Building and Using a Workflow¶
In this tutorial, you'll learn about the workflow
package in LabOne Q, what you can do with it, and how to make and use workflows of your own.
Imports¶
from laboneq import workflow
Define the tasks¶
A task is any Python function that represents a single unit of work within a workflow. The workflow stores the input and outputs of all its tasks.
Python functions can be marked as workflow tasks by wrapping them with
workflow.task
decorator. When used outside of a workflow, tasks behave as normal Python functions.
To learn more about Tasks
in LabOne Q, check out the tutorial on Tasks.
@workflow.task
def measure() -> int:
return 100
@workflow.task
def analyze(measurement_result: int, threshold: int) -> str:
if measurement_result < threshold:
return "PASS"
return "FAIL"
Define the workflow¶
In this section we go through the process of combining our predefined tasks into a single workflow.
A workflow in itself should be as simple as possible and it should not contain any complex operations. Most operations should happen within the tasks.
A workflow can be defined by decorating a Python function with @workflow
decorator.
Important remarks¶
When a function is marked as a workflow
, it has some limitations to a normal Python
execution flow:
- Only functions marked as tasks should be called within a workflow definition
- Using Python statements (
if
,else
,for
,return
etc.) should not be used in the Workflow, however they can be used freely in tasks.
The reasons for above limitations is to ensure that a graph of dependencies between tasks
can be created and the Workflow
can then fully control the execution flow.
Workflow references¶
While the workflow is being constructed, the actual variables (workflow inputs, task outputs) are replaced
with a Reference
object that then connects the producing and receiving ends of an variable.
By default Reference
supports only a subset of default Python operations, for example, __getitem__
and
__getattr__
. The supported operations can be seen from Reference
documentation.
Build a workflow¶
@workflow.workflow
def experiment(threshold: int):
measurement = measure()
analyze(measurement, threshold)
Instantiate and run the workflow¶
wf = experiment(threshold=101)
wf
result = wf.run()
Inspect the results¶
result
Inspecting the tasks¶
result.tasks
There are several ways to get the individual tasks from the WorkflowResult
result.tasks[1], result.tasks["analyze"]
Specific task lookup with indexing
The first argument is the name of the task and the second is an integer or a
slice
result.tasks["analyze", :] # All tasks named 'analyze'
result.tasks["analyze", 0] # First task entry for 'analyze'
Inspecting individual task information
# Task output
result.tasks["analyze"].output
# Task input
result.tasks["analyze"].input
Iteration in workflow¶
Previously we defined a workflow with a single measurement value and checked
for the analysis results.
Now we modify the measure(...)
task to take in a parameter and with that,
the output changes. We will find at which parameter our analysis will pass.
To iterate over different values, we use workflow.for_(values)
.
@workflow.task
def measure_parameter(parameter: int) -> int:
return 10 + parameter
@workflow.workflow
def experiment(parameters: list[int], threshold: int):
with workflow.for_(parameters) as parameter:
measurement = measure_parameter(parameter)
analyze(measurement, threshold)
parameters = [80, 90, 100]
wf = experiment(parameters=parameters, threshold=100)
result = wf.run()
Let's look at the tasks stored in result
:
for t in result.tasks:
print(t)
Notice that the index of the iteration is automatically added to the TaskResults
to distinguish between the different runs.
We can also programme the index to be a string of our choosing:
@workflow.workflow
def experiment(parameters: list[int], threshold: int):
with workflow.for_(parameters, lambda p: f"value = {p}") as parameter:
measurement = measure_parameter(parameter)
analyze(measurement, threshold)
parameters = [80, 90, 100]
wf = experiment(parameters=parameters, threshold=100)
result = wf.run()
for t in result.tasks:
print(t)
Let's get all the analyze
tasks:
result.tasks["analyze", :]
Note that result.tasks["analyze"]
returns only the first task in the list above.
To inspect the index of a task, use the index
property:
result.tasks["analyze"].index
Let's now display the output of each of the measure_parameter
and analyze
tasks:
for i, parameter in enumerate(parameters):
analyze_task = result.tasks["analyze", i]
measure_task = result.tasks["measure_parameter", i]
print(
f"Input parameter value: {parameter}. "
f"Measurement result: {measure_task.output}. "
f"Analysis result: {analyze_task.output}"
)
Conditionals and workflow output¶
In the previous example we defined a workflow which tries different parameters and finds which ones are successful.
However, it iterates over all the given parameters even after our analysis has failed, which might be wasteful.
Now we define a workflow that returns immediately when analysis fails.
We use
workflow.if_(condition)
to check for the conditionWe use
workflow.return_(value)
to immediately exit the workflow and set the workflow output value
@workflow.workflow
def experiment(parameters: list[int], threshold: int):
with workflow.for_(parameters) as parameter:
measurement = measure_parameter(parameter)
analyze_result = analyze(measurement, threshold)
with workflow.if_(analyze_result == "FAIL"):
workflow.return_(analyze_result)
workflow.return_(analyze_result)
parameters = [80, 90, 100]
wf = experiment(parameters=parameters, threshold=100)
result = wf.run()
When inspecting the results, we only see two results since the iteration only went through
two steps instead of all of the parameters, because the analyze
task failed.
for idx, analyze_task in enumerate(result.tasks["analyze", :]):
measure_task = result.tasks["measure_parameter", idx]
parameter = parameters[idx]
print(
f"Input parameter value: {parameter}. "
f"Measurement result: {measure_task.output}. "
f"Analysis result: {analyze_task.output}"
)
We can also check if the workflow passed as a whole, as we set the workflow output
to the result of analyze
print("Workflow outcome: ", result.output)
Our workflow did not pass with the given values, however now we know the parameter range in which it could pass.
We modify the input parameters and examine the output of our workflow:
parameters = [80, 85]
wf = experiment(parameters=parameters, threshold=100)
result = wf.run()
print("Workflow outcome: ", result.output)
Inspect a workflow that has failed¶
In case there is an error during the execution of a workflow, we can still inspect the tasks that have run up to the task that triggered the error using recover()
. Note that recover()
stores only one execution result and can only be called once; a second call to recover()
raises an exception.
For experiment workflows, this is useful for debugging a failed compilation task by inspecting the experiment sequence produced by the previous task.
In this example, we will add an assertion error to the analyze
task.
@workflow.task
def measure() -> int:
return 100
@workflow.task
def analyze(measurement_result: int, threshold: int) -> str:
# let's add an error in this task
if not (measurement_result >= 100 and threshold >= 100):
raise RuntimeError("Something went wrong.")
if measurement_result < threshold:
return "PASS"
return "FAIL"
@workflow.workflow
def experiment(threshold: int):
measurement = measure()
result = analyze(measurement, threshold)
workflow.return_(result)
result = experiment(99).run()
recovered_result = experiment.recover()
recovered_result
# Check that the measure task returns a result that is >= 100
recovered_result.tasks["measure"].output
# Check the value of the threshold passed to the taskbook
recovered_result.output
# Now we know we have to increase the value of the threshold
result = experiment(101).run()
result.output
Run a partial Workflow¶
In some cases during development, only parts of a workflow need to be executed.
For this reason, workflows can be
executed until a specific task
within the workflow.
In the example below, we will use the previously defined workflow and execute only
the measure()
part of the workflow to validate everything is working as expected before
continuing to analyze()
and finishing the workflow.
First we run the experiment workflow with an until
argument
exp = experiment(100)
partial_result = exp.run(until="measure")
partial_result.tasks
# Inspect the partial result
print(partial_result.tasks)
print(partial_result.tasks[0].output)
print(partial_result.output)
After checking that our simple task works as expected, the workflow can be resumed
by calling .resume()
without the until
argument.
The already-executed tasks are not executed again, and their results are used in the following workflow execution.
result = exp.resume()
print(result.tasks)
print(result.output)
Nested workflows¶
Workflows can be nested and they behave just like tasks within another workflow as they have inputs and an output.
To define a workflow within a workflow, it should not be initialized, but instead a Python function
decorated with @workflow
should be called with desired arguments.
As the workflow definition is building the graph of tasks and workflows to execute, the
workflow(...)
inside a workflow adds a sub-graph to the workflow being built.
When a workflow decorated function is called within another workflow it does not return a
Workflow
instance, but the result of that workflow. This is the same object when Workflow.run()
is
called outside of a workflow definition.
Building and running a nested workflow¶
In the example below, we will create a new workflow to calculate a threshold limit and pass it to the previously
defined experiment()
workflow and create a task to check the output
of the experiment()
@workflow.task
def calculate_threshold(value: int) -> int:
return value - 5
@workflow.task
def validate_experiment_result(result: str) -> str:
if result == "PASS":
return "The experiment passed!"
return "The experiment failed!"
@workflow.workflow
def calculate_threshold_experiment(value: int):
threshold = calculate_threshold(value)
result = experiment(threshold)
result_validation = validate_experiment_result(result.output)
workflow.return_(result_validation)
wf = calculate_threshold_experiment(110)
result = wf.run()
Inspecting the results¶
Nested workflow result exists within the top level WorkflowResult
returned
by Workflow.run()
.
The workflows can be inspected just like tasks.
result.tasks
# Threshold task inspection
result.tasks["calculate_threshold"].output
# Inspecting the nested WorkflowResult produced by `experiment()` workflow
result.tasks["experiment"].tasks
result.tasks["validate_experiment_result"].output
Inspecting the graph¶
Workflow graph, alongside the nested ones, can be accessed via Workflow.graph
attribute.
We will use the workflow defined in the previous section.
Getting graph as a tree¶
wf.graph.tree