Source code for pylangacq.chat

"""Interfacing with CHAT data files."""

import collections
import concurrent.futures as cf
import dataclasses
import datetime
import functools
import itertools
import json
import os
import re
import shutil
import tempfile
import uuid
import warnings
import zipfile
from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple, Union

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from dateutil.parser import parse as parse_date
from dateutil.parser import ParserError
from tabulate import tabulate

import pylangacq
from .measures import _get_ipsyn, _get_mlum, _get_mluw, _get_ttr
from .objects import (
    Gra,
    Token,
    Utterance,
    _sort_keys,
    _CLITICS,
    _PRECLITIC,
    _POSTCLITIC,
)
from ._clean_utterance import _clean_utterance


_ENCODING = "utf-8"

_CHAT_EXTENSION = ".cha"

_TIMER_MARKS_REGEX = re.compile(r"\x15-?(\d+)_(\d+)-?\x15")

_CACHED_DATA_DIR = os.path.join(os.path.expanduser("~"), ".pylangacq")
_CACHED_DATA_JSON_PATH = os.path.join(_CACHED_DATA_DIR, "cached_data.json")

_CHAT_LINE_INDICATORS = frozenset({"@", "*", "%"})

_HEADER_REGEX = re.compile(r"\A@([^@:]+)(:\s+(\S[\S\s]+))?\Z")

_CLITIC_REGEX = re.compile(r"((.+)\$)?([^$~]+)(~(.+))?")


def _params_in_docstring(*params, class_method=True):
    docstring = ""

    if "participants" in params:
        docstring += """
        participants : str or iterable of str, optional
            Participants of interest. You may pass in a string (e.g., ``"CHI"``
            for studying child speech)
            or an iterable of strings (e.g., ``{"MOT", "INV"}``). Only the specified
            participants are included.
            If you pass in ``None`` (the default), all participants are included.
            This parameter cannot be used together with ``exclude``.
        exclude : str or iterable of str, optional
            Participants to exclude. You may pass in a string (e.g., ``"CHI"``
            for child-directed speech)
            or an iterable of strings (e.g., ``{"MOT", "INV"}``). Only the specified
            participants are excluded.
            If you pass in ``None`` (the default), no participants are excluded.
            This parameter cannot be used together with ``participants``."""

    if "by_utterances" in params:
        docstring += """
        by_utterances : bool, optional
            If ``True``, the resulting objects are wrapped as a list at the utterance
            level.
            If ``False`` (the default), such utterance-level list structure
            does not exist."""

    if "by_files" in params:
        docstring += """
        by_files : bool, optional
            If ``True``, return a list X of results, where len(X) is the number of
            files in the ``Reader`` object, and each element in X is the result for one
            file; the ordering of X corresponds to that of the file paths from
            :func:`~pylangacq.Reader.file_paths`.
            If ``False`` (the default), return the result that collapses the file
            distinction just described for when ``by_files`` is ``True``."""

    if "keep_case" in params:
        docstring += """
        keep_case : bool, optional
            If ``True`` (the default), case distinctions are kept, e.g.,
            word tokens like "the" and "The" are treated as distinct.
            If ``False``, all word tokens are forced to be in lowercase
            as a preprocessing step.
            CHAT data from CHILDES intentionally does not follow the orthographic
            convention of capitalizing the first letter of a sentence in the
            transcriptions (as would have been done in many European languages),
            and so leaving keep_case as True is appropriate in most cases."""

    if "match" in params:
        docstring += """
        match : str, optional
            If provided, only the file paths that match this string
            (by regular expression matching) are read and parsed.
            For example, to work with the American English dataset Brown (containing
            data for the children Adam, Eve, and Sarah),
            you can pass in ``"Eve"`` here to only handle the data for Eve, since
            the unzipped Brown data from CHILDES has a directory structure of
            ``Brown/Eve/xxx.cha`` for Eve's data.
            If this parameter is not specified or ``None`` is passed in (the default),
            such file path filtering does not apply.
        exclude : str, optional
            If provided, the file paths that match this string (by regular expression
            matching) are excluded for reading and parsing."""

    if "encoding" in params:
        docstring += """
        encoding : str, optional
            Text encoding to parse the CHAT data. The default value is ``"utf-8"``
            for Unicode UTF-8."""

    if "extension" in params:
        docstring += """
        extension : str, optional
            File extension for CHAT data files. The default value is ``".cha"``."""

    if "cls" in params:
        docstring += """
        cls : type, optional
            Either :class:`~pylangacq.Reader` (the default),
            or a subclass from it that expects the same arguments for the methods
            :func:`~pylangacq.Reader.from_zip`, :func:`~pylangacq.Reader.from_dir`,
            and :func:`~pylangacq.Reader.from_files`.
            Pass in your own :class:`~pylangacq.Reader` subclass
            for new or modified behavior of the returned reader object."""

    if "parallel" in params:
        docstring += """
        parallel : bool, optional
            If ``True`` (the default), CHAT reading and parsing is parallelized
            for speed-up, because in most cases multiple CHAT data files and/or strings
            are being handled.
            Under certain circumstances (e.g., your application is already parallelized
            and further parallelization from within PyLangAcq might be undesirable),
            you may like to consider setting this parameter to ``False``."""

    if "use_cached" in params:
        docstring += """
        use_cached : bool, optional
            If ``True`` (the default), and if the path is a URL for a remote ZIP
            archive, then CHAT reading attempts to use the previously downloaded
            data cached on disk. This setting allows you to call
            this function with the same URL repeatedly without hitting the CHILDES /
            TalkBank server more than once for the same data.
            Pass in ``False`` to force a new download; the upstream CHILDES / TalkBank
            data is updated in minor ways from time to time, e.g., for CHAT format,
            header/metadata information, updated annotations.
            See also the helper functions: :func:`pylangacq.chat.cached_data_info`,
            :func:`pylangacq.chat.remove_cached_data`."""

    if "session" in params:
        docstring += """
        session : requests.Session, optional
            If the path is a URL for a remote ZIP archive, data downloading is
            done with reasonable settings of retries and timeout by default,
            in order to be robust against intermittent network issues.
            If necessary, pass in your own instance of :class:`requests.Session`
            to customize."""

    if not class_method:
        docstring = docstring.replace("\n        ", "\n    ")

    def real_decorator(func):
        returns_none = (
            f"Returns\n{'        ' if class_method else '    '}-------"
            not in func.__doc__
        )
        if returns_none:
            func.__doc__ += docstring
        else:
            if class_method:
                returns_header = "\n\n        Returns\n        -------"
            else:
                returns_header = "\n\n    Returns\n    -------"
            func.__doc__ = func.__doc__.replace(
                returns_header, docstring + returns_header
            )

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)

        return wrapper

    return real_decorator


def _deprecate_warning(what, since_version, use_instead):
    """Throws a FutureWarning for deprecation.

    FutureWarning is used instead of DeprecationWarning, because Python
    does not show DeprecationWarning by default.

    Parameters
    ----------
    what : str
        What to deprecate.
    since_version : str
        Version "x.y.z" since which the deprecation is in effect.
    use_instead : str
        Use this instead.
    """
    warnings.warn(
        f"'{what}' has been deprecated since PyLangAcq v{since_version}. "
        f"Please use {use_instead} instead.",
        FutureWarning,
    )


class _list(list):
    def __repr__(self):
        return "\n".join(x._to_str() for x in self)

    def _repr_html_(self) -> str:
        return "\n".join(x._repr_html_() for x in self)


@dataclasses.dataclass
class _File:
    """A CHAT file (or string).

    Attributes
    ----------
    file_path : str
    header : dict
    utterances : List[Utterance]
    """

    __slots__ = ("file_path", "header", "utterances")

    file_path: str
    header: Dict
    utterances: List[Utterance]


[docs]class Reader: """A reader that handles CHAT data.""" def __init__(self): """Initialize an empty reader.""" self._files = collections.deque() def _parse_chat_strs( self, strs: List[str], file_paths: List[str], parallel: bool ) -> None: if parallel: with cf.ProcessPoolExecutor() as executor: self._files = collections.deque( executor.map(self._parse_chat_str, strs, file_paths) ) else: self._files = collections.deque( self._parse_chat_str(s, f) for s, f in zip(strs, file_paths) ) def __len__(self): raise NotImplementedError( "__len__ of a CHAT reader is intentionally undefined. " "Intuitively, there are different lengths one may refer to: " "Number of files in this reader? Utterances? Words? Something else?" ) def _get_reader_from_files(self, files: Iterable[_File]) -> "pylangacq.Reader": reader = self.__class__() reader._files = collections.deque(files) return reader def __iter__(self): yield from (self._get_reader_from_files([f]) for f in self._files) def __getitem__(self, item): if type(item) is int: return self._get_reader_from_files([self._files[item]]) elif type(item) is slice: start, stop, step = item.indices(len(self._files)) # Slicing of a list etc would give us a _shallow_ copy of the container, # and so we follow the shallow copying practice here for the files. return self._get_reader_from_files( itertools.islice(self._files.copy(), start, stop, step) ) else: raise TypeError( f"Reader indices must be integers or slices, not {type(item)}" ) def __setitem__(self, key, value): raise NotImplementedError( "Mutating the CHAT reader by targeting the individual files through " "indices is not supported. Please use the implemented Reader object " "methods to add or remove data." )
[docs] def clear(self) -> None: """Remove all data from this reader.""" self._files = collections.deque()
def __add__(self, other: "pylangacq.Reader") -> "pylangacq.Reader": if not issubclass(other.__class__, Reader): raise TypeError(f'cannot concatenate "{other.__class__}" to a reader') return self._get_reader_from_files(self._files + other._files) def _append(self, left_or_right, reader: "pylangacq.Reader") -> None: func = "extendleft" if left_or_right == "left" else "extend" if not issubclass(reader.__class__, Reader): raise TypeError(f"not a Reader object: {type(reader)}") getattr(self._files, func)(reader._files)
[docs] def append(self, reader: "pylangacq.Reader") -> None: """Append data from another reader. New data is appended as-is with no filtering of any sort, even for files whose file paths duplicate those already in the current reader. Parameters ---------- reader : Reader A reader from which to append data """ self._append("right", reader)
[docs] def append_left(self, reader: "pylangacq.Reader") -> None: """Left-append data from another reader. New data is appended as-is with no filtering of any sort, even for files whose file paths duplicate those already in the current reader. Parameters ---------- reader : Reader A reader from which to left-append data """ self._append("left", reader)
def _extend(self, left_or_right, readers: "Iterable[pylangacq.Reader]") -> None: # Loop through each object in ``readers`` explicitly, so that we have # a chance to check that the object is indeed a Reader instance. new_files = [] for reader in readers: if not issubclass(reader.__class__, Reader): raise TypeError(f"not a Reader object: {type(reader)}") new_files.extend(reader._files) func = "extendleft" if left_or_right == "left" else "extend" getattr(self._files, func)(new_files)
[docs] def extend(self, readers: "Iterable[pylangacq.Reader]") -> None: """Extend data from other readers. New data is appended as-is with no filtering of any sort, even for files whose file paths duplicate those already in the current reader. Parameters ---------- readers : Iterable[Reader] Readers from which to extend data """ # Loop through each object in ``readers`` explicitly, so that we have # a chance to check that the object is indeed a Reader instance. self._extend("right", readers)
[docs] def extend_left(self, readers: "Iterable[pylangacq.Reader]") -> None: """Left-extend data from other readers. New data is appended as-is with no filtering of any sort, even for files whose file paths duplicate those already in the current reader. Parameters ---------- readers : Iterable[Reader] Readers from which to extend data """ self._extend("left", readers)
def _pop(self, left_or_right) -> "pylangacq.Reader": func = "popleft" if left_or_right == "left" else "pop" file_ = getattr(self._files, func)() return self._get_reader_from_files([file_])
[docs] def pop(self) -> "pylangacq.Reader": """Drop the last data file from the reader and return it as a reader. Returns ------- :class:`pylangacq.Reader` """ return self._pop("right")
[docs] def pop_left(self) -> "pylangacq.Reader": """Drop the first data file from the reader and return it as a reader. Returns ------- :class:`pylangacq.Reader` """ return self._pop("left")
[docs] @_params_in_docstring("match", "exclude") def filter(self, match: str = None, exclude: str = None) -> "pylangacq.Reader": """Return a new reader filtered by file paths. Parameters ---------- Returns ------- :class:`pylangacq.Reader` Raises ------ TypeError If neither ``match`` nor ``exclude`` is specified. """ if not match and not exclude: raise TypeError("At least one of {match, exclude} must be specified") reader = self.__class__() file_paths = set(self._filter_file_paths(self.file_paths(), match, exclude)) reader._files = collections.deque( f for f in self._files if f.file_path in file_paths ) return reader
@staticmethod def _flatten(item_type, nested) -> Union[List, Set]: if item_type == list: return [item for items in nested for item in items] elif item_type == int: return sum(nested) elif item_type == set: return set().union(*nested) elif item_type == collections.Counter: return sum(nested, collections.Counter()) else: raise ValueError(f"unrecognized item type: {item_type}")
[docs] @_params_in_docstring("participants", "exclude", "by_files") def utterances( self, participants=None, exclude=None, by_files=False ) -> Union[List[Utterance], List[List[Utterance]]]: """Return the utterances. Parameters ---------- Returns ------- List[Utterance] if ``by_files`` is ``False``, otherwise List[List[Utterance]] """ result_by_files = self._filter_utterances_by_participants(participants, exclude) if by_files: return result_by_files else: return self._flatten(list, result_by_files)
def _get_result_by_utterances_by_files(self, result, by_utterances, by_files): if by_files and by_utterances: pass elif by_files and not by_utterances: result = [self._flatten(list, f) for f in result] elif not by_files and by_utterances: result = self._flatten(list, result) else: # not by_files and not by_utterances result = self._flatten(list, (self._flatten(list, f) for f in result)) return result
[docs] @_params_in_docstring("participants", "exclude", "by_utterances", "by_files") def tokens( self, participants=None, exclude=None, by_utterances=False, by_files=False ) -> Union[List[Token], List[List[Token]], List[List[List[Token]]]]: """Return the tokens. Parameters ---------- Returns ------- List[List[List[Token]]] if both ``by_utterances`` and ``by_files`` are ``True`` List[List[Token]] if ``by_utterances`` is ``True`` and ``by_files`` is ``False`` List[List[Token]] if ``by_utterances`` is ``False`` and ``by_files`` is ``True`` List[Token] if both ``by_utterances`` and ``by_files`` are ``False`` """ utterances = self.utterances( participants=participants, exclude=exclude, by_files=True ) result = [[u.tokens for u in us] for us in utterances] return self._get_result_by_utterances_by_files(result, by_utterances, by_files)
[docs] @_params_in_docstring("participants", "exclude", "by_utterances", "by_files") def words( self, participants=None, exclude=None, by_utterances=False, by_files=False ) -> Union[List[str], List[List[str]], List[List[List[str]]]]: """Return the words. Parameters ---------- Returns ------- List[List[List[str]]] if both ``by_utterances`` and ``by_files`` are ``True`` List[List[str]] if ``by_utterances`` is ``True`` and ``by_files`` is ``False`` List[List[str]] if ``by_utterances`` is ``False`` and ``by_files`` is ``True`` List[str] if both ``by_utterances`` and ``by_files`` are ``False`` """ tokens = self.tokens( participants=participants, exclude=exclude, by_utterances=True, by_files=True, ) result = [ [[t.word for t in ts if t.word not in _CLITICS] for ts in tss] for tss in tokens ] return self._get_result_by_utterances_by_files(result, by_utterances, by_files)
def _filter_utterances_by_participants( self, participants, exclude ) -> List[List[Utterance]]: if participants and exclude: raise TypeError( "participants and exclude cannot be specified at the same time: " f"{participants}, {exclude}" ) if participants is None: participants: List[Set] = self.participants(by_files=True) elif type(participants) is str: participants: List[Set] = [{participants} for _ in range(self.n_files())] elif hasattr(participants, "__iter__"): participants: List[Set] = [set(participants) for _ in range(self.n_files())] else: raise ValueError( "participants must be one of {None, a string, an iterable of strings}: " f"{participants}" ) if exclude is None: pass elif type(exclude) is str: participants: List[Set] = [p - {exclude} for p in participants] elif hasattr(exclude, "__iter__"): participants: List[Set] = [p - set(exclude) for p in participants] else: raise ValueError( "exclude must be one of {None, a string, an iterable of strings}: " f"{exclude}" ) return [ [u for u in us if u.participant in ps] for us, ps in zip([f.utterances for f in self._files], participants) ]
[docs] def headers(self) -> List[Dict]: """Return the headers. Returns ------- List[Dict] """ return [f.header for f in self._files]
[docs] def file_paths(self) -> List[str]: """Return the file paths. If the data comes from in-memory strings, then the "file paths" are arbitrary UUID random strings. Returns ------- List[str] """ return [f.file_path for f in self._files]
[docs] def n_files(self) -> int: """Return the number of files.""" return len(self._files)
[docs] @_params_in_docstring("by_files") def participants(self, by_files=False) -> Union[Set[str], List[Set[str]]]: """Return the participants (e.g., CHI, MOT). Parameters ---------- Returns ------- Set[str] if ``by_files`` is ``False``, otherwise List[Set[str]] """ result_by_files = [{u.participant for u in f.utterances} for f in self._files] if by_files: return result_by_files else: return self._flatten(set, result_by_files)
[docs] @_params_in_docstring("by_files") def languages(self, by_files=False) -> Union[Set[str], List[List[str]]]: """Return the languages in the data. Parameters ---------- Returns ------- Set[str] if ``by_files`` is ``False``, otherwise List[List[str]] When ``by_files`` is ``True``, the ordering of languages given by the list indicates language dominance. Such ordering would not make sense when ``by_files`` is ``False``, in which case the returned object is a set instead of a list. """ result_by_files = [f.header.get("Languages", []) for f in self._files] if by_files: return result_by_files else: return set(self._flatten(list, result_by_files))
[docs] @_params_in_docstring("by_files") def dates_of_recording( self, by_files=False ) -> Union[Set[datetime.date], List[Set[datetime.date]]]: """Return the dates of recording. Parameters ---------- Returns ------- Set[datetime.date] if ``by_files`` is ``False``, otherwise List[Set[datetime.date]]] """ result_by_files = [f.header.get("Date", set()) for f in self._files] if by_files: return result_by_files else: return self._flatten(set, result_by_files)
[docs] def ages( self, participant="CHI", months=False ) -> Union[List[Tuple[int, int, int]], List[float]]: """Return the ages of the given participant in the data. Parameters ---------- participant : str, optional Participant of interest, which defaults to the typical use case of ``"CHI"`` for the target child. months : bool, optional If ``False`` (the default), age is represented as a tuple of (years, months, days), e.g., "1;06.00" in CHAT becomes ``(1, 6, 0)``. If ``True``, age is a float for the number of months, e.g., "1;06.00" in CHAT becomes ``18.0`` for 18 months. Returns ------- List[Tuple[int, int, int]] if ``months`` is ``False``, otherwise List[float] """ result_by_files = [] for f in self._files: try: age = f.header["Participants"][participant]["age"] year_str, _, month_day = age.partition(";") month_str, _, day_str = month_day.partition(".") year_int = int(year_str) if year_str.isdigit() else 0 month_int = int(month_str) if month_str.isdigit() else 0 day_int = int(day_str) if day_str.isdigit() else 0 if months: result = year_int * 12 + month_int + day_int / 30 else: result = (year_int, month_int, day_int) except (KeyError, IndexError, ValueError): result = None result_by_files.append(result) return result_by_files
[docs] @_params_in_docstring("participants", "exclude", "by_files") def tagged_sents( self, participants=None, exclude=None, by_files=False ) -> Union[List[List[Token]], List[List[List[Token]]]]: """Return the tagged sents. .. deprecated:: 0.13.0 Please use :func:`~pylangacq.Reader.tokens` with ``by_utterances=True`` instead. Parameters ---------- Returns ------- List[List[Token]] if ``by_files`` is ``False``, otherwise List[List[List[Token]]] """ _deprecate_warning( "tagged_sents", "0.13.0", "the `.tokens()` method with by_utterances=True", ) utterances = self._filter_utterances_by_participants(participants, exclude) result_by_files = [[u.tokens for u in us] for us in utterances] if by_files: return result_by_files else: return self._flatten(list, result_by_files)
[docs] @_params_in_docstring("participants", "exclude", "by_files") def tagged_words( self, participants=None, exclude=None, by_files=False ) -> Union[List[Token], List[List[Token]]]: """Return the tagged words. .. deprecated:: 0.13.0 Please use :func:`~pylangacq.Reader.tokens` with ``by_utterances=False`` instead. Parameters ---------- Returns ------- List[Token] if ``by_files`` is ``False``, otherwise List[List[Token]] """ _deprecate_warning( "tagged_words", "0.13.0", "the `.tokens()` method with by_utterances=False" ) utterances = self._filter_utterances_by_participants(participants, exclude) result_by_files = [[word for u in us for word in u.tokens] for us in utterances] if by_files: return result_by_files else: return self._flatten(list, result_by_files)
[docs] @_params_in_docstring("participants", "exclude", "by_files") def sents( self, participants=None, exclude=None, by_files=False ) -> Union[List[List[str]], List[List[List[str]]]]: """Return the sents. .. deprecated:: 0.13.0 Please use :func:`~pylangacq.Reader.words` with ``by_utterances=True`` instead. Parameters ---------- Returns ------- List[List[str]] if ``by_files`` is ``False``, otherwise List[List[List[str]]] """ _deprecate_warning( "words", "0.13.0", "the `.words()` method with by_utterances=True" ) utterances = self._filter_utterances_by_participants(participants, exclude) result_by_files = [ [[t.word for t in u.tokens] for u in us] for us in utterances ] if by_files: return result_by_files else: return self._flatten(list, result_by_files)
[docs] def mlum(self, participant="CHI", exclude_switch: bool = False) -> List[float]: """Return the mean lengths of utterance in morphemes. Parameters ---------- participant : str, optional Participant of interest, which defaults to the typical use case of ``"CHI"`` for the target child. exclude_switch : bool, optional If ``True``, exclude words with the suffix "@s" for switching to another language (not uncommon in code-mixing or multilingual acquisition). The default is ``False``. Returns ------- List[float] """ return _get_mlum( self.tokens(participants=participant, by_utterances=True, by_files=True), exclude_switch, )
[docs] def mlu(self, participant="CHI", exclude_switch: bool = False) -> List[float]: """Return the mean lengths of utterance (MLU). This method is equivalent to :func:`~pylangacq.Reader.mlum`. Parameters ---------- participant : str, optional Participant of interest, which defaults to the typical use case of ``"CHI"`` for the target child. exclude_switch : bool, optional If ``True``, exclude words with the suffix "@s" for switching to another language (not uncommon in code-mixing or multilingual acquisition). The default is ``False``. Returns ------- List[float] """ return self.mlum(participant=participant, exclude_switch=exclude_switch)
[docs] def mluw(self, participant="CHI", exclude_switch: bool = False) -> List[float]: """Return the mean lengths of utterance in words. Parameters ---------- participant : str, optional Participant of interest, which defaults to the typical use case of ``"CHI"`` for the target child. exclude_switch : bool, optional If ``True``, exclude words with the suffix "@s" for switching to another language (not uncommon in code-mixing or multilingual acquisition). The default is ``False``. Returns ------- List[float] """ return _get_mluw( self.words(participants=participant, by_utterances=True, by_files=True), exclude_switch, )
[docs] def ttr(self, keep_case=True, participant="CHI") -> List[float]: """Return the type-token ratios (TTR). Parameters ---------- keep_case : bool, optional If ``True`` (the default), case distinctions are kept, e.g., word tokens like "the" and "The" are treated as distinct. If ``False``, all word tokens are forced to be in lowercase as a preprocessing step. CHAT data from CHILDES intentionally does not follow the orthographic convention of capitalizing the first letter of a sentence in the transcriptions (as would have been done in many European languages), and so leaving keep_case as True is appropriate in most cases. participant : str, optional Participant of interest, which defaults to the typical use case of ``"CHI"`` for the target child. Returns ------- List[float] """ return _get_ttr( self.word_frequencies( keep_case=keep_case, participants=participant, by_files=True ) )
[docs] def ipsyn(self, participant="CHI") -> List[int]: """Return the indexes of productive syntax (IPSyn). Parameters ---------- participant : str, optional Participant of interest, which defaults to the typical use case of ``"CHI"`` for the target child. Returns ------- List[float] """ return _get_ipsyn( self.tokens(participants=participant, by_utterances=True, by_files=True) )
[docs] @_params_in_docstring("keep_case", "participants", "exclude", "by_files") def word_ngrams( self, n, keep_case=True, participants=None, exclude=None, by_files=False ) -> Union[collections.Counter, List[collections.Counter]]: """Return word ngrams. Parameters ---------- Returns ------- collections.Counter if ``by_files`` is ``False``, otherwise List[collections.Counter] """ err_msg = f"n must be a positive integer: {n}" if type(n) is not int: raise TypeError(err_msg) elif n < 1: raise ValueError(err_msg) result_by_files = [] for sents_in_file in self.words( participants=participants, exclude=exclude, by_utterances=True, by_files=True, ): result_for_file = collections.Counter() for sent in sents_in_file: if len(sent) < n: continue if not keep_case: sent = [w.lower() for w in sent] ngrams = zip(*[sent[i:] for i in range(n)]) result_for_file.update(ngrams) result_by_files.append(result_for_file) if by_files: return result_by_files else: return self._flatten(collections.Counter, result_by_files)
[docs] @_params_in_docstring("keep_case", "participants", "exclude", "by_files") def word_frequencies( self, keep_case=True, participants=None, exclude=None, by_files=False ) -> Union[collections.Counter, List[collections.Counter]]: """Return word frequencies. Parameters ---------- Returns ------- collections.Counter if ``by_files`` is ``False``, otherwise List[collections.Counter] """ result_by_files = self.word_ngrams( 1, keep_case=keep_case, participants=participants, exclude=exclude, by_files=True, ) result_by_files = [ collections.Counter({k[0]: v for k, v in r.items()}) for r in result_by_files ] if by_files: return result_by_files else: return self._flatten(collections.Counter, result_by_files)
[docs] @classmethod @_params_in_docstring("parallel") def from_strs( cls, strs: List[str], ids: List[str] = None, parallel: bool = True ) -> "pylangacq.Reader": """Instantiate a reader from in-memory CHAT data strings. Parameters ---------- strs : List[str] List of CHAT data strings. The ordering of the strings determines that of the parsed CHAT data in the resulting reader. ids : List[str], optional List of identifiers. If not provided, UUID random strings are used. When file paths are referred to in other parts of this package, they mean these identifiers if you have instantiated the reader by this method. Returns ------- :class:`pylangacq.Reader` """ strs = list(strs) if ids is None: ids = [_get_uuid() for _ in range(len(strs))] else: ids = list(ids) if len(strs) != len(ids): raise ValueError( f"strs and ids must have the same size: {len(strs)} and {len(ids)}" ) reader = cls() reader._parse_chat_strs(strs, ids, parallel) return reader
[docs] @classmethod @_params_in_docstring("match", "exclude", "encoding", "parallel") def from_files( cls, paths: List[str], match: str = None, exclude: str = None, encoding: str = _ENCODING, parallel: bool = True, ) -> "pylangacq.Reader": """Instantiate a reader from local CHAT data files. Parameters ---------- paths : List[str] List of local file paths of the CHAT data. The ordering of the paths determines that of the parsed CHAT data in the resulting reader. Returns ------- :class:`pylangacq.Reader` """ # Inner function with file closing and closure to wrap in the given encoding def _open_file(path: str) -> str: with open(path, encoding=encoding) as f: return f.read() paths = cls._filter_file_paths(paths, match, exclude) if parallel: with cf.ThreadPoolExecutor() as executor: strs = list(executor.map(_open_file, paths)) else: strs = [_open_file(p) for p in paths] return cls.from_strs(strs, paths, parallel=parallel)
@staticmethod def _filter_file_paths( paths: List[str], match: str = None, exclude: str = None ) -> List[str]: paths = list(paths) if match: regex = re.compile(match) paths = [p for p in paths if regex.search(p)] if exclude: regex = re.compile(exclude) paths = [p for p in paths if not regex.search(p)] return paths
[docs] @classmethod @_params_in_docstring("match", "exclude", "extension", "encoding", "parallel") def from_dir( cls, path: str, match: str = None, exclude: str = None, extension: str = _CHAT_EXTENSION, encoding: str = _ENCODING, parallel: bool = True, ) -> "pylangacq.Reader": """Instantiate a reader from a local directory with CHAT data files. Parameters ---------- path : str Local directory that contains CHAT data files. Files are searched for recursively under this directory, and those that satisfy ``match`` and ``extension`` are parsed and handled by the reader. Returns ------- :class:`pylangacq.Reader` """ file_paths = [] for dirpath, _, filenames in os.walk(path): if not filenames: continue for filename in filenames: if not filename.endswith(extension): continue file_paths.append(os.path.join(dirpath, filename)) return cls.from_files( sorted(file_paths), match=match, exclude=exclude, encoding=encoding, parallel=parallel, )
[docs] @classmethod @_params_in_docstring( "match", "exclude", "extension", "encoding", "parallel", "use_cached", "session", ) def from_zip( cls, path: str, match: str = None, exclude: str = None, extension: str = _CHAT_EXTENSION, encoding: str = _ENCODING, parallel: bool = True, use_cached: bool = True, session: requests.Session = None, ) -> "pylangacq.Reader": """Instantiate a reader from a local or remote ZIP file. If the input data is a remote ZIP file and you expect to call this method with the same path multiple times, consider downloading the data to the local system and then reading it from there to avoid unnecessary re-downloading. Caching a remote ZIP file isn't implemented (yet) as the upstream CHILDES / TalkBank data is updated in minor ways from time to time. Parameters ---------- path : str Either a local file path or a URL (one that begins with ``"https://"`` or ``"http://"``) for a ZIP file containing CHAT data files. For instance, you can provide either a local path to a ZIP file downloaded from CHILDES, or simply a URL such as ``"https://childes.talkbank.org/data/Eng-NA/Brown.zip"``. Returns ------- :class:`pylangacq.Reader` """ with tempfile.TemporaryDirectory() as temp_dir: is_url = path.startswith("https://") or path.startswith("http://") unzip_dir = cls._retrieve_unzip_dir(path) if is_url else None if is_url and (not use_cached or not unzip_dir): if unzip_dir: remove_cached_data(path) zip_path = os.path.join(temp_dir, os.path.basename(path)) _download_file(path, zip_path, session) unzip_dir = cls._create_unzip_dir(path) elif is_url and unzip_dir: zip_path = None else: zip_path = path unzip_dir = temp_dir if zip_path: with zipfile.ZipFile(zip_path) as zfile: zfile.extractall(unzip_dir) reader = cls.from_dir( unzip_dir, match=match, exclude=exclude, extension=extension, encoding=encoding, parallel=parallel, ) # Unzipped files from `.from_zip` have the unwieldy temp dir in the file path. for f in reader._files: f.file_path = f.file_path.replace(unzip_dir, "").lstrip(os.sep) return reader
@staticmethod def _retrieve_unzip_dir(url: str) -> Union[str, None]: try: existing_records = json.load(open(_CACHED_DATA_JSON_PATH, encoding="utf-8")) except FileNotFoundError: return None subdir = existing_records.get(url, {}).get("subdir") if subdir is None: return None else: return os.path.join(_CACHED_DATA_DIR, subdir) @staticmethod def _create_unzip_dir(url: str) -> str: if not os.path.isdir(_CACHED_DATA_DIR): _initialize_cached_data_dir() subdir = _get_uuid() unzip_dir = os.path.join(_CACHED_DATA_DIR, subdir) os.makedirs(unzip_dir) new_record = { "subdir": subdir, "url": url, "cached_at": datetime.datetime.now().isoformat(), } existing_records = json.load(open(_CACHED_DATA_JSON_PATH, encoding="utf-8")) _write_cached_data_json({**existing_records, **{url: new_record}}) return unzip_dir
[docs] def to_strs(self, tabular: bool = True) -> Generator[str, None, None]: """Yield CHAT data strings. .. note:: The header information may not be completely reproduced in the output CHAT strings. Known issues all have to do with a header field used multiple times in the original CHAT data. For ``Date``, only the first date of recording is retained in the output string. For all other multiply used header fields (e.g., ``Tape Location``, ``Time Duration``), only the last value in a given CHAT file is retained. Note that ``ID`` for participant information is not affected. Parameters ---------- tabular : bool, optional If ``True``, adjust spacing such that the three tiers of the utterance, %mor, and %gra are aligned in a tabular form. Note that such alignment would drop annotations (e.g., pauses) on the main utterance tier. Yields ------ str CHAT data string for one file. """ header_first = ( "UTF8", "PID", "Languages", "Participants", "Date", "Types", ) for f in self._files: str_for_file = "" header_keys = _sort_keys(f.header.keys(), first=header_first) for key in header_keys: if key == "Languages": str_for_file += ( f"@Languages:\t{' , '.join(f.header['Languages'])}\n" ) elif key == "Participants": participants = f.header["Participants"] parts = [] for code, demographics in participants.items(): parts.append( f"{code} {demographics['name']} {demographics['role']}" ) str_for_file += f"@Participants:\t{' , '.join(parts)}\n" for code, d in participants.items(): # d = demographics id_line = ( f"{d['language']}|" f"{d['corpus']}|" f"{code}|" f"{d['age']}|" f"{d['sex']}|" f"{d['group']}|" f"{d['ses']}|" f"{d['role']}|" f"{d['education']}|" f"{d['custom']}|" ) str_for_file += f"@ID:\t{id_line}\n" elif key == "Date": # TODO: A CHAT file may have more than one recording date. try: date = sorted(f.header["Date"])[0] except IndexError: continue str_for_file += f"@Date:\t{date.strftime('%d-%b-%Y').upper()}\n" elif f.header[key]: str_for_file += f"@{key}:\t{f.header[key]}\n" else: str_for_file += f"@{key}\n" for u in f.utterances: str_for_file += u._to_str(tabular=tabular) yield str_for_file
[docs] @_params_in_docstring("participants", "exclude") def head(self, n: int = 5, participants=None, exclude=None): """Return the first several utterances. Parameters ---------- n : int, optional The number of utterances to return. Returns ------- list of utterances """ return self._head_or_tail(slice(n), participants, exclude)
[docs] @_params_in_docstring("participants", "exclude") def tail(self, n: int = 5, participants=None, exclude=None): """Return the last several utterances. Parameters ---------- n : int, optional The number of utterances to return. Returns ------- list of utterances """ return self._head_or_tail(slice(-n, None), participants, exclude)
def _head_or_tail(self, slice_, participants, exclude): us = self.utterances( participants=participants, exclude=exclude, by_files=False ).__getitem__(slice_) return _list(us) def _get_info_summary(self) -> str: lines = [ f"{len(self._files)} files", f"{sum(len(f.utterances) for f in self._files)} utterances", f"{sum(len(u.tokens) for f in self._files for u in f.utterances)} words", ] return "\n".join(lines) def _get_info_details_of_file(self, f: _File) -> Dict: result = { "Utterance Count": len(f.utterances), "Word Count": sum(len(u.tokens) for u in f.utterances), } if not _is_uuid(f.file_path): result["File Path"] = f.file_path return result
[docs] def info(self, verbose=False) -> None: """Print a summary of this Reader's data. Parameters ---------- verbose : bool, optional If ``True`` (default is ``False``), show the details of all the files. """ print(self._get_info_summary()) if len(self._files) < 2: return details = [self._get_info_details_of_file(f) for f in self._files] max_n_files_if_not_verbose = 5 if not verbose: details = details[:max_n_files_if_not_verbose] n_files = max_n_files_if_not_verbose else: n_files = self.n_files() indices = (f"#{i + 1}" for i in range(n_files)) output = tabulate(details, headers="keys", showindex=indices) if not verbose: output += "\n...\n(set `verbose` to True for all the files)" print(output)
[docs] def to_chat( self, path: str, is_dir: bool = False, filenames: Iterable[str] = None, tabular: bool = True, encoding: str = _ENCODING, ) -> None: """Export to CHAT data files. Parameters ---------- path : str The path to a file where you want to output the CHAT data, e.g., `"data.cha"`, `"foo/bar/data.cha"`. is_dir : bool, optional If ``True`` (default is ``False``), then ``path`` is interpreted as a directory instead. The CHAT data is written to possibly multiple files under this directory. The number of files you get can be checked by calling :func:`~pylangacq.Reader.n_files`, which depends on how this reader object is created. filenames : Iterable[str], optional Used only when ``is_dir`` is ``True``. These are the filenames of the CHAT files to write. If ``None`` or not given, {0001.cha, 0002.cha, ...} are used. tabular : bool, optional If ``True``, adjust spacing such that the three tiers of the utterance, %mor, and %gra are aligned in a tabular form. Note that such alignment would drop annotations (e.g., pauses) on the main utterance tier. encoding : str, optional Text encoding to output the CHAT data as. The default value is ``"utf-8"`` for Unicode UTF-8. Raises ------ ValueError - If you attempt to output data to a single local file, but the CHAT data in this reader appears to be organized in multiple files. - If you attempt to output data to a directory while providing your own filenames, but the number of your filenames doesn't match the number of CHAT files in this reader object. """ if not is_dir: if self.n_files() > 1: raise ValueError( "The CHAT data in this reader object exists in more than one file. " "(Call the `.n_files()` method to check.) It is not possible to " "output data from multiple files to a single local file. " "To output data, set `is_dir` to `True`, and pass in a directory " "(not a file path) to `path`." ) dir_, basename = os.path.split(path) if not basename and dir_.endswith(os.sep): raise ValueError( f"You've passed in {dir_} as `path` that looks like a directory " f"instead of a path to a file, because {dir_} ends with the " f"directory separator {os.sep}. " "As an example, a path to a file should look like " f"foo{os.sep}bar.cha" ) elif not basename: raise ValueError( f"You've passed in {dir_} as `path`, but it doesn't look like " "a path to a file." ) filenames = [basename] else: dir_ = path if filenames is None: filenames = [ f"{str(i + 1).zfill(4)}.cha" for i in range(len(self._files)) ] else: filenames = list(filenames) if len(filenames) != len(self._files): raise ValueError( f"There are {len(self._files)} CHAT files to create, " f"but you've provided {len(filenames)} filenames." ) if dir_: os.makedirs(dir_, exist_ok=True) for filename, lines in zip(filenames, self.to_strs(tabular=tabular)): with open(os.path.join(dir_, filename), "w", encoding=encoding) as f: f.write(lines)
def _parse_chat_str(self, chat_str, file_path) -> _File: lines = self._get_lines(chat_str) header = self._get_header(lines) all_tiers = self._get_all_tiers(lines) utterances = self._get_utterances(all_tiers) return _File(file_path, header, utterances) def _get_participant_code(self, tier_markers: Iterable[str]) -> Union[str, None]: for tier_marker in tier_markers: if not tier_marker.startswith("%"): return tier_marker return None def _get_utterances(self, all_tiers: Iterable[Dict[str, str]]) -> List[Utterance]: result_list = [] for tiermarker_to_line in all_tiers: # TODO: Handle the new tiers for Universal Dependencies for unhandled_tier in ("%umor", "%ugra"): if unhandled_tier in tiermarker_to_line: del tiermarker_to_line[unhandled_tier] participant_code = self._get_participant_code(tiermarker_to_line.keys()) if participant_code is None: continue # get the plain words from utterance tier utterance_line = _clean_utterance(tiermarker_to_line[participant_code]) forms = utterance_line.split() # %mor tier preclitic_indices = [] postclitic_indices = [] mor_items = [] if "%mor" in tiermarker_to_line: mor_split = tiermarker_to_line["%mor"].split() for j, item in enumerate(mor_split): match = _CLITIC_REGEX.search(item) _, morph_preclitic, morph_core, _, morph_postclitic = match.groups() if morph_preclitic: for morph in morph_preclitic.split("$"): preclitic_indices.append(len(mor_items)) mor_items.append(morph) mor_items.append(morph_core) if morph_postclitic: for morph in morph_postclitic.split("~"): postclitic_indices.append(len(mor_items)) mor_items.append(morph) if mor_items and ( (len(forms) + len(preclitic_indices) + len(postclitic_indices)) != len(mor_items) ): raise ValueError( "cannot align the utterance and %mor tiers:\n" f"Tiers --\n{tiermarker_to_line}\n" f"Cleaned-up utterance --\n{utterance_line}\n" f"Parsed %mor tier --\n{mor_items}" ) # %gra tier gra_items = ( tiermarker_to_line["%gra"].split() if "%gra" in tiermarker_to_line else [] ) if mor_items and gra_items and (len(mor_items) != len(gra_items)): raise ValueError( f"cannot align the %mor and %gra tiers:\n" f"Tiers --\n{tiermarker_to_line}\n" f"Parsed %mor tier --\n{mor_items}\n" f"parsed %gra tier --\n{gra_items}" ) # utterance tier if mor_items and (preclitic_indices or postclitic_indices): word_iterator = iter(forms) utterance_items = [""] * len(mor_items) for j in range(len(mor_items)): if j in postclitic_indices: utterance_items[j] = _POSTCLITIC elif j in preclitic_indices: utterance_items[j] = _PRECLITIC else: utterance_items[j] = next(word_iterator) else: utterance_items = forms # determine what to yield (and how) to create the generator if not mor_items: mor_items = [None] * len(utterance_items) if not gra_items: gra_items = [None] * len(utterance_items) sent: List[Token] = [] for word, mor, gra in zip(utterance_items, mor_items, gra_items): try: pos, _, mor = mor.partition("|") except AttributeError: pos, mor = None, None output_word = Token( _clean_word(word), self._preprocess_pos(pos), mor, self._get_gra(gra), ) sent.append(self._preprocess_token(output_word)) time_marks = self._get_time_marks(tiermarker_to_line[participant_code]) u = Utterance(participant_code, sent, time_marks, tiermarker_to_line) result_list.append(self._preprocess_utterance(u)) return result_list @staticmethod def _preprocess_token(t: Token): """Override this method in a child class for custom behavior.""" return t @staticmethod def _preprocess_utterance(u: Utterance): """Override this method in a child class for custom behavior.""" return u @staticmethod def _preprocess_pos(pos: str) -> str: """If POS tag preprocessing is needed, create a child class of Reader and override this method.""" return pos @staticmethod def _get_time_marks(line: str) -> Union[Tuple[int, int], None]: match = _TIMER_MARKS_REGEX.search(line) if match: time_marks = match.groups() return int(time_marks[0]), int(time_marks[1]) else: return None @staticmethod def _get_gra(raw_gra: Optional[str]) -> Union[Gra, None]: if raw_gra is None: return None try: dep, head, rel = raw_gra.strip().split("|", 2) dep = int(dep) head = int(head) return Gra(dep, head, rel) except (ValueError, TypeError): return None def _get_all_tiers(self, lines: List[str]) -> Iterable[Dict[str, str]]: index_to_tiers: Dict[int, Dict[str, str]] = {} index_ = -1 # utterance index (1st utterance is index 0) utterance = None for line in lines: if line.startswith("@"): continue line_split = line.split() if line.startswith("*"): index_ += 1 participant_code = line_split[0].lstrip("*").rstrip(":") utterance = " ".join(line_split[1:]) index_to_tiers[index_] = {participant_code: utterance} elif utterance and line.startswith("%"): tier_marker = line_split[0].rstrip(":") index_to_tiers[index_][tier_marker] = " ".join(line_split[1:]) return index_to_tiers.values() def _get_header(self, lines: List[str]) -> Dict: headname_to_entry = {} for line in lines: header_re_search = _HEADER_REGEX.search(line) if not header_re_search: continue if line.startswith("@Begin") or line.startswith("@End"): continue # find head, e.g., "Languages", "Participants", "ID" etc head = header_re_search.group(1) line = header_re_search.group(3) if head == "Participants": participants = line.split(",") for participant in participants: participant = participant.strip() code, _, participant_label = participant.partition(" ") ( participant_name, _, participant_role, ) = participant_label.partition(" ") # code = participant code, e.g. CHI, MOT if "Participants" not in headname_to_entry: headname_to_entry["Participants"] = {} headname_to_entry["Participants"][code] = {"name": participant_name} elif head == "ID": participant_info = line.split("|")[:-1] # final empty str removed code = participant_info[2] # participant_info contains these in order: # language, corpus, code, age, sex, group, SES, role, # education, custom del participant_info[2] # remove code info (3rd in list) participant_info_heads = [ "language", "corpus", "age", "sex", "group", "ses", "role", "education", "custom", ] head_to_info = dict(zip(participant_info_heads, participant_info)) if "Participants" not in headname_to_entry: headname_to_entry["Participants"] = {} if code not in headname_to_entry["Participants"]: headname_to_entry["Participants"][code] = {} headname_to_entry["Participants"][code].update(head_to_info) elif head == "Date": try: date = self._header_line_to_date(line.strip()) except (TypeError, ValueError, ParserError): continue if "Date" not in headname_to_entry: headname_to_entry["Date"] = set() headname_to_entry["Date"].add(date) elif head.startswith("Birth of"): # e.g., header is 'Birth of CHI', participant is 'CHI' _, _, participant = head.split() try: date = self._header_line_to_date(line.strip()) except (TypeError, ValueError, ParserError): continue if participant not in headname_to_entry["Participants"]: headname_to_entry["Participants"][participant] = {} headname_to_entry["Participants"][participant]["dob"] = date elif head == "Languages": languages = [] # not set; ordering indicates language dominance for language in line.strip().split(","): language = language.strip() if language: languages.append(language) headname_to_entry["Languages"] = languages else: headname_to_entry[head] = line or "" return headname_to_entry if any(headname_to_entry.values()) else {} @staticmethod def _header_line_to_date(line: str) -> datetime.date: return parse_date(line).date() @staticmethod def _get_lines(raw_str: str) -> List[str]: lines: List[str] = [] raw_str = (raw_str or "").strip() if not raw_str: return lines for line in raw_str.splitlines(): line = line.strip() if not line: continue # TODO: Why did I do this? if line.startswith("%xpho:") or line.startswith("%xmod:"): line = line.replace("%x", "%", 1) if line[0] not in _CHAT_LINE_INDICATORS: previous_line = lines.pop() line = f"{previous_line} {line}" lines.append(line) return lines
def _get_uuid() -> str: """This function goes hand-in-hand with _is_uuid() below.""" return str(uuid.uuid4()) def _is_uuid(s: str) -> bool: """This function goes hand-in-hand with _get_uuid() above.""" # Implementation from https://stackoverflow.com/a/33245493 try: uuid_obj = uuid.UUID(s, version=4) except ValueError: return False return str(uuid_obj) == s def _initialize_cached_data_dir() -> None: if os.path.isdir(_CACHED_DATA_DIR): shutil.rmtree(_CACHED_DATA_DIR) os.makedirs(_CACHED_DATA_DIR) with open(os.path.join(_CACHED_DATA_DIR, "README.txt"), "w", encoding="utf-8") as f: f.write( "The contents of this directory are automatically managed by " "the PyLangAcq library. Please do not edit anything on your own.\n" ) with open(_CACHED_DATA_JSON_PATH, "w", encoding="utf-8") as f: f.write("{}") def _write_cached_data_json(records: Dict) -> None: with open(_CACHED_DATA_JSON_PATH, "w", encoding="utf-8") as f: json.dump(records, f, indent=4)
[docs]def cached_data_info() -> Set[str]: """Return the information of the cached datasets. Returns ------- Set[str] A set of the URLs for the cached CHILDES / TalkBank datasets. """ try: existing_records = json.load(open(_CACHED_DATA_JSON_PATH, encoding="utf-8")) except FileNotFoundError: return set() else: return set(existing_records.keys())
[docs]def remove_cached_data(url: str = None) -> None: """Remove data cached on disk. Parameters ---------- url : str, optional If provided, remove only the data specified by this URL. If not provided, all cached data is removed. """ try: existing_records = json.load(open(_CACHED_DATA_JSON_PATH, encoding="utf-8")) except FileNotFoundError: _initialize_cached_data_dir() return if url is None: for subdir in [record["subdir"] for record in existing_records.values()]: shutil.rmtree(os.path.join(_CACHED_DATA_DIR, subdir)) _initialize_cached_data_dir() else: subdir = existing_records.get(url, {}).get("subdir") if subdir: del existing_records[url] _write_cached_data_json(existing_records) shutil.rmtree(os.path.join(_CACHED_DATA_DIR, subdir)) else: raise KeyError(f"url not found among the cached data: {url}")
[docs]@_params_in_docstring("match", "exclude", "encoding", "cls", class_method=False) def read_chat( path: str, match: str = None, exclude: str = None, encoding: str = _ENCODING, cls: type = Reader, ) -> "pylangacq.Reader": """Create a reader of CHAT data. If ``path`` is a remote ZIP file and you expect to call this function with the same path multiple times, consider downloading the data to the local system and then reading it from there to avoid unnecessary re-downloading. Caching a remote ZIP file isn't implemented (yet) as the upstream CHILDES / TalkBank data is updated in minor ways from time to time. Parameters ---------- path : str A path that points to one of the following: - ZIP file. Either a local ``.zip`` file path or a URL (one that begins with ``"https://"`` or ``"http://"``). Example of a URL: ``"https://childes.talkbank.org/data/Eng-NA/Brown.zip"`` - A local directory, for files under this directory recursively. - A single ``.cha`` CHAT file. Returns ------- :class:`~pylangacq.Reader` """ if cls != Reader and not issubclass(cls, Reader): raise TypeError(f"Only a Reader class or its child class is allowed: {cls}") # Just in case the user provides a CHILDES web link like # https://childes.talkbank.org/access/Eng-NA/Brown.html # instead of https://childes.talkbank.org/data/Eng-NA/Brown.zip. # The subdomain can be "childes", "phonbank", "ca", etc. # This hack is just for convenience. # Not sure if we should encourage using the .html link, # since I can't guarantee the URL format... if re.search(r"https://\S+\.talkbank\.org/access/\S+\.html", path): path = path.replace("/access/", "/data/") path = path.replace(".html", ".zip") path_lower = path.lower() if path_lower.endswith(".zip"): return cls.from_zip(path, match=match, exclude=exclude, encoding=encoding) elif os.path.isdir(path): return cls.from_dir(path, match=match, exclude=exclude, encoding=encoding) elif path_lower.endswith(_CHAT_EXTENSION): return cls.from_files([path], match=match, exclude=exclude, encoding=encoding) else: raise ValueError( "path is not one of the accepted choices of " f"{{.zip file, local directory, .cha file}}: {path}" )
def _clean_word(word): """Clean the word. Parameters ---------- word : str Returns ------- str """ new_word = ( word.replace("(", "") .replace(")", "") .replace(":", "") .replace(";", "") .replace("+", "") ) if "@" in new_word: new_word = new_word[: new_word.index("@")] if new_word.startswith("&"): new_word = new_word[1:] return new_word class _HTTPSession(requests.Session): def __init__( self, max_retries: int = 10, backoff_factor: float = 0.1, timeout: int = 10 ): super().__init__() retry = Retry(total=max_retries, backoff_factor=backoff_factor) adapter = HTTPAdapter(max_retries=retry) self.mount("http://", adapter) self.mount("https://", adapter) self.timeout = timeout def request(self, *args, **kwargs): kwargs.setdefault("timeout", self.timeout) return super(_HTTPSession, self).request(*args, **kwargs) def _download_file(url, path, session=None): if session is None: session = _HTTPSession() with open(path, "wb") as f, session.get(url, stream=True) as r: shutil.copyfileobj(r.raw, f)