ZenML helps you standardize your ML workflows as ML Pipelines consisting of decoupled, modular Steps. This enables you to write portable code that can be moved from experimentation to production in seconds.
If you are new to MLOps and would like to learn more about ML pipelines in
general, checkout ZenBytes, our lesson
series on practical MLOps, where we introduce ML pipelines in more detail in
ZenBytes lesson
1.1.
Steps are the atomic components of a ZenML pipeline. Each step is defined by its inputs, the logic it applies and its outputs. Here is a very basic example of such a step, which uses a utility function to load the Digits dataset:
Copy
import numpy as npfrom sklearn.datasets import load_digitsfrom sklearn.model_selection import train_test_splitfrom zenml.steps import Output, step@stepdef digits_data_loader() -> Output( X_train=np.ndarray, X_test=np.ndarray, y_train=np.ndarray, y_test=np.ndarray): """Loads the digits dataset as a tuple of flattened numpy arrays.""" digits = load_digits() data = digits.images.reshape((len(digits.images), -1)) X_train, X_test, y_train, y_test = train_test_split( data, digits.target, test_size=0.2, shuffle=False ) return X_train, X_test, y_train, y_test
As this step has multiple outputs, we need to use the zenml.steps.step_output.Output class to indicate the names of each output. These names can be used to directly access the outputs of steps after running a pipeline, as we will see in a later chapter.Let’s come up with a second step that consumes the output of our first step and performs some sort of transformation on it. In this case, let’s train a support vector machine classifier on the training data using sklearn:
Copy
import numpy as npfrom sklearn.base import ClassifierMixinfrom sklearn.svm import SVCfrom zenml.steps import step@stepdef svc_trainer( X_train: np.ndarray, y_train: np.ndarray,) -> ClassifierMixin: """Train a sklearn SVC classifier.""" model = SVC(gamma=0.001) model.fit(X_train, y_train) return model
Next, we will combine our two steps into our first ML pipeline.In case you want to run the step function outside the context of a ZenML pipeline, all you need to do is call the .entrypoint() method with the same input signature. For example:
Copy
svc_trainer.entrypoint(X_train=..., y_train=...)
Using the Class-based API
In ZenML there are two different ways how you can define pipelines or steps. What you have seen in this section so far is the Functional API, where steps and pipelines are defined as Python functions with a @step or @pipeline decorator respectively. This is the API that is used primarily throughout the ZenML docs and examples.Alternatively, you can also define steps and pipelines using the Class-Based API by creating Python classes that subclass ZenML’s abstract base classes BaseStep and BasePipeline directly. Internally, both APIs will result in similar definitions, so it is entirely up to you which API to use.
Let us now define our first ML pipeline. This is agnostic of the implementation and can be done by routing outputs through the steps within the pipeline. You can think of this as a recipe for how we want data to flow through our steps.
In ZenML there are two different ways how you can define pipelines or steps. What you have seen in this section so far is the Functional API, where steps and pipelines are defined as Python functions with a @step or @pipeline decorator respectively. This is the API that is used primarily throughout the ZenML docs and examples.Alternatively, you can also define steps and pipelines using the Class-Based API by creating Python classes that subclass ZenML’s abstract base classes BaseStep and BasePipeline directly. Internally, both APIs will result in similar definitions, so it is entirely up to you which API to use.
You can then execute your pipeline instance with the .run() method:
Copy
first_pipeline_instance.run()
You should see the following output in your terminal:
Copy
Registered new pipeline with name `first_pipeline`.Creating run `first_pipeline-03_Oct_22-14_08_44_284312` for pipeline `first_pipeline` (Caching enabled)Using stack `default` to run pipeline `first_pipeline`...Step `digits_data_loader` has started.Step `digits_data_loader` has finished in 0.121s.Step `svc_trainer` has started.Step `svc_trainer` has finished in 0.099s.Pipeline run `first_pipeline-03_Oct_22-14_08_44_284312` has finished in 0.236s.Pipeline visualization can be seen in the ZenML Dashboard. Run `zenml up` to see your pipeline!
We will dive deeper into how to inspect the finished run within the chapter on Accessing Pipeline Runs.
Notice the last log, that indicates running a command to view the dashboard. Check out the dashboard guide in the next section to inspect your pipeline there.
When running a pipeline by calling my_pipeline.run(), ZenML uses the current date and time as the name for the pipeline run. In order to change the name for a run, pass run_name as a parameter to the run() function:
Once a pipeline has been executed, it is represented by a PipelineSpec that uniquely identifies it. Therefore, you cannot edit a pipeline after it has been run once. In order to iterate quickly pipelines, there are three options:
Pipeline runs can be created without being associated with a pipeline explicitly. These are called unlisted runs and can be created by passing the unlisted parameter when running a pipeline: pipeline_instance.run(unlisted=True).
Pipelines can be deleted and created again using zenml pipeline delete <PIPELINE_ID_OR_NAME>.
Pipelines can be given unique names each time they are run to uniquely identify them.