A relatable example from data science would be doing a grid search over hyper parameters where the training pipeline needs
to run on every hyper parameter.
flowchart TD
gridSearch([Grid Search]):::green
success([Success]):::green
subgraph one[Parameter 1]
process_chunk1([Train model]):::yellow
success_chunk1([Success]):::yellow
process_chunk1 --> success_chunk1
end
subgraph two[Parameter ...]
process_chunk2([Train model]):::yellow
success_chunk2([Success]):::yellow
process_chunk2 --> success_chunk2
end
subgraph three[Parameter n]
process_chunk3([Train model]):::yellow
success_chunk3([Success]):::yellow
process_chunk3 --> success_chunk3
end
reduce([Reduce]):::green
gridSearch --> one --> reduce
gridSearch --> two --> reduce
gridSearch --> three --> reduce
reduce --> success
classDef yellow stroke:#FFFF00
classDef green stroke:#0f0
The reduce step is part of the map state definition.
Conceptually, map node can be represented in python as:
foriiniterable_parameter:# a pipeline of stepsx=execute_first_step(i)score=execute_second_step(i,x)reduce(score)# could be as simple as a list of scores indexed by i or a custom reducer function/lambda...
The returns of the tasks of the iterable branch are reduced to a list indexed
by the order of iterable. In the above example, there would be parameter available for downstream steps of
iterate_task that is a list of all xs observed during the iteration.
For clarity, the default reducer is: lambda *x: list(x) # returns a list of the args
The map state also accepts a argument reducer which could be a lambda or function that
accepts *args (a non-keyword variable length argument list) and returns a reduced value.
The downstream steps of iterate_task would use the reduced value.
A branch of a map step is considered success only if the success step is reached at the end.
The steps of the pipeline can fail and be handled by on failure and
redirected to success if that is the desired behavior.
The map step is considered successful only if all the branches of the step have terminated successfully.
"""map states allows to repeat a branch for each value of an iterable.The below example can written, in python, as:chunks = [1, 2, 3]for chunk in chunks: # Any task within the pipeline can access the value of chunk as an argument. processed = process_chunk(chunk) # The value of processed for every iteration is the value returned by the steps # of the current execution. For example, the value of processed # for chunk=1, is chunk*10 = 10 for downstream steps. read_processed_chunk(chunk, processed)# Outside of loop, processed is a list of all the processed chunks.# This is also called as the reduce pattern.assert processed == [chunk * 10 for chunk in chunks]Run this pipeline as: python examples/07-map/map.py"""fromexamples.common.functionsimport(assert_default_reducer,process_chunk,read_processed_chunk,)fromrunnableimportMap,NotebookTask,Pipeline,PythonTask,ShellTaskdefiterable_branch(execute:bool=True):""" Use the pattern of using "execute" to control the execution of the pipeline. The same pipeline can be run independently from the command line. WARNING: If the execution is not controlled by "execute", the pipeline will be executed even during the definition of the branch in parallel steps. """# The python function to process a single chunk of data.# In the example, we are multiplying the chunk by 10.process_chunk_task_python=PythonTask(name="execute_python",function=process_chunk,returns=["processed_python"],)# return parameters within a map branch have to be unique# The notebook takes in the value of processed_python as an argument.# and returns a new parameter "processed_notebook" which is 10*processed_pythonprocess_chunk_task_notebook=NotebookTask(name="execute_notebook",notebook="examples/common/process_chunk.ipynb",returns=["processed_notebook"],)# following the pattern, the shell takes in the value of processed_notebook as an argument.# and returns a new parameter "processed_shell" which is 10*processed_notebook.shell_command=""" if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \ && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then echo "yaay" else echo "naay" exit 1; fi export processed_shell=$( expr 10 '*' "$processed_notebook") """process_chunk_task_shell=ShellTask(name="execute_shell",command=shell_command,returns=["processed_shell"],)# A downstream step of process_<python, notebook, shell> which reads the parameter "processed".# The value of processed is within the context of the branch.# For example, for chunk=1, the value of processed_python is chunk*10 = 10# the value of processed_notebook is processed_python*10 = 100# the value of processed_shell is processed_notebook*10 = 1000read_chunk=PythonTask(name="read processed chunk",function=read_processed_chunk,terminate_with_success=True,)pipeline=Pipeline(steps=[process_chunk_task_python,process_chunk_task_notebook,process_chunk_task_shell,read_chunk,],)ifexecute:pipeline.execute()returnpipelinedefmain():# Create a map state which iterates over a list of chunks.# chunk is the value of the iterable.map_state=Map(name="map state",iterate_on="chunks",iterate_as="chunk",branch=iterable_branch(execute=False),)# Outside of the loop, processed is a list of all the processed chunks.# This is also called as the reduce pattern.# the value of processed_python is [10, 20, 30]# the value of processed_notebook is [100, 200, 300]# the value of processed_shell is [1000, 2000, 3000]collect=PythonTask(name="collect",function=assert_default_reducer,terminate_with_success=True,)pipeline=Pipeline(steps=[map_state,collect])pipeline.execute(parameters_file="examples/common/initial_parameters.yaml")returnpipelineif__name__=="__main__":main()
branch:&branchstart_at:execute_pythonsteps:execute_python:type:taskcommand:examples.common.functions.process_chunkreturns:-name:processed_pythonnext:execute_notebookexecute_notebook:type:taskcommand_type:notebookcommand:examples/common/process_chunk.ipynbreturns:-name:processed_notebooknext:execute_shellexecute_shell:type:taskcommand_type:shellcommand:|if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \&& [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; thenecho "yaay"elseecho "naay"exit 1;fiexport processed_shell=$( expr 10 '*' "$processed_notebook")returns:-name:processed_shellnext:read_chunkread_chunk:type:taskcommand:examples.common.functions.read_processed_chunknext:successdag:description:|map states allows to repeat a branch for each value of an iterable.The below example can written, in python, as:chunks = [1, 2, 3]for chunk in chunks:# Any task within the pipeline can access the value of chunk as an argument.processed = process_chunk(chunk)# The value of processed for every iteration is the value returned by the steps# of the current execution. For example, the value of processed# for chunk=1, is chunk*10 = 10 for downstream steps.read_processed_chunk(chunk, processed)# Outside of loop, processed is a list of all the processed chunks.# This is also called as the reduce pattern.assert processed == [chunk * 10 for chunk in chunks]Run this pipeline as:runnable execute -f examples/07-map/map.yaml \-p examples/common/initial_parameters.yamlstart_at:map_statesteps:map_state:type:mapbranch:*branchiterate_on:chunksiterate_as:chunknext:collectcollect:type:taskcommand:examples.common.functions.assert_default_reducernext:success
Differs from default reducer to a lambda *x: max(x) reducer.
"""map states allows to repeat a branch for each value of an iterable.The below example can written, in python, as:chunks = [1, 2, 3]for chunk in chunks: # Any task within the pipeline can access the value of chunk as an argument. processed = process_chunk(chunk) # The value of processed for every iteration is the value returned by the steps # of the current execution. For example, the value of processed # for chunk=1, is chunk*10 = 10 for downstream steps. read_processed_chunk(chunk, processed)It is possible to use a custom reducer, for example, this reducer is a max of the collection.# Once the reducer is applied, processed is reduced to a single value.assert processed == max(chunk * 10 for chunk in chunks)"""fromexamples.common.functionsimport(assert_custom_reducer,process_chunk,read_processed_chunk,)fromrunnableimportMap,NotebookTask,Pipeline,PythonTask,ShellTaskdefiterable_branch(execute:bool=True):""" Use the pattern of using "execute" to control the execution of the pipeline. The same pipeline can be run independently from the command line. WARNING: If the execution is not controlled by "execute", the pipeline will be executed even during the definition of the branch in parallel steps. """# The python function to process a single chunk of data.# In the example, we are multiplying the chunk by 10.process_chunk_task_python=PythonTask(name="execute_python",function=process_chunk,returns=["processed_python"],)# return parameters within a map branch have to be unique# The notebook takes in the value of processed_python as an argument.# and returns a new parameter "processed_notebook" which is 10*processed_pythonprocess_chunk_task_notebook=NotebookTask(name="execute_notebook",notebook="examples/common/process_chunk.ipynb",returns=["processed_notebook"],)# following the pattern, the shell takes in the value of processed_notebook as an argument.# and returns a new parameter "processed_shell" which is 10*processed_notebook.shell_command=""" if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \ && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then echo "yaay" else echo "naay" exit 1; fi export processed_shell=$( expr 10 '*' "$processed_notebook") """process_chunk_task_shell=ShellTask(name="execute_shell",command=shell_command,returns=["processed_shell"],)# A downstream step of process_<python, notebook, shell> which reads the parameter "processed".# The value of processed is within the context of the branch.# For example, for chunk=1, the value of processed_python is chunk*10 = 10# the value of processed_notebook is processed_python*10 = 100# the value of processed_shell is processed_notebook*10 = 1000read_chunk=PythonTask(name="read processed chunk",function=read_processed_chunk,terminate_with_success=True,)pipeline=Pipeline(steps=[process_chunk_task_python,process_chunk_task_notebook,process_chunk_task_shell,read_chunk,],)ifexecute:pipeline.execute()returnpipelinedefmain():# Create a map state which iterates over a list of chunks.# chunk is the value of the iterable.# Upon completion of the map state, all the parameters of the tasks# within the pipeline will be processed by the reducer.# In this case, the reducer is the max of all the processed chunks.map_state=Map(name="map state",iterate_on="chunks",iterate_as="chunk",reducer="lambda *x: max(x)",branch=iterable_branch(execute=False),)collect=PythonTask(name="collect",function=assert_custom_reducer,terminate_with_success=True,)pipeline=Pipeline(steps=[map_state,collect])pipeline.execute(parameters_file="examples/common/initial_parameters.yaml")returnpipelineif__name__=="__main__":main()
branch:&branchstart_at:execute_pythonsteps:execute_python:type:taskcommand:examples.common.functions.process_chunkreturns:-name:processed_pythonnext:execute_notebookexecute_notebook:type:taskcommand_type:notebookcommand:examples/common/process_chunk.ipynbreturns:-name:processed_notebooknext:execute_shellexecute_shell:type:taskcommand_type:shellcommand:|if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \&& [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; thenecho "yaay"elseecho "naay"exit 1;fiexport processed_shell=$( expr 10 '*' "$processed_notebook")returns:-name:processed_shellnext:read_chunkread_chunk:type:taskcommand:examples.common.functions.read_processed_chunknext:successdag:description:|map states allows to repeat a branch for each value of an iterable.The below example can written, in python, as:chunks = [1, 2, 3]for chunk in chunks:# Any task within the pipeline can access the value of chunk as an argument.processed = process_chunk(chunk)# The value of processed for every iteration is the value returned by the steps# of the current execution. For example, the value of processed# for chunk=1, is chunk*10 = 10 for downstream steps.read_processed_chunk(chunk, processed)It is possible to use a custom reducer, for example, this reducer is a max of the collection.# Once the reducer is applied, processed is reduced to a single value.assert processed == max(chunk * 10 for chunk in chunks)Run this pipeline as:runnable execute -f examples/07-map/custom_reducer.yaml \-p examples/common/initial_parameters.yamlstart_at:map_statesteps:map_state:type:mapbranch:*branchiterate_on:chunksiterate_as:chunkreducer:"lambda*x:max(x)"next:collectcollect:type:taskcommand:examples.common.functions.assert_custom_reducernext:success