stateDiagram-v2
direction lr
state "hello stub" as start_at
state "hello python" as step_2
state "hello notebook" as step_3
state "hello shell" as step_4
state "Success" as success
[*] --> start_at
start_at --> step_2 : #9989;
step_2 --> step_3 : #9989;
step_3 --> step_4 : #9989;
step_4 --> success : #9989;
success --> [*]
Traversal
Start at hello stub.
If it is successful, go to next step of the pipeline until we reach the success state.
Any failure in execution of step would, by default, go to the fail state.
"""You can execute this pipeline by: python examples/02-sequential/traversal.py A pipeline can have any "tasks" as part of it. In the below example, we have a mix of stub, python, shell and notebook tasks. As with simpler tasks, the stdout and stderr of each task are captured and stored in the catalog. .catalog └── cold-jennings-1534 ├── examples │ └── common │ └── simple_notebook_out.ipynb ├── hello_notebook.execution.log ├── hello_python.execution.log └── hello_shell.execution.log 4 directories, 4 files"""fromexamples.common.functionsimporthellofromrunnableimportNotebookTask,Pipeline,PythonTask,ShellTask,Stubdefmain():stub_task=Stub(name="hello stub")python_task=PythonTask(name="hello python",function=hello,)shell_task=ShellTask(name="hello shell",command="echo 'Hello World!'",)notebook_task=NotebookTask(name="hello notebook",notebook="examples/common/simple_notebook.ipynb",terminate_with_success=True,)# The pipeline has a mix of tasks.# The order of execution follows the order of the tasks in the list.pipeline=Pipeline(steps=[# (2)stub_task,# (1)python_task,shell_task,notebook_task,])pipeline.execute()returnpipelineif__name__=="__main__":main()
Start the pipeline.
The order of the steps is the execution order
The first step is the step corresponding to start_at
The mapping defined in the steps.
The next step after a successful execution of a step.
success as next node implies successful execution of the pipeline.
dag:description:|A pipeline can have any "tasks" as part of it. In thebelow example, we have a mix of stub, python, shell and notebook tasks.As with simpler tasks, the stdout and stderr of each task are capturedand stored in the catalog.For example, the catalog structure for this execution would be:.catalog└── cold-jennings-1534├── examples│ └── common│ └── simple_notebook_out.ipynb├── hello_notebook.execution.log├── hello_python.execution.log└── hello_shell.execution.log4 directories, 4 filesThe notebook simple_notebook_out.ipynb has the captured stdout of "Hello World!".You can run this pipeline as:runnable execute -f examples/02-sequential/traversal.yamlstart_at:hello stub# (1)steps:hello stub:type:stubnext:hello python# (2)hello python:type:taskcommand_type:pythoncommand:examples.common.functions.hello# dotted path to the function.next:hello shellhello shell:type:taskcommand_type:shellcommand:echo "Hello World!"# Command to runnext:hello notebookhello notebook:type:taskcommand_type:notebookcommand:examples/common/simple_notebook.ipynb# The path is relative to the root of the project.next:success# (3)
"""This pipeline showcases handling failures in a pipeline.The path taken if none of the steps failed:step_1 -> step_2 -> step_3 -> successstep_1 is a python function that raises an exception.And we can instruct the pipeline to execute step_4 if step_1 failsand then eventually succeed too.step_1 -> step_4 -> successThis pattern is handy when you are expecting a failure of a stepand have ways to handle it.Run this pipeline: python examples/02-sequential/on_failure_succeed.py"""fromexamples.common.functionsimportraise_exfromrunnableimportPipeline,PythonTask,Stubdefmain():step_1=PythonTask(name="step 1",function=raise_ex)# This will failstep_2=Stub(name="step 2")step_3=Stub(name="step 3",terminate_with_success=True)step_4=Stub(name="step 4",terminate_with_success=True)# (1)step_1.on_failure=step_4.namepipeline=Pipeline(steps=[step_1,step_2,step_3,[step_4]],)pipeline.execute()returnpipelineif__name__=="__main__":main()
terminate_with_success is true traverses to success node.
dag:description:|This pipeline showcases handling failures in a pipeline.The path taken if none of the steps failed:step_1 -> step_2 -> step_3 -> successstep_1 is a python function that raises an exception.And we can instruct the pipeline to execute step_4 if step_1 failsand then eventually fail.step_1 -> step_4 -> successThis pattern is handy when you are expecting a failure of a stepand have ways to handle it.Run this pipeline as:runnable execute -f examples/02-sequential/on_failure_succeed.yamlstart_at:step_1steps:step_1:type:taskcommand_type:shellcommand:exit 1# This will fail!next:step_2on_failure:step_4step_2:type:stubnext:step_3step_3:type:stubnext:successstep_4:type:stubnext:success
"""This pipeline showcases handling failures in a pipeline.The path taken if none of the steps failed:step_1 -> step_2 -> step_3 -> successstep_1 is a python function that raises an exception.And we can instruct the pipeline to execute step_4 if step_1 failsand then eventually fail.step_1 -> step_4 -> failThis pattern is handy when you need to do something before eventuallyfailing (eg: sending a notification, updating status, etc...)Run this pipeline as: python examples/02-sequential/on_failure_fail.py"""fromexamples.common.functionsimportraise_exfromrunnableimportPipeline,PythonTask,Stubdefmain():step_1=PythonTask(name="step 1",function=raise_ex)# This will failstep_2=Stub(name="step 2")step_3=Stub(name="step 3",terminate_with_success=True)step_4=Stub(name="step 4",terminate_with_failure=True)# (1)step_1.on_failure=step_4.namepipeline=Pipeline(steps=[step_1,step_2,step_3],)pipeline.execute()returnpipelineif__name__=="__main__":main()
terminate_with_failure is true traverses to fail node.
dag:description:|This pipeline showcases handling failures in a pipeline.The path taken if none of the steps failed:step_1 -> step_2 -> step_3 -> successstep_1 is a python function that raises an exception.And we can instruct the pipeline to execute step_4 if step_1 failsand then eventually fail.step_1 -> step_4 -> failThis pattern is handy when you need to do something before eventuallyfailing (eg: sending a notification, updating status, etc...)Run this pipeline as:runnable execute -f examples/02-sequential/default_fail.yamlstart_at:step_1steps:step_1:type:taskcommand_type:shellcommand:exit 1# This will fail!next:step_2on_failure:step_4step_2:type:stubnext:step_3step_3:type:stubnext:successstep_4:type:stubnext:fail