Pipelines
Pipeline class enables users to interact with the functional properties of the Pipelines infrastructure such as create, read or delete pipelines. It can also be used for creating pipeline runs and jobs.
Parameters:
-
token
(str
, default:None
) –token copy from polly.
Usage
from polly.pipelines import Pipeline
pipeline = Pipeline(token)
create_run
This function is used to create a Pipeline run.
A run is a collection of jobs, this functions creates an empty run in which the jobs can be added.
Parameters:
-
pipeline_id
(str
) –pipeline_id for which the run is to be created
-
run_name
(str
, default:None
) –name of the run
-
priority
(str
, default:'low'
) –priority of the run, can be low | medium | high
-
tags
(dict
, default:{}
) –a dict of key-value pair with tag_name -> tag_value mapping
-
domain_context
(dict
, default:{}
) –domain context for a run
Returns:
-
object
–It will return a JSON object which is the pipeline run. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
get_job
This function returns the job data for the provided job_id
Parameters:
-
job_id
(str
) –the job_id for which the data is required
Returns:
-
object
–It will return a JSON object with pipeline job data. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
get_jobs
This function returns the list of jobs executed for a run.
Parameters:
-
run_id
(str
) –the run_id for which the jobs are required
-
org_id
((str, Optional)
) –to filter runs based on the org_id
-
user_id
((str, Optional)
) –to filter the run_id based on user_id
-
page_size
((int, Optional)
) –number of runs to be fetched per request, default = 10
-
page_after
((int, Optional)
) –number of pages to be skipped, default = 0
Returns:
-
list
–It will return a list of JSON object with pipeline runs. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
get_pipeline
This function returns the pipeline data of the provided pipeline_id.
Parameters:
-
pipeline_id
(str
) –pipeline_id for required pipeline
Returns:
-
object
–It will return a JSON object with pipeline data. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
get_pipelines
This function returns all the pipelines that the user have access to Please use this function with default values for the paramters.
Returns:
-
list
–It will return a list of JSON objects. (See Examples)
get_run
This function returns the pipeline run data
Parameters:
-
run_id
(str
) –the run_id for which the data is required
Returns:
-
list
–It will return a list of JSON object with pipeline run data. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
get_runs
This function returns the list of pipeline runs
Parameters:
-
org_id
((str, Optional)
) –to filter runs based on the org_id
-
user_id
((str, Optional)
) –to filter the run_id based on user_id
-
page_size
((int, Optional)
) –number of runs to be fetched per request, default = 10
-
page_after
((int, Optional)
) –number of pages to be skipped, default = 0
Returns:
-
list
–It will return a list of JSON object with pipeline runs. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
submit_job
This function is used for creating jobs for a particular run.
Parameters:
-
run_id
(str
) –run_id in which the job is to be created.
-
parameters
(dict
) –a key-value object of all the required parameters of pipeline
-
config
(dict
) –config definition for the pipeline job. should be of format {"infra": {"cpu": int, "memory": int, "storage": int}}
-
job_name
((str, Optional)
, default:None
) –name of the job, auto-generated if not assigned
Returns:
-
Object
–It will return a JSON object with pipeline data. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
Examples
Pipeline class of polly-python can be initialised using the code block below:-
# Install polly python
pip install polly-python
# Import libraries
from polly.auth import Polly
from polly.pipelines import Pipeline
# Create omixatlas object and authenticate
AUTH_TOKEN=(os.environ['POLLY_REFRESH_TOKEN'])
pipelines = Pipelines(token=AUTH_TOKEN)
Get All Pipelines
[
{
"id":"fb40282e-05d8-49c5-bd84-e5ad04cd8233",
"type":"pipelines",
"attributes":{
"name":"toy",
"display_name":"toy",
"description":"Pipeline description",
"executor":"nextflow",
"deployment_stage":"dev",
"config":{
},
"org_id":"1",
"user_id":"1703140688",
"user_name":"polly@elucidata.io",
"created_at":1709473284864,
"last_updated_at":1713784141145
}
}
]
Get Pipeline by ID
{
"id":"fb40282e-05d8-49c5-bd84-e5ad04cd8233",
"type":"pipelines",
"attributes":{
"name":"toy",
"display_name":"toy",
"description":"Pipeline description",
"executor":"nextflow",
"deployment_stage":"dev",
"config":{
},
"org_id":"1",
"user_id":"1703140688",
"user_name":"polly@elucidata.io",
"created_at":1709473284864,
"last_updated_at":1713784141145
}
}
Create a pipeline run
Users can create runs for pipelines they have access, as shown below:-
run = pipeline.create_run(
pipeline_id=demo_pipeline_id,
run_name="TEST_RUN",
priority="medium",
tags={},
domain_context={},
)
{
"id":"7c273042-a8dd-49dc-9208-a24ea4cc3296",
"type":"runs",
"attributes":{
"name":"TEST_RUN",
"created_at":17137412412778,
"pipeline_id":"fb402852-05d8-49c5-bd84-e5ad04cd8233",
"priority":"high",
"domain_context":{
},
"tags":{
"org_id":""
},
"org_id":"1",
"user_id":"1658226496",
"user_name":"john.doe@elucidata.io",
"num_jobs":0,
"num_successful_jobs":0,
"num_failed_jobs":0,
"status":"PENDING",
"last_updated_at":1713794894778
}
}
Submit pipeline job
Users can submit jobs for pipeline as shown below:-
job = pipeline.submit_job(
run_id = "run_id",
parameters = {
"param_a": "value_a",
"param_b": "value_b"
},
config = {
"infra": {
"cpu": 1,
"memory": 2,
"storage": 120
}
}
)
Get runs for the current user, and filter by status or priority
# Get Runs by user
runs = pipeline.get_runs()
# Get runs with filter
filtered_runs_by_status = pipeline.get_runs(
status="PARTIALLY_COMPLETED"
)
# Get runs with filter
filtered_runs_by_priority = pipeline.get_runs(
priority="low"
)
[
{
"id":"1659632141379__1686063841__1641216496",
"type":"runs",
"attributes":{
"name":"Multiple OA",
"created_at":1686063857177,
"pipeline_id":"None",
"priority":"high",
"domain_context":{
"source":"migration",
"repo_id":"16596812811111",
"repo_version":168412063841
},
"tags":{
},
"org_id":"1",
"user_id":"1658226496",
"user_name":"John Doe",
"num_jobs":5,
"num_successful_jobs":3,
"num_failed_jobs":2,
"status":"PARTIALLY_COMPLETED",
"last_updated_at":1693139935542
}
}
]
Get a run by ID
{
"id":"8b3db5bd-12cf-4929-bdda-d1900553f5ad",
"type":"runs",
"attributes":{
"name":"76afcb3d-f9d4-480c-91ec-9d23a2064e47",
"created_at":1694256985387,
"pipeline_id":"ce03e312-e9cf-46f0-985a-e14f93066cd3",
"priority":"low",
"domain_context":{
},
"tags":{
},
"org_id":"1",
"user_id":"1658226496",
"user_name":"aditya.asthana@elucidata.io",
"num_jobs":1,
"num_successful_jobs":0,
"num_failed_jobs":1,
"status":"ERRORED",
"last_updated_at":1694257190419
}
}
Get jobs inside a run
[
{
"id":"56cb3a52-c280-4433-b73f-13875dbb2480",
"type":"jobs",
"attributes":{
"name":"job_name",
"parameters":{
"x":12,
"y":13
},
"config":{
},
"run_id":"8b3db5bd-12cf-4929-bdda-d1900553f5ad",
"stage":"Processing",
"progress":"1/2",
"errored":true,
"finished":false,
"created_at":1694256987866,
"last_updated_at":1694257190419
}
}
]
Get a job by ID
{
"id":"561b1a52-c280-4433-b73f-13875dbb2480",
"type":"jobs",
"attributes":{
"name":"job#0",
"parameters":{
"x":12,
"y":13
},
"config":{
},
"run_id":"41245bd-12cf-4929-bdda-d1900553f5ad",
"stage":"Processing",
"progress":"1/2",
"errored":true,
"finished":false,
"created_at":1694256987866,
"last_updated_at":1694257190419
}
}