Skip to content

Pipelines

Pipeline class enables users to interact with the functional properties of the Pipelines infrastructure such as create, read or delete pipelines. It can also be used for creating pipeline batches and runs.

Parameters:

  • token (str, default: None ) –

    token copy from polly.

Usage

from polly.pipelines import Pipeline

pipeline = Pipeline(token)

cancel_run

cancel_run(run_id)

This function cancels a run Args: run_id (str): the run_id of the run to be cancelled

Returns:

  • object

    It will return a JSON object with pipeline run data. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

create_batch

create_batch(pipeline_id, name=None, description=None, priority='low', tags={}, domain_context={})

This function is used to create a Pipeline batch.

A batch is a collection of runs, this functions creates an empty batch in which the runs can be added.

Parameters:

  • pipeline_id (str) –

    pipeline_id for which the batch is to be created

  • name (str, default: None ) –

    name of the batch

  • description (str, default: None ) –

    description of the batch

  • priority (str, default: 'low' ) –

    priority of the batch, can be low | medium | high

  • tags (dict, default: {} ) –

    a dict of key-value pair with tag_name -> tag_value mapping

  • domain_context (dict, default: {} ) –

    domain context for a batch

Returns:

  • object

    It will return a JSON object which is the pipeline batch. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

get_batch

get_batch(batch_id)

This function returns the pipeline batch data

Parameters:

  • batch_id (str) –

    the batch_id for which the data is required

Returns:

  • list

    It will return a list of JSON object with pipeline batch data. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

get_pipeline

get_pipeline(pipeline_id)

This function returns the pipeline data of the provided pipeline_id.

Parameters:

  • pipeline_id (str) –

    pipeline_id for required pipeline

Returns:

  • object

    It will return a JSON object with pipeline data. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

get_run

get_run(run_id)

This function returns the run data for the provided run_id

Parameters:

  • run_id (str) –

    the run_id for which the data is required

Returns:

  • object

    It will return a JSON object with pipeline run data. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

list_batches

list_batches(status=None, priority=None, search_term=None)

This function returns the list of pipeline batches

Parameters:

  • status ((str, Optional), default: None ) –

    to filter batches based on the status

  • priority ((str, Optional), default: None ) –

    to filter the batches based on priority

  • search_term ((str, Optional), default: None ) –

    to filter the batches based on a search term.

Returns:

  • list

    It will return a list of JSON object with pipeline batches. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

list_pipelines

list_pipelines(search_term=None)

This function returns all the pipelines that the user have access to Please use this function with default values for the paramters.

Returns:

  • list

    It will return a list of JSON objects. (See Examples)

list_runs

list_runs(batch_id)

This function returns the list of runs executed for a batch.

Parameters:

  • batch_id (str) –

    the batch_id for which the runs are required

Returns:

  • list

    It will return a list of JSON object with pipeline runs. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

submit_run

submit_run(batch_id, parameters, config={}, run_name=None)

This function is used for creating runs for a particular batch.

Parameters:

  • batch_id (str) –

    batch_id in which the run is to be created.

  • parameters (dict) –

    a key-value object of all the required parameters of pipeline

  • config (dict, default: {} ) –

    config definition for the pipeline run. should be of format {"infra": {"cpu": int, "memory": int, "storage": int}}

  • run_name ((str, Optional), default: None ) –

    name of the run, auto-generated if not assigned

Returns:

  • Object

    It will return a JSON object with pipeline data. (See Examples)

Raises:

  • wrongParamException

    invalid parameter passed

Examples

Pipeline class of polly-python can be initialised using the code block below:-

# Install polly python
pip install polly-python

# Import libraries
from polly.auth import Polly
from polly.pipelines import Pipeline

# Create omixatlas object and authenticate
AUTH_TOKEN=(os.environ['POLLY_REFRESH_TOKEN'])
pipelines = Pipelines(token=AUTH_TOKEN)

Get All Pipelines

pipelines.get_pipelines()
[
    {
        "id":"fb40282e-05d8-49c5-bd84-e5ad04cd8233",
        "type":"pipelines",
        "attributes":{
            "name":"toy",
            "display_name":"toy",
            "description":"Pipeline description",
            "executor":"nextflow",
            "deployment_stage":"dev",
            "config":{

            },
            "org_id":"1",
            "user_id":"1703140688",
            "user_name":"polly@elucidata.io",
            "created_at":1709473284864,
            "last_updated_at":1713784141145
        }
    }
]

Get Pipeline by ID

pipelines.get_pipeline(pipeline_id="fb40282e-05d8-49c5-bd84-e5ad04cd8233")
{
    "id":"fb40282e-05d8-49c5-bd84-e5ad04cd8233",
    "type":"pipelines",
    "attributes":{
        "name":"toy",
        "display_name":"toy",
        "description":"Pipeline description",
        "executor":"nextflow",
        "deployment_stage":"dev",
        "config":{

        },
        "org_id":"1",
        "user_id":"1703140688",
        "user_name":"polly@elucidata.io",
        "created_at":1709473284864,
        "last_updated_at":1713784141145
    }
}

Create a pipeline run

Users can create runs for pipelines they have access, as shown below:-

run = pipeline.create_run(
    pipeline_id=demo_pipeline_id,
    run_name="TEST_RUN",
    priority="medium",
    tags={},
    domain_context={},
)
{
    "id":"7c273042-a8dd-49dc-9208-a24ea4cc3296",
    "type":"runs",
    "attributes":{
        "name":"TEST_RUN",
        "created_at":17137412412778,
        "pipeline_id":"fb402852-05d8-49c5-bd84-e5ad04cd8233",
        "priority":"high",
        "domain_context":{

        },
        "tags":{
            "org_id":""
        },
        "org_id":"1",
        "user_id":"1658226496",
        "user_name":"john.doe@elucidata.io",
        "num_jobs":0,
        "num_successful_jobs":0,
        "num_failed_jobs":0,
        "status":"PENDING",
        "last_updated_at":1713794894778
    }
}

Submit pipeline job

Users can submit jobs for pipeline as shown below:-

job = pipeline.submit_job(
    run_id = "run_id",
    parameters = {
        "param_a": "value_a",
        "param_b": "value_b"
    },
    config = {
        "infra": {
            "cpu": 1,
            "memory": 2,
            "storage": 120
        }
    }
)

Get runs for the current user, and filter by status or priority

# Get Runs by user
runs = pipeline.get_runs()

# Get runs with filter
filtered_runs_by_status = pipeline.get_runs(
    status="PARTIALLY_COMPLETED"
)

# Get runs with filter
filtered_runs_by_priority = pipeline.get_runs(
    priority="low"
)
[
    {
        "id":"1659632141379__1686063841__1641216496",
        "type":"runs",
        "attributes":{
            "name":"Multiple OA",
            "created_at":1686063857177,
            "pipeline_id":"None",
            "priority":"high",
            "domain_context":{
                "source":"migration",
                "repo_id":"16596812811111",
                "repo_version":168412063841
            },
            "tags":{

            },
            "org_id":"1",
            "user_id":"1658226496",
            "user_name":"John Doe",
            "num_jobs":5,
            "num_successful_jobs":3,
            "num_failed_jobs":2,
            "status":"PARTIALLY_COMPLETED",
            "last_updated_at":1693139935542
        }
    }
]

Get a run by ID

pipeline.get_run(run_id="<RUN_ID>")
{
    "id":"8b3db5bd-12cf-4929-bdda-d1900553f5ad",
    "type":"runs",
    "attributes":{
        "name":"76afcb3d-f9d4-480c-91ec-9d23a2064e47",
        "created_at":1694256985387,
        "pipeline_id":"ce03e312-e9cf-46f0-985a-e14f93066cd3",
        "priority":"low",
        "domain_context":{

        },
        "tags":{

        },
        "org_id":"1",
        "user_id":"1658226496",
        "user_name":"aditya.asthana@elucidata.io",
        "num_jobs":1,
        "num_successful_jobs":0,
        "num_failed_jobs":1,
        "status":"ERRORED",
        "last_updated_at":1694257190419
    }
}

Get jobs inside a run

pipeline.get_jobs(run_id="<RUN_ID>")
[
    {
        "id":"56cb3a52-c280-4433-b73f-13875dbb2480",
        "type":"jobs",
        "attributes":{
            "name":"job_name",
            "parameters":{
                "x":12,
                "y":13
            },
            "config":{

            },
            "run_id":"8b3db5bd-12cf-4929-bdda-d1900553f5ad",
            "stage":"Processing",
            "progress":"1/2",
            "errored":true,
            "finished":false,
            "created_at":1694256987866,
            "last_updated_at":1694257190419
        }
    }
]

Get a job by ID

pipeline.get_job(job_id='<JOB_ID>')
{
    "id":"561b1a52-c280-4433-b73f-13875dbb2480",
    "type":"jobs",
    "attributes":{
        "name":"job#0",
        "parameters":{
            "x":12,
            "y":13
        },
        "config":{

        },
        "run_id":"41245bd-12cf-4929-bdda-d1900553f5ad",
        "stage":"Processing",
        "progress":"1/2",
        "errored":true,
        "finished":false,
        "created_at":1694256987866,
        "last_updated_at":1694257190419
    }
}