Source code for tape.ensemble_frame

from packaging.version import Version
import warnings

import numpy as np
import pandas as pd

import dask
import dask.dataframe as dd
import dask.array as da

from tape.utils import IndexCallable

from dask.dataframe.utils import meta_nonempty
from dask.dataframe.dispatch import make_meta_dispatch, pyarrow_schema_dispatch
from dask.dataframe.backends import _nonempty_index, meta_nonempty_dataframe, _nonempty_series

import dask_expr as dx
from dask_expr import get_collection_type
from dask_expr._collection import new_collection, from_dict
from dask_expr._expr import _emulate, ApplyConcatApply

SOURCE_FRAME_LABEL = "source"  # Reserved label for source table
OBJECT_FRAME_LABEL = "object"  # Reserved label for object table.

__all__ = [
    "EnsembleFrame",
    "EnsembleSeries",
    "ObjectFrame",
    "SourceFrame",
    "TapeFrame",
    "TapeObjectFrame",
    "TapeSourceFrame",
    "TapeSeries",
]

from functools import partial
from dask.dataframe.io.parquet.arrow import (
    ArrowDatasetEngine as DaskArrowDatasetEngine,
)


[docs] class TapeSeries(pd.Series): """A barebones extension of a Pandas series to be used for underlying Ensemble data. See https://pandas.pydata.org/docs/development/extending.html#subclassing-pandas-data-structures """ @property
[docs] def _constructor(self): return TapeSeries
@property
[docs] def _constructor_sliced(self): return TapeSeries
[docs] class TapeFrame(pd.DataFrame): """A barebones extension of a Pandas frame to be used for underlying Ensemble data. See https://pandas.pydata.org/docs/development/extending.html#subclassing-pandas-data-structures """ @property
[docs] def _constructor(self): return TapeFrame
@property
[docs] def _constructor_expanddim(self): return TapeFrame
class TapeArrowEngine(DaskArrowDatasetEngine): """ Engine for reading parquet files into Tape and assigning the appropriate Dask meta. Based off of the approach used in dask_geopandas.io """ @classmethod def _creates_meta(cls, meta, schema): """ Converts the meta to a TapeFrame. """ return TapeFrame(meta) @classmethod def _create_dd_meta(cls, dataset_info, use_nullable_dtypes=False): """Overriding private method for dask >= 2021.10.0""" meta = super()._create_dd_meta(dataset_info) schema = dataset_info["schema"] if not schema.names and not schema.metadata: if len(list(dataset_info["ds"].get_fragments())) == 0: raise ValueError( "No dataset parts discovered. Use dask.dataframe.read_parquet " "to read it as an empty DataFrame" ) meta = cls._creates_meta(meta, schema) return meta class TapeSourceArrowEngine(TapeArrowEngine): """ Barebones subclass of TapeArrowEngine for assigning the meta when loading from a parquet file of source data. """ @classmethod def _creates_meta(cls, meta, schema): """ Convert meta to a TapeSourceFrame """ return TapeSourceFrame(meta) class TapeObjectArrowEngine(TapeArrowEngine): """ Barebones subclass of TapeArrowEngine for assigning the meta when loading from a parquet file of object data. """ @classmethod def _creates_meta(cls, meta, schema): """ Convert meta to a TapeObjectFrame """ return TapeObjectFrame(meta) class _Frame(dx.FrameBase): """Base class for extensions of Dask Dataframes that track additional Ensemble-related metadata. """ _partition_type = TapeFrame def __init__(self, expr, label=None, ensemble=None): # We define relevant object fields before super().__init__ since that call may lead to a # map_partitions call which will assume these fields exist. self.label = label # A label used by the Ensemble to identify this frame. self.ensemble = ensemble # The Ensemble object containing this frame. self.dirty = False # True if the underlying data is out of sync with the Ensemble super().__init__(expr) def is_dirty(self): return self.dirty def set_dirty(self, dirty): self.dirty = dirty @property def _args(self): # Ensure our Dask extension can correctly be used by pickle. # See https://github.com/geopandas/dask-geopandas/issues/237 return super()._args + (self.label, self.ensemble) @property def partitions(self): """Slice dataframe by partitions This allows partitionwise slicing of a TAPE EnsembleFrame. You can perform normal Numpy-style slicing, but now rather than slice elements of the array you slice along partitions so, for example, ``df.partitions[:5]`` produces a new Dask Dataframe of the first five partitions. Valid indexers are integers, sequences of integers, slices, or boolean masks. Examples -------- >>> df.partitions[0] # doctest: +SKIP >>> df.partitions[:3] # doctest: +SKIP >>> df.partitions[::10] # doctest: +SKIP Returns ------- A TAPE EnsembleFrame Object """ self.set_dirty(True) return IndexCallable(self._partitions, self.is_dirty(), self.ensemble, self.label) def optimize(self, fuse: bool = True): result = new_collection(self.expr.optimize(fuse=fuse)) return result def __dask_postpersist__(self): func, args = super().__dask_postpersist__() return self._rebuild, (func, args) def _rebuild(self, graph, func, args): collection = func(graph, *args) return collection def _propagate_metadata(self, new_frame): """Propagates any relevant metadata to a new frame. Parameters ---------- new_frame: `_Frame` A frame to propage metadata to Returns ---------- new_frame: `_Frame` The modifed frame """ new_frame.label = self.label new_frame.ensemble = self.ensemble new_frame.set_dirty(self.is_dirty()) return new_frame def copy(self): self_copy = super().copy() return self._propagate_metadata(self_copy) def assign(self, **kwargs): """Assign new columns to a DataFrame. This docstring was copied from dask.dataframe.DataFrame.assign. Some inconsistencies with the Dask version may exist. Returns a new object with all original columns in addition to new ones. Existing columns that are re-assigned will be overwritten. Parameters ---------- **kwargs: `dict` The column names are keywords. If the values are callable, they are computed on the DataFrame and assigned to the new columns. The callable must not change input DataFrame (though pandas doesn't check it). If the values are not callable, (e.g. a Series, scalar, or array), they are simply assigned. Returns ---------- result: `tape._Frame` The modifed frame """ result = self._propagate_metadata(super().assign(**kwargs)) result.set_dirty(True) return result def query(self, expr, **kwargs): """Filter dataframe with complex expression Doc string below derived from dask.dataframe.core Blocked version of pd.DataFrame.query Parameters ---------- expr: str The query string to evaluate. You can refer to column names that are not valid Python variable names by surrounding them in backticks. Dask does not fully support referring to variables using the '@' character, use f-strings or the ``local_dict`` keyword argument instead. **kwargs: `dict` See the documentation for eval() for complete details on the keyword arguments accepted by pandas.DataFrame.query(). Returns ---------- result: `tape._Frame` The modifed frame Notes ----- This is like the sequential version except that this will also happen in many threads. This may conflict with ``numexpr`` which will use multiple threads itself. We recommend that you set ``numexpr`` to use a single thread: .. code-block:: python import numexpr numexpr.set_num_threads(1) """ result = self._propagate_metadata(super().query(expr, **kwargs)) result.set_dirty(True) return result def sample(self, **kwargs): """Random sample of items from a Dataframe. Doc string below derived from dask.dataframe.core Parameters ---------- frac: float, optional Approximate fraction of objects to return. This sampling fraction is applied to all partitions equally. Note that this is an approximate fraction. You should not expect exactly len(df) * frac items to be returned, as the exact number of elements selected will depend on how your data is partitioned (but should be pretty close in practice). replace: boolean, optional Sample with or without replacement. Default = False. random_state: int or np.random.RandomState If an int, we create a new RandomState with this as the seed; Otherwise we draw from the passed RandomState. Returns ---------- result: `tape._Frame` The modifed frame """ result = self._propagate_metadata(super().sample(**kwargs)) result.set_dirty(True) return result def merge(self, right, **kwargs): """Merge the Dataframe with another DataFrame Doc string below derived from dask.dataframe.core This will merge the two datasets, either on the indices, a certain column in each dataset or the index in one dataset and the column in another. Parameters ---------- right: dask.dataframe.DataFrame how : {'left', 'right', 'outer', 'inner'}, default: 'inner' How to handle the operation of the two objects: - left: use calling frame's index (or column if on is specified) - right: use other frame's index - outer: form union of calling frame's index (or column if on is specified) with other frame's index, and sort it lexicographically - inner: form intersection of calling frame's index (or column if on is specified) with other frame's index, preserving the order of the calling's one on : label or list Column or index level names to join on. These must be found in both DataFrames. If on is None and not merging on indexes then this defaults to the intersection of the columns in both DataFrames. left_on : label or list, or array-like Column to join on in the left DataFrame. Other than in pandas arrays and lists are only support if their length is 1. right_on : label or list, or array-like Column to join on in the right DataFrame. Other than in pandas arrays and lists are only support if their length is 1. left_index : boolean, default False Use the index from the left DataFrame as the join key. right_index : boolean, default False Use the index from the right DataFrame as the join key. suffixes : 2-length sequence (tuple, list, ...) Suffix to apply to overlapping column names in the left and right side, respectively indicator : boolean or string, default False If True, adds a column to output DataFrame called "_merge" with information on the source of each row. If string, column with information on source of each row will be added to output DataFrame, and column will be named value of string. Information column is Categorical-type and takes on a value of "left_only" for observations whose merge key only appears in `left` DataFrame, "right_only" for observations whose merge key only appears in `right` DataFrame, and "both" if the observation's merge key is found in both. npartitions: int or None, optional The ideal number of output partitions. This is only utilised when performing a hash_join (merging on columns only). If ``None`` then ``npartitions = max(lhs.npartitions, rhs.npartitions)``. Default is ``None``. shuffle: {'disk', 'tasks', 'p2p'}, optional Either ``'disk'`` for single-node operation or ``'tasks'`` and ``'p2p'``` for distributed operation. Will be inferred by your current scheduler. broadcast: boolean or float, optional Whether to use a broadcast-based join in lieu of a shuffle-based join for supported cases. By default, a simple heuristic will be used to select the underlying algorithm. If a floating-point value is specified, that number will be used as the ``broadcast_bias`` within the simple heuristic (a large number makes Dask more likely to choose the ``broacast_join`` code path). See ``broadcast_join`` for more information. Notes ----- There are three ways to join dataframes: 1. Joining on indices. In this case the divisions are aligned using the function ``dask.dataframe.multi.align_partitions``. Afterwards, each partition is merged with the pandas merge function. 2. Joining one on index and one on column. In this case the divisions of dataframe merged by index (:math:`d_i`) are used to divide the column merged dataframe (:math:`d_c`) one using ``dask.dataframe.multi.rearrange_by_divisions``. In this case the merged dataframe (:math:`d_m`) has the exact same divisions as (:math:`d_i`). This can lead to issues if you merge multiple rows from (:math:`d_c`) to one row in (:math:`d_i`). 3. Joining both on columns. In this case a hash join is performed using ``dask.dataframe.multi.hash_join``. In some cases, you may see a ``MemoryError`` if the ``merge`` operation requires an internal ``shuffle``, because shuffling places all rows that have the same index in the same partition. To avoid this error, make sure all rows with the same ``on``-column value can fit on a single partition. """ result = super().merge(right, **kwargs) return self._propagate_metadata(result) def join(self, other, **kwargs): """Join columns of another DataFrame. Note that if `other` is a different type, we expect the result to have the type of this object regardless of the value of the`how` parameter. This docstring was copied from pandas.core.frame.DataFrame.join. Some inconsistencies with this version may exist. Join columns with `other` DataFrame either on index or on a key column. Efficiently join multiple DataFrame objects by index at once by passing a list. Parameters ---------- other : DataFrame, Series, or a list containing any combination of them Index should be similar to one of the columns in this one. If a Series is passed, its name attribute must be set, and that will be used as the column name in the resulting joined DataFrame. on : str, list of str, or array-like, optional Column or index level name(s) in the caller to join on the index in `other`, otherwise joins index-on-index. If multiple values given, the `other` DataFrame must have a MultiIndex. Can pass an array as the join key if it is not already contained in the calling DataFrame. Like an Excel VLOOKUP operation. how : {'left', 'right', 'outer', 'inner', 'cross'}, default 'left' How to handle the operation of the two objects. * left: use calling frame's index (or column if on is specified) * right: use `other`'s index. * outer: form union of calling frame's index (or column if on is specified) with `other`'s index, and sort it lexicographically. * inner: form intersection of calling frame's index (or column if on is specified) with `other`'s index, preserving the order of the calling's one. * cross: creates the cartesian product from both frames, preserves the order of the left keys. lsuffix : str, default '' Suffix to use from left frame's overlapping columns. rsuffix : str, default '' Suffix to use from right frame's overlapping columns. sort : bool, default False Order result DataFrame lexicographically by the join key. If False, the order of the join key depends on the join type (how keyword). validate : str, optional If specified, checks if join is of specified type. * "one_to_one" or "1:1": check if join keys are unique in both left and right datasets. * "one_to_many" or "1:m": check if join keys are unique in left dataset. * "many_to_one" or "m:1": check if join keys are unique in right dataset. * "many_to_many" or "m:m": allowed, but does not result in checks. Returns ------- result: `tape._Frame` A TAPE dataframe containing columns from both the caller and `other`. """ result = super().join(other, **kwargs) return self._propagate_metadata(result) def drop(self, labels=None, axis=0, columns=None, errors="raise"): """Drop specified labels from rows or columns. Doc string below derived from dask.dataframe.core Remove rows or columns by specifying label names and corresponding axis, or by directly specifying index or column names. When using a multi-index, labels on different levels can be removed by specifying the level. See the :ref:`user guide <advanced.shown_levels>` for more information about the now unused levels. Parameters ---------- labels : single label or list-like Index or column labels to drop. A tuple will be used as a single label and not treated as a list-like. axis : {0 or 'index', 1 or 'columns'}, default 0 Whether to drop labels from the index (0 or 'index') or columns (1 or 'columns'). is equivalent to ``index=labels``. columns : single label or list-like Alternative to specifying axis (``labels, axis=1`` is equivalent to ``columns=labels``). errors : {'ignore', 'raise'}, default 'raise' If 'ignore', suppress error and only existing labels are dropped. Returns ------- result: `tape._Frame` Returns the frame or None with the specified index or column labels removed or None if inplace=True. """ result = self._propagate_metadata( super().drop(labels=labels, axis=axis, columns=columns, errors=errors) ) result.set_dirty(True) return result def dropna(self, **kwargs): """ Remove missing values. Doc string below derived from dask.dataframe.core Parameters ---------- how : {'any', 'all'}, default 'any' Determine if row or column is removed from DataFrame, when we have at least one NA or all NA. * 'any' : If any NA values are present, drop that row or column. * 'all' : If all values are NA, drop that row or column. thresh : int, optional Require that many non-NA values. Cannot be combined with how. subset : column label or sequence of labels, optional Labels along other axis to consider, e.g. if you are dropping rows these would be a list of columns to include. Returns ---------- result: `tape._Frame` The modifed frame with NA entries dropped from it or None if ``inplace=True``. """ result = self._propagate_metadata(super().dropna(**kwargs)) result.set_dirty(True) return result def persist(self, **kwargs): """Persist this dask collection into memory Doc string below derived from dask.base This turns a lazy Dask collection into a Dask collection with the same metadata, but now with the results fully computed or actively computing in the background. The action of function differs significantly depending on the active task scheduler. If the task scheduler supports asynchronous computing, such as is the case of the dask.distributed scheduler, then persist will return *immediately* and the return value's task graph will contain Dask Future objects. However if the task scheduler only supports blocking computation then the call to persist will *block* and the return value's task graph will contain concrete Python results. This function is particularly useful when using distributed systems, because the results will be kept in distributed memory, rather than returned to the local process as with compute. Parameters ---------- **kwargs Extra keywords to forward to the scheduler function. Returns ------- result: `tape._Frame` The modifed frame backed by in-memory data """ result = super().persist(**kwargs) return self._propagate_metadata(result) def set_index( self, other, drop=True, sorted=False, npartitions=None, divisions=None, sort=True, **kwargs, ): """Set the DataFrame index (row labels) using an existing column. Doc string below derived from dask.dataframe.core If ``sort=False``, this function operates exactly like ``pandas.set_index`` and sets the index on the DataFrame. If ``sort=True`` (default), this function also sorts the DataFrame by the new index. This can have a significant impact on performance, because joins, groupbys, lookups, etc. are all much faster on that column. However, this performance increase comes with a cost, sorting a parallel dataset requires expensive shuffles. Often we ``set_index`` once directly after data ingest and filtering and then perform many cheap computations off of the sorted dataset. With ``sort=True``, this function is much more expensive. Under normal operation this function does an initial pass over the index column to compute approximate quantiles to serve as future divisions. It then passes over the data a second time, splitting up each input partition into several pieces and sharing those pieces to all of the output partitions now in sorted order. In some cases we can alleviate those costs, for example if your dataset is sorted already then we can avoid making many small pieces or if you know good values to split the new index column then we can avoid the initial pass over the data. For example if your new index is a datetime index and your data is already sorted by day then this entire operation can be done for free. You can control these options with the following parameters. Parameters ---------- other: string or Dask Series Column to use as index. drop: boolean, default True Delete column to be used as the new index. sorted: bool, optional If the index column is already sorted in increasing order. Defaults to False npartitions: int, None, or 'auto' The ideal number of output partitions. If None, use the same as the input. If 'auto' then decide by memory use. Only used when ``divisions`` is not given. If ``divisions`` is given, the number of output partitions will be ``len(divisions) - 1``. divisions: list, optional The "dividing lines" used to split the new index into partitions. For ``divisions=[0, 10, 50, 100]``, there would be three output partitions, where the new index contained [0, 10), [10, 50), and [50, 100), respectively. See https://docs.dask.org/en/latest/dataframe-design.html#partitions. If not given (default), good divisions are calculated by immediately computing the data and looking at the distribution of its values. For large datasets, this can be expensive. Note that if ``sorted=True``, specified divisions are assumed to match the existing partitions in the data; if this is untrue you should leave divisions empty and call ``repartition`` after ``set_index``. sort: bool, optional If ``True``, sort the DataFrame by the new index. Otherwise set the index on the individual existing partitions. Defaults to ``True``. shuffle: {'disk', 'tasks', 'p2p'}, optional Either ``'disk'`` for single-node operation or ``'tasks'`` and ``'p2p'`` for distributed operation. Will be inferred by your current scheduler. compute: bool, default False Whether or not to trigger an immediate computation. Defaults to False. Note, that even if you set ``compute=False``, an immediate computation will still be triggered if ``divisions`` is ``None``. partition_size: int, optional Desired size of each partitions in bytes. Only used when ``npartitions='auto'`` Returns ---------- result: `tape._Frame` The indexed frame """ result = super().set_index(other, drop, sorted, npartitions, divisions, sort, **kwargs) return self._propagate_metadata(result) def map_partitions(self, func, *args, **kwargs): """Apply Python function on each DataFrame partition. Doc string below derived from dask.dataframe.core If ``sort=False``, this function operates exactly like ``pandas.set_index`` and sets the index on the DataFrame. If ``sort=True`` (default), this function also sorts the DataFrame by the new index. This can have a significant impact on performance, because joins, groupbys, lookups, etc. are all much faster on that column. However, this performance increase comes with a cost, sorting a parallel dataset requires expensive shuffles. Often we ``set_index`` once directly after data ingest and filtering and then perform many cheap computations off of the sorted dataset. With ``sort=True``, this function is much more expensive. Under normal operation this function does an initial pass over the index column to compute approximate quantiles to serve as future divisions. It then passes over the data a second time, splitting up each input partition into several pieces and sharing those pieces to all of the output partitions now in sorted order. In some cases we can alleviate those costs, for example if your dataset is sorted already then we can avoid making many small pieces or if you know good values to split the new index column then we can avoid the initial pass over the data. For example if your new index is a datetime index and your data is already sorted by day then this entire operation can be done for free. You can control these options with the following parameters. Parameters ---------- other: string or Dask Series Column to use as index. drop: boolean, default True Delete column to be used as the new index. sorted: bool, optional If the index column is already sorted in increasing order. Defaults to False npartitions: int, None, or 'auto' The ideal number of output partitions. If None, use the same as the input. If 'auto' then decide by memory use. Only used when ``divisions`` is not given. If ``divisions`` is given, the number of output partitions will be ``len(divisions) - 1``. divisions: list, optional The "dividing lines" used to split the new index into partitions. For ``divisions=[0, 10, 50, 100]``, there would be three output partitions, where the new index contained [0, 10), [10, 50), and [50, 100), respectively. See https://docs.dask.org/en/latest/dataframe-design.html#partitions. If not given (default), good divisions are calculated by immediately computing the data and looking at the distribution of its values. For large datasets, this can be expensive. Note that if ``sorted=True``, specified divisions are assumed to match the existing partitions in the data; if this is untrue you should leave divisions empty and call ``repartition`` after ``set_index``. inplace: bool, optional Modifying the DataFrame in place is not supported by Dask. Defaults to False. sort: bool, optional If ``True``, sort the DataFrame by the new index. Otherwise set the index on the individual existing partitions. Defaults to ``True``. shuffle: {'disk', 'tasks', 'p2p'}, optional Either ``'disk'`` for single-node operation or ``'tasks'`` and ``'p2p'`` for distributed operation. Will be inferred by your current scheduler. compute: bool, default False Whether or not to trigger an immediate computation. Defaults to False. Note, that even if you set ``compute=False``, an immediate computation will still be triggered if ``divisions`` is ``None``. partition_size: int, optional Desired size of each partitions in bytes. Only used when ``npartitions='auto'`` """ result = super().map_partitions(func, *args, **kwargs) if isinstance(result, self.__class__): # If the output of func is another _Frame, let's propagate any metadata. return self._propagate_metadata(result) elif isinstance(result, ObjectFrame): result = self._propagate_metadata(result) result.label = OBJECT_FRAME_LABEL # override the label return result elif isinstance(result, SourceFrame): return self._propagate_metadata(result) return result def compute(self, **kwargs): """Compute this Dask collection, returning the underlying dataframe or series. If tracked by an `Ensemble`, the `Ensemble` is informed of this operation and is given the opportunity to sync any of its tables prior to this Dask collection being computed. Doc string below derived from dask.dataframe.DataFrame.compute This turns a lazy Dask collection into its in-memory equivalent. For example a Dask array turns into a NumPy array and a Dask dataframe turns into a Pandas dataframe. The entire dataset must fit into memory before calling this operation. Parameters ---------- scheduler: `string`, optional Which scheduler to use like “threads”, “synchronous” or “processes”. If not provided, the default is to check the global settings first, and then fall back to the collection defaults. optimize_graph: `bool`, optional If True [default], the graph is optimized before computation. Otherwise the graph is run as is. This can be useful for debugging. **kwargs: `dict`, optional Extra keywords to forward to the scheduler function. """ if self.ensemble is not None: self.ensemble._lazy_sync_tables_from_frame(self) return super().compute(**kwargs) def repartition( self, divisions=None, npartitions=None, partition_size=None, freq=None, force=False, ): """Repartition dataframe along new divisions Doc string below derived from dask.dataframe.DataFrame Parameters ---------- divisions : list, optional The "dividing lines" used to split the dataframe into partitions. For ``divisions=[0, 10, 50, 100]``, there would be three output partitions, where the new index contained [0, 10), [10, 50), and [50, 100), respectively. See https://docs.dask.org/en/latest/dataframe-design.html#partitions. Only used if npartitions and partition_size isn't specified. For convenience if given an integer this will defer to npartitions and if given a string it will defer to partition_size (see below) npartitions : int, optional Approximate number of partitions of output. Only used if partition_size isn't specified. The number of partitions used may be slightly lower than npartitions depending on data distribution, but will never be higher. partition_size: int or string, optional Max number of bytes of memory for each partition. Use numbers or strings like 5MB. If specified npartitions and divisions will be ignored. Note that the size reflects the number of bytes used as computed by ``pandas.DataFrame.memory_usage``, which will not necessarily match the size when storing to disk. .. warning:: This keyword argument triggers computation to determine the memory size of each partition, which may be expensive. freq : str, pd.Timedelta A period on which to partition timeseries data like ``'7D'`` or ``'12h'`` or ``pd.Timedelta(hours=12)``. Assumes a datetime index. force : bool, default False Allows the expansion of the existing divisions. If False then the new divisions' lower and upper bounds must be the same as the old divisions'. Notes ----- Exactly one of `divisions`, `npartitions`, `partition_size`, or `freq` should be specified. A ``ValueError`` will be raised when that is not the case. Also note that ``len(divisons)`` is equal to ``npartitions + 1``. This is because ``divisions`` represents the upper and lower bounds of each partition. The first item is the lower bound of the first partition, the second item is the lower bound of the second partition and the upper bound of the first partition, and so on. The second-to-last item is the lower bound of the last partition, and the last (extra) item is the upper bound of the last partition. Examples -------- >>> df = df.repartition(npartitions=10) # doctest: +SKIP >>> df = df.repartition(divisions=[0, 5, 10, 20]) # doctest: +SKIP >>> df = df.repartition(freq='7d') # doctest: +SKIP """ result = super().repartition( divisions=divisions, npartitions=npartitions, partition_size=partition_size, freq=freq, force=force, ) return self._propagate_metadata(result) def head(self, n=5, compute=True, npartitions=None): """Returns `n` rows of data for previewing purposes. Parameters ---------- n : int, optional The number of desired rows. Default is 5. compute : bool, optional Whether to compute the result immediately. Default is True. npartitions : int, optional `npartitions` is not supported and if provided will be ignored. Instead all partitions may be used. Returns: A pandas DataFrame with up to `n` rows of data. """ if npartitions is not None: warnings.warn( "The 'npartitions' parameter is not supported for TAPE dataframes. All partitions may be used." ) if not compute: # Just use the Dask head method return super().head(n, compute=False) if n <= 0: return super().head(0) # Iterate over the partitions until we have enough rows dfs = [] remaining_rows = n for partition in self.partitions: if remaining_rows == 0: break # Note that partition is itself a _Frame object, so we need to compute to avoid infinite recursion partition_head = partition.compute().head(remaining_rows) dfs.append(partition_head) remaining_rows -= len(partition_head) return pd.concat(dfs)
[docs] class EnsembleSeries(_Frame, dd.Series): """A barebones extension of a Dask Series for Ensemble data."""
[docs] _partition_type = TapeSeries # Tracks the underlying data type
[docs] class EnsembleFrame( _Frame, dd.DataFrame ): # can use dd.DataFrame instead of dx.DataFrame if the config is set true (default in >=2024.3.0) """An extension for a Dask Dataframe for data used by a lightcurve Ensemble. The underlying non-parallel dataframes are TapeFrames and TapeSeries which extend Pandas frames. Examples ---------- Instatiation:: import tape ens = tape.Ensemble() data = {...} # Some data you want tracked by the Ensemble ensemble_frame = tape.EnsembleFrame.from_dict(data, label="my_frame", ensemble=ens) """
[docs] _partition_type = TapeFrame # Tracks the underlying data type
[docs] def __getitem__(self, key): result = super().__getitem__(key) if isinstance(result, _Frame): # Ensures that any _Frame metadata is propagated. result = self._propagate_metadata(result) return result
@classmethod
[docs] def from_tapeframe(cls, data, npartitions=None, chunksize=None, sort=True, label=None, ensemble=None): """Returns an EnsembleFrame constructed from a TapeFrame. Parameters ---------- data: `TapeFrame` Frame containing the underlying data fro the EnsembleFram npartitions: `int`, optional The number of partitions of the index to create. Note that depending on the size and index of the dataframe, the output may have fewer partitions than requested. chunksize: `int`, optional Size of the individual chunks of data in non-parallel objects that make up Dask frames. sort: `bool`, optional Whether to sort the frame by a default index. label: `str`, optional The label used to by the Ensemble to identify the frame. ensemble: `tape.Ensemble`, optional A link to the Ensemble object that owns this frame. Returns ---------- result: `tape.EnsembleFrame` The constructed EnsembleFrame object. """ result = dd.from_pandas(data, npartitions=npartitions, chunksize=chunksize, sort=sort) result.label = label result.ensemble = ensemble return result
@classmethod
[docs] def from_dask_dataframe(cl, df, ensemble=None, label=None): """Returns an EnsembleFrame constructed from a Dask dataframe. Parameters ---------- df: `dask.dataframe.DataFrame` or `list` a Dask dataframe to convert to an EnsembleFrame ensemble: `tape.ensemble.Ensemble`, optional A link to the Ensemble object that owns this frame. label: `str`, optional The label used to by the Ensemble to identify the frame. Returns ---------- result: `tape.EnsembleFrame` The constructed EnsembleFrame object. """ # Create a EnsembleFrame by mapping the partitions to the appropriate meta, TapeFrame # TODO(wbeebe@uw.edu): Determine if there is a better method result = df.map_partitions(TapeFrame) result.ensemble = ensemble result.label = label return result
[docs] def update_ensemble(self): """Updates the Ensemble linked by the `EnsembelFrame.ensemble` property to track this frame. Returns ---------- result: `tape.Ensemble` The Ensemble object which tracks this frame, `None` if no such Ensemble. """ if self.ensemble is None: return None # Update the Ensemble to track this frame and return the ensemble. return self.ensemble.update_frame(self)
@classmethod
[docs] def from_dict( cls, data, npartitions, orient="columns", dtype=None, columns=None, label=None, ensemble=None ): """ Construct a Tape EnsembleFrame from a Python Dictionary Parameters ---------- data : dict Of the form {field : array-like} or {field : dict}. npartitions : int The number of partitions of the index to create. Note that depending on the size and index of the dataframe, the output may have fewer partitions than requested. orient : {'columns', 'index', 'tight'}, default 'columns' The "orientation" of the data. If the keys of the passed dict should be the columns of the resulting DataFrame, pass 'columns' (default). Otherwise if the keys should be rows, pass 'index'. If 'tight', assume a dict with keys ['index', 'columns', 'data', 'index_names', 'column_names']. dtype: bool Data type to force, otherwise infer. columns: string, optional Column labels to use when ``orient='index'``. Raises a ValueError if used with ``orient='columns'`` or ``orient='tight'``. label: `str`, optional The label used to by the Ensemble to identify the frame. ensemble: `tape.ensemble.Ensemble`, optional A link to the Ensemble object that owns this frame. Returns ---------- result: `tape.EnsembleFrame` The constructed EnsembleFrame object. """ result = from_dict( data, npartitions=npartitions, orient=orient, dtype=dtype, columns=columns, constructor=cls._partition_type, ) result.label = label result.ensemble = ensemble return result
@classmethod
[docs] def from_parquet(cl, path, index=None, columns=None, label=None, ensemble=None, **kwargs): """Returns an EnsembleFrame constructed from loading a parquet file. Parameters ---------- path: `str` or `list` Source directory for data, or path(s) to individual parquet files. Prefix with a protocol like s3:// to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol. index: `str`, `list`, `False`, optional Field name(s) to use as the output frame index. Default is None and index will be inferred from the pandas parquet file metadata, if present. Use False to read all fields as columns. columns: `str` or `list`, optional Field name(s) to read in as columns in the output. By default all non-index fields will be read (as determined by the pandas parquet metadata, if present). Provide a single field name instead of a list to read in the data as a Series. label: `str`, optional The label used to by the Ensemble to identify the frame. ensemble: `tape.ensemble.Ensemble`, optional A link to the Ensemble object that owns this frame. Returns ---------- result: `tape.EnsembleFrame` The constructed EnsembleFrame object. """ # Read the parquet file with an engine that will assume the meta is a TapeFrame which Dask will # instantiate as EnsembleFrame via its dispatcher. result = dd.read_parquet( path, index=index, columns=columns, split_row_groups=True, engine=TapeArrowEngine, **kwargs ) result.label = label result.ensemble = ensemble return result
[docs] def convert_flux_to_mag( self, flux_col, zero_point, err_col=None, zp_form="mag", out_col_name=None, ): """Converts this EnsembleFrame's flux column into a magnitude column, returning a new EnsembleFrame. Parameters ---------- flux_col: 'str' The name of the EnsembleFrame flux column to convert into magnitudes. zero_point: 'str' The name of the EnsembleFrame column containing the zero point information for column transformation. err_col: 'str', optional The name of the EnsembleFrame column containing the errors to propagate. Errors are propagated using the following approximation: Err= (2.5/log(10))*(flux_error/flux), which holds mainly when the error in flux is much smaller than the flux. zp_form: `str`, optional The form of the zero point column, either "flux" or "magnitude"/"mag". Determines how the zero point (zp) is applied in the conversion. If "flux", then the function is applied as mag=-2.5*log10(flux/zp), or if "magnitude", then mag=-2.5*log10(flux)+zp. out_col_name: 'str', optional The name of the output magnitude column, if None then the output is just the flux column name + "_mag". The error column is also generated as the out_col_name + "_err". Returns ---------- result: `tape.EnsembleFrame` A new EnsembleFrame object with a new magnitude (and error) column. """ if out_col_name is None: out_col_name = flux_col + "_mag" result = None if zp_form == "flux": # mag = -2.5*np.log10(flux/zp) result = self.assign(**{out_col_name: lambda x: -2.5 * np.log10(x[flux_col] / x[zero_point])}) elif zp_form == "magnitude" or zp_form == "mag": # mag = -2.5*np.log10(flux) + zp result = self.assign(**{out_col_name: lambda x: -2.5 * np.log10(x[flux_col]) + x[zero_point]}) else: raise ValueError(f"{zp_form} is not a valid zero_point format.") # Calculate Errors if err_col is not None: result = result.assign( **{out_col_name + "_err": lambda x: (2.5 / np.log(10)) * (x[err_col] / x[flux_col])} ) return result
[docs] def coalesce(self, input_cols, output_col, drop_inputs=False): """Combines multiple input columns into a single output column, with values equal to the first non-nan value encountered in the input cols. Parameters ---------- input_cols: `list` The list of column names to coalesce into a single column. output_col: `str`, optional The name of the coalesced output column. drop_inputs: `bool`, optional Determines whether the input columns are dropped or preserved. If a mapped column is an input and dropped, the output column is automatically assigned to replace that column mapping internally. Returns ------- ensemble: `tape.ensemble.Ensemble` An ensemble object. """ def coalesce_partition(df, input_cols, output_col): """Coalescing function for a single partition (pandas dataframe)""" # Create a subset dataframe per input column # Rename column to output to allow combination input_dfs = [] for col in input_cols: col_df = df[[col]] input_dfs.append(col_df.rename(columns={col: output_col})) # Combine each dataframe coal_df = input_dfs.pop() while input_dfs: coal_df = coal_df.combine_first(input_dfs.pop()) # Assign the output column to the partition dataframe out_df = df.assign(**{output_col: coal_df[output_col]}) return out_df table_ddf = self.map_partitions(lambda x: coalesce_partition(x, input_cols, output_col)) # Drop the input columns if wanted if drop_inputs: if self.ensemble is not None: # First check to see if any dropped columns were critical columns current_map = self.ensemble.make_column_map().map cols_to_update = [key for key in current_map if current_map[key] in input_cols] # Theoretically a user could assign multiple critical columns in the input cols, this is very # likely to be a mistake, so we throw a warning here to alert them. if len(cols_to_update) > 1: warnings.warn( """Warning: Coalesce (with column dropping) is needing to update more than one critical column mapping, please check that the resulting mapping is set as intended""" ) # Update critical columns to the new output column as needed if len(cols_to_update): # if not zero new_map = current_map for col in cols_to_update: new_map[col] = output_col new_colmap = self.ensemble.make_column_map() new_colmap.map = new_map # Update the mapping self.ensemble.update_column_mapping(new_colmap) table_ddf = table_ddf.drop(columns=input_cols) return table_ddf
[docs] class TapeSourceFrame(TapeFrame): """A barebones extension of a Pandas frame to be used for underlying Ensemble source data See https://pandas.pydata.org/docs/development/extending.html#subclassing-pandas-data-structures """ @property
[docs] def _constructor(self): return TapeSourceFrame
@property
[docs] def _constructor_expanddim(self): return TapeSourceFrame
[docs] class TapeObjectFrame(TapeFrame): """A barebones extension of a Pandas frame to be used for underlying Ensemble object data. See https://pandas.pydata.org/docs/development/extending.html#subclassing-pandas-data-structures """ @property
[docs] def _constructor(self): return TapeObjectFrame
@property
[docs] def _constructor_expanddim(self): return TapeObjectFrame
[docs] class SourceFrame(EnsembleFrame): """A subclass of EnsembleFrame for Source data."""
[docs] _partition_type = TapeSourceFrame # Tracks the underlying data type
def __init__(self, expr, ensemble=None): # We define relevant object fields before super().__init__ since that call may lead to a # map_partitions call which will assume these fields exist. self.label = SOURCE_FRAME_LABEL # A label used by the Ensemble to identify this frame. self.ensemble = ensemble # The Ensemble object containing this frame. self.dirty = False # True if the underlying data is out of sync with the Ensemble super().__init__(expr)
[docs] def __getitem__(self, key): result = super().__getitem__(key) if isinstance(result, _Frame): # Ensures that we have any metadata result = self._propagate_metadata(result) return result
@classmethod
[docs] def from_parquet( cl, path, index=None, columns=None, ensemble=None, ): """Returns a SourceFrame constructed from loading a parquet file.""" result = dd.read_parquet( path, index=index, columns=columns, split_row_groups=True, engine=TapeSourceArrowEngine, ) result.ensemble = ensemble result.label = SOURCE_FRAME_LABEL return result
@classmethod
[docs] def from_dask_dataframe(cl, df, ensemble=None): """Returns a SourceFrame constructed from a Dask dataframe. Parameters ---------- df: `dask.dataframe.DataFrame` or `list` a Dask dataframe to convert to a SourceFrame ensemble: `tape.ensemble.Ensemble`, optional A link to the Ensemble object that owns this frame. Returns ---------- result: `tape.SourceFrame` The constructed SourceFrame object. """ # Create a SourceFrame by mapping the partitions to the appropriate meta, TapeSourceFrame # TODO(wbeebe@uw.edu): Determine if there is a better method result = df.map_partitions(TapeSourceFrame) result.ensemble = ensemble result.label = SOURCE_FRAME_LABEL return result
[docs] class ObjectFrame(EnsembleFrame): """A subclass of EnsembleFrame for Object data."""
[docs] _partition_type = TapeObjectFrame # Tracks the underlying data type
def __init__(self, expr, ensemble=None): # We define relevant object fields before super().__init__ since that call may lead to a # map_partitions call which will assume these fields exist. self.label = OBJECT_FRAME_LABEL # A label used by the Ensemble to identify this frame. self.ensemble = ensemble # The Ensemble object containing this frame. self.dirty = False # True if the underlying data is out of sync with the Ensemble super().__init__(expr) @classmethod
[docs] def from_parquet( cl, path, index=None, columns=None, ensemble=None, ): """Returns an ObjectFrame constructed from loading a parquet file.""" # Read in the object Parquet file result = dd.read_parquet( path, index=index, columns=columns, split_row_groups=True, engine=TapeObjectArrowEngine, ) result.ensemble = ensemble result.label = OBJECT_FRAME_LABEL return result
@classmethod
[docs] def from_dask_dataframe(cl, df, ensemble=None): """Returns an ObjectFrame constructed from a Dask dataframe. Parameters ---------- df: `dask.dataframe.DataFrame` or `list` a Dask dataframe to convert to an ObjectFrame ensemble: `tape.ensemble.Ensemble`, optional A link to the Ensemble object that owns this frame. Returns ---------- result: `tape.ObjectFrame` The constructed ObjectFrame object. """ # Create an ObjectFrame by mapping the partitions to the appropriate meta, TapeObjectFrame # TODO(wbeebe@uw.edu): Determine if there is a better method result = df.map_partitions(TapeObjectFrame) result.ensemble = ensemble result.label = OBJECT_FRAME_LABEL return result
# Dask Dataframes are constructed indirectly using method dispatching and inference on the # underlying data. So to ensure our subclasses behave correctly, we register the methods # below. # # For more information, see https://docs.dask.org/en/latest/dataframe-extend.html # # The following should ensure that any Dask Dataframes which use TapeSeries or TapeFrames as their # underlying data will be resolved as EnsembleFrames or EnsembleSeries as their parrallel # counterparts. The underlying Dask Dataframe _meta will be a TapeSeries or TapeFrame. # # Note that with the change to the dask-expr backend, the `get_collection_type` method # is used to register instead of the previously used `get_parallel_type`. get_collection_type.register(TapeSeries, lambda _: EnsembleSeries) get_collection_type.register(TapeFrame, lambda _: EnsembleFrame) get_collection_type.register(TapeObjectFrame, lambda _: ObjectFrame) get_collection_type.register(TapeSourceFrame, lambda _: SourceFrame) @make_meta_dispatch.register(TapeSeries) def make_meta_series(x, index=None): # Create an empty TapeSeries to use as Dask's underlying object meta. result = x.head(0) return result @make_meta_dispatch.register(TapeFrame) def make_meta_frame(x, index=None): # Create an empty TapeFrame to use as Dask's underlying object meta. result = x.head(0) return result @meta_nonempty.register(TapeSeries) def _nonempty_tapeseries(x, index=None): # Construct a new TapeSeries with the same underlying data. data = _nonempty_series(x) return TapeSeries(data) @meta_nonempty.register(TapeFrame) def _nonempty_tapeseries(x, index=None): # Construct a new TapeFrame with the same underlying data. df = meta_nonempty_dataframe(x) return TapeFrame(df) @make_meta_dispatch.register(TapeObjectFrame) def make_meta_frame(x, index=None): # Create an empty TapeObjectFrame to use as Dask's underlying object meta. result = x.head(0) return result @meta_nonempty.register(TapeObjectFrame) def _nonempty_tapeobjectframe(x, index=None): # Construct a new TapeObjectFrame with the same underlying data. df = meta_nonempty_dataframe(x) return TapeObjectFrame(df) @make_meta_dispatch.register(TapeSourceFrame) def make_meta_frame(x, index=None): # Create an empty TapeSourceFrame to use as Dask's underlying object meta. result = x.head(0) return result @meta_nonempty.register(TapeSourceFrame) def _nonempty_tapesourceframe(x, index=None): # Construct a new TapeSourceFrame with the same underlying data. df = meta_nonempty_dataframe(x) return TapeSourceFrame(df)