Common Data Operations with TAPE#

In this notebook, we’ll highlight a handful of common dataframe operations that can be performed within TAPE.

Note: TAPE extends the Pandas/Dask API, and so users familiar with those APIs can expect many operations to be near-identical when working with TAPE.

Let’s consider a small example dataset of Stripe 82 RRLyrae:

[1]:
from tape import Ensemble

ens = Ensemble()

ens.from_dataset("s82_rrlyrae", sorted=True)
[1]:
<tape.ensemble.Ensemble at 0x7fb6d424af20>

Inspection#

These functions provide views into the contents of your Ensemble dataframe, especially important when dealing with large data volumes that cannot be brought into memory all at once.

Lazy View of an EnsembleFrame#

The most basic inspection method is to just call the EnsembleFrame (dataframe) objects themselves. This returns a lazy (no data is loaded) view of the EnsembleFrame.

[2]:
ens.object
[2]:
Dask DataFrame Structure:
ra dec rExt d rGC uF gF rF iF zF VF ugmin ugminErr grmin grminErr type P uA u0 uE uT gA g0 gE gT rA r0 rE rT iA i0 iE iT zA z0 zE zT
npartitions=5
4099 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 string float64 float64 float64 float64 int64 float64 float64 float64 int64 float64 float64 float64 int64 float64 float64 float64 int64 float64 float64 float64 int64
921268 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
3138275 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
5011634 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: merge, 10 expressions
[3]:
ens.source
[3]:
Dask DataFrame Structure:
ra dec mjd flux error band
npartitions=5
4099 float64 float64 float64 float64 float64 string
921268 ... ... ... ... ... ...
... ... ... ... ... ... ...
3138275 ... ... ... ... ... ...
5011634 ... ... ... ... ... ...
Dask Name: merge, 8 expressions

Using Compute() to view the data#

When an EnsembleFrame’s contents are small enough to fit into memory, you can use compute() to view the actual data.

Note: compute() also involves actual computation of the in-memory data, working on any loading/filtering/analysis needed to produce the result, as such this can take a long time!

[4]:
ens.object.compute()
[4]:
ra dec rExt d rGC uF gF rF iF zF ... rE rT iA i0 iE iT zA z0 zE zT
#id
4099 0.935679 1.115859 0.089 17.75 20.03 18.134 16.989 16.777 16.703 16.685 ... 51075.295112 103 0.317851 16.548633 51075.295084 102 0.302557 16.539893 51075.288235 100
13350 0.283437 1.178522 0.080 24.77 26.55 18.839 17.679 17.544 17.497 17.501 ... 54025.326474 112 0.642111 17.147570 54025.326185 116 0.583437 17.190782 54025.327901 114
15927 3.254658 -0.584066 0.090 29.12 30.96 19.288 18.058 17.859 17.792 17.780 ... 53680.226214 108 0.368674 17.610787 53680.243421 104 0.345422 17.615747 53680.247101 100
20406 3.244369 0.218891 0.088 9.13 12.76 16.715 15.543 15.336 15.286 15.276 ... 54000.276631 108 0.342734 15.118909 54000.293780 102 0.303788 15.132053 54000.296412 100
21992 4.315354 1.054582 0.077 7.35 11.54 16.186 15.040 14.909 14.864 14.853 ... 53698.243534 114 0.661144 14.523218 53698.249941 111 0.619123 14.524697 53698.245861 114
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
4956681 58.700931 1.228830 1.051 36.88 43.44 19.476 18.513 18.454 18.480 18.505 ... 53272.495204 105 0.730201 18.060217 53272.485932 113 0.583450 18.176327 53272.488449 117
4983075 57.156605 0.134676 0.527 29.15 35.64 19.114 18.054 17.869 17.818 17.842 ... 54064.415166 101 0.278196 17.676564 54064.408405 101 0.247065 17.722055 54064.404969 100
4984662 57.128875 -0.389138 0.584 39.05 45.43 19.745 18.701 18.489 18.454 18.452 ... 53994.472022 104 0.359037 18.272159 53994.465668 108 0.309688 18.303267 53994.475330 100
4992418 57.151443 0.892965 0.479 31.46 37.97 19.279 18.214 18.042 18.000 17.997 ... 53681.416038 109 0.560006 17.708949 53681.425071 107 0.512215 17.718540 53681.406509 115
5011634 56.510537 -0.561729 0.451 30.58 36.98 19.232 18.106 18.041 18.058 18.116 ... 52934.364056 0 0.281035 17.916158 52934.356016 1 0.255751 17.987333 52934.369944 0

483 rows × 37 columns

Grab small in-memory views with head()#

Often, you’ll want to peek at your data even though the full-size is too large for memory.

Note: some partitions may be empty and head will have to traverse these empty partitions to find enough rows for your result. An empty table with many partitions (O(100)k) might be costly even for an ultimately empty result.

[5]:
ens.source.head(5)  # grabs the first 5 rows

# can also use tail to grab the last 5 rows
[5]:
ra dec mjd flux error band
#id
4099 0.935679 1.115859 53974.306211 18.337 0.022 u
4099 0.935679 1.115859 53989.309618 -99.990 0.033 u
4099 0.935679 1.115859 53997.405253 -99.990 0.025 u
4099 0.935679 1.115859 54000.285068 -99.990 0.020 u
4099 0.935679 1.115859 52557.310873 16.677 0.011 z

Getting Individual Lightcurves#

Several methods exist to access individual lightcurves within the Ensemble.

Access using a known ID#

If you’d like to access a particular lightcurve given an ID, you can use the to_timeseries() function. This allows you to supply a given object ID, and returns a TimeSeries object (see working_with_the_timeseries).

Note: that this loads data from all available bands.

[6]:
ts = ens.to_timeseries(13350)
ts.data
[6]:
ra dec mjd flux error band
band index
u 0 0.283437 1.178522 53656.246172 19.214 0.053 u
1 0.283437 1.178522 54000.283248 19.016 0.036 u
2 0.283437 1.178522 54007.333339 18.774 0.023 u
3 0.283437 1.178522 51075.300645 19.323 0.030 u
4 0.283437 1.178522 52557.308234 18.560 0.019 u
... ... ... ... ... ... ...
60 0.283437 1.178522 53665.237450 -99.990 0.714 u
61 0.283437 1.178522 53668.249081 19.140 0.035 u
62 0.283437 1.178522 53670.253933 18.465 0.018 u
63 0.283437 1.178522 53680.226504 18.932 0.025 u
64 0.283437 1.178522 53693.228399 18.511 0.030 u

325 rows × 6 columns

[7]:
import matplotlib.pyplot as plt

for band in ts.data.band.unique():
    plt.errorbar(
        ts.data.loc[band]["mjd"],
        ts.data.loc[band]["flux"],
        yerr=ts.data.loc[band]["error"],
        fmt=".",
        label=band,
    )

plt.ylim(16, 20)
plt.legend()
plt.title(ts.meta["id"])
[7]:
Text(0.5, 1.0, '13350')
../_images/tutorials_common_data_operations_15_1.png

Access a random lightcurve#

Alternatively, if you aren’t interested in a particular lightcurve, you can draw a random one from the Ensemble using Ensemble.select_random_timeseries().

[8]:
ens.select_random_timeseries(seed=1).data
Selected Object 4455741 from Partition 4
[8]:
ra dec mjd flux error band
band index
g 0 -50.103767 -0.884631 53989.151215 17.152 0.016 g
1 -50.103767 -0.884631 53995.159131 16.678 0.015 g
2 -50.103767 -0.884631 54388.185506 16.623 0.009 g
3 -50.103767 -0.884631 54403.099282 16.677 0.012 g
4 -50.103767 -0.884631 54406.093110 17.019 0.014 g
... ... ... ... ... ... ... ...
r 52 -50.103767 -0.884631 53665.096219 16.822 0.011 r
53 -50.103767 -0.884631 53668.107851 16.905 0.004 r
54 -50.103767 -0.884631 53680.085273 16.932 0.006 r
55 -50.103767 -0.884631 53693.087179 16.875 0.005 r
g 55 -50.103767 -0.884631 53272.186738 17.123 0.005 g

280 rows × 6 columns

Filtering#

Queries#

Queries mirror the Pandas implementation. Specifically, the function takes a string that provides an expression indicating which rows to keep.

[9]:
# define a query to remove the top 5% of flux values
highest_flux = ens.source[ens._flux_col].quantile(0.95).compute()
ens.source.query(f"{ens._flux_col} < {highest_flux}").compute()
[9]:
ra dec mjd flux error band
#id
4099 0.935679 1.115859 54359.270027 -99.990 0.022 u
4099 0.935679 1.115859 54365.309090 18.420 0.021 u
4099 0.935679 1.115859 54373.383315 -99.990 0.034 u
4099 0.935679 1.115859 53626.288499 16.823 0.004 g
4099 0.935679 1.115859 53294.301322 18.477 0.020 u
... ... ... ... ... ... ...
5011634 56.510537 -0.561729 52224.370556 19.872 0.043 u
5011634 56.510537 -0.561729 52234.327053 20.245 0.065 u
5011634 56.510537 -0.561729 53669.494075 18.919 0.012 g
5011634 56.510537 -0.561729 53288.409232 18.905 0.009 g
5011634 56.510537 -0.561729 53294.456904 18.815 0.009 g

134684 rows × 6 columns

Note: When filtering, or doing any operations that modify a dataframe, the result is a new dataframe that does not automically update the Ensemble. If you’d like to update the Ensemble with the result of any of the following operations, be sure to add .update_ensemble() to the end of the call.

Filtering by Number of Observations#

Filters based on number of observations are more directly supported within the TAPE API. First, using a dedicated function to calculate the number of observations per lightcurve, Ensemble.calc_nobs():

[10]:
ens.calc_nobs(by_band=True, temporary=False)

ens.object.head(5)[["nobs_u", "nobs_g", "nobs_r", "nobs_i", "nobs_z", "nobs_total"]]
[10]:
nobs_u nobs_g nobs_r nobs_i nobs_z nobs_total
#id
4099 64 64 64 64 64 320
13350 65 65 65 65 65 325
15927 64 64 64 64 64 320
20406 65 65 65 65 65 325
21992 79 79 79 79 79 395

You can then query on these columns as normal.

[11]:
ens.object.query("nobs_total > 322")[["nobs_u", "nobs_g", "nobs_r", "nobs_i", "nobs_z", "nobs_total"]].head(5)
[11]:
nobs_u nobs_g nobs_r nobs_i nobs_z nobs_total
#id
13350 65 65 65 65 65 325
20406 65 65 65 65 65 325
21992 79 79 79 79 79 395
46988 65 65 65 65 65 325
91658 65 65 65 65 65 325

Alternatively, if you’d like to just quickly filter by the number of total observations, you can use Ensemble.prune().

[12]:
ens.prune(322)  # equivalent to the above
ens.object[["nobs_total"]].head(5)
[12]:
nobs_total
#id
13350 325
20406 325
21992 395
46988 325
91658 325

Removing NaNs#

Removing Rows with NaN values follows the Pandas API, using dropna():

[13]:
# Remove any rows with a NaN value in any of the specified columns
ens.source.dropna(subset=["flux", "mjd", "error", "band"]).update_ensemble()
ens.source
[13]:
Dask DataFrame Structure:
ra dec mjd flux error band
npartitions=5
4099 float64 float64 float64 float64 float64 string
921268 ... ... ... ... ... ...
... ... ... ... ... ... ...
3138275 ... ... ... ... ... ...
5011634 ... ... ... ... ... ...
Dask Name: dropna, 34 expressions

Analysis#

Applying Functions with Ensemble.batch()#

The Ensemble provides a powerful batching interface, Ensemble.batch(), with in-built parallelization (provided the input data is in multiple partitions).

[14]:
import numpy as np


# Defining a simple function
def my_flux_average(flux_array, band_array, method="mean", band=None):
    """Read in an array of fluxes, and return the average of the fluxes by band"""
    if band != None:
        mask = [band_array == band]  # Create a band by band mask
        band_flux = flux_array[tuple(mask)]  # Mask the flux array
        if method == "mean":
            res = np.mean(band_flux)
        elif method == "median":
            res = np.median(band_flux)
    else:
        res = None
    return res

With the function defined, we next supply it to Ensemble.batch(). The column labels of the Ensemble columns we want to use as arguments must be provided, as well as any keyword arguments. In this case, we pass along "flux" and "band", so that the Ensemble will map those columns to flux_array and band_array respectively. We also pass method='median' and band='g', which will pass those kwargs along to my_flux_average.

[15]:
# Applying the function to the ensemble
res = ens.batch(my_flux_average, "flux", "band", meta=None, method="median", band="g")
res.compute()
Temporary columns dropped from Object Table: ['nobs_total']
Using generated label, result_1, for a batch result.
[15]:
result
#id
13350 17.8260
20406 15.6750
21992 15.1520
46988 15.5000
91658 16.1550
... ...
4920018 14.8620
4947744 18.5270
4983075 18.8380
4984662 19.5020
4992418 19.0345

103 rows × 1 columns

Ensemble.batch() supports many different variations of custom user functions, and additionally has a small suite of tailored analysis functions designed for it. For more details on batch, see the batch showcase.

Column Assignment#

The ensemble object supports assignment through the Pandas assign function. We can pass in either a callable or a series to assign to the new column. New column names are produced automatically from the argument name.

For example, if we want to compute the lower bound of an error range as the estimated flux minus twice the estimated error, we would use:

[16]:
lower_bnd = ens.source.assign(lower_bnd=lambda x: x["flux"] - 2.0 * x["error"])
lower_bnd.head(5)
[16]:
ra dec mjd flux error band lower_bnd
#id
13350 0.283437 1.178522 54029.250968 18.722 0.030 u 18.662
13350 0.283437 1.178522 54348.318361 19.191 0.029 u 19.133
13350 0.283437 1.178522 53680.226504 18.932 0.025 u 18.882
13350 0.283437 1.178522 53693.228399 18.511 0.030 u 18.451
13350 0.283437 1.178522 53698.225791 18.581 0.034 u 18.513

We can also assign our computed batch result as a new object column using the same methodology.

[17]:
ens.object.assign(g_average=res["result"])[["ra", "dec", "g_average"]].head(5)
[17]:
ra dec g_average
#id
13350 0.283437 1.178522 17.826
20406 3.244369 0.218891 15.675
21992 4.315354 1.054582 15.152
46988 2.426843 -0.562932 15.500
91658 0.846748 -0.994204 16.155

Dask Tips#

Using persist() to Save Computation Time#

When calling compute(), all work needed to produce the in-memory result is performed. This work is reperformed each time compute() is called, leading to the potential to duplicate a lot of computational work, especially in exploratory notebooks where you’re testing different workflows. In such cases, it can be advantageous to call persist().

persist() returns a lazy view of a result, but actively begins computation of that result behind the scenes, leading to successive calls simply grabbing the result from persist() rather than needing to compute the result themselves. As a result, persist() should only be used when your data can fit into memory.

[18]:
ens.source.persist()  # persist performs all queued data loading tasks
ens.source.compute()  # which allows compute to just pull the result immediately.
[18]:
ra dec mjd flux error band
#id
13350 0.283437 1.178522 53656.246172 19.214 0.053 u
13350 0.283437 1.178522 54000.283248 19.016 0.036 u
13350 0.283437 1.178522 54007.333339 18.774 0.023 u
13350 0.283437 1.178522 51075.300645 19.323 0.030 u
13350 0.283437 1.178522 52557.308234 18.560 0.019 u
... ... ... ... ... ... ...
4992418 57.151443 0.892965 52558.478255 20.058 0.044 u
4992418 57.151443 0.892965 51875.256823 19.929 0.041 u
4992418 57.151443 0.892965 54047.434791 20.291 0.195 u
4992418 57.151443 0.892965 53634.428794 19.987 0.102 u
4992418 57.151443 0.892965 52935.376893 20.385 0.060 u

38510 rows × 6 columns

Repartitioning#

With Dask and TAPE data is stored in separate sub-containers called “partitions”, `Dask has recommendations <https://docs.dask.org/en/stable/best-practices.html#dask-best-practices>`__ for the optimal amount of data stored in a given partition, and even if the initial data follows these recommendations, filtering steps can cause partitions to contain very little data. In this case, it may be best to call repartition().

[19]:
ens.source.repartition(partition_size="100MB")  # 100MBs is generally recommended
# In this case, we have a small set of data that easily fits into one partition
[19]:
Dask DataFrame Structure:
ra dec mjd flux error band
npartitions=1
4099 float64 float64 float64 float64 float64 string
5011634 ... ... ... ... ... ...
Dask Name: repartition, 42 expressions

Sampling#

In addition to filtering by specific constraints, it’s possible to select a subset of your data to work with. Ensemble.sample() will randomly select a fraction of objects from the full object list. This will return a new ensemble object to work with.

[20]:
subset_ens = ens.sample(frac=0.5)  # select ~half of the objects

print("Number of pre-sampled objects: ", len(ens.object))
print("Number of post-sampled objects: ", len(subset_ens.object))
Number of pre-sampled objects:  103
Number of post-sampled objects:  52

For reproducible results, you can also specify a random seed via the random_state parameter. By re-using the same seed in your random_state, you can ensure that a given Ensemble will always be sampled the same way.

[21]:
subset_ens = ens.sample(
    frac=0.2,  # select a ~fifth of the objects
    random_state=53783594,  # set a random seed for reproducibility
)

print("Number of pre-sampled objects: ", len(ens.object))
print("Number of post-sampled objects: ", len(subset_ens.object))
Number of pre-sampled objects:  103
Number of post-sampled objects:  21

Note: Using Ensemble.sample to filter large datasets is not recommended, as it does not handle repartitioning. Instead, using partition slicing, shown below.

[22]:
# partition slicing

# specify a subset of partitions, propagates to the object table automatically
ens.source.partitions[0:1].update_ensemble()
[22]:
<tape.ensemble.Ensemble at 0x7fb6d424af20>

Saving Intermediate Results#

In some situations, you may find yourself running a given workflow many times. Due to the nature of lazy-computation, this will involve repeated execution of data I/O, pre-processing steps, initial analysis, etc. In these situations, it may be effective to instead save the ensemble state to disk after completion of these initial processing steps. To accomplish this, we can use the Ensemble.save_ensemble() function.

[23]:
ens.object.head(5)
[23]:
ra dec rExt d rGC uF gF rF iF zF ... iT zA z0 zE zT nobs_g nobs_i nobs_r nobs_u nobs_z
#id
13350 0.283437 1.178522 0.080 24.77 26.55 18.839 17.679 17.544 17.497 17.501 ... 116 0.583437 17.190782 54025.327901 114 65 65 65 65 65
20406 3.244369 0.218891 0.088 9.13 12.76 16.715 15.543 15.336 15.286 15.276 ... 102 0.303788 15.132053 54000.296412 100 65 65 65 65 65
21992 4.315354 1.054582 0.077 7.35 11.54 16.186 15.040 14.909 14.864 14.853 ... 111 0.619123 14.524697 53698.245861 114 79 79 79 79 79
46988 2.426843 -0.562932 0.140 7.85 11.69 16.424 15.194 15.038 14.993 15.004 ... 112 0.540125 14.715373 52253.188970 108 65 65 65 65 65
91658 0.846748 -0.994204 0.098 11.07 14.04 17.087 15.933 15.791 15.733 15.731 ... 119 0.510250 15.470862 54348.319546 110 65 65 65 65 65

5 rows × 42 columns

[24]:
ens.save_ensemble(".", "ensemble", additional_frames=False)  # Saves to disk
Saved to ./ensemble

The above command creates an “ensemble” directory in the current working directory. This directory contains a subdirectory of parquet files for each EnsembleFrame object that was included in the additional_frames kwarg. Note that if additional_frames was set to True or False this would save all or none of the additional EnsembleFrame objects respectively, and that the object (unless it has no columns) and source frames are always saved.

From here, we can just load the ensemble from disk.

[25]:
new_ens = Ensemble()
new_ens.from_ensemble("./ensemble")
[25]:
<tape.ensemble.Ensemble at 0x7fb670b36860>
[ ]: