Once you have a pipeline up and running, you can publish a pipeline so that it runs with different inputs. This was known as Published Pipelines.
What has changed?
Batch Endpoint proposes a similar yet more powerful way to handle multiple assets running under a durable API, which is why the Published pipelines functionality was moved to Pipeline component deployments in batch endpoints.
Batch endpoints decouples the interface (endpoint) from the actual implementation (deployment) and allow the user to decide which deployment serves the default implementation of the endpoint. Pipeline component deployments in batch endpoints allow users to deploy pipeline components instead of pipelines, which make a better use of reusable assets for those organizations looking to streamline their MLOps practice.
The following table shows a comparison of each of the concepts:
Concept |
SDK v1 |
SDK v2 |
Pipeline's REST endpoint for invocation |
Pipeline endpoint |
Batch endpoint |
Pipeline's specific version under the endpoint |
Published pipeline |
Pipeline component deployment |
Pipeline's arguments on invocation |
Pipeline parameter |
Job inputs |
Job generated from a published pipeline |
Pipeline job |
Batch job |
To learn how to create your first pipeline component deployment see How to deploy pipelines in Batch Endpoints.
Moving to batch endpoints
Use the following guidelines to learn how to move from SDK v1 to SDK v2 using the concepts in Batch Endpoints.
Publish a pipeline
Compare how publishing a pipeline has changed from v1 to v2:
First, we need to get the pipeline we want to publish:
pipeline1 = Pipeline(workspace=ws, steps=[step1, step2])
We can publish the pipeline as follows:
from azureml.pipeline.core import PipelineEndpoint
endpoint_name = "PipelineEndpointTest"
pipeline_endpoint = PipelineEndpoint.publish(
workspace=ws,
name=endpoint_name,
pipeline=pipeline,
description="A hello world endpoint for component deployments"
)
First, we need to define the pipeline we want to publish.
@pipeline()
def pipeline(input_data: Input(type=AssetTypes.URI_FOLDER)):
(...)
return {
(..)
}
Batch endpoints don't deploy pipelines but pipeline components. Components propose a more reliable way for having source control of the assets that are being deployed under an endpoint. We can convert any pipeline definition into a pipeline component as follows:
pipeline_component = pipeline().component
As a best practice, we recommend registering pipeline components so you can keep versioning of them in a centralized way inside the workspace or even the shared registries.
ml_client.components.create(pipeline_component)
Then, we need to create the endpoint hosting all the pipeline deployments:
endpoint_name = "PipelineEndpointTest"
endpoint = BatchEndpoint(
name=endpoint_name,
description="A hello world endpoint for component deployments",
)
ml_client.batch_endpoints.begin_create_or_update(endpoint)
Create a deployment for the pipeline component:
deployment_name = "hello-batch-dpl"
deployment = BatchPipelineComponentDeployment(
name=deployment_name,
description="A hello world deployment with a single step.",
endpoint_name=endpoint.name,
component=pipeline_component
)
ml_client.batch_deployments.begin_create_or_update(deployment)
Submit a job to a pipeline endpoint
To call the default version of the pipeline, you can use:
pipeline_endpoint = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint.submit("PipelineEndpointExperiment")
job = ml_client.batch_endpoints.invoke(
endpoint_name=batch_endpoint,
)
Use inputs
to indicate the inputs of the job if needed. See Create jobs and input data for batch endpoints for a more detailed explanation about how to indicate inputs and outputs.
job = ml_client.batch_endpoints.invoke(
endpoint_name=batch_endpoint,
inputs={
"input_data": Input(type=AssetTypes.URI_FOLDER, path="./my_local_data")
}
)
You can also submit a job to a specific version:
run_id = pipeline_endpoint.submit(endpoint_name, pipeline_version="0")
In batch endpoints, deployments aren't versioned. However, you can deploy multiple pipeline components versions under the same endpoint. In this sense, each pipeline version in v1 corresponds to a different pipeline component version and its corresponding deployment under the endpoint.
Then, you can deploy a specific deployment running under the endpoint if that deployment runs the version yo're interested in.
job = ml_client.batch_endpoints.invoke(
endpoint_name=endpoint_name,
deployment_name=deployment_name,
)
Get all pipelines deployed
all_pipelines = PublishedPipeline.get_all(ws)
The following code list all the endpoints existing in the workspace:
all_endpoints = ml_client.batch_endpoints.list()
However, keep in mind that batch endpoints can host deployments operationalizing either pipelines or models. If you want to get a list of all the deployments that host pipelines, you can do as follows:
all_deployments = []
for endpoint in all_endpoints:
all_deployments.extend(ml_client.batch_deployments.list(endpoint_name=endpoint.name))
all_pipeline_deployments = filter(all_endpoints, lamdba x: x is BatchPipelineComponentDeployment)
Using the REST API
You can create jobs from the endpoints by using the REST API of the invocation URL. See the following examples to see how invocation has changed from v1 to v2.
pipeline_endpoint = PipelineEndpoint.get(workspace=ws, name=endpoint_name)
rest_endpoint = pipeline_endpoint.endpoint
response = requests.post(
rest_endpoint,
headers=aad_token,
json={
"ExperimentName": "PipelineEndpointExperiment",
"RunSource": "API",
"ParameterAssignments": {"argument1": "united", "argument2":45}
}
)
Batch endpoints support multiple inputs types. The following example shows how to indicate two different inputs of type string
and numeric
. See Create jobs and input data for batch endpoints (REST) for more detailed examples:
batch_endpoint = ml_client.batch_endpoints.get(endpoint_name)
rest_endpoint = batch_endpoint.invocation_url
response = requests.post(
rest_endpoint,
headers=aad_token,
json={
"properties": {
"InputData": {
"argument1": {
"JobInputType": "Literal",
"Value": "united"
},
"argument2": {
"JobInputType": "Literal",
"Value": 45
}
}
}
}
)
Next steps