Base Implementation
ZenML aims to enable orchestration with any orchestration tool. This is where theBaseOrchestrator
comes into play. It abstracts away many of the ZenML specific details from the actual implementation and exposes a simplified interface:
Build your own custom orchestrator
If you want to create your own custom flavor for an orchestrator, you can follow the following steps:- Create a class which inherits from the
BaseOrchestrator
class and implement the abstractprepare_or_run_pipeline
method. - If you need to provide any configuration, create a class which inherits from the
BaseOrchestratorConfig
class add your configuration parameters. - Bring both of the implementation and the configuration together by inheriting from the
BaseOrchestratorFlavor
class. Make sure that you give aname
to the flavor through its abstract property.
- The CustomOrchestratorFlavor class is imported and utilized upon the creation of the custom flavor through the CLI.
- The CustomOrchestratorConfig class is imported when someone tries to register/update a stack component with this custom flavor. Especially, during the registration process of the stack component, the config will be used to validate the values given by the user. As
Config
object are inherentlypydantic
objects, you can also add your own custom validators here. - The CustomOrchestrator only comes into play when the component is ultimately in use.
CustomOrchestratorFlavor
and the CustomOrchestratorConfig
are implemented in a different module/path than the actual CustomOrchestrator
).
Some additional implementation details
Not all orchestrators are created equal. Here is a few basic categories that differentiate them.Direct Orchestration
The implementation of alocal
orchestrator can be summarized as follows: code:
Python Operator based Orchestration
Theairflow
orchestrator has a slightly more complex implementation of the prepare_or_run_pipeline()
method. Instead of immediately executing a step, a PythonOperator
is created which contains a _step_callable
. This _step_callable
will ultimately execute the self.run_step(...)
method of the orchestrator. The PythonOperators are assembled into an AirflowDag which is returned. Through some Airflow magic, this DAG is loaded by the connected instance of Airflow and orchestration of this DAG is performed either directly or on a set schedule.
Container-based Orchestration
Thekubeflow
orchestrator is a great example of container-based orchestration. In an implementation-specific method called prepare_pipeline_deployment()
a Docker image containing the complete project context is built.
Within prepare_or_run_pipeline()
a yaml file is created as an intermediate representation of the pipeline and uploaded to the Kubeflow instance. To create this yaml file a callable is defined within which a dsl.ContainerOp
is created for each step. This ContainerOp contains the container entrypoint command and arguments that will make the image run just the one step. The ContainerOps are assembled according to their interdependencies inside a dsl.Pipeline
which can then be compiled into the yaml file.
Handling per-step resources
If your orchestrator allows specification of per-step resources, make sure to handle the configurations defined on each step:Base Implementation of the Step Entrypoint Configuration
Within the base Docker images that are used for container-based orchestration thesrc.zenml.entrypoints.entrypoint.py
is the default entrypoint to run a specific step. It does so by loading an orchestrator specific StepEntrypointConfiguration
object. This object is then used to parse all entrypoint arguments (e.g. --step_name <STEP_NAME>
). Finally, the StepEntrypointConfiguration.run()
method is used to execute the step. Under the hood this will eventually also call the orchestrators run_step()
method.
The StepEntrypointConfiguration
is the base class that already implements most of the required functionality. Let’s dive right into it:
- It defines some mandatory arguments for the step entrypoint. These are set as constants at the top of the file and used as the minimum required arguments.
- The
run()
method uses the parsed arguments to set up all required prerequisites before ultimately executing the step.
StepEntrypointConfiguration
looks like:
Build your own Step Entrypoint Configuration
If you need to customize what happens when a step gets executed inside the entrypoint, you can subclass from theStepEntrypointConfiguration
class:
If you want to provide a custom run name (this has to be the same for all steps that are executed as part of the same pipeline run), you can overwrite the get_run_name(...)
method.
If you need to pass additional arguments to the entrypoint, there are two methods that you need to implement:
get_entrypoint_options()
: This method should return all the additional options that you require in the entrypoint.get_entrypoint_arguments(...)
: This method should return a list of arguments that should be passed to the entrypoint. The arguments need to provide values for all options defined in theget_entrypoint_options()
method mentioned above.