|
English

This is the Ray Python tutorial we wished existed when we onboarded our first ML engineer onto Ray. It starts with a working five-line example, walks up to a real KubeRay cluster, and stops at the point where the docs become sufficient. No toy examples that fall apart at production scale.

You should leave with: a working Ray script, the right mental model for tasks versus actors, and a sized cluster to point at real work.

Install and warm-up

pip install "ray[default]==2.46.0"

The [default] extras pull in the dashboard and KubeRay client. The bare ray package is the C++ runtime only.

import ray
ray.init()

@ray.remote
def slow_add(a, b):
    import time; time.sleep(0.5)
    return a + b

futures = [slow_add.remote(i, i+1) for i in range(20)]
print(ray.get(futures))

Twenty 0.5-second tasks finish in under a second on a laptop because they run in parallel. You just used Ray.

For background on what Ray is and how it differs from Spark or Dask, see our Apache Ray distributed Python guide.

Tasks versus actors

Two primitives, two mental models.

Tasks are stateless. @ray.remote on a function. Each call is independent and can run anywhere. Use tasks for embarrassingly parallel work, map-reduce, ETL, hyperparameter trials.

Actors are stateful. @ray.remote on a class. Each instance is pinned to one worker; calls go to that instance. Use actors for model replicas, parameter servers, in-memory caches, anything where state matters.

@ray.remote
class Counter:
    def __init__(self):
        self.n = 0
    def incr(self):
        self.n += 1
        return self.n

c = Counter.remote()
print(ray.get([c.incr.remote() for _ in range(5)]))

The Counter actor returns 1, 2, 3, 4, 5 in order because actor calls are serialized per instance. That guarantee is the whole point.

The object store

Large objects do not move through your driver. When a task returns a 4 GB tensor, Ray stores it in a node-local Plasma store and gives you a reference. Other tasks read it via zero-copy shared memory.

import numpy as np

@ray.remote
def make_big():
    return np.zeros((1024, 1024, 1024), dtype=np.float32)  # 4 GB

@ray.remote
def consume(arr):
    return float(arr.sum())

ref = make_big.remote()
print(ray.get(consume.remote(ref)))

The 4 GB array stays in shared memory. Only the reference travels across the network. This is the single most important Ray performance behavior to internalize.

Ray Data: distributed dataframes done right

For batch processing of large datasets:

import ray

ds = ray.data.read_parquet("s3://bucket/big.parquet")
ds = ds.map(lambda row: {"score": row["x"] * 2 + row["y"]})
ds.write_parquet("s3://bucket/output.parquet")

Ray Data autoscales the operator pool, handles backpressure, and streams between stages. It is the right tool for distributed Python ETL when you do not need SQL semantics. For the tradeoffs versus Spark see our Ray vs Spark comparison.

A real distributed example

A worker that downloads URLs and extracts text, with concurrent network IO and bounded memory.

import ray
import requests
from bs4 import BeautifulSoup

ray.init()

@ray.remote
def fetch(url):
    try:
        r = requests.get(url, timeout=10)
        return {"url": url, "ok": True, "len": len(r.text)}
    except Exception as e:
        return {"url": url, "ok": False, "err": str(e)}

urls = open("urls.txt").read().splitlines()
results = ray.get([fetch.remote(u) for u in urls])
print(f"{sum(r['ok'] for r in results)} / {len(urls)} succeeded")

Submit 50,000 URLs and Ray dispatches them across the cluster. Add @ray.remote(num_cpus=0.25) to pack four fetch tasks per CPU since they are network-bound.

Resource requests

Every task or actor can declare resources:

@ray.remote(num_cpus=2, num_gpus=0.5, memory=4*1024**3)
def train_step(batch):
    ...

Ray's scheduler honors these. Fractional GPUs (num_gpus=0.5) are real — Ray will pack two replicas onto one A100. This is one of the few distributed compute systems where this works out of the box.

From laptop to cluster

Two lines change.

Local: ray.init(). Connects to a local cluster started in-process.

Cluster: ray.init(address="ray://head.example.com:10001"). Connects to a remote head node. Your script runs on the cluster, your driver stays local.

The standard production deployment is KubeRay on a Kubernetes cluster. The minimum viable RayCluster CRD:

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: dev
spec:
  rayVersion: "2.46.0"
  headGroupSpec:
    rayStartParams: {}
    template:
      spec:
        containers:
        - name: ray-head
          image: rayproject/ray:2.46.0
          resources:
            limits: { cpu: "4", memory: "8Gi" }
  workerGroupSpecs:
  - replicas: 2
    minReplicas: 0
    maxReplicas: 16
    groupName: cpu-workers
    rayStartParams: {}
    template:
      spec:
        containers:
        - name: ray-worker
          image: rayproject/ray:2.46.0
          resources:
            limits: { cpu: "8", memory: "16Gi" }

Apply with kubectl apply -f cluster.yaml. Port-forward the head node's client port and connect from your laptop.

Sizing recommendations

WorkMin clusterRecommended
Hyperparameter sweep, 200 trials4 nodes8 GPU nodes
Distributed PyTorch (BERT-base)2 GPU nodes8 GPU nodes
Distributed PyTorch (Llama 8B fine-tune)8 GPU nodes16 GPU nodes
Batch inference, 100M docs4 GPU nodes16 GPU nodes
Ray Serve LLM, 1000 RPS2 GPU replicas4-8 replicas
RL training (simple env)8 CPU nodes32 CPU nodes

For tuning specifically see our Ray Tune hyperparameter guide.

Operational rules we learned the hard way

Externalize the GCS to managed Redis.        |####################| critical
Always set object spilling to local NVMe.    |####################| critical
Pin Ray version per cluster, never auto.     |##################  | high
Set num_cpus and num_gpus on every task.     |#################   | high
Use Ray Dashboard plus Prometheus metrics.   |###############     | high
Run a permanent dev cluster, not ephemeral.  |##########          | medium
Externalize logs to S3 or CloudWatch.        |#########           | medium

Skipping the first two costs you a week of debugging the first time the head node restarts.

Where Ray fits with managed APIs

Ray decides where compute happens on hardware you control. Calling out to OpenAI or Anthropic from inside a Ray task is fine, but you should put a gateway in front of those calls — Swfte Connect handles retry, caching, and provider failover so a single misbehaving model does not break a 50,000-task batch. The Ray cluster handles compute orchestration; the gateway handles model orchestration.

Canonical references

For more concrete examples see our Ray example workloads post.

What to do this quarter

  1. Pip install Ray and run the five-line example from this post on a laptop. Total time: 10 minutes.
  2. Pick one Python script that uses multiprocessing or asyncio and port it to Ray. The first port takes a day; the second takes an hour.
  3. Stand up a small KubeRay cluster on three nodes, even if the cluster is mostly idle. Having a permanent Ray cluster removes the activation energy for using it.
  4. Externalize the GCS to a managed Redis. Skip this and your first head-node restart costs you all in-flight work.
  5. Add Prometheus scraping of Ray's /metrics endpoint. The dashboard is fine; long-term metrics need real time-series storage.
0
0
0
0

Enjoyed this article?

Get more insights on AI and enterprise automation delivered to your inbox.