Skip to main content
When Geneva runs in distributed mode, jobs are deployed against a Kubernetes KubeRay instance that dynamically provisions a Ray cluster. Job execution time depends on sufficient CPU/GPU resources for computation and sufficient write bandwidth to store the output values. Tuning the performance of a job boils down to configuring the table or cluster resources.

Scaling computation resources

Geneva jobs can split and schedule computational work into smaller batches that are assigned to tasks which are distributed across the cluster. As each task completes, it writes its output into a checkpoint file. If a job is interrupted or run again, Geneva will look to see if a checkpoint for the computation is already present and if not will kick off computations. Usually computation capacity is the bottleneck for job execution. To complete all of a job’s tasks more quickly, you just need to increase the amount of CPU/GPU resources available.

Estimating CPU and memory requirements

CPU

Geneva can run up to concurrency tasks in parallel. Within each task, it can run up to intra_applier_concurrency UDF applications concurrently. If your UDF requests udf.num_cpus CPUs, the peak CPU required is approximately: total_cpus ≈ concurrency * intra_applier_concurrency * udf.num_cpus concurrency and intra_applier_concurrency are parameters on Table.backfill(...). In most cases you do not need to change udf.num_cpus, unless the UDF itself uses multiple threads. If your UDF is single-threaded but you want to speed it up with more parallelism, prefer increasing concurrency first. If you stop seeing meaningful improvements, then try increasing intra_applier_concurrency.

Memory

Geneva defaults to checkpoint batches of up to 100 rows (checkpoint_size=100). Peak memory usage scales with both the input rows held for processing and the output rows buffered before they are written. A rough upper bound is:
memory_bytes ≈
  input_row_size  * min(checkpoint_size * worker_num_cpus, task_size) +
  output_row_size * min(checkpoint_size * intra_applier_concurrency, task_size)
  • worker_num_cpus is the number of CPUs on the Ray worker.
  • task_size is a tunable parameter (rows per task). It can be set on the UDF (e.g. @udf(..., task_size=...)) and on Table.backfill(..., task_size=...) (backfill-level values take precedence). If task_size is larger than the number of rows in a fragment, it behaves the same as task_size = fragment_num_rows. By default, it is: task_size = num_rows / (concurrency * intra_applier_concurrency * 2) (where num_rows is the number of rows in the table, or the number of rows selected by your filter)
  • input_row_size / output_row_size are the average bytes per row after materialization in memory.
For primitive numeric columns (ints/floats), these sizes are usually small. For images, videos, embeddings, etc. you may need to explicitly provision more memory (e.g. via @udf(..., memory=...)) and/or reduce checkpoint_size / task_size. Typical per-row sizes:
  • Images: ~200KB–2MB (depends on dataset and encoding)
  • Videos: ~10MB–200MB
  • Embeddings: dimension * data_type_size bytes (e.g. float32 embeddings use 4 bytes per value, so a 1536-dim embedding is 1536 * 4 = 6144 bytes)

GKE node pools

GKE + KubeRay can autoscale the number of VM nodes on demand. Limitations on the amount of resources provisioned are configured via node pools. Node pools can be managed to scale vertically (type of machine) or horizontally (# of nodes). Properly applying Kubernetes labels to the node pool machines allows you to control resources for different jobs in your cluster.

Options on Table.backfill(..)

The Table.backfill(..) method has several optional arguments to tune performance. To saturate the CPUs in the cluster, the main arguments to change are concurrency which controls the number of task processes and intra_applier_concurrency which controls the number of task threads per task process. commit_granularity controls how frequently fragments are committed so that partial results can become visible to table readers. Setting checkpoint_size smaller introduces finer-grained checkpoints and can help provide more frequent proof of life as a job is being executed. This is useful if the computation on your data is expensive. Reference:

Balancing write bandwidth

While computation can be broken down into small tasks, new Lance column data for each fragment must be written out in a serialized fashion. Each fragment has a writer that waits for checkpointed results to arrive, sequences them, and then serially writes out the new data file. Writers can be a bottleneck if a Lance dataset has a small number of fragments, especially if the amount of data being written out is comparatively large. Maximizing parallel write throughput can be achieved by having more fragments than nodes in the cluster.

Symptom: Computation tasks complete but writers seem to hang

Certain jobs that take a small dataset and expand it may appear as if the writer has frozen. An example is a table that contains a list of URLs pointing to large media files. This list is relatively small (< 100MB) and can fit into a single fragment. A UDF that downloads will fetch all the data and then attempt to write all of it out through the single writer. This single writer can then be responsible for serially writing out 500+GB of data to a single file! To mitigate this, you can load your initial table so that there will be multiple fragments. Each fragment with new outputs can be written in parallel with higher write throughput.