Ensemble Batch Showcase#
Ensemble.batch is a versatile function that allows users to pass in external functions that operate on groupings of Ensemble data, most commonly these are functions that calculate something per lightcurve. Because external functions can have a huge variety of inputs and outputs, this notebook serves as a collection of example functions and how batch can be used with them. The hope is that there is a function here similar to a function that you are trying to apply via batch so
that example can be used as a template for getting your function to work.
Generate some toy data and create an Ensemble#
[1]:
from tape import Ensemble, ColumnMapper, TapeFrame
import numpy as np
import pandas as pd
import sys
[2]:
# Generate some fake data
np.random.seed(1)
obj_ids = []
mjds = []
for i in range(10, 110):
obj_ids.append(np.array([i] * 1250))
mjds.append(np.arange(0.0, 1250.0, 1.0))
obj_ids = np.concatenate(obj_ids)
mjds = np.concatenate(mjds)
flux = 10 * np.random.random(125000)
err = flux / 10
band = np.random.choice(["g", "r"], 125000)
source_dict = {"id": obj_ids, "mjd": mjds, "flux": flux, "err": err, "band": band}
[3]:
# Load the data into an Ensemble
ens = Ensemble()
ens.from_source_dict(
source_dict,
column_mapper=ColumnMapper(id_col="id", time_col="mjd", flux_col="flux", err_col="err", band_col="band"),
sorted=True,
)
[3]:
<tape.ensemble.Ensemble at 0x7fb6df7aaaa0>
Case 1: A Simple Mean#
We define a simple function that takes in an array-like argument, flux, and returns it’s mean.
[4]:
# Case 1: Simple
def my_mean(flux):
return np.mean(flux)
my_mean([1, 2, 3, 4, 5])
[4]:
3.0
To run the my_mean function with Ensemble.batch, we simply pass the function, and the argument(s) as separate function arguments. In this case, we pass “flux” as a string, as batch will grab the data at that column label to evaluate on.
[5]:
# Default batch
res1 = ens.batch(
my_mean, "flux"
) # "flux" is provided to have TAPE pass the "flux" column data along to my_mean
res1.compute() # Compute to see the result
Using generated label, result_1, for a batch result.
[5]:
| result | |
|---|---|
| id | |
| 10 | 5.020567 |
| 11 | 5.041076 |
| 12 | 4.916761 |
| 13 | 5.033744 |
| 14 | 5.084872 |
| ... | ... |
| 105 | 5.113830 |
| 106 | 5.097340 |
| 107 | 5.013111 |
| 108 | 4.962582 |
| 109 | 5.143362 |
100 rows × 1 columns
By default, Ensemble.batch groups each lightcurve together (grouping on the specified id column). However, batch also support custom grouping assignments, as below we instead pass on=["band"], letting batch know to calculate the mean for all data from each band.
[6]:
# Batch with custom grouping
res2 = ens.batch(my_mean, "flux", on=["band"])
res2.compute()
Using generated label, result_2, for a batch result.
[6]:
| result | |
|---|---|
| id | |
| r | 5.006652 |
| g | 4.993567 |
This can be extended to more than just a single column, as below we group by id and then sub-group by band. In Pandas, an operation like this would return a multi-index, but due to Dask not supporting multi-indexes we return sub-groupings as columns.
[7]:
# Multi-level groupbys
res3 = ens.batch(my_mean, "flux", on=["id", "band"])
res3.compute()
Using generated label, result_3, for a batch result.
[7]:
| band | result | |
|---|---|---|
| id | ||
| 10 | r | 4.915863 |
| 10 | g | 5.131125 |
| 11 | g | 5.147449 |
| 11 | r | 4.922463 |
| 12 | r | 4.942122 |
| ... | ... | ... |
| 107 | r | 4.992102 |
| 108 | r | 4.977693 |
| 108 | g | 4.947759 |
| 109 | g | 5.188093 |
| 109 | r | 5.102989 |
200 rows × 2 columns
Sub-grouping by photometric band is a use case we expect to be common in TAPE workflows, and so there is the by_band kwarg available within batch. This will ensure that the last sub-grouping level is on band and will return independent columns for each band result.
[8]:
# Batch with the by_band flag
res4 = ens.batch(my_mean, "flux", by_band=True)
res4.compute()
Using generated label, result_4, for a batch result.
[8]:
| result_g | result_r | |
|---|---|---|
| id | ||
| 10 | 5.131125 | 4.915863 |
| 11 | 5.147449 | 4.922463 |
| 12 | 4.891963 | 4.942122 |
| 13 | 4.927702 | 5.135140 |
| 14 | 5.116642 | 5.052591 |
| ... | ... | ... |
| 105 | 4.990176 | 5.231312 |
| 106 | 5.123191 | 5.070562 |
| 107 | 5.032880 | 4.992102 |
| 108 | 4.947759 | 4.977693 |
| 109 | 5.188093 | 5.102989 |
100 rows × 2 columns
Case 2: Functions That Return a Series#
In case 2, we write a function that returns a Pandas.Series object. This object has the min and max of the flux array stored at different indices of the output series.
[9]:
def my_bounds(flux):
return pd.Series({"min": np.min(flux), "max": np.max(flux)})
# Function output
my_bounds([1, 2, 3, 4, 5])
[9]:
min 1
max 5
dtype: int64
As in case 1, we’re able to pass this function and the “flux” column along to run the function. However, this time we need the meta to be set. The meta is a needed component of Dask's lazy evaluation. As Dask does not actually compute results until requested to, meta serves as the expected form of the output. In this case, we just need to let Dask know that a min and max column will be present in a dataframe (TAPE will always return a dataframe) and that both will be
float values.
For more information on the Dask meta argument, read their documentation.
[10]:
# Default Batch
res1 = ens.batch(my_bounds, "flux", meta={"min": float, "max": float}) # Requires meta to be set
res1.compute()
Using generated label, result_5, for a batch result.
[10]:
| min | max | |
|---|---|---|
| id | ||
| 10 | 0.001144 | 9.974850 |
| 11 | 0.010329 | 9.985991 |
| 12 | 0.000970 | 9.998758 |
| 13 | 0.001413 | 9.975477 |
| 14 | 0.006086 | 9.983582 |
| ... | ... | ... |
| 105 | 0.010900 | 9.987430 |
| 106 | 0.005584 | 9.999650 |
| 107 | 0.014817 | 9.992852 |
| 108 | 0.001185 | 9.994509 |
| 109 | 0.008270 | 9.986481 |
100 rows × 2 columns
The same flexibility with grouping extends to case 2, with again needing to specify the meta. Note that the meta given to Ensemble.batch remains the same, only depending on the function output, it handles the meta for any columns generated by the grouping on it’s own.
[11]:
# Multi-level groupbys, note that meta does not need to change
res2 = ens.batch(
my_bounds, "flux", on=["id", "band"], meta={"min": float, "max": float}
) # Requires meta to be set
res2.compute()
Using generated label, result_6, for a batch result.
[11]:
| band | min | max | |
|---|---|---|---|
| id | |||
| 10 | r | 0.001144 | 9.974850 |
| 10 | g | 0.004020 | 9.950523 |
| 11 | g | 0.025076 | 9.985205 |
| 11 | r | 0.010329 | 9.985991 |
| 12 | r | 0.003984 | 9.998758 |
| ... | ... | ... | ... |
| 107 | r | 0.031415 | 9.990664 |
| 108 | r | 0.001185 | 9.987623 |
| 108 | g | 0.001418 | 9.994509 |
| 109 | g | 0.036095 | 9.982185 |
| 109 | r | 0.008270 | 9.986481 |
200 rows × 3 columns
Using the by_band kwarg extends the output columns to be per-band.
[12]:
# Using by_band
res3 = ens.batch(
my_bounds, "flux", by_band=True, meta={"min": float, "max": float}
) # Requires meta to be set
res3.compute()
Using generated label, result_7, for a batch result.
[12]:
| max_g | max_r | min_g | min_r | |
|---|---|---|---|---|
| id | ||||
| 10 | 9.950523 | 9.974850 | 0.004020 | 0.001144 |
| 11 | 9.985205 | 9.985991 | 0.025076 | 0.010329 |
| 12 | 9.971388 | 9.998758 | 0.000970 | 0.003984 |
| 13 | 9.967007 | 9.975477 | 0.001413 | 0.003930 |
| 14 | 9.983582 | 9.977627 | 0.018308 | 0.006086 |
| ... | ... | ... | ... | ... |
| 105 | 9.986839 | 9.987430 | 0.010900 | 0.028731 |
| 106 | 9.987471 | 9.999650 | 0.005584 | 0.017656 |
| 107 | 9.992852 | 9.990664 | 0.014817 | 0.031415 |
| 108 | 9.994509 | 9.987623 | 0.001418 | 0.001185 |
| 109 | 9.982185 | 9.986481 | 0.036095 | 0.008270 |
100 rows × 4 columns
Case 3: Functions That Return a DataFrame#
Here we define a function, my_bounds_df that computes the same quantities as my_bounds above, but in this case we return a dataframe of the results.
[13]:
def my_bounds_df(flux):
return pd.DataFrame({"min": [np.min(flux)], "max": [np.max(flux)]})
my_bounds_df([1, 2, 3, 4, 5])
[13]:
| min | max | |
|---|---|---|
| 0 | 1 | 5 |
This is perfectly reasonable, but when passing a function like this through batch there’s an issue currently to watch out for.
[14]:
# Default Batch, some things to watch out for
res1 = ens.batch(my_bounds_df, "flux", meta={"min": float, "max": float})
res1.compute()
Using generated label, result_8, for a batch result.
[14]:
| min | max | ||
|---|---|---|---|
| id | |||
| 10 | 0 | 0.001144 | 9.974850 |
| 11 | 0 | 0.010329 | 9.985991 |
| 12 | 0 | 0.000970 | 9.998758 |
| 13 | 0 | 0.001413 | 9.975477 |
| 14 | 0 | 0.006086 | 9.983582 |
| ... | ... | ... | ... |
| 105 | 0 | 0.010900 | 9.987430 |
| 106 | 0 | 0.005584 | 9.999650 |
| 107 | 0 | 0.014817 | 9.992852 |
| 108 | 0 | 0.001185 | 9.994509 |
| 109 | 0 | 0.008270 | 9.986481 |
100 rows × 2 columns
As with the series, we needed to pass the meta kwarg letting TAPE know which output columns to expect from the function. However, we see that our result is carrying over the index generated by the dataframe in addition to the batch index, represented as a multi-index. At the time of this notebooks creation, Dask does not have explicit support for multi-indexes. We can see this problem in the following cells.
[15]:
# Pandas resolves these indexes as a multi-index
res1.reset_index().compute()
[15]:
| id | level_1 | min | max | |
|---|---|---|---|---|
| 0 | 10 | 0 | 0.001144 | 9.974850 |
| 1 | 11 | 0 | 0.010329 | 9.985991 |
| 2 | 12 | 0 | 0.000970 | 9.998758 |
| 3 | 13 | 0 | 0.001413 | 9.975477 |
| 4 | 14 | 0 | 0.006086 | 9.983582 |
| ... | ... | ... | ... | ... |
| 95 | 105 | 0 | 0.010900 | 9.987430 |
| 96 | 106 | 0 | 0.005584 | 9.999650 |
| 97 | 107 | 0 | 0.014817 | 9.992852 |
| 98 | 108 | 0 | 0.001185 | 9.994509 |
| 99 | 109 | 0 | 0.008270 | 9.986481 |
100 rows × 4 columns
[16]:
# Dask assumes there's just a single index column being sent to the dataframe columns
res1.reset_index()
[16]:
| index | min | max | |
|---|---|---|---|
| npartitions=1 | |||
| int64 | float64 | float64 | |
| ... | ... | ... |
When Dask and the underlying Pandas disagree on what the dataframe looks like, this causes issues with you as the user being able to work with the dataframe. As Dask won’t recognize any calls to “id” or “level_1” here, and instead will only accept a call to “index” which in turn Pandas won’t understand. If this is the issue you run into, we recommend trying to modify your function into a non-dataframe output format. However, in the case that this isn’t possible, here’s a somewhat
hacky way to move around it.
We can resolve this by updating the Dask meta manually, to re-align Dask and Pandas.
[17]:
# If it's not too compute intensive, grabbing the actual dataframe is the easiest way forward
real_meta_from_result = res1.reset_index().head(0)
real_meta_from_result
[17]:
| id | level_1 | min | max |
|---|
[18]:
# otherwise, can generate this ourselves
real_meta_from_dataframe = TapeFrame(columns=["id", "level_1", "min", "max"])
real_meta_from_dataframe
[18]:
| id | level_1 | min | max |
|---|
[19]:
# Update the metadata
res1_noindex = res1.reset_index()
res1_noindex = res1_noindex.map_partitions(TapeFrame, meta=real_meta_from_dataframe)
res1_noindex
[19]:
| id | level_1 | min | max | |
|---|---|---|---|---|
| npartitions=1 | ||||
| object | object | object | object | |
| ... | ... | ... | ... |
Note that in the above, we’ve reset the index as Dask will not support meta that tracks a multi-index. In the case of this function, we gain no information from the “level_1” column, and it would be nice to restablish “id” as the index, so we close the loop by executing the commands in the next cell.
[20]:
res1 = res1_noindex.drop(columns=["level_1"]).set_index("id")
res1.compute()
[20]:
| min | max | |
|---|---|---|
| id | ||
| 10 | 0.001144 | 9.974850 |
| 11 | 0.010329 | 9.985991 |
| 12 | 0.000970 | 9.998758 |
| 13 | 0.001413 | 9.975477 |
| 14 | 0.006086 | 9.983582 |
| ... | ... | ... |
| 105 | 0.010900 | 9.987430 |
| 106 | 0.005584 | 9.999650 |
| 107 | 0.014817 | 9.992852 |
| 108 | 0.001185 | 9.994509 |
| 109 | 0.008270 | 9.986481 |
100 rows × 2 columns
Case 4: Functions that Require Non-Array Inputs#
Let’s return to case 1, but this time instead of the list-like flux argument, let’s say that the function needs to take in a dataframe with a column titled my_flux
[21]:
# Case 4: DataFrame input
def my_mean_from_df(df):
return np.mean(df["my_flux"])
df = pd.DataFrame({"my_flux": [1, 2, 3, 4, 5]})
my_mean_from_df(df)
[21]:
3.0
In this case, batch won’t be able to directly provide inputs to this function, as batch passes along the column data as arrays to the function. However, we can make this function able to be used by batch by wrapping it with another function.
[22]:
def mean_wrapper(flux):
df = pd.DataFrame({"my_flux": flux})
return my_mean_from_df(df)
# Can pass the wrapper function along to batch
res1 = ens.batch(mean_wrapper, "flux")
res1.compute()
Using generated label, result_9, for a batch result.
[22]:
| result | |
|---|---|
| id | |
| 10 | 5.020567 |
| 11 | 5.041076 |
| 12 | 4.916761 |
| 13 | 5.033744 |
| 14 | 5.084872 |
| ... | ... |
| 105 | 5.113830 |
| 106 | 5.097340 |
| 107 | 5.013111 |
| 108 | 4.962582 |
| 109 | 5.143362 |
100 rows × 1 columns
This is a really simple case, but highlights that in some cases a wrapper function can be written to serve as a middle man between your function and batch, even doing work to sort or filter your data on a per function call basis if not done as a pre-filter step for your Ensemble.
Case 5: TAPE Analysis Functions#
TAPE analysis functions are a special case of input function to Ensemble.batch, where normally required information such as the specified column labels to pass to the function and the meta are passed along from the function to Ensemble.batch internally, meaning you just need to specify the function and any additional kwargs. For this case, let’s leverage the light-curve package, which implements the extraction of many light
curve features used in astrophysics. Feature extraction from this package is also supported within TAPE as an analysis function.
[23]:
# Grab two features extraction methods from light-curve
from light_curve import Periodogram, OtsuSplit
In the below example, we apply the Lomb-Scargle Periodogram to our Ensemble light curves. Again, noting that in this case the meta we had to configure above is already handled by TAPE, and the needed timeseries columns are already passed along internally as well.
[24]:
# Find periods using Lomb-Scargle periodogram
periodogram = Periodogram(peaks=1, nyquist=0.1, max_freq_factor=10, fast=False)
# Use r band only
res_per = ens.batch(periodogram, band_to_calc="r") # band_to_calc is a kwarg of Periodogram
res_per.compute()
Using generated label, result_10, for a batch result.
[24]:
| period_0 | period_s_to_n_0 | |
|---|---|---|
| id | ||
| 10 | 0.210117 | 6.017269 |
| 11 | 2.254403 | 7.155673 |
| 12 | 0.205399 | 7.303439 |
| 13 | 0.677417 | 5.520282 |
| 14 | 8.170040 | 6.012586 |
| ... | ... | ... |
| 105 | 0.225024 | 6.872417 |
| 106 | 0.696414 | 7.074310 |
| 107 | 0.471936 | 6.576487 |
| 108 | 0.276805 | 6.236394 |
| 109 | 0.365316 | 6.042835 |
100 rows × 2 columns
Next, we use the OtsuSplit function, used to perform automatic thresholding. In this case, we also supply the by_band kwarg to get a result per photometric band.
[25]:
res_otsu = ens.batch(OtsuSplit(), band_to_calc=None, by_band=True)
res_otsu.compute()
Using generated label, result_11, for a batch result.
[25]:
| otsu_lower_to_all_ratio_g | otsu_lower_to_all_ratio_r | otsu_mean_diff_g | otsu_mean_diff_r | otsu_std_lower_g | otsu_std_lower_r | otsu_std_upper_g | otsu_std_upper_r | |
|---|---|---|---|---|---|---|---|---|
| id | ||||||||
| 10 | 0.478618 | 0.481308 | 4.945568 | 5.055986 | 1.496743 | 1.355024 | 1.391181 | 1.541987 |
| 11 | 0.496206 | 0.475465 | 4.957610 | 4.924894 | 1.435659 | 1.373389 | 1.421739 | 1.500613 |
| 12 | 0.487342 | 0.493528 | 4.935361 | 5.189914 | 1.338911 | 1.324070 | 1.490207 | 1.479580 |
| 13 | 0.513912 | 0.478873 | 5.089806 | 4.839217 | 1.436066 | 1.456344 | 1.436931 | 1.439613 |
| 14 | 0.439683 | 0.453226 | 4.827796 | 5.149632 | 1.406289 | 1.472390 | 1.541064 | 1.462766 |
| ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 105 | 0.512315 | 0.492980 | 4.883700 | 4.856093 | 1.482071 | 1.501231 | 1.493872 | 1.369657 |
| 106 | 0.482704 | 0.480456 | 5.046406 | 5.072484 | 1.451172 | 1.363104 | 1.443635 | 1.477108 |
| 107 | 0.484472 | 0.488449 | 4.988996 | 4.860512 | 1.456886 | 1.397071 | 1.470777 | 1.421263 |
| 108 | 0.532488 | 0.479806 | 5.191130 | 4.917644 | 1.546710 | 1.452074 | 1.465029 | 1.478669 |
| 109 | 0.480607 | 0.485540 | 5.092411 | 4.930022 | 1.426610 | 1.433209 | 1.320875 | 1.380337 |
100 rows × 8 columns
Using light-curve package features#
Ensemble.batch() also supports the use of light-curve package feature extractor:
[26]:
import light_curve as licu
extractor = licu.Extractor(licu.Amplitude(), licu.AndersonDarlingNormal(), licu.StetsonK())
# band_to_calc=None will ignore the band column and use all sources for each object
res = ens.batch(extractor, band_to_calc="g")
res.compute()
Using generated label, result_12, for a batch result.
[26]:
| amplitude | anderson_darling_normal | stetson_K | |
|---|---|---|---|
| id | |||
| 10 | 4.973251 | 6.598161 | 0.999129 |
| 11 | 4.980065 | 7.228309 | 0.985758 |
| 12 | 4.985209 | 7.046509 | 0.999740 |
| 13 | 4.982797 | 7.737570 | 0.999787 |
| 14 | 4.982637 | 5.463445 | 0.994758 |
| ... | ... | ... | ... |
| 105 | 4.987969 | 5.364664 | 0.997364 |
| 106 | 4.990943 | 7.245717 | 0.997570 |
| 107 | 4.989018 | 7.244391 | 0.994809 |
| 108 | 4.996546 | 8.042666 | 0.999837 |
| 109 | 4.973045 | 9.282865 | 0.987262 |
100 rows × 3 columns