Executor
TODO: Simplify¶
Executors are the heart of runnable, they traverse the workflow and execute the tasks within the workflow while coordinating with different services (eg. run log, catalog, secrets etc)
To enable workflows run in varied computational environments, we distinguish between two core functions of any workflow engine.
Graph Traversal
-
Involves following the user-defined workflow graph to its eventual conclusion. The navigation process encompasses the sequential execution of tasks or complex tasks such as parallel paths. It also includes decision-making regarding the pathway to follow in case of task failure and the upkeep of the overall status of graph execution.
Executing Individual Steps
-
This refers to the concrete execution of the task as specified by the user along with allowing for data flow between tasks. This could involve activities such as launching a container or initiating a SQL query, among others.
Graph Traversal¶
In runnable, the graph traversal can be performed by runnable itself or can be handed over to other orchestration frameworks (e.g Argo workflows, AWS step functions).
Example¶
Below is a simple pipeline definition that does one task of printing "Hello World".
The above pipeline can be executed by the default config to execute it locally or could be translated to argo specification just by changing the configuration.
The configuration defines the local compute to the execution environment with the run log
being completely in memory and buffered with no other services active.
You can execute the pipeline in default configuration by:
runnable execute -f examples/concepts/task_shell_simple.yaml
- Run the pipeline in local environment.
- Use the buffer as run log, this will not persist the run log to disk.
- Do not move any files to central storage.
- Do not use any secrets manager.
- Do not integrate with any experiment tracking tools
In this configuration, we are using argo workflows
as our workflow engine. We are also instructing the workflow engine to use a docker image,
runnable:demo
defined in line #4, as our execution environment. Please read
containerised environments for more information.
Since runnable needs to track the execution status of the workflow, we are using a run log
which is persistent and available in for jobs in kubernetes environment.
You can execute the pipeline in argo configuration by:
runnable execute -f examples/concepts/task_shell_simple.yaml -c examples/configs/argo-config.yaml
- Use argo workflows as the execution engine to run the pipeline.
- Run this docker image for every step of the pipeline. The docker image should have the same directory structure as the project directory.
- Mount the volume from Kubernetes persistent volumes (runnable-volume) to /mnt directory.
- Resource constraints for the container runtime.
- Since every step runs in a container, the run log should be persisted. Here we are using the file-system as our run log store.
- Kubernetes PVC is mounted to every container as
/mnt
, use that to surface the run log to every step.
In the below generated argo workflow template:
- Lines 10-17 define a
dag
with tasks that corresponding to the tasks in the example workflow. - The graph traversal rules follow the the same rules as our workflow. The
step
success-success-ou7qlf
in line #15 only happens if the stepshell-task-dz3l3t
defined in line #12 succeeds. - The execution fails if any of the tasks fail. Both argo workflows and runnable
run log
mark the execution as failed.
As seen from the above example, once a pipeline is defined in runnable either via yaml or SDK, we can run the pipeline in different environments just by providing a different configuration. Most often, there is no need to change the code or deviate from standard best practices while coding.
Step Execution¶
Note
This section is to understand the internal mechanism of runnable and not required if you just want to use different executors.
Independent of traversal, all the tasks are executed within the context
of runnable.
A closer look at the actual task implemented as part of transpiled workflow in argo specification details the inner workings. Below is a snippet of the argo specification from lines 18 to 34.
The actual command
to run is not the command
defined in the workflow,
i.e echo hello world
, but a command in the CLI of runnable which specifies the workflow file,
the step name and the configuration file.
Context of runnable¶
Any task
defined by the user as part of the workflow always runs as a sub-command of
runnable. In that sense, runnable follows the
decorator pattern without being part of the
application codebase.
In a very simplistic sense, the below stubbed-code explains the context of runnable during execution of a task.
- The run log maintains the state of the execution of the tasks and subsequently the pipeline. It also holds the latest state of parameters along with captured metrics.
- The catalog contains the information about the data flowing through the pipeline. You can get/put artifacts generated during the current execution of the pipeline to a central storage.
- Read the workflow and get the step definition which holds the
command
orfunction
to execute along with the other optional information. - Any artifacts from previous steps that are needed to execute the current step can be retrieved from the catalog.
- The current function or step might need only some of the parameters casted as pydantic models, filter and cast them appropriately.
- At this point in time, we have the required parameters and data to execute the actual command. The command can internally request for more data using the python API or record experiment tracking metrics.
- If the task failed, we update the run log with that information and also raise an exception for the workflow engine to handle. Any on-failure traversals are already handled as part of the workflow definition.
- Upon successful execution, we update the run log with current state of parameters for downstream steps.
- Any artifacts generated from this step are put into the central storage for downstream steps.
- We send a success message to the workflow engine and mark the step as completed.