Using Dask on Ray with the Ensemble

Using Dask on Ray with the Ensemble#

Ray is an open-source unified framework for scaling AI and Python applications. Ray provides a scheduler for Dask (dask_on_ray) which allows you to build data analyses using Dask’s collections and execute the underlying tasks on a Ray cluster. Ray can be used on TAPE using the setup shown in the following example.

[1]:
import ray
from ray.util.dask import enable_dask_on_ray, disable_dask_on_ray
from tape import Ensemble
from tape.analysis.structurefunction2 import calc_sf2

context = ray.init()

# Use the Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()
2024-04-30 17:42:24,342 INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2024-04-30 17:42:34,016 WARNING services.py:1996 -- WARNING: The object store is using /tmp instead of /dev/shm because /dev/shm has only 67108864 bytes available. This will harm performance! You may be able to free up space by deleting files in /dev/shm. If you are inside a Docker container, you can increase /dev/shm size by passing '--shm-size=1.85gb' to 'docker run' (or add it to the run_options list in a Ray cluster config). Make sure to set this to more than 30% of available RAM.
2024-04-30 17:42:35,073 INFO worker.py:1749 -- Started a local Ray instance.
/home/docs/checkouts/readthedocs.org/user_builds/tape/envs/stable/lib/python3.10/site-packages/dask/config.py:789: FutureWarning: Dask configuration key 'shuffle' has been deprecated; please use 'dataframe.shuffle.algorithm' instead
  warnings.warn(
[1]:
<dask.config.set at 0x7f059e4e9870>

We import ray, and just need to invoke two commands. context = ray.init() starts a local ray cluster, and we can use this context object to retrieve the url of the ray dashboard, as shown below. enable_dask_on_ray() is a dask configuration function that sets up all Dask work to use the established Ray cluster.

[2]:
print(context.dashboard_url)

For TAPE, the only needed change is to specify client=False when initializing an Ensemble object. Because the Dask configuration has been set, the Ensemble will automatically use the established Ray cluster.

[3]:
ens = Ensemble(client=False)  # Do not use a client

From here, we are free to work with TAPE as normal.

[4]:
ens.from_dataset("s82_qso", sorted=True)
ens.source = ens.source.repartition(npartitions=10)
ens.batch(
    calc_sf2, use_map=False
)  # use_map is false as we repartition naively, splitting per-object sources across partitions
Using generated label, result_1, for a batch result.
[4]:
Dask DataFrame Structure:
lc_id band dt sf2 1_sigma
npartitions=10
70 int64 object float64 float64 float64
349067 ... ... ... ... ...
... ... ... ... ... ...
5414392 ... ... ... ... ...
7913279 ... ... ... ... ...
Dask Name: groupbyapply, 10 expressions