Scaling computation resources
Geneva jobs can split and schedule computational work into smaller batches that are assigned to tasks which are distributed across the cluster. As each task completes, it writes its output into a checkpoint file. If a job is interrupted or run again, Geneva will look to see if a checkpoint for the computation is already present and if not will kick off computations. Usually computation capacity is the bottleneck for job execution. To complete all of a job’s tasks more quickly, you just need to increase the amount of CPU/GPU resources available.Estimating CPU and memory requirements
CPU
Geneva can run up toconcurrency tasks in parallel. Within each task, it can run up to intra_applier_concurrency UDF applications concurrently. If your UDF requests udf.num_cpus CPUs, the peak CPU required is approximately:
total_cpus ≈ concurrency * intra_applier_concurrency * udf.num_cpus
concurrency and intra_applier_concurrency are parameters on Table.backfill(...).
In most cases you do not need to change udf.num_cpus, unless the UDF itself uses multiple threads. If your UDF is single-threaded but you want to speed it up with more parallelism, prefer increasing concurrency first. If you stop seeing meaningful improvements, then try increasing intra_applier_concurrency.
Memory
Geneva defaults to checkpoint batches of up to 100 rows (checkpoint_size=100). Peak memory usage scales with both the input rows held for processing and the output rows buffered before they are written. A rough upper bound is:
-
worker_num_cpusis the number of CPUs on the Ray worker. -
task_sizeis a tunable parameter (rows per task). It can be set on the UDF (e.g.@udf(..., task_size=...)) and onTable.backfill(..., task_size=...)(backfill-level values take precedence). Iftask_sizeis larger than the number of rows in a fragment, it behaves the same astask_size = fragment_num_rows. By default, it is:task_size = num_rows / (concurrency * intra_applier_concurrency * 2)(wherenum_rowsis the number of rows in the table, or the number of rows selected by your filter) -
input_row_size/output_row_sizeare the average bytes per row after materialization in memory.
@udf(..., memory=...)) and/or reduce checkpoint_size / task_size.
Typical per-row sizes:
- Images: ~200KB–2MB (depends on dataset and encoding)
- Videos: ~10MB–200MB
- Embeddings:
dimension * data_type_sizebytes (e.g. float32 embeddings use 4 bytes per value, so a 1536-dim embedding is1536 * 4 = 6144bytes)
GKE node pools
GKE + KubeRay can autoscale the number of VM nodes on demand. Limitations on the amount of resources provisioned are configured via node pools. Node pools can be managed to scale vertically (type of machine) or horizontally (# of nodes). Properly applying Kubernetes labels to the node pool machines allows you to control resources for different jobs in your cluster.Options on Table.backfill(..)
The Table.backfill(..) method has several optional arguments to tune performance. To saturate the CPUs in the cluster, the main arguments to change are concurrency which controls the number of task processes and intra_applier_concurrency which controls the number of task threads per task process.
commit_granularity controls how frequently fragments are committed so that partial results can become visible to table readers.
Setting checkpoint_size smaller introduces finer-grained checkpoints and can help provide more frequent proof of life as a job is being executed. This is useful if the computation on your data is expensive.
Reference: