Claude
Skills
Sign in
Back

cupynumeric-parallel-data-load

Included with Lifetime
$97 forever

Load a sharded, on-disk dataset (sharded .npy, Parquet/Arrow, raw binary, sharded HDF5, custom layouts) into a distributed cuPyNumeric ndarray via a manual partition + leaf @task launch with CPU/OMP/GPU variants. Use when no single-call loader fits, including when per-shard row counts differ across files. Prefer cupynumeric.load or legate.io.hdf5.from_file when they apply.

Productivityassets

What this skill does


# Parallel sharded data -> cupynumeric load

**Why this skill exists.** cupynumeric mirrors NumPy's array API,
including `cupynumeric.load` for a single `.npy` file. Beyond that,
file *loading* lives in Legate, not cupynumeric:

| Format | Built-in loader |
|---|---|
| Single `.npy` | `cupynumeric.load(path)` (NumPy-API parity) |
| HDF5 (single file) | `legate.io.hdf5.from_file` / `from_file_batched` |
| Sharded multi-file (any format), Parquet/Arrow, raw binary, custom layouts | **No built-in loader — this skill.** |

This skill shows the canonical way to fill the gap in the last row:
write a Legate Python task that calls the third-party reader the
format needs (`h5py`, `pyarrow`, `np.memmap`, ...) inside the
task body, and let Legate distribute the reads across GPUs / nodes.
For the formats with a built-in loader, prefer it unless you need a
custom in-task body (mmap-based loader, format-specific decoder,
sidecar metadata, partial / sharded reads).

Canonical pattern: **manual partition + manual task launch, sized to
the machine, not the files.** Only axis 0 is sharded; trailing axes
ride along inside each tile. Per-shard row counts may differ across
files (only `dtype` and trailing axes must match); the launch fills
every available processor regardless of how many files there are.

`.npy` is the worked example because the header carries shape and
dtype on disk, but the skeleton applies to any format with cheap
range/slice reads (raw binary, HDF5, Parquet/Arrow — see "Other
formats" below). Reference implementation:
[`assets/examples/parallel_npy_load.py`](assets/examples/parallel_npy_load.py).

## Data layout assumption

This skill is purely about **loading** — it assumes the data is already
laid out on a shared filesystem in some predictable, indexable way.
Producing those files is out of scope (the example ships a `write`
subcommand for convenience, but real users bring their own).

The worked example assumes one specific layout:

- A directory containing files named `shard_0000.npy`, `shard_0001.npy`,
  ... in a contiguous integer sequence (zero-padded width 4).
- All shards share the same `dtype` and the same trailing axes
  (`shape[1:]`); **axis 0 (rows per shard) may differ across files** —
  the recipe builds a cumulative row-offset table and reads each
  file's overlapping slice from inside the leaf task.
- The directory is visible to every rank (shared filesystem for
  multi-node runs).

The example's `discover_layout()` prints what it found and hard-fails
with a descriptive error when the layout is wrong (missing directory,
no shards, mismatched `dtype` / trailing axes, or a hole in the
contiguous `shard_NNNN.npy` sequence).

If your data lives in a different layout — fixed-stride raw binary, an
HDF5 file with one dataset per shard, a directory tree, ... — only the
glob pattern, the per-file reader (step 4 below), and the metadata
discovery (step 1 below) change. The partitioning and launch machinery
is layout-agnostic.

## When to use

See the format table above for the routing decision (built-in loader
vs. this skill). Beyond that, two additional cues that this skill is
the right fit:

- Replacing sequential `np.concatenate([read(f) for f in files])` with
  parallel per-GPU reads.
- Demonstrating how a user-defined Legate Python task writes into a
  cupynumeric output array via a manual launch.

## Examples

Paths below are written relative to this skill's directory (the script
ships at `assets/examples/parallel_npy_load.py`). Adjust the prefix to
match wherever your skill is installed (e.g.
`skills/cupynumeric-parallel-data-load/assets/...` if the skill lives
under a top-level `skills/` directory).

```bash
# Single-node, 4 GPUs.
legate --gpus 4 --fbmem 4000 --min-gpu-chunk 1 \
    assets/examples/parallel_npy_load.py \
    read --shard-dir /shared/scratch/demo
```

```bash
# Multi-node, 2 nodes x 4 GPUs (slurm), shared filesystem at --shard-dir.
# Generate the shards once on rank 0, then re-run `read` at any scale.
legate --launcher srun --nodes 2 --cpus 1 \
    assets/examples/parallel_npy_load.py \
    write --shard-dir /shared/scratch/demo

legate --launcher srun --nodes 2 --ranks-per-node 4 \
    --gpus 4 --fbmem 4000 --min-gpu-chunk 1 \
    assets/examples/parallel_npy_load.py \
    read --shard-dir /shared/scratch/demo
```

No layout flags — the read driver walks every `.npy` header to recover
per-file row counts, the trailing shape, and the dtype, then derives
`tile_rows` from the available processor count.

`--min-gpu-chunk 1` is only needed when the per-tile element count is
below Legate's default minimum chunk size for GPU launches (e.g. the
worked example's defaults — total rows split across 4 GPUs at
`~1M` per tile — fall below the threshold and would otherwise be
folded onto a single GPU). For production-sized datasets (tens of
millions of elements per tile or larger) you can drop the flag and
let Legate use its default. Bumping it to a moderate value (e.g.
`--min-gpu-chunk 1024`) is fine when each tile is large enough that
per-task overhead matters more than getting *every* GPU a tile.

## Instructions

Five steps from a `.npy` worked example; only step 1 (parsing the
format header) and step 4 (the per-file reader inside the task body)
are format-specific. The other three (allocate destination, partition,
fence) are reused unchanged across formats — see "Other formats" below
for the swap-points.

### 1. Read the metadata from every shard

Scan the directory and peek at every `.npy` header (`mmap_mode="r"`
reads only the header). The header carries the per-shard shape and
dtype, so the driver can recover total rows, trailing shape, and a
cumulative row-offset table without ever loading the data:

```python
paths = sorted(SHARD_DIR.glob("shard_*.npy"))

per_file_rows = []                       # rows along axis 0 per file
trailing_shape = None                    # shape[1:], must match across files
dtype = None
for p in paths:
    hdr = np.load(p, mmap_mode="r")
    if trailing_shape is None:
        trailing_shape = tuple(hdr.shape[1:])
        dtype = hdr.dtype
    elif tuple(hdr.shape[1:]) != trailing_shape or hdr.dtype != dtype:
        raise RuntimeError(
            f"{p.name}: trailing shape / dtype mismatch "
            f"({hdr.shape[1:]}/{hdr.dtype} vs {trailing_shape}/{dtype})"
        )
    per_file_rows.append(int(hdr.shape[0]))

cum_rows = np.cumsum([0] + per_file_rows, dtype=np.int64)  # length N+1
total_rows = int(cum_rows[-1])
```

The snippet above enforces matching `dtype` and `trailing_shape` (i.e.
`shape[1:]`) across files. **Per-shard row counts may differ** — the
cum-rows table handles that. Production code should also verify that
names form a contiguous `shard_0000.npy ... shard_NNNN.npy` sequence
(omitted from the snippet for brevity; see `discover_layout()` in the
worked example). Discovery relies only on what the
on-disk format itself exposes (the `.npy` header here, `.shape` /
`.dtype` for HDF5, etc.); any sidecar (manifest, content hashes) is a
separate verification step on top.

### 2. Create the cupynumeric output store from the metadata

The total array spans `total_rows` along axis 0; trailing axes come
from `trailing_shape` unchanged. Use `cn.empty` — the task overwrites
every cell, zero-init would be wasted.

```python
import cupynumeric as cn

total_shape = (total_rows,) + trailing_shape
out = cn.empty(total_shape, dtype=dtype)
```

### 3. Tile the store by processor count

The launch shape is sized to the **available processors**, not to the
file count. Pick `tile_rows = ceil(total_rows / num_processors)` and
partition axis 0 by that tile size. Trailing axes are not partitioned
(tile spans the full extent there). The last tile is allowed to be
short — that's exactly what `partition_by_tiling` supports — so the
recipe needs no divisibility constraint.

```python
from legate.core import TaskTarget, get_legate_runtime
from legate.core.data_interface import as_logical_array

runtime = g

Related in Productivity