This is the Ray Python tutorial we wished existed when we onboarded our first ML engineer onto Ray. It starts with a working five-line example, walks up to a real KubeRay cluster, and stops at the point where the docs become sufficient. No toy examples that fall apart at production scale.
You should leave with: a working Ray script, the right mental model for tasks versus actors, and a sized cluster to point at real work.
Install and warm-up
pip install "ray[default]==2.46.0"
The [default] extras pull in the dashboard and KubeRay client. The bare ray package is the C++ runtime only.
import ray
ray.init()
@ray.remote
def slow_add(a, b):
import time; time.sleep(0.5)
return a + b
futures = [slow_add.remote(i, i+1) for i in range(20)]
print(ray.get(futures))
Twenty 0.5-second tasks finish in under a second on a laptop because they run in parallel. You just used Ray.
For background on what Ray is and how it differs from Spark or Dask, see our Apache Ray distributed Python guide.
Tasks versus actors
Two primitives, two mental models.
Tasks are stateless. @ray.remote on a function. Each call is independent and can run anywhere. Use tasks for embarrassingly parallel work, map-reduce, ETL, hyperparameter trials.
Actors are stateful. @ray.remote on a class. Each instance is pinned to one worker; calls go to that instance. Use actors for model replicas, parameter servers, in-memory caches, anything where state matters.
@ray.remote
class Counter:
def __init__(self):
self.n = 0
def incr(self):
self.n += 1
return self.n
c = Counter.remote()
print(ray.get([c.incr.remote() for _ in range(5)]))
The Counter actor returns 1, 2, 3, 4, 5 in order because actor calls are serialized per instance. That guarantee is the whole point.
The object store
Large objects do not move through your driver. When a task returns a 4 GB tensor, Ray stores it in a node-local Plasma store and gives you a reference. Other tasks read it via zero-copy shared memory.
import numpy as np
@ray.remote
def make_big():
return np.zeros((1024, 1024, 1024), dtype=np.float32) # 4 GB
@ray.remote
def consume(arr):
return float(arr.sum())
ref = make_big.remote()
print(ray.get(consume.remote(ref)))
The 4 GB array stays in shared memory. Only the reference travels across the network. This is the single most important Ray performance behavior to internalize.
Ray Data: distributed dataframes done right
For batch processing of large datasets:
import ray
ds = ray.data.read_parquet("s3://bucket/big.parquet")
ds = ds.map(lambda row: {"score": row["x"] * 2 + row["y"]})
ds.write_parquet("s3://bucket/output.parquet")
Ray Data autoscales the operator pool, handles backpressure, and streams between stages. It is the right tool for distributed Python ETL when you do not need SQL semantics. For the tradeoffs versus Spark see our Ray vs Spark comparison.
A real distributed example
A worker that downloads URLs and extracts text, with concurrent network IO and bounded memory.
import ray
import requests
from bs4 import BeautifulSoup
ray.init()
@ray.remote
def fetch(url):
try:
r = requests.get(url, timeout=10)
return {"url": url, "ok": True, "len": len(r.text)}
except Exception as e:
return {"url": url, "ok": False, "err": str(e)}
urls = open("urls.txt").read().splitlines()
results = ray.get([fetch.remote(u) for u in urls])
print(f"{sum(r['ok'] for r in results)} / {len(urls)} succeeded")
Submit 50,000 URLs and Ray dispatches them across the cluster. Add @ray.remote(num_cpus=0.25) to pack four fetch tasks per CPU since they are network-bound.
Resource requests
Every task or actor can declare resources:
@ray.remote(num_cpus=2, num_gpus=0.5, memory=4*1024**3)
def train_step(batch):
...
Ray's scheduler honors these. Fractional GPUs (num_gpus=0.5) are real — Ray will pack two replicas onto one A100. This is one of the few distributed compute systems where this works out of the box.
From laptop to cluster
Two lines change.
Local: ray.init(). Connects to a local cluster started in-process.
Cluster: ray.init(address="ray://head.example.com:10001"). Connects to a remote head node. Your script runs on the cluster, your driver stays local.
The standard production deployment is KubeRay on a Kubernetes cluster. The minimum viable RayCluster CRD:
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: dev
spec:
rayVersion: "2.46.0"
headGroupSpec:
rayStartParams: {}
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.46.0
resources:
limits: { cpu: "4", memory: "8Gi" }
workerGroupSpecs:
- replicas: 2
minReplicas: 0
maxReplicas: 16
groupName: cpu-workers
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.46.0
resources:
limits: { cpu: "8", memory: "16Gi" }
Apply with kubectl apply -f cluster.yaml. Port-forward the head node's client port and connect from your laptop.
Sizing recommendations
| Work | Min cluster | Recommended |
|---|---|---|
| Hyperparameter sweep, 200 trials | 4 nodes | 8 GPU nodes |
| Distributed PyTorch (BERT-base) | 2 GPU nodes | 8 GPU nodes |
| Distributed PyTorch (Llama 8B fine-tune) | 8 GPU nodes | 16 GPU nodes |
| Batch inference, 100M docs | 4 GPU nodes | 16 GPU nodes |
| Ray Serve LLM, 1000 RPS | 2 GPU replicas | 4-8 replicas |
| RL training (simple env) | 8 CPU nodes | 32 CPU nodes |
For tuning specifically see our Ray Tune hyperparameter guide.
Operational rules we learned the hard way
Externalize the GCS to managed Redis. |####################| critical
Always set object spilling to local NVMe. |####################| critical
Pin Ray version per cluster, never auto. |################## | high
Set num_cpus and num_gpus on every task. |################# | high
Use Ray Dashboard plus Prometheus metrics. |############### | high
Run a permanent dev cluster, not ephemeral. |########## | medium
Externalize logs to S3 or CloudWatch. |######### | medium
Skipping the first two costs you a week of debugging the first time the head node restarts.
Where Ray fits with managed APIs
Ray decides where compute happens on hardware you control. Calling out to OpenAI or Anthropic from inside a Ray task is fine, but you should put a gateway in front of those calls — Swfte Connect handles retry, caching, and provider failover so a single misbehaving model does not break a 50,000-task batch. The Ray cluster handles compute orchestration; the gateway handles model orchestration.
Canonical references
- Official tutorial: docs.ray.io/en/latest/ray-overview/getting-started.html.
- KubeRay: github.com/ray-project/kuberay.
- The original Ray paper: arxiv.org/abs/1712.05889.
For more concrete examples see our Ray example workloads post.
What to do this quarter
- Pip install Ray and run the five-line example from this post on a laptop. Total time: 10 minutes.
- Pick one Python script that uses multiprocessing or asyncio and port it to Ray. The first port takes a day; the second takes an hour.
- Stand up a small KubeRay cluster on three nodes, even if the cluster is mostly idle. Having a permanent Ray cluster removes the activation energy for using it.
- Externalize the GCS to a managed Redis. Skip this and your first head-node restart costs you all in-flight work.
- Add Prometheus scraping of Ray's
/metricsendpoint. The dashboard is fine; long-term metrics need real time-series storage.