Batch scoring
This page guides you on how to use the H2O MLOps Python client for batch scoring.
For more information about batch scoring and the supported source and sink types, see Batch scoring.
Prerequisites
Before you begin:
- Connect to H2O MLOps. For instructions, see Connect to H2O MLOps.
- Create a workspace. For instructions, see Create a workspace.
- Register a model. For instructions, see Manage models.
- (Optional) If using Feature Store as a source or sink, ensure Feature Store is configured in your H2O MLOps deployment. For details, see Batch scoring.
Configure the input source
To list available source connectors, run:
mlops.batch_connectors.source_specs.list()
Use the following code to configure the input source:
- Amazon S3
- GCP
- Azure
- MinIO
- JDBC
- Feature Store
source = h2o_mlops.options.BatchSourceOptions(
spec_uid="s3",
config={
"region": "us-west-2",
"accessKeyID": credentials['AccessKeyId'],
"secretAccessKey": credentials['SecretAccessKey'],
"sessionToken": credentials['SessionToken'],
},
mime_type=h2o_mlops.types.MimeType.CSV,
location="s3://<bucket-name>/<path-to-input-file>.csv",
)
Public S3 buckets are also supported as an input source. To read from a public S3 bucket, leave the access key and secret key fields empty. Only the input source supports public S3 buckets.
source = h2o_mlops.options.BatchSourceOptions(
spec_uid="gcp",
config={
"projectID": credentials['projectID'],
"credentials": credentials['credentials'],
},
mime_type=h2o_mlops.types.MimeType.CSV,
location="<location>",
)
source = h2o_mlops.options.BatchSourceOptions(
spec_uid="azure",
config={
"accountKey": credentials['accountKey'],
"sasToken": credentials['sasToken'],
"containerName": credentials['containerName']
},
mime_type=h2o_mlops.types.MimeType.CSV,
location="https://<container-name>.blob.core.windows.net/<path-to-file>.csv",
)
source = h2o_mlops.options.BatchSourceOptions(
spec_uid="s3",
config={
"region": "us-west-2",
"accessKeyID": credentials['AccessKeyId'],
"secretAccessKey": credentials['SecretAccessKey'],
"sessionToken": credentials['SessionToken'],
"pathStyle": True,
"endpoint": "https://s3.minio.location"
},
mime_type=h2o_mlops.types.MimeType.CSV,
location="s3://<bucket-name>/<path-to-input-file>.csv",
)
source = h2o_mlops.options.BatchSourceOptions(
spec_uid="jdbc",
config={
"table": "table_with_data",
"driver": "postgres",
"numPartitions": 8,
"lowerBound": "2023-01-01 00:00:00",
"upperBound": "2024-01-01 00:00:00",
"partitionColumn": "created_at",
"secretParams": {
"username": credentials["username"],
"password": credentials["password"],
}
},
mime_type=h2o_mlops.types.MimeType.JDBC,
location="postgres://h2oai-postgresql.default:5432/db_name?user={{username}}&password={{password}}&sslmode=disable",
)
H2O Feature Store manages versioned feature sets for machine learning workflows. To find the feature set ID and version, open the Feature Store UI, navigate to your project, and select the feature set. The ID and version are displayed on the feature set details page.
source = h2o_mlops.options.BatchSourceOptions(
spec_uid="feature-store",
config={
"featureSetID": "<featureSetID>", # UUID, for example, "550e8400-e29b-41d4-a716-446655440000"
"featureSetVersion": "<featureSetVersion>", # Dotted decimal, for example, "1.0"
},
)
The Feature Store source does not require mime_type or location parameters because data format and location are managed by the Feature Store.
Configure the output location
To list available sink connectors, run:
mlops.batch_connectors.sink_specs.list()
This command returns schema details, supported paths, and MIME types.
Set up the output location where the batch scoring results will be stored:
- Amazon S3
- GCP
- Azure
- MinIO
- JDBC
- Feature Store
output_location = location="s3://<bucket-name>/<path-to-output-directory>/" + datetime.now().strftime("%Y%m%d-%H%M%S")
sink = h2o_mlops.options.BatchSinkOptions(
spec_uid="s3",
config={
"region": "us-west-2",
"accessKeyID": credentials['AccessKeyId'],
"secretAccessKey": credentials['SecretAccessKey'],
"sessionToken": credentials['SessionToken'],
},
mime_type=h2o_mlops.types.MimeType.JSONL,
location=output_location,
)
output_location = location="<location>" + datetime.now().strftime("%Y%m%d-%H%M%S")
sink = h2o_mlops.options.BatchSinkOptions(
spec_uid="gcp",
config={
"projectID": credentials['projectID'],
"credentials": credentials['credentials'],
},
mime_type=h2o_mlops.types.MimeType.JSONL,
location=output_location,
)
output_location = location="https://<container-name>.blob.core.windows.net/<path-to-output-directory>/" + datetime.now().strftime("%Y%m%d-%H%M%S")
sink = h2o_mlops.options.BatchSinkOptions(
spec_uid="azure",
config={
"accountKey": credentials['accountKey'],
"sasToken": credentials['sasToken'],
"containerName": credentials['containerName']
},
mime_type=h2o_mlops.types.MimeType.JSONL,
location=output_location,
)
output_location = location="s3://<bucket-name>/" + datetime.now().strftime("%Y%m%d-%H%M%S")
sink = h2o_mlops.options.BatchSinkOptions(
spec_uid="s3",
config={
"region": "us-west-2",
"accessKeyID": credentials['AccessKeyId'],
"secretAccessKey": credentials['SecretAccessKey'],
"sessionToken": credentials['SessionToken'],
"pathStyle": True,
"endpoint": "https://s3.minio.location"
},
mime_type=h2o_mlops.types.MimeType.JSONL,
location=output_location,
)
sink = h2o_mlops.options.BatchSinkOptions(
spec_uid="jdbc",
config={
"driver": "postgres",
"table": "new_table",
"secretParams": {
"username": credentials["username"],
"password": credentials["password"],
}
},
mime_type=h2o_mlops.types.MimeType.JDBC,
location="postgres://h2oai-postgresql.default:5432/db_name?user={{username}}&password={{password}}&sslmode=disable",
)
H2O Feature Store manages versioned feature sets for machine learning workflows. You can write batch scoring results to an existing feature set or create a new one.
Using an existing feature set:
To find the feature set ID and version, open the Feature Store UI, navigate to your project, and select the feature set. The ID and version are displayed on the feature set details page.
sink = h2o_mlops.options.BatchSinkOptions(
spec_uid="feature-store",
config={
"featureSetID": "<featureSetID>", # UUID, for example, "550e8400-e29b-41d4-a716-446655440000"
"featureSetVersion": "<featureSetVersion>", # Dotted decimal, for example, "1.0"
},
)
Creating a new feature set:
To find the project ID, open the Feature Store UI and navigate to your project. The project ID is displayed on the project overview page.
sink = h2o_mlops.options.BatchSinkOptions(
spec_uid="feature-store",
config={
"projectID": "<projectID>", # Feature Store project ID
"featureSetName": "<featureSetName>", # Name for the new feature set
},
)
- If
featureSetIDis provided, thefeatureSetVersionparameter is mandatory. You can find both values on the feature set details page in the Feature Store UI. - To create a new feature set, provide
projectIDandfeatureSetNameinstead. You can find the project ID on the project overview page in the Feature Store UI. - The Feature Store sink does not require
mime_typeorlocationparameters. Data format and location are managed by the Feature Store.
Create batch scoring job
First, retrieve the scoring runtime for the model:
scoring_runtime = model.experiment().scoring_runtimes[0]
Create the batch scoring job using the source and sink variables defined in the previous sections:
job = workspace.batch_scoring_jobs.create(
source=source,
sink=sink,
model=model,
scoring_runtime=scoring_runtime,
kubernetes_options=h2o_mlops.options.BatchKubernetesOptions(
replicas=2,
min_replicas=1,
),
mini_batch_size=100, # Number of rows sent per request during batch processing
name="DEMO JOB",
)
Retrieve the job ID:
job.uid
Include input features or an ID field along with the output
When you create a batch scoring job, you can include input data in the output for easier comparison.
You can use only one of these options:
- Set
model_request_parameters.output_fields_type=h2o_mlops.types.OutputFieldsType.INCLUDE_ALL_INPUT_FEATURESto include all input features.
Example:
job = workspace.batch_scoring_jobs.create(
source=source,
sink=sink,
model=model,
scoring_runtime=scoring_runtime,
kubernetes_options=h2o_mlops.options.BatchKubernetesOptions(
replicas=2,
min_replicas=1,
),
mini_batch_size=100, # Number of rows sent per request during batch processing
model_request_parameters=h2o_mlops.options.ModelRequestParameters(
output_fields_type=h2o_mlops.types.OutputFieldsType.INCLUDE_ALL_INPUT_FEATURES,
),
name="DEMO JOB",
)
- Set
model_request_parameters.id_fieldandmodel_request_parameters.output_fields_type=h2o_mlops.types.OutputFieldsType.INCLUDE_IDto include only one identifier column.
Example:
job = workspace.batch_scoring_jobs.create(
source=source,
sink=sink,
model=model,
scoring_runtime=scoring_runtime,
kubernetes_options=h2o_mlops.options.BatchKubernetesOptions(
replicas=2,
min_replicas=1,
),
mini_batch_size=100, # Number of rows sent per request during batch processing
model_request_parameters=h2o_mlops.options.ModelRequestParameters(
output_fields_type=h2o_mlops.types.OutputFieldsType.INCLUDE_ID,
id_field="id_field_name", # Field name in the model schema, for example, "age"
),
name="DEMO JOB",
)
Wait for job completion
During the execution of the following code, you can view the log output from both the scorer and the batch scoring job.
job.wait()
By default, this command will print logs while waiting. If you want to wait for job completion without printing any logs, use:
job.wait(logs=False)
List all jobs
workspace.batch_scoring_jobs.list()
Retrieve a job by ID
workspace.batch_scoring_jobs.get(uid=...)
Cancel a job
job.cancel()
By default, this command blocks until the job is fully canceled. If you want to cancel without waiting for completion, use:
job.cancel(wait=False)
Delete a job
job.delete()
- Submit and view feedback for this page
- Send feedback about H2O MLOps to cloud-feedback@h2o.ai