Using Dask on Ray with the Ensemble#
Ray is an open-source unified framework for scaling AI and Python applications. Ray provides a scheduler for Dask (dask_on_ray) which allows you to build data analyses using Dask’s collections and execute the underlying tasks on a Ray cluster. Ray can be used on TAPE using the setup shown in the following example.
[1]:
import ray
from ray.util.dask import enable_dask_on_ray, disable_dask_on_ray
from tape import Ensemble
from tape.analysis.structurefunction2 import calc_sf2
context = ray.init()
# Use the Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()
2024-08-09 19:33:00,958 INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2024-08-09 19:33:10,457 WARNING services.py:2017 -- WARNING: The object store is using /tmp instead of /dev/shm because /dev/shm has only 67108864 bytes available. This will harm performance! You may be able to free up space by deleting files in /dev/shm. If you are inside a Docker container, you can increase /dev/shm size by passing '--shm-size=1.89gb' to 'docker run' (or add it to the run_options list in a Ray cluster config). Make sure to set this to more than 30% of available RAM.
2024-08-09 19:33:11,508 INFO worker.py:1781 -- Started a local Ray instance.
/home/docs/checkouts/readthedocs.org/user_builds/tape/envs/latest/lib/python3.10/site-packages/dask/config.py:789: FutureWarning: Dask configuration key 'shuffle' has been deprecated; please use 'dataframe.shuffle.algorithm' instead
warnings.warn(
[1]:
<dask.config.set at 0x7f26dd506e60>
We import ray, and just need to invoke two commands. context = ray.init() starts a local ray cluster, and we can use this context object to retrieve the url of the ray dashboard, as shown below. enable_dask_on_ray() is a dask configuration function that sets up all Dask work to use the established Ray cluster.
[2]:
print(context.dashboard_url)
For TAPE, the only needed change is to specify client=False when initializing an Ensemble object. Because the Dask configuration has been set, the Ensemble will automatically use the established Ray cluster.
[3]:
ens = Ensemble(client=False) # Do not use a client
From here, we are free to work with TAPE as normal.
[4]:
ens.from_dataset("s82_qso", sorted=True)
ens.source = ens.source.repartition(npartitions=10)
ens.batch(
calc_sf2, use_map=False
) # use_map is false as we repartition naively, splitting per-object sources across partitions
Using generated label, result_1, for a batch result.
[4]:
| lc_id | band | dt | sf2 | 1_sigma | |
|---|---|---|---|---|---|
| npartitions=10 | |||||
| 70 | int64 | object | float64 | float64 | float64 |
| 406146 | ... | ... | ... | ... | ... |
| ... | ... | ... | ... | ... | ... |
| 5509561 | ... | ... | ... | ... | ... |
| 7913279 | ... | ... | ... | ... | ... |