Ensemble Batch Showcase#

Ensemble.batch is a versatile function that allows users to pass in external functions that operate on groupings of Ensemble data, most commonly these are functions that calculate something per lightcurve. Because external functions can have a huge variety of inputs and outputs, this notebook serves as a collection of example functions and how batch can be used with them. The hope is that there is a function here similar to a function that you are trying to apply via batch so that example can be used as a template for getting your function to work.

Generate some toy data and create an Ensemble#

[1]:
from tape import Ensemble, ColumnMapper, TapeFrame
import numpy as np
import pandas as pd
import sys
[2]:
# Generate some fake data

np.random.seed(1)

obj_ids = []
mjds = []
for i in range(10, 110):
    obj_ids.append(np.array([i] * 1250))
    mjds.append(np.arange(0.0, 1250.0, 1.0))

obj_ids = np.concatenate(obj_ids)
mjds = np.concatenate(mjds)

flux = 10 * np.random.random(125000)
err = flux / 10
band = np.random.choice(["g", "r"], 125000)

source_dict = {"id": obj_ids, "mjd": mjds, "flux": flux, "err": err, "band": band}
[3]:
# Load the data into an Ensemble
ens = Ensemble()

ens.from_source_dict(
    source_dict,
    column_mapper=ColumnMapper(id_col="id", time_col="mjd", flux_col="flux", err_col="err", band_col="band"),
    sorted=True,
)
[3]:
<tape.ensemble.Ensemble at 0x7fb6df7aaaa0>

Case 1: A Simple Mean#

We define a simple function that takes in an array-like argument, flux, and returns it’s mean.

[4]:
# Case 1: Simple
def my_mean(flux):
    return np.mean(flux)


my_mean([1, 2, 3, 4, 5])
[4]:
3.0

To run the my_mean function with Ensemble.batch, we simply pass the function, and the argument(s) as separate function arguments. In this case, we pass “flux” as a string, as batch will grab the data at that column label to evaluate on.

[5]:
# Default batch
res1 = ens.batch(
    my_mean, "flux"
)  # "flux" is provided to have TAPE pass the "flux" column data along to my_mean
res1.compute()  # Compute to see the result
Using generated label, result_1, for a batch result.
[5]:
result
id
10 5.020567
11 5.041076
12 4.916761
13 5.033744
14 5.084872
... ...
105 5.113830
106 5.097340
107 5.013111
108 4.962582
109 5.143362

100 rows × 1 columns

By default, Ensemble.batch groups each lightcurve together (grouping on the specified id column). However, batch also support custom grouping assignments, as below we instead pass on=["band"], letting batch know to calculate the mean for all data from each band.

[6]:
# Batch with custom grouping

res2 = ens.batch(my_mean, "flux", on=["band"])
res2.compute()
Using generated label, result_2, for a batch result.
[6]:
result
id
r 5.006652
g 4.993567

This can be extended to more than just a single column, as below we group by id and then sub-group by band. In Pandas, an operation like this would return a multi-index, but due to Dask not supporting multi-indexes we return sub-groupings as columns.

[7]:
# Multi-level groupbys

res3 = ens.batch(my_mean, "flux", on=["id", "band"])
res3.compute()
Using generated label, result_3, for a batch result.
[7]:
band result
id
10 r 4.915863
10 g 5.131125
11 g 5.147449
11 r 4.922463
12 r 4.942122
... ... ...
107 r 4.992102
108 r 4.977693
108 g 4.947759
109 g 5.188093
109 r 5.102989

200 rows × 2 columns

Sub-grouping by photometric band is a use case we expect to be common in TAPE workflows, and so there is the by_band kwarg available within batch. This will ensure that the last sub-grouping level is on band and will return independent columns for each band result.

[8]:
# Batch with the by_band flag
res4 = ens.batch(my_mean, "flux", by_band=True)
res4.compute()
Using generated label, result_4, for a batch result.
[8]:
result_g result_r
id
10 5.131125 4.915863
11 5.147449 4.922463
12 4.891963 4.942122
13 4.927702 5.135140
14 5.116642 5.052591
... ... ...
105 4.990176 5.231312
106 5.123191 5.070562
107 5.032880 4.992102
108 4.947759 4.977693
109 5.188093 5.102989

100 rows × 2 columns

Case 2: Functions That Return a Series#

In case 2, we write a function that returns a Pandas.Series object. This object has the min and max of the flux array stored at different indices of the output series.

[9]:
def my_bounds(flux):
    return pd.Series({"min": np.min(flux), "max": np.max(flux)})


# Function output
my_bounds([1, 2, 3, 4, 5])
[9]:
min    1
max    5
dtype: int64

As in case 1, we’re able to pass this function and the “flux” column along to run the function. However, this time we need the meta to be set. The meta is a needed component of Dask's lazy evaluation. As Dask does not actually compute results until requested to, meta serves as the expected form of the output. In this case, we just need to let Dask know that a min and max column will be present in a dataframe (TAPE will always return a dataframe) and that both will be float values.

For more information on the Dask meta argument, read their documentation.

[10]:
# Default Batch

res1 = ens.batch(my_bounds, "flux", meta={"min": float, "max": float})  # Requires meta to be set
res1.compute()
Using generated label, result_5, for a batch result.
[10]:
min max
id
10 0.001144 9.974850
11 0.010329 9.985991
12 0.000970 9.998758
13 0.001413 9.975477
14 0.006086 9.983582
... ... ...
105 0.010900 9.987430
106 0.005584 9.999650
107 0.014817 9.992852
108 0.001185 9.994509
109 0.008270 9.986481

100 rows × 2 columns

The same flexibility with grouping extends to case 2, with again needing to specify the meta. Note that the meta given to Ensemble.batch remains the same, only depending on the function output, it handles the meta for any columns generated by the grouping on it’s own.

[11]:
# Multi-level groupbys, note that meta does not need to change
res2 = ens.batch(
    my_bounds, "flux", on=["id", "band"], meta={"min": float, "max": float}
)  # Requires meta to be set
res2.compute()
Using generated label, result_6, for a batch result.
[11]:
band min max
id
10 r 0.001144 9.974850
10 g 0.004020 9.950523
11 g 0.025076 9.985205
11 r 0.010329 9.985991
12 r 0.003984 9.998758
... ... ... ...
107 r 0.031415 9.990664
108 r 0.001185 9.987623
108 g 0.001418 9.994509
109 g 0.036095 9.982185
109 r 0.008270 9.986481

200 rows × 3 columns

Using the by_band kwarg extends the output columns to be per-band.

[12]:
# Using by_band

res3 = ens.batch(
    my_bounds, "flux", by_band=True, meta={"min": float, "max": float}
)  # Requires meta to be set
res3.compute()
Using generated label, result_7, for a batch result.
[12]:
max_g max_r min_g min_r
id
10 9.950523 9.974850 0.004020 0.001144
11 9.985205 9.985991 0.025076 0.010329
12 9.971388 9.998758 0.000970 0.003984
13 9.967007 9.975477 0.001413 0.003930
14 9.983582 9.977627 0.018308 0.006086
... ... ... ... ...
105 9.986839 9.987430 0.010900 0.028731
106 9.987471 9.999650 0.005584 0.017656
107 9.992852 9.990664 0.014817 0.031415
108 9.994509 9.987623 0.001418 0.001185
109 9.982185 9.986481 0.036095 0.008270

100 rows × 4 columns

Case 3: Functions That Return a DataFrame#

Here we define a function, my_bounds_df that computes the same quantities as my_bounds above, but in this case we return a dataframe of the results.

[13]:
def my_bounds_df(flux):
    return pd.DataFrame({"min": [np.min(flux)], "max": [np.max(flux)]})


my_bounds_df([1, 2, 3, 4, 5])
[13]:
min max
0 1 5

This is perfectly reasonable, but when passing a function like this through batch there’s an issue currently to watch out for.

[14]:
# Default Batch, some things to watch out for

res1 = ens.batch(my_bounds_df, "flux", meta={"min": float, "max": float})
res1.compute()
Using generated label, result_8, for a batch result.
[14]:
min max
id
10 0 0.001144 9.974850
11 0 0.010329 9.985991
12 0 0.000970 9.998758
13 0 0.001413 9.975477
14 0 0.006086 9.983582
... ... ... ...
105 0 0.010900 9.987430
106 0 0.005584 9.999650
107 0 0.014817 9.992852
108 0 0.001185 9.994509
109 0 0.008270 9.986481

100 rows × 2 columns

As with the series, we needed to pass the meta kwarg letting TAPE know which output columns to expect from the function. However, we see that our result is carrying over the index generated by the dataframe in addition to the batch index, represented as a multi-index. At the time of this notebooks creation, Dask does not have explicit support for multi-indexes. We can see this problem in the following cells.

[15]:
# Pandas resolves these indexes as a multi-index
res1.reset_index().compute()
[15]:
id level_1 min max
0 10 0 0.001144 9.974850
1 11 0 0.010329 9.985991
2 12 0 0.000970 9.998758
3 13 0 0.001413 9.975477
4 14 0 0.006086 9.983582
... ... ... ... ...
95 105 0 0.010900 9.987430
96 106 0 0.005584 9.999650
97 107 0 0.014817 9.992852
98 108 0 0.001185 9.994509
99 109 0 0.008270 9.986481

100 rows × 4 columns

[16]:
# Dask assumes there's just a single index column being sent to the dataframe columns
res1.reset_index()
[16]:
Dask DataFrame Structure:
index min max
npartitions=1
int64 float64 float64
... ... ...
Dask Name: reset_index, 6 expressions

When Dask and the underlying Pandas disagree on what the dataframe looks like, this causes issues with you as the user being able to work with the dataframe. As Dask won’t recognize any calls to “id” or “level_1” here, and instead will only accept a call to “index” which in turn Pandas won’t understand. If this is the issue you run into, we recommend trying to modify your function into a non-dataframe output format. However, in the case that this isn’t possible, here’s a somewhat hacky way to move around it.

We can resolve this by updating the Dask meta manually, to re-align Dask and Pandas.

[17]:
# If it's not too compute intensive, grabbing the actual dataframe is the easiest way forward
real_meta_from_result = res1.reset_index().head(0)
real_meta_from_result
[17]:
id level_1 min max
[18]:
# otherwise, can generate this ourselves
real_meta_from_dataframe = TapeFrame(columns=["id", "level_1", "min", "max"])
real_meta_from_dataframe
[18]:
id level_1 min max
[19]:
# Update the metadata

res1_noindex = res1.reset_index()
res1_noindex = res1_noindex.map_partitions(TapeFrame, meta=real_meta_from_dataframe)
res1_noindex
[19]:
Dask DataFrame Structure:
id level_1 min max
npartitions=1
object object object object
... ... ... ...
Dask Name: tapeframe, 7 expressions

Note that in the above, we’ve reset the index as Dask will not support meta that tracks a multi-index. In the case of this function, we gain no information from the “level_1” column, and it would be nice to restablish “id” as the index, so we close the loop by executing the commands in the next cell.

[20]:
res1 = res1_noindex.drop(columns=["level_1"]).set_index("id")
res1.compute()
[20]:
min max
id
10 0.001144 9.974850
11 0.010329 9.985991
12 0.000970 9.998758
13 0.001413 9.975477
14 0.006086 9.983582
... ... ...
105 0.010900 9.987430
106 0.005584 9.999650
107 0.014817 9.992852
108 0.001185 9.994509
109 0.008270 9.986481

100 rows × 2 columns

Case 4: Functions that Require Non-Array Inputs#

Let’s return to case 1, but this time instead of the list-like flux argument, let’s say that the function needs to take in a dataframe with a column titled my_flux

[21]:
# Case 4: DataFrame input
def my_mean_from_df(df):
    return np.mean(df["my_flux"])


df = pd.DataFrame({"my_flux": [1, 2, 3, 4, 5]})
my_mean_from_df(df)
[21]:
3.0

In this case, batch won’t be able to directly provide inputs to this function, as batch passes along the column data as arrays to the function. However, we can make this function able to be used by batch by wrapping it with another function.

[22]:
def mean_wrapper(flux):
    df = pd.DataFrame({"my_flux": flux})
    return my_mean_from_df(df)


# Can pass the wrapper function along to batch
res1 = ens.batch(mean_wrapper, "flux")
res1.compute()
Using generated label, result_9, for a batch result.
[22]:
result
id
10 5.020567
11 5.041076
12 4.916761
13 5.033744
14 5.084872
... ...
105 5.113830
106 5.097340
107 5.013111
108 4.962582
109 5.143362

100 rows × 1 columns

This is a really simple case, but highlights that in some cases a wrapper function can be written to serve as a middle man between your function and batch, even doing work to sort or filter your data on a per function call basis if not done as a pre-filter step for your Ensemble.

Case 5: TAPE Analysis Functions#

TAPE analysis functions are a special case of input function to Ensemble.batch, where normally required information such as the specified column labels to pass to the function and the meta are passed along from the function to Ensemble.batch internally, meaning you just need to specify the function and any additional kwargs. For this case, let’s leverage the light-curve package, which implements the extraction of many light curve features used in astrophysics. Feature extraction from this package is also supported within TAPE as an analysis function.

[23]:
# Grab two features extraction methods from light-curve
from light_curve import Periodogram, OtsuSplit

In the below example, we apply the Lomb-Scargle Periodogram to our Ensemble light curves. Again, noting that in this case the meta we had to configure above is already handled by TAPE, and the needed timeseries columns are already passed along internally as well.

[24]:
# Find periods using Lomb-Scargle periodogram
periodogram = Periodogram(peaks=1, nyquist=0.1, max_freq_factor=10, fast=False)

# Use r band only
res_per = ens.batch(periodogram, band_to_calc="r")  # band_to_calc is a kwarg of Periodogram
res_per.compute()
Using generated label, result_10, for a batch result.
[24]:
period_0 period_s_to_n_0
id
10 0.210117 6.017269
11 2.254403 7.155673
12 0.205399 7.303439
13 0.677417 5.520282
14 8.170040 6.012586
... ... ...
105 0.225024 6.872417
106 0.696414 7.074310
107 0.471936 6.576487
108 0.276805 6.236394
109 0.365316 6.042835

100 rows × 2 columns

Next, we use the OtsuSplit function, used to perform automatic thresholding. In this case, we also supply the by_band kwarg to get a result per photometric band.

[25]:
res_otsu = ens.batch(OtsuSplit(), band_to_calc=None, by_band=True)
res_otsu.compute()
Using generated label, result_11, for a batch result.
[25]:
otsu_lower_to_all_ratio_g otsu_lower_to_all_ratio_r otsu_mean_diff_g otsu_mean_diff_r otsu_std_lower_g otsu_std_lower_r otsu_std_upper_g otsu_std_upper_r
id
10 0.478618 0.481308 4.945568 5.055986 1.496743 1.355024 1.391181 1.541987
11 0.496206 0.475465 4.957610 4.924894 1.435659 1.373389 1.421739 1.500613
12 0.487342 0.493528 4.935361 5.189914 1.338911 1.324070 1.490207 1.479580
13 0.513912 0.478873 5.089806 4.839217 1.436066 1.456344 1.436931 1.439613
14 0.439683 0.453226 4.827796 5.149632 1.406289 1.472390 1.541064 1.462766
... ... ... ... ... ... ... ... ...
105 0.512315 0.492980 4.883700 4.856093 1.482071 1.501231 1.493872 1.369657
106 0.482704 0.480456 5.046406 5.072484 1.451172 1.363104 1.443635 1.477108
107 0.484472 0.488449 4.988996 4.860512 1.456886 1.397071 1.470777 1.421263
108 0.532488 0.479806 5.191130 4.917644 1.546710 1.452074 1.465029 1.478669
109 0.480607 0.485540 5.092411 4.930022 1.426610 1.433209 1.320875 1.380337

100 rows × 8 columns

Using light-curve package features#

Ensemble.batch() also supports the use of light-curve package feature extractor:

[26]:
import light_curve as licu

extractor = licu.Extractor(licu.Amplitude(), licu.AndersonDarlingNormal(), licu.StetsonK())
# band_to_calc=None will ignore the band column and use all sources for each object
res = ens.batch(extractor, band_to_calc="g")
res.compute()
Using generated label, result_12, for a batch result.
[26]:
amplitude anderson_darling_normal stetson_K
id
10 4.973251 6.598161 0.999129
11 4.980065 7.228309 0.985758
12 4.985209 7.046509 0.999740
13 4.982797 7.737570 0.999787
14 4.982637 5.463445 0.994758
... ... ... ...
105 4.987969 5.364664 0.997364
106 4.990943 7.245717 0.997570
107 4.989018 7.244391 0.994809
108 4.996546 8.042666 0.999837
109 4.973045 9.282865 0.987262

100 rows × 3 columns