Fast Data Processing w Ray

When I was working on a framework to process a massive amount of data in a producer–consumer, or map–reduce style, I quickly realized the most important challenge was not writing the computation itself, but how to assign work across all workers. The right assignment can keep the system fast and memory-efficient; the wrong one can create stragglers or even out-of-memory errors.

This article walks through the evolution of workload assignment strategies I experimented with in Ray. Starting from the most straightforward static division, I gradually moved toward more dynamic schemes that better handle imbalance and tail effects. Along the way we will see some code sketches and discuss the trade-offs.

Static Assignment

The first attempt was naturally static assignment: divide the dataset into disjoint partitions, each worker gets one partition, and process it. To prevent any single worker from holding too much data in memory, I also added a limit on how many tasks can be in flight. A simplified version looks like this:

num_rows_to_process = dataset_len // num_workers + (
    0 if dataset_len % num_workers == 0 else 1
)

start = worker_id * num_rows_to_process
end = dataset_len if worker_id == num_workers - 1 else (start + num_rows_to_process)

# Control in-flight tasks to avoid OOM
start_indices = np.arange(start, end, max_inflight_data_tasks)
end_indices = np.minimum(start_indices + max_inflight_data_tasks, end)

prepare_list = list(zip(start_indices, end_indices))

This ensures each worker has roughly the same amount of work, and memory usage is bounded. For small workloads, this might be sufficient. But when the workload grows massive, differences in worker speed, network latency, or data skew become significant. The slowest worker dominates the total runtime, and static assignment does not adapt.

Dynamic Assignment with Decaying Chunk Size

The next step was to make the assignment dynamic: instead of giving each worker a fixed slice, let workers ask for work ranges as they go. The manager keeps track of what’s left, and allocates a chunk on request.

A simple heuristic is to reduce the chunk size as the workload gets closer to the end, so the tail is smoother:

class WorkManager:
    def __init__(self, workload: int, num_workers: int):
        self.next_start = 0
        self.chunk_size = max(1, workload // num_workers)
        self.workload = workload

    def get_next_range(self) -> Optional[Tuple[int, int]]:
        if self.next_start >= self.workload:
            return None

        remaining = (self.workload - self.next_start) / float(self.workload)
        chunk_size = max(1, math.floor(self.chunk_size * remaining))

        start = self.next_start
        end = min(self.next_start + chunk_size, self.workload)
        self.next_start = end
        return start, end

The idea looks nice, but in practice this creates imbalance: earlier workers receive large chunks, while later ones get much smaller ones. This unevenness does not eliminate the tail problem; instead, it sometimes makes it worse.

Dynamic Assignment with Granular Splitting

To fix that, I added another parameter: a split factor. Instead of dividing the workload by just the number of workers, divide it further into smaller chunks. Workers still request chunks dynamically, but the granularity is finer:

class WorkManager:
    def __init__(self, workload: int, num_workers: int, split_factor: int = 1):
        self.next_start = 0
        self.chunk_size = max(1, workload // (num_workers * split_factor))
        self.workload = workload

    def get_next_range(self) -> Optional[Tuple[int, int]]:
        if self.next_start >= self.workload:
            return None

        start = self.next_start
        end = min(self.next_start + self.chunk_size, self.workload)
        self.next_start = end
        return start, end

This avoids the worst imbalance, because no worker is locked into a huge chunk upfront. Instead, fast workers can request more chunks and help catch up with the slow ones. But the chunk size is still fixed throughout the process, and we can do better.

Dynamic Assignment with Adaptive Chunk Size

Finally, I arrived at an elastic strategy: dynamic workers with dynamic chunk sizes. The idea is to use larger chunks at the beginning for throughput, then reduce the chunk size near the end to smooth out the tail. This combines the best of both worlds—efficiency in the bulk of the work, fairness in the final stage.

Here is a Ray-actor version:

@ray.remote(num_cpus=0.01)
class WorkManager:
    def __init__(
        self,
        workload: int,
        num_workers: int,
        split_factor: int = 1,
        min_chunk_size: int = 10,
        max_chunk_size: int = 2000,
        tail_percentage: float = 0.2,
    ):
        self.next_start = 0
        self.split_factor = split_factor
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
        self.workload = workload
        self.tail = tail_percentage
        self.num_workers = num_workers

    def get_next_range(self, worker_id: int) -> Optional[Tuple[int, int]]:
        if self.next_start >= self.workload:
            return None

        remaining = self.workload - self.next_start
        estimated_chunks = self.num_workers * self.split_factor

        if remaining < self.tail * self.workload:
            # Tail: smaller chunks to avoid stragglers
            chunk_size = max(self.min_chunk_size, remaining // self.num_workers)
        else:
            # Main phase: larger chunks for throughput
            chunk_size = max(
                self.min_chunk_size,
                min(self.max_chunk_size, self.workload // estimated_chunks),
            )

        start = self.next_start
        end = min(start + chunk_size, self.workload)
        self.next_start = end
        return start, end

This way, fast workers keep the system moving, while the manager automatically shrinks the chunk size near the tail to avoid idle time. It is flexible, memory-safe, and reduces straggler effects.

Balancing workload in Ray is not a trivial problem. A static assignment is simple but brittle; a dynamic one with fixed chunk size improves utilization but can still leave a tail. The most effective approach I found is to make the chunk size adaptive—large in the bulk, small near the end—so that workers are both busy and balanced.

The progression from static to adaptive assignment mirrors a common theme in distributed systems: efficiency requires elasticity. We cannot perfectly predict how each worker will perform, but we can design the system to adapt as the computation unfolds. And that, more than any single heuristic, is what keeps large-scale data processing fast and stable in practice.




Enjoy Reading This Article?

Here are some more articles you might like to read next:

  • LLM Study Notes III: Post-Training
  • Trajectory Basics VI: Adaptive Tracking II
  • Trajectory Basics I: Foundations