Ray OOM Prevention: Best Practices

Background

When running distributed workloads with Ray, we’ve been intermittently hitting worker OOM issues.

Sometimes it’s minor — a few workers restart and end up corrupting Parquet outputs. Other times the entire job crashes. What makes this especially annoying is that it’s not deterministic. You rerun the same job and the OOM might not happen again. It’s very hard to reason about, and as a result we’ve been hesitant to scale jobs to larger sizes.

Fact Check

Worker Killed

There are two main cases: Ray proactively killing workers, and the system killing them underneath Ray. Reference

Raylet’s OOM monitor tracks total memory usage on a node, including worker heap, object store, and Raylet internal usage. Once it exceeds a threshold (95% by default), it will pick a worker and kill it:

[2025-12-05 08:50:23,174 E …] (raylet) node_manager.cc:3069:
1 Workers (tasks / actors) killed due to memory pressure (OOM),
0 Workers crashed due to other reasons at node (ID: …)

There is an important line in the doc:

The memory monitor avoids infinite loops of task retries by ensuring at least one task is able to run for each caller on each node. If it is unable to ensure this, the workload will fail with an OOM error.

This basically means if Ray believes no further progress can be made, it will fail immediately.

Another subtle point: Ray does not retry exceptions raised by application code. So even though it says it may ignore max_retries and retry remote tasks, in practice if the OOM surfaces as an application-level exception, setting max_retries=0 will still cause an immediate failure:

ray.exceptions.OutOfMemoryError: Task was killed due to the node running low on memory. Memory on the node (…) was 45.62GB / 48.00GB (0.950375), which exceeds the memory usage threshold of 0.95. Ray killed this worker because it was the most recently scheduled task.

When deciding which worker to kill, Ray follows a policy:

  • Prefer killing remote tasks over actors (actors are stateful and harder to recover)
  • Prefer killing tasks from the caller with the most running tasks (fairness)
  • Among those, kill the most recently started task (least wasted work)

This avoids infinite retry loops.

In theory retries should make this safe. In practice, it depends on the task. If you’re doing something like writing a Parquet file that cannot resume, killing the worker just means corrupted or lost data. So we still want to avoid getting into this situation.

There are two environment variables that control this behavior:

  • RAY_memory_usage_threshold (default 0.95)
  • RAY_memory_monitor_refresh_ms (default 250 ms)

If you set the refresh interval to 0, you effectively disable Ray’s memory monitor.

OS OOM (Kernel Kill)

If Ray’s monitor is disabled, or memory spikes too quickly for it to react, the kernel OOM killer will step in.

You’ll see logs like:

Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure.

or

A worker died or was killed while executing a task by an unexpected system error.

Possible causes: (1) SIGKILL by OOM killer due to high memory usage (2) ray stop –force (3) SIGSEGV or other unexpected errors

At this point Ray can’t really help. It just observes that the worker disappeared and may retry. Reference

Remote Memory Option

So how do we reduce the chances of hitting these cases?

Reference

We can use:

process.options(memory=N * 1024 * 1024 * 1024)

to hint how much memory each worker needs.

But this is often misunderstood.

First, this is not a hard limit. It’s just a scheduling hint. If a node has 22GB and each worker requests 4GB, Ray will schedule 5 workers and stop:

Warning: The following resource request cannot be scheduled right now:
{'memory': ..., 'CPU': ...}

Second, Ray does not enforce this at runtime. If a worker exceeds the declared memory, nothing happens immediately. If enough workers exceed their estimates, you still hit the global threshold, and then either Ray or the OS starts killing processes.

Mitigation

The most reliable approach we found is to assume the worst case based on the smallest node.

Instead of thinking “how many workers can I run on average”, think:

On the node with the least memory, how many workers can I safely run?

Let the minimum memory across nodes be $M_{\min}$, and each worker needs $C$. Then we conservatively run:

\(K = \lfloor M_{\min} / C \rfloor\) workers per node.

If the job runs on $N$ worker nodes, total workers = $N \cdot K$.

In our setup CPU is not the bottleneck — memory is. Each node has plenty of CPU, so we can back out num_cpus per worker from total CPU / total workers.

For the head node, follow Ray’s guidance and don’t schedule tasks on it: Reference

Also be careful with memory usage in the driver script. Especially ray.get. Even if tasks return small values, each result includes metadata and object references. Calling ray.get on a large list can put significant pressure on the head node, especially combined with GCS. Reference

Validation

All of the above is reasoning, so we validated it with a simple stress test.

We used a minimal task that aggressively consumes memory via mmap:

def consume_data(self, input_data):
    num_bytes = int(round(self._test_mem_size_gb * GiB, 0))
    mm = mmap.mmap(-1, num_bytes)
    mm.write(b"\0" * num_bytes)
    time.sleep(0.1)
    return True, {"size": mm.tell() // GiB}

We ran two jobs. In both cases actual usage was about 5GB per worker.

With memory set to 4GB, Ray scheduled too many workers and the job quickly OOMed.

With memory set to 8GB, giving some buffer, Ray scheduled fewer workers and the job ran successfully. We repeated this multiple times with max_retries=0 to ensure any OOM would immediately fail, and the behavior was consistent.

If we increased the workload, the same pattern held. Slight underestimation still caused OOM. Generous overestimation stabilized the system.

Example

At some point experiments aren’t enough, so we just scaled it up.

Using this approach, we were able to run large-scale CPU jobs that we previously couldn’t stabilize: • 600 nodes × 60 CPUs, processing ~6.33M data in 108 minutes • 750 nodes × 60 CPUs, processing ~4M data in 42 minutes

There was one Raylet kill in the second run, but retry recovered it successfully.

Overall, the key takeaway is that Ray’s memory controls are not strict enforcement. Stability comes from conservative planning, especially based on the weakest node, and leaving enough buffer so neither Ray nor the OS is forced into reactive killing.




Enjoy Reading This Article?

Here are some more articles you might like to read next:

  • LLM Study Notes III: Post-Training
  • Trajectory Basics VI: Adaptive Tracking II
  • Trajectory Basics I: Foundations