dask-optimization
Advanced sub-skill for Dask focused on distributed system performance, memory management, and task graph optimization. Covers cluster tuning, efficient serialization, data skew mitigation, and dashboard-driven debugging.
What this skill does
# Dask - Advanced Optimization & Cluster Tuning
Parallel computing is not "free". In a distributed environment, the cost of moving data (network I/O) and scheduling tasks can often exceed the computation time. This guide focuses on minimizing overhead and maximizing throughput.
## When to Use
- Your Dask jobs are failing with "KilledWorker" or "OutOfMemory" errors.
- The Dask Dashboard shows a lot of "red" (communication) or "gray" (idle) time.
- You need to process datasets that are 10x-100x larger than the total RAM of your cluster.
- You are building custom distributed algorithms using `dask.delayed` or Futures.
- You need to optimize resource allocation (CPU vs. Threads) for specific workloads.
## Reference Documentation
- **Best Practices**: https://docs.dask.org/en/latest/best-practices.html
- **Distributed Diagnostics**: https://distributed.dask.org/en/latest/diagnosing-performance.html
- **Memory Management**: https://distributed.dask.org/en/latest/worker.html#memory-management
- **Search patterns**: `client.scatter`, `dask.compute(optimize_graph=True)`, `repartition`, `client.restart`
## Core Principles
### 1. Communication is the Killer
The fastest distributed task is the one that doesn't need data from another machine. Aim for data locality.
### 2. The Goldilocks Chunk Size
- **Too small**: Scheduler is overwhelmed by millions of tiny tasks (Task Overhead).
- **Too large**: Tasks don't fit in memory, causing disk spilling or worker crashes.
- **Target**: 100MB - 300MB per chunk for most numeric data.
### 3. Computation vs. Serialization
Every object sent to a worker must be serialized (pickled). Large Python objects (like complex dicts) passed as arguments can slow down the cluster significantly.
## Quick Reference: Performance Profiling
```python
from dask.distributed import Client, performance_report
client = Client("tcp://scheduler-address:8786")
# Generate a detailed HTML report of the computation
with performance_report(filename="dask-report.html"):
result = big_computation.compute()
# Tip: Check the "Task Stream" for gaps. Gaps mean workers are idle
# waiting for the scheduler or network.
```
## Critical Rules
### ✅ DO
- **Use client.scatter** - If multiple tasks need the same large piece of data, send it to workers once.
- **Prefer map_partitions** - In DataFrames, this allows you to run a single optimized pandas operation per chunk instead of row-wise logic.
- **Use persist() for branching** - If you use the same intermediate result in two different computations, `persist()` it in memory to avoid re-calculating the entire graph twice.
- **Profile with the Dashboard** - Watch the "Memory" and "Task Stream" tabs in real-time.
- **Match Threads to Workload** - Use many threads for I/O bound tasks (web scraping, reading files) and 1 thread per worker for CPU-bound tasks (NumPy math, ML training) to avoid Global Interpreter Lock (GIL) issues.
### ❌ DON'T
- **Don't use compute() in loops** - This pulls data to the local machine and destroys parallel efficiency.
- **Don't pass Large Data as Arguments** - Instead of `delayed(func)(large_df)`, use `large_future = client.scatter(large_df)` then `delayed(func)(large_future)`.
- **Don't ignore Data Skew** - If one worker has 10GB of data and others have 100MB, the cluster is only as fast as the slowest worker. Use `repartition` or `rechunk`.
- **Don't use list(dask_collection)** - This forces an immediate compute of all elements into local memory.
## Data Locality & Communication
### Using scatter and Futures
```python
# ❌ BAD: Large object sent to every task (High Overhead)
large_lookup = load_heavy_dict()
results = [delayed(process)(x, large_lookup) for x in data]
# ✅ GOOD: Scatter once, use reference
[large_future] = client.scatter([large_lookup], broadcast=True)
results = [delayed(process)(x, large_future) for x in data]
```
## Memory Management Tuning
### Fighting the "KilledWorker"
Worker memory has thresholds:
- **Target (0.6)**: Dask tries to stay below this.
- **Spill (0.7)**: Dask starts moving data to disk.
- **Pause (0.8)**: Worker stops accepting new tasks.
- **Terminate (0.95)**: OS or Dask kills the worker.
```python
# Adjusting worker limits via config (distributed.yaml or code)
import dask
dask.config.set({"distributed.worker.memory.target": 0.45})
dask.config.set({"distributed.worker.memory.spill": 0.55})
```
## Task Graph Optimization
### Fusing Operations
Dask automatically "fuses" many operations into one task to reduce scheduler overhead.
```python
# Multiple operations on a DataArray
da = da + 1
da = da * 2
da = da.sum()
# When computing, Dask optimizes the graph
# You can inspect it:
da.visualize(filename='graph.pdf', optimize_graph=True)
```
## Practical Workflows
### 1. Optimizing Large Joins (Shuffling)
Joins are expensive because they require moving data between workers (shuffling).
```python
def optimized_join(left_ddf, right_ddf):
# 1. Ensure right side is small enough to be broadcasted
# or ensure both are partitioned by the join key.
# If one DF is small (e.g. < 100MB)
# result = left_ddf.map_partitions(lambda df: df.merge(small_df_local, on='key'))
# 2. If both are large, set the index first (triggers a shuffle once)
left_ddf = left_ddf.set_index('key')
right_ddf = right_ddf.set_index('key')
# 3. Subsequent joins will be "locally aligned" (Zero communication)
return left_ddf.merge(right_ddf, left_index=True, right_index=True)
```
### 2. High-Throughput I/O with Parquet
```python
def fast_save(ddf, path):
# 1. Categorical columns save massive space
ddf = ddf.categorize()
# 2. Write with efficient compression
# 'snappy' is usually the best balance for speed
ddf.to_parquet(path, engine='pyarrow', compression='snappy',
write_metadata_file=True)
```
### 3. Managing a Long-running Cluster
```python
# Prevent a "dirty" cluster from slowing down
def clean_cluster_state():
client.cancel(list(client.futures)) # Clear all references
client.restart() # Hard reset all workers
import gc
gc.collect() # Local cleanup
```
## Advanced Configuration
### Resource Tagging
Tell Dask to run specific tasks only on specific workers (e.g., those with a GPU).
```python
# Start worker with: dask-worker ... --resources "GPU=1"
# Submit task requesting resource
future = client.submit(my_gpu_function, data, resources={'GPU': 1})
```
## Common Pitfalls and Solutions
### The "Zombie Worker" (serialization error)
If your task requires a library that isn't installed on the workers, the task will fail repeatedly.
```python
# ✅ Solution: Use pip_install or conda_install via client
from dask.distributed import PipInstall
client.register_plugin(PipInstall(packages=["scikit-learn"]))
```
### Unmanaged Memory
Python's garbage collector isn't immediate. Sometimes workers appear full because "Unmanaged Memory" hasn't been freed.
```python
# ✅ Solution: Manually trigger GC on workers
def worker_gc():
import gc
return gc.collect()
client.run(worker_gc)
```
### Too Many Partitions
If you have 10,000 partitions for a 1GB dataset, you'll spend more time scheduling than calculating.
```python
# ✅ Solution: Repartition to fewer pieces
ddf = ddf.repartition(npartitions=20)
```
Dask Optimization is the art of balancing resources. By understanding the flow of data through the network and the mechanics of worker memory, you can scale Python logic to planetary-scale datasets with industrial reliability.
Related in Productivity
gitea-workflow
IncludedOrchestrate agile development workflows for Gitea repositories using the tea CLI. Use when working with Gitea-hosted repos and asking to 'run the workflow', 'continue working', 'what's next', 'complete the task cycle', 'start my day', 'end the sprint', 'implement the next task', or wanting guided step-by-step development assistance. Keywords: workflow, orchestrate, agile, task cycle, sprint, daily, implement, review, PR, standup, retrospective, gitea, tea.
microsoft-graph-gateway
IncludedRoute Microsoft Graph work in this workspace. Use when users want to read or write Outlook mail, calendar events, contacts, OneDrive or SharePoint files, Teams, Planner, To Do, users, groups, directory data, or arbitrary Microsoft Graph endpoints from VS Code. Prefer WorkIQ for common read scenarios. Use Microsoft Graph for write actions and gap-read scenarios that need exact Graph properties, filters, permissions, or endpoints.
copilotkit
IncludedUse when building with CopilotKit — setup, development, integrations, debugging, upgrading, or contributing. Routes to the appropriate specialized skill based on the task.
wordly-wisdom
IncludedProvides calibrated decision analysis using Charlie Munger-style multiple mental models, inversion, incentive mapping, circle-of-competence checks, misjudgment audits, second-order effects, and forecast updates. Use when the user asks for an oracle take, a hard call, a decision memo, a premortem, an outside view, a red-team, a sanity-check, what am I missing, think this through, or wants a strategy, hire, investment, plan, product, partnership, or major life choice analysed. Avoid for simple factual lookups or time-sensitive legal, medical, or market questions without fresh evidence.
swain-session
IncludedSession management and project status dashboard. Owns the full session lifecycle (start/work/close/resume), focus lane, bookmarks, worktree detection, and tab naming. Also serves as the project status dashboard — shows active epics, progress, actionable next steps, blocked items, tasks, GitHub issues, and recommendations. Worktree creation is deferred to swain-do task dispatch (SPEC-195). Triggers on: 'session', 'status', 'what's next', 'dashboard', 'overview', 'where are we', 'what should I work on', 'show me priorities', 'bookmark', 'focus on', 'session info'.
gandi
IncludedComprehensive Gandi domain registrar integration for domain and DNS management. Register and manage domains, create/update/delete DNS records (A, AAAA, CNAME, MX, TXT, SRV, and more), configure email forwarding and aliases, check SSL certificate status, create DNS snapshots for safe rollback, bulk update zone files, and monitor domain expiration. Supports multi-domain management, zone file import/export, and automated DNS backups. Includes both read-only and destructive operations with safety controls.