Source
id: orchestrate-zenml
namespace: company.team
tasks:
- id: run_zenml
type: io.kestra.plugin.scripts.python.Commands
containerImage: zenmldocker/zenml-server
env:
ZENML_STORE_API_KEY: "{{ secret('ZENML_API_KEY') }}"
ZENML_STORE_URL: http://localhost:4200
ZENML_STACK: default
commands:
- python zenml_flow.py
inputFiles:
zenml_flow.py: |
from zenml import pipeline, step
@step
def load_data() -> int:
return 42
@step
def train_model(data: int) -> None:
print(f"Training model with data: {data}")
@pipeline
def zenml_flow():
dataset = load_data()
train_model(dataset)
if __name__ == "__main__":
zenml_flow()
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 9 * * *" # daily at 9 AM
About this blueprint
Python Kestra
This flow triggers a ZenML pipeline within Kestra, with ZenML running as a containerized service with Docker Compose. It executes a simple workflow that loads data and trains a model using ZenML’s pipeline framework.
Before running the flow, start a ZenML instance in Docker:
version: "3.9"
services:
mysql:
image: mysql:8.0
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=password
zenml:
image: zenmldocker/zenml-server
ports:
- "4200:8080"
environment:
- ZENML_STORE_URL=mysql://root:password@host.docker.internal/zenml
links:
- mysql
depends_on:
- mysql
extra_hosts:
- "host.docker.internal:host-gateway"
restart: on-failure
After starting ZenML, create a user via http://localhost:4200 and generate an API key under Settings → Service Accounts.
Store this key as a secret in Kestra (ZENML_API_KEY
) before executing the flow.
The flow is scheduled to run daily at 9 AM. Instead of embedding Python code inline, you can also use Kestra’s namespace files:
tasks:
- id: run_zenml
type: io.kestra.plugin.scripts.python.Commands
containerImage: zenmldocker/zenml-server
namespaceFiles:
enabled: true
env:
ZENML_STORE_API_KEY: "{{ secret('ZENML_API_KEY') }}"
ZENML_STORE_URL: http://localhost:4200
ZENML_STACK: default
commands:
- python zenml_flow.py # add the script in Files editor
For scaling, you can use Kestra Enterprise’s task runners to execute the ML pipeline on cloud-based compute environments such as AWS Fargate:
tasks:
- id: run_zenml_on_aws_fargate
type: io.kestra.plugin.scripts.python.Commands
containerImage: zenmldocker/zenml-server
namespaceFiles:
enabled: true
env:
ZENML_STORE_API_KEY: "{{ secret('ZENML_API_KEY') }}"
ZENML_STORE_URL: http://localhost:4200
ZENML_STACK: default
commands:
- python zenml_flow.py
taskRunner:
type: io.kestra.plugin.ee.aws.runner.Batch
region: us-east-1
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
computeEnvironmentArn: "{{ namespace.compute_environment_arn }}"
jobQueueArn: "{{ namespace.job_queue_arn }}"
executionRoleArn: "{{ namespace.execution_role_arn }}"
taskRoleArn: "{{ namespace.task_role_arn }}"
bucket: kestra-us
More Related Blueprints