Skip to main content
The APIs on this page require Geneva v0.10.0 or later.
Geneva automatically packages and deploys your Python execution environment to its worker nodes. This ensures that distributed execution occurs in the same environment and dependencies as your prototype. We currently support one processing backend: Ray. There are 3 ways to connect to a Ray cluster:
  1. Local Ray
  2. KubeRay: create a cluster on demand in your Kubernetes cluster.
  3. Existing Ray Cluster

Ray Clusters

Local Ray

To execute jobs without an external Ray cluster, you can use LocalRayContext. This will auto-create a Ray cluster on your machine. Because it’s on your laptop/desktop, this is only suitable for prototyping on small datasets. But it is the easiest way to get started. Simply define the UDF, add a column, call Connection.local_ray_context(), and trigger the job:
from geneva import udf
from geneva.db import Connection

@udf
def filename_len(filename: str) -> int:
    return len(filename)

tbl.add_columns({"filename_len": filename_len})

with Connection.local_ray_context():
    tbl.backfill("filename_len")
Geneva will package up your local environment and send it to each worker process, so they’ll have access to all the same dependencies as if you ran a simple Python script yourself.

KubeRay

If you have a Kubernetes cluster with kuberay-operator, you can use Geneva to automatically provision RayClusters. To do so, define a Geneva cluster, representing the resource needs, Docker images, and other Ray configurations necessary to run your job. Make sure your cluster has adequate compute resources to provision the RayCluster. Here is an example Geneva cluster definition:
import geneva
from geneva.cluster import GenevaCluster, K8sConfigMethod
from geneva.cluster.builder import KubeRayClusterBuilder

db = geneva.connect("s3://my-bucket/my-db")

cluster_name = "my-geneva-cluster" # lowercase, numbers, hyphens only
service_account = "my_k8s_service_account" # k8s service account that Geneva runs as
k8s_namespace = "geneva"  # k8s namespace

cluster = (
    GenevaCluster.create_kuberay(cluster_name)
        .namespace(k8s_namespace)
        .aws_config(region="us-east-1") # only required if using AWS
        .config_method(K8sConfigMethod.LOCAL) # Load k8s config from `~/.kube.config`
        # (other options include EKS_AUTH to load from AWS EKS, or IN_CLUSTER to load the
        # config when running inside a pod in the cluster)
        .head_group(
            service_account=service_account,
            cpus=2,
            node_selector={"geneva.lancedb.com/ray-head":""}, # k8s label required for head in your cluster
        )
        .add_worker_group(
            KubeRayClusterBuilder.cpu_worker()
            .cpus(4)
            .memory("8Gi")
            .service_account(service_account)
            .build()
        )
        .add_worker_group(
            KubeRayClusterBuilder.gpu_worker() # defaults to 1 GPU
            .cpus(2)
            .memory("8Gi")
            .service_account(service_account)
            .build()
        )
        .build()
)

db.define_cluster(cluster_name, cluster)
# define_cluster stores the cluster metadata in persistent storage. The Cluster can then be referenced by name and provisioned when creating an execution context.

table = db.get_table("my_table")
with db.context(cluster=cluster_name):
    table.backfill("my_udf")

External Ray cluster

If you already have a Ray cluster, Geneva can execute jobs against it too. You do so by defining a Geneva cluster with GenevaCluster.create_external() which has the address of the cluster. Here’s an example:
import geneva
from geneva.cluster import GenevaCluster

db = geneva.connect(my_db_uri)
cluster_name = "my-geneva-external-cluster"

cluster = (
    GenevaCluster.create_external(cluster_name, "ray://my_ip:my_port")
    .build()
)
db.define_cluster(cluster_name, cluster)

If you need to send environment variables to your workers, in either a KubeRay or External Ray cluster, you can use ray_init_kwargs, like so:
Python
cluster = (
    GenevaCluster.create_kuberay(cluster_name) # or create_external(cluster_name)
        .ray_init_kwargs({
            "runtime_env": {
                "env_vars": {
                    "MY_VAR": "value",
                    "AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"]
                },
            },
        })
        ...
    .build()
)

Dependencies and Manifests

Most UDFs require some dependencies: helper libraries like pillow for image processing, pre-trained models like open-clip to calculate embeddings, or even small config files. We have three ways to get them to workers:
  1. Define dependencies explicitly in a manifest
  2. Bake dependencies into an image
  3. Auto-upload local dependencies

Define dependencies explicitly in a manifest

We recommend defining dependencies explicitly: it’s the easiest way to understand exactly what’s running, and the least error-prone. To do so, define a Manifest, like so:
from geneva.manifest import GenevaManifest

db = geneva.connect(my_db_uri)

manifest_name="dev-manifest"
manifest = (
    GenevaManifest.create_pip(manifest_name)
        .pip(["lancedb", "numpy"])
    ).build()

db.define_manifest(manifest_name, manifest)
After workers start up, this will run pip install lancedb numpy on them. You can also use .requirements_path(path) to point to a local requirements.txt file instead of listing packages inline. Note that attempting to use both .pip() and .requirements_path() will raise an exception. For conda-based dependencies, use GenevaManifest.create_conda() instead:
Python
from geneva.manifest import GenevaManifest

manifest = (
    GenevaManifest.create_conda("my-conda-manifest")
        .conda({"dependencies": ["python=3.10", "numpy"]})
    ).build()
You can also use .conda_environment_path(path) to point to a local environment.yml file. Note that attempting to use both .conda() and .conda_environment_path() will raise an exception.

Bake dependencies into an image

Because the pip or conda methods involve installing packages, they will incur some startup costs. When your jobs are stable in production, therefore, it will be faster to build all your dependencies into the workers’ images, then specify them like so:
from geneva.manifest import GenevaManifest

db = geneva.connect(my_db_uri)

manifest_name = "prod-manifest"
manifest = (
    GenevaManifest.create_pip(manifest_name)
        .worker_image("myregistry.example.com/my-custom-worker-image:latest")
    ).build()

db.define_manifest(manifest_name, manifest)
You can also define images in a cluster via the head_group method and cpu_worker/gpu_worker methods, e.g.:
Python
cluster = (
    GenevaCluster.create_kuberay(cluster_name)
        .head_group(
            image="myregistry.example.com/my-custom-head-image:latest",
        )
        .add_worker_group(KubeRayClusterBuilder.cpu_worker()
            .image("myregistry.example.com/my-custom-cpu-worker-image:latest")
        )
        .add_worker_group(KubeRayClusterBuilder.gpu_worker()
            .image("myregistry.example.com/my-custom-gpu-worker-image:latest")
        )
    .build()
)
However, if an image is defined in both a Cluster and a Manifest, the definition in the Manifest will take priority.

Auto-upload local dependencies

Geneva can package your local environment and send it to Ray workers. This includes the current workspace root (if you’re in a python repo) or the current working directory (if you’re not). GenevaManifest.create_site() additionally uploads your Python site-packages (defined by site.getsitepackages()) to workers. This is not recommended for production use, as it is prone to issues like architecture mismatches of built dependencies, but it can be a good way to iterate quickly during development. To upload site packages:
Python
from geneva.manifest import GenevaManifest
db = geneva.connect(my_db_uri)
manifest_name = "dev-manifest"
manifest = GenevaManifest.create_site(manifest_name).build()

db.define_manifest(manifest_name, manifest)

What’s in a manifest?

Here’s a summary of what’s in a manifest and how you can define it across the three manifest builder types.
ContentsHow you can define itFactory method
Local working directory (or workspace root, if in a python repo)Will be uploaded automatically.All
Local python packages (site-packages)Uploaded automatically.create_site()
Pip packages to be installedUse .pip(packages: list[str]) or .requirements_path(path: str). See Ray’s RuntimeEnv docs for details.create_pip()
Conda packages to be installedUse .conda(deps: dict[str, Any]) or .conda_environment_path(path: str). See Ray’s RuntimeEnv docs for details.create_conda()
Local python packages outside of site_packagesUse .py_modules(modules: list[str]) or .add_py_module(module: str). See Ray’s RuntimeEnv docs for details.All
Container image for head nodeUse .head_image(head_image: str) or default_head_image() to use the default. Note that, if the image is also defined in the GenevaCluster, the image set here in the Manifest will take priority.All
Container image for worker nodesUse .worker_image(worker_image: str) or default_worker_image() to use the default for the current platform. As with the head image, this takes priority over any images set in the Cluster.All
If you want to see exactly what is being uploaded to the cluster, set .delete_local_zips(False) and .local_zip_output_dir(path) then examine the zip files in path.

Putting it all together: Execution Contexts

An execution context represents the concrete execution environment (Cluster and Manifest) used to execute a distributed job. Calling context will enter a context manager that will provision an execution cluster and execute the Job using the Cluster and Manifest definitions provided. Because you’ve already defined the cluster and manifest, you can just reference them by name. Note that providing a manifest is optional. Once completed, the context manager will automatically de-provision the cluster.
db = geneva.connect(my_db_uri)
tbl = db.get_table("my_table")

with db.context(cluster=cluster_name, manifest=manifest_name):
    tbl.backfill("embedding")
In a notebook environment, you can manually enter and exit the context manager in multiple steps like so:
ctx = db.context(cluster=cluster_name, manifest=manifest_name)
ctx.__enter__()

# ... do stuff

ctx.__exit__(None,None,None)