Once a pipeline run has completed, we can access it using the post-execution utilities.Each pipeline can have multiple runs associated with it, and for each run there might be several outputs for each step. Thus, to inspect a specific output, we first need to access the respective pipeline, then fetch the respective run, and then choose the step output of that specific run.The overall hierarchy looks like this:
Copy
pipelines -> runs -> steps -> outputs# where -> implies a 1-many relationship.
Let us investigate how to traverse this hierarchy level by level:
ZenML keeps a collection of all created pipelines with at least one run sorted by the time of their first run from oldest to newest.You can either access this collection via the get_pipelines() method or query a specific pipeline by name using get_pipeline(pipeline=...):
Copy
from zenml.post_execution import get_pipelines, get_pipeline# get all pipelines from all stackspipelines = get_pipelines()# now you can get pipelines by indexpipeline_with_latest_initial_run_time = pipelines[-1]# or get one pipeline by namepipeline_x = get_pipeline(pipeline="example_pipeline")# or even use the pipeline classpipeline_x = get_pipeline(pipeline=example_pipeline)
Be careful when accessing pipelines by index. Even if you just ran a pipeline
it might not be at index -1, due to the fact that the pipelines are sorted
by time of first run. Instead, it is recommended to access the pipeline
using the pipeline class, an instance of the class or even the name of the
pipeline as a string: get_pipeline(pipeline=...).
Using the CLI
You can also access your pipelines through the CLI by executing the following command on the terminal:
Each pipeline can be executed many times. You can get a list of all runs using the runs attribute of a pipeline.
Copy
# get all runs of a pipeline chronologically orderedruns = pipeline_x.runs# get the last run by index, runs are ordered by execution time in ascending orderlast_run = runs[-1]# or get a specific run by namerun = pipeline_x.get_run(run_name="my_run_name")
Alternatively, you can also access the runs from the pipeline class/instance itself:
Copy
from zenml.pipelines import pipeline# Definition of pipeline@pipelinedef example_pipeline(...): ...# Instantiation and execution of pipelinepipe = example_pipeline(...)pipe.run()# get all runs of the defined pipeline chronologically orderedruns = example_pipeline.get_runs()# get all runs of the instantiated pipeline chronologically orderedruns = pipe.get_runs()# get the last run by index, runs are ordered by execution time in ascending orderlast_run = runs[-1]# or get a specific run by namerun = example_pipeline.get_run(run_name=...)
The Git commit SHA that the pipeline run was performed on. This will only be set if the pipeline code is in a git repository and there are no uncommitted files when running the pipeline.
The pipeline_configuration is a super object that contains all configuration of the pipeline and pipeline run, including pipeline-level BaseSettings, which we will learn more about later. You can also access the settings directly via the settings variable.
Within a given pipeline run you can now further zoom in on individual steps using the steps attribute or by querying a specific step using the get_step(step=...) method.
Copy
# get all steps of a pipeline for a given runsteps = run.steps# get the step that was executed firstfirst_step = steps[0]# or get a specific step by namestep = run.get_step(step="first_step")
The step name refers to the pipeline attribute and not the class name of the
steps that implement the step for a pipeline instance.
Copy
# Definition of pipeline@pipelinedef example_pipeline(step_1, step_2): ...# Initialize a new pipeline runpipe = example_pipeline(step_1=first_step(), step_2=second_step())pipe.run()# Get the first steppipe.get_runs()[-1].get_step(step="step_1")# This won't work:# pipe.get_runs()[-1].get_step(step="first_step")
The steps are ordered by time of execution. Depending on the
orchestrator, steps can be run in
parallel. Thus, accessing steps by index can be unreliable across different
runs, and it is recommended to access steps by the step class, an instance of
the class or even the name of the step as a string: get_step(step=...)
instead.
Similar to the run, for reproducibility, you can use the step object to access:
BaseParameters via step.parameters: The parameters used to run the step.
Finally, this is how you can inspect the output of a step:
If there only is a single output, use the output attribute
If there are multiple outputs, use the outputs attribute, which is a dictionary that can be indexed using the name of an output:
Copy
# The outputs of a step# if there are multiple outputs they are accessible by nameoutput = step.outputs["output_name"]# if there is only one output, use the `.output` property insteadoutput = step.output# read the value into memoryoutput.read()
The names of the outputs can be found in the Output typing of your steps:
Copy
from zenml.steps import step, Output@stepdef some_step() -> Output(output_name=int): ...
# Definition of pipeline@pipelinedef example_pipeline(step_1, step_2): ...# Initialize a new pipeline runpipe = example_pipeline(step_1=first_step(), step_2=second_step())pipe.run()# Get the first stepstep_1 = pipe.get_runs()[-1].get_step(step="step_1")output = step_1.output.read()
Final note: Fetching older pipeline runs within a step
While most of this document has been focusing on the so called post-execution workflow (i.e. fetching objects after a pipeline has completed), it can also be used within the context of a running pipeline.This is often desirable in cases where a pipeline is running continously over time and decisions have to be made according to older runs.E.g. Here, we fetch from within a step the last pipeline run for the same pipeline:
Copy
from zenml.post_execution import get_pipelinefrom zenml.environment import Environment@stepdef my_step(): # Fetch the current pipeline p = get_pipeline('pipeline_name') # Fetch an older run older_run = p.runs[-2] # -1 will be the current run # Use the older run to make a decision ...
You can get a lot more metadata within a step as well, something we’ll learn in more detail in the advanced docs.