Blueprints

Run a ZenML flow from Kestra until its completion and stream logs in real-time

Source

yaml
id: orchestrate-zenml
namespace: company.team

tasks:
  - id: run_zenml
    type: io.kestra.plugin.scripts.python.Commands
    containerImage: zenmldocker/zenml-server
    env:
      ZENML_STORE_API_KEY: "{{ secret('ZENML_API_KEY') }}"
      ZENML_STORE_URL: http://localhost:4200
      ZENML_STACK: default
    commands:
      - python zenml_flow.py
    inputFiles:
      zenml_flow.py: |
        from zenml import pipeline, step

        @step
        def load_data() -> int:
            return 42

        @step
        def train_model(data: int) -> None:
            print(f"Training model with data: {data}")

        @pipeline
        def zenml_flow():
            dataset = load_data()
            train_model(dataset)

        if __name__ == "__main__":
            zenml_flow()

triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 9 * * *" # daily at 9 AM

About this blueprint

Python Kestra

This flow triggers a ZenML pipeline within Kestra, with ZenML running as a containerized service with Docker Compose. It executes a simple workflow that loads data and trains a model using ZenML’s pipeline framework.

Before running the flow, start a ZenML instance in Docker:

yaml
version: "3.9"

services:
  mysql:
    image: mysql:8.0
    ports:
      - 3306:3306
    environment:
      - MYSQL_ROOT_PASSWORD=password

  zenml:
    image: zenmldocker/zenml-server
    ports:
      - "4200:8080"
    environment:
      - ZENML_STORE_URL=mysql://root:password@host.docker.internal/zenml
    links:
      - mysql
    depends_on:
      - mysql
    extra_hosts:
      - "host.docker.internal:host-gateway"
    restart: on-failure

After starting ZenML, create a user via http://localhost:4200 and generate an API key under Settings → Service Accounts. Store this key as a secret in Kestra (ZENML_API_KEY) before executing the flow.

The flow is scheduled to run daily at 9 AM. Instead of embedding Python code inline, you can also use Kestra’s namespace files:

yaml
tasks:
  - id: run_zenml
    type: io.kestra.plugin.scripts.python.Commands
    containerImage: zenmldocker/zenml-server
    namespaceFiles:
      enabled: true
    env:
      ZENML_STORE_API_KEY: "{{ secret('ZENML_API_KEY') }}"
      ZENML_STORE_URL: http://localhost:4200
      ZENML_STACK: default
    commands:
      - python zenml_flow.py # add the script in Files editor

For scaling, you can use Kestra Enterprise’s task runners to execute the ML pipeline on cloud-based compute environments such as AWS Fargate:

yaml
tasks:
  - id: run_zenml_on_aws_fargate
    type: io.kestra.plugin.scripts.python.Commands
    containerImage: zenmldocker/zenml-server
    namespaceFiles:
      enabled: true
    env:
      ZENML_STORE_API_KEY: "{{ secret('ZENML_API_KEY') }}"
      ZENML_STORE_URL: http://localhost:4200
      ZENML_STACK: default
    commands:
      - python zenml_flow.py
    taskRunner:
      type: io.kestra.plugin.ee.aws.runner.Batch
      region: us-east-1
      accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
      secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
      computeEnvironmentArn: "{{ namespace.compute_environment_arn }}"
      jobQueueArn: "{{ namespace.job_queue_arn }}"
      executionRoleArn: "{{ namespace.execution_role_arn }}"
      taskRoleArn: "{{ namespace.task_role_arn }}"
      bucket: kestra-us

Commands

Schedule

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra