Pipeline¶
This page describes the pipeline architecture and its specifications.
The Pipeline is a group of services that are chained together. It is defined by a JSON file that describes the services and their order and is available through a REST API that can be used to process data. It does not rely on a Pod or a Docker image since it is a group of services. It is only stored in the database.
Architecture¶
To see the general architecture of the project, see the global UML Diagram.
This sequence diagram illustrates the interaction between an user and a pipeline.
sequenceDiagram
participant S1 as s1 - Service 1
participant S2 as s2 - Service 2
participant C as c - Client
participant S3 as s3 - Storage
participant E as e - Core engine
C->>+E: POST(p.slug, data)
E->>+S3: file_keys = for file in data: upload(file)
S3-->>-E: return(200, file_key)
E->>E: service_tasks = create_tasks()
E->>+S1: POST(s.url, callback_url: str, service_tasks[0]: ServiceTask)
Note right of S1: callback_url is the url where the service should send the response
Note right of S1: service_task should match the model
S1-->>-E: return(200, Task added to the queue)
E-->>-C: return(200, service_tasks)
S1->>+S3: data = for key in service_task.task.data_in: get_file(service_task.s3_infos, key)
S3-->>-S1: return(200, stream)
S1->>S1: result = process(data)
S1->>+S3: data_out = for res in result: upload_file(service_task.s3_infos, data_out)
S3-->>-S1: return(200, key)
S1->>S1: task_update = jsonable_encoder(TaskUpdate({status: finished, task.data_out: data_out}))
S1->>+E: PATCH(callback_url, task_update)
E-->>-S1: return(200, OK)
E->>+S2: POST(s.url, callback_url: str, service_tasks[1]: ServiceTask)
S2-->>-E: return(200, Task added to the queue)
S2->>+S3: data = for key in service_task.task.data_in: get_file(service_task.s3_infos, key)
S3-->>-S2: return(200, stream)
S2->>S2: result = process(data)
S2->>+S3: data_out = for res in result: upload_file(service_task.s3_infos, data_out)
S3-->>-S2: return(200, key)
S2->>S2: task_update = jsonable_encoder(TaskUpdate({status: finished, task.data_out: data_out}))
S2->>+E: PATCH(callback_url, task_update)
E-->>-S2: return(200, OK)
loop Every second until status is FINISHED
C-->E: GET(service_tasks[last])
end
C->>+S3: GET(service_tasks[last].data_out)
S3-->>-C: return(200, stream)
Specifications¶
Any service can be part of a pipeline. It must be registered to the Core engine.
Endpoints¶
A pipeline will be registered on the Core engine URL with its slug. For example, if the pipeline slug is my-pipeline
, the endpoints will be:
POST /my-pipeline
: Add a task to the pipeline
Models¶
The different models used in the pipeline are described below.
Pipeline model¶
This is the model of a pipeline:
Pipeline Step model¶
Each pipeline is composed of steps. A step is a service that is part of the pipeline. It is defined by the following model:
A pipeline step is linked to a service and can have a condition. The condition is a python expression that will be evaluated to know if the step should be executed or not. The condition can use the needs
field to access the outputs of the previous steps. For example, if the step step1
has an output output1
, the condition of step2
can be output1 == "foo"
.
JSON representation¶
A JSON representation of a pipeline would look like this:
The needs
field is a list of the identifiers of the previous steps needed to be executed before the current one. So the steps need to be ordered. The inputs
field is a list of the inputs of the step. The service_slug
field is the slug of the service that will be executed.
The inputs
should be in the following format: <step_identifier>.<output_name>
. For example, if the step face-detection
has an output result
, the input of the step image-blur
should be face-detection.result
. To access the inputs of the pipeline, the input should be pipeline.<input_name>
. For example, if the pipeline has an input image
, the input of the step face-detection
should be pipeline.image
.
Warning
The inputs
need to be ordered in the same order as the data_in_fields
of the service linked to the step.
On POST, the pipeline will be validated and the steps will be added to the Database.
After the pipeline is registered, it will be available on the Core engine's /pipeline-slug
endpoint.
Execution¶
When launching a pipeline, the Core engine will create a task for each step of the pipeline. The tasks will be executed in order. The Core engine will wait for the previous task to be finished before launching the next one. All the tasks will be executed linked to an element called PipelineExecution. This element will be used to store the inputs and outputs of the pipeline execution.
PipelineExecution¶
The PipelineExecution model is defined as follows: