Skip to content

Pipelines

Pipeline class enables users to interact with the functional properties of the Pipelines infrastructure such as create, read or delete pipelines. It can also be used for creating pipeline runs and jobs.

Parameters:

  • token (str, default: None ) –

    token copy from polly.

Usage

from polly.pipelines import Pipeline

pipeline = Pipeline(token)

create_run

create_run(pipeline_id, run_name=None, priority='low', tags={}, domain_context={})

This function is used to create a Pipeline run.

A run is a collection of jobs, this functions creates an empty run in which the jobs can be added.

Parameters:

  • pipeline_id (str) –

    pipeline_id for which the run is to be created

  • run_name (str, default: None ) –

    name of the run

  • priority (str, default: 'low' ) –

    priority of the run, can be low | medium | high

  • tags (dict, default: {} ) –

    a dict of key-value pair with tag_name -> tag_value mapping

  • domain_context (dict, default: {} ) –

    domain context for a run

Returns:

  • object

    It will return a JSON object which is the pipeline run. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

get_job

get_job(job_id)

This function returns the job data for the provided job_id

Parameters:

  • job_id (str) –

    the job_id for which the data is required

Returns:

  • object

    It will return a JSON object with pipeline job data. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

get_jobs

get_jobs(run_id)

This function returns the list of jobs executed for a run.

Parameters:

  • run_id (str) –

    the run_id for which the jobs are required

  • org_id ((str, Optional)) –

    to filter runs based on the org_id

  • user_id ((str, Optional)) –

    to filter the run_id based on user_id

  • page_size ((int, Optional)) –

    number of runs to be fetched per request, default = 10

  • page_after ((int, Optional)) –

    number of pages to be skipped, default = 0

Returns:

  • list

    It will return a list of JSON object with pipeline runs. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

get_pipeline

get_pipeline(pipeline_id)

This function returns the pipeline data of the provided pipeline_id.

Parameters:

  • pipeline_id (str) –

    pipeline_id for required pipeline

Returns:

  • object

    It will return a JSON object with pipeline data. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

get_pipelines

get_pipelines()

This function returns all the pipelines that the user have access to Please use this function with default values for the paramters.

Returns:

  • list

    It will return a list of JSON objects. (See Examples)

get_run

get_run(run_id)

This function returns the pipeline run data

Parameters:

  • run_id (str) –

    the run_id for which the data is required

Returns:

  • list

    It will return a list of JSON object with pipeline run data. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

get_runs

get_runs(status=None, priority=None)

This function returns the list of pipeline runs

Parameters:

  • org_id ((str, Optional)) –

    to filter runs based on the org_id

  • user_id ((str, Optional)) –

    to filter the run_id based on user_id

  • page_size ((int, Optional)) –

    number of runs to be fetched per request, default = 10

  • page_after ((int, Optional)) –

    number of pages to be skipped, default = 0

Returns:

  • list

    It will return a list of JSON object with pipeline runs. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

submit_job

submit_job(run_id, parameters, config, job_name=None)

This function is used for creating jobs for a particular run.

Parameters:

  • run_id (str) –

    run_id in which the job is to be created.

  • parameters (dict) –

    a key-value object of all the required parameters of pipeline

  • config (dict) –

    config definition for the pipeline job. should be of format {"infra": {"cpu": int, "memory": int, "storage": int}}

  • job_name ((str, Optional), default: None ) –

    name of the job, auto-generated if not assigned

Returns:

  • Object

    It will return a JSON object with pipeline data. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

Examples

Pipeline class of polly-python can be initialised using the code block below:-

# Install polly python
pip install polly-python

# Import libraries
from polly.auth import Polly
from polly.pipelines import Pipeline

# Create omixatlas object and authenticate
AUTH_TOKEN=(os.environ['POLLY_REFRESH_TOKEN'])
pipelines = Pipelines(token=AUTH_TOKEN)

Get All Pipelines

pipelines.get_pipelines()
[
    {
        "id":"fb40282e-05d8-49c5-bd84-e5ad04cd8233",
        "type":"pipelines",
        "attributes":{
            "name":"toy",
            "display_name":"toy",
            "description":"Pipeline description",
            "executor":"nextflow",
            "deployment_stage":"dev",
            "config":{

            },
            "org_id":"1",
            "user_id":"1703140688",
            "user_name":"polly@elucidata.io",
            "created_at":1709473284864,
            "last_updated_at":1713784141145
        }
    }
]

Get Pipeline by ID

pipelines.get_pipeline(pipeline_id="fb40282e-05d8-49c5-bd84-e5ad04cd8233")
{
    "id":"fb40282e-05d8-49c5-bd84-e5ad04cd8233",
    "type":"pipelines",
    "attributes":{
        "name":"toy",
        "display_name":"toy",
        "description":"Pipeline description",
        "executor":"nextflow",
        "deployment_stage":"dev",
        "config":{

        },
        "org_id":"1",
        "user_id":"1703140688",
        "user_name":"polly@elucidata.io",
        "created_at":1709473284864,
        "last_updated_at":1713784141145
    }
}

Create a pipeline run

Users can create runs for pipelines they have access, as shown below:-

run = pipeline.create_run(
    pipeline_id=demo_pipeline_id,
    run_name="TEST_RUN",
    priority="medium",
    tags={},
    domain_context={},
)
{
    "id":"7c273042-a8dd-49dc-9208-a24ea4cc3296",
    "type":"runs",
    "attributes":{
        "name":"TEST_RUN",
        "created_at":17137412412778,
        "pipeline_id":"fb402852-05d8-49c5-bd84-e5ad04cd8233",
        "priority":"high",
        "domain_context":{

        },
        "tags":{
            "org_id":""
        },
        "org_id":"1",
        "user_id":"1658226496",
        "user_name":"john.doe@elucidata.io",
        "num_jobs":0,
        "num_successful_jobs":0,
        "num_failed_jobs":0,
        "status":"PENDING",
        "last_updated_at":1713794894778
    }
}

Submit pipeline job

Users can submit jobs for pipeline as shown below:-

job = pipeline.submit_job(
    run_id = "run_id",
    parameters = {
        "param_a": "value_a",
        "param_b": "value_b"
    },
    config = {
        "infra": {
            "cpu": 1,
            "memory": 2,
            "storage": 120
        }
    }
)

Get runs for the current user, and filter by status or priority

# Get Runs by user
runs = pipeline.get_runs()

# Get runs with filter
filtered_runs_by_status = pipeline.get_runs(
    status="PARTIALLY_COMPLETED"
)

# Get runs with filter
filtered_runs_by_priority = pipeline.get_runs(
    priority="low"
)
[
    {
        "id":"1659632141379__1686063841__1641216496",
        "type":"runs",
        "attributes":{
            "name":"Multiple OA",
            "created_at":1686063857177,
            "pipeline_id":"None",
            "priority":"high",
            "domain_context":{
                "source":"migration",
                "repo_id":"16596812811111",
                "repo_version":168412063841
            },
            "tags":{

            },
            "org_id":"1",
            "user_id":"1658226496",
            "user_name":"John Doe",
            "num_jobs":5,
            "num_successful_jobs":3,
            "num_failed_jobs":2,
            "status":"PARTIALLY_COMPLETED",
            "last_updated_at":1693139935542
        }
    }
]

Get a run by ID

pipeline.get_run(run_id="<RUN_ID>")
{
    "id":"8b3db5bd-12cf-4929-bdda-d1900553f5ad",
    "type":"runs",
    "attributes":{
        "name":"76afcb3d-f9d4-480c-91ec-9d23a2064e47",
        "created_at":1694256985387,
        "pipeline_id":"ce03e312-e9cf-46f0-985a-e14f93066cd3",
        "priority":"low",
        "domain_context":{

        },
        "tags":{

        },
        "org_id":"1",
        "user_id":"1658226496",
        "user_name":"aditya.asthana@elucidata.io",
        "num_jobs":1,
        "num_successful_jobs":0,
        "num_failed_jobs":1,
        "status":"ERRORED",
        "last_updated_at":1694257190419
    }
}

Get jobs inside a run

pipeline.get_jobs(run_id="<RUN_ID>")
[
    {
        "id":"56cb3a52-c280-4433-b73f-13875dbb2480",
        "type":"jobs",
        "attributes":{
            "name":"job_name",
            "parameters":{
                "x":12,
                "y":13
            },
            "config":{

            },
            "run_id":"8b3db5bd-12cf-4929-bdda-d1900553f5ad",
            "stage":"Processing",
            "progress":"1/2",
            "errored":true,
            "finished":false,
            "created_at":1694256987866,
            "last_updated_at":1694257190419
        }
    }
]

Get a job by ID

pipeline.get_job(job_id='<JOB_ID>')
{
    "id":"561b1a52-c280-4433-b73f-13875dbb2480",
    "type":"jobs",
    "attributes":{
        "name":"job#0",
        "parameters":{
            "x":12,
            "y":13
        },
        "config":{

        },
        "run_id":"41245bd-12cf-4929-bdda-d1900553f5ad",
        "stage":"Processing",
        "progress":"1/2",
        "errored":true,
        "finished":false,
        "created_at":1694256987866,
        "last_updated_at":1694257190419
    }
}