Using Exit Handlers to Create Robust ML Pipelines in Production
Author: Felix Schaumann, Machine Learning Engineering Lead
Vertex AI Pipelines offer a scalable way to automate and orchestrate your ML workflows in a serverless manner. Vertex AI supports Kubeflow Pipelines SDK v1.8.9 or later and thus, you can use a variety of open source features available in Kubeflow in Vertex AI Pipelines, too.
While some Kubeflow features are commonly used in Vertex AI Pipelines, such as breaking your pipeline into individual tasks or controlling your workflow using conditions, some often overlooked Kubeflow concepts are crucial for creating robust production-ready ML pipelines. One of these features is certainly the Exit Handler and as such, this post explains its usage in the context of deploying GPT-2 from HuggingFace and testing its performance using Vertex AI Pipelines and Locust.
Introducing Exit Handlers
When creating a robust product, error handling in your code must be considered. Python offers “try-except” statements for catching errors encountered at run-time. Additionally, Python’s “try” statement has another optional clause “finally” which is intended to define clean-up actions that must be executed under all circumstances.
Kubeflow offers a similar control flow by introducing Exit Handlers. The example below (see Figure 1) ensures that you clean up resources during model training in a Kubeflow pipeline similar to what you would achieve with a “finally” statement locally.
You might notice that Kubeflow requires you to define your clean-up task before creating the dataset and training the model. Nevertheless, your exit handler will still run after your business logic. Consider the following scenarios in which your pipeline will execute Kubeflow’s exit handler:
- Pipeline tasks complete successfully
- Any of the pipeline tasks within the exit handler’s block fail
- The pipeline is cancelled
Creating Robust ML Pipelines
Common AI +ML use cases require a training and prediction pipeline to serve predictions offline. However, some use cases (for example in the context of a chatbot) will often require an API to provide ad-hoc predictions such as answers, summaries or recommendations based on user input. In Vertex AI, you can deploy model endpoints to wrap your model in an API which can be queried upon user requests. As an example, we’ll deploy the open-source Large-Language Model (LLM) GPT-2 from HuggingFace to provide predictions based on a user prompt.
Regardless of how you automate the deployment of your model, it’s important to ensure that your API serves predictions correctly and satisfies latency requirements to keep your end users happy! Based on what we know about Kubeflow and exit handlers, let’s implement a robust deployment workflow in a Vertex AI pipeline:
The deployment pipeline is broken down into three parts:
- Import model and tests: Import your pre-trained open-source model and tests from Vertex AI Model Registry and performance tests from Cloud Storage.
- Deploy + test model: Deploy a Vertex AI endpoint + model and run performance tests against the deployed model API.
- Clean up resources: Delete the deployed model and endpoint.
This pipeline structure allows us to declare the clean-up operation as an exit handler which runs even if any of the tasks fail or the pipeline is cancelled. This is especially important to keep your costs low if you deploy a model with GPU acceleration:
The following code shows a low-code implementation of the ML pipeline which uses Google Cloud Pipeline Components and Locust:
from kfp.dsl import pipeline, importer, ExitHandler
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
from .locust_component import LocustTestsOp
@pipeline
def deploy_and_test_model_pipeline():
# 1. Import model and tests
import_model_op = importer(...).set_display_name("Import model")
import_tests_op = importer(...).set_display_name("Import tests")
# 3. Clean up resources
exit_handler_op = EndpointDeleteOp(...).set_display_name("Clean up")
with ExitHandler(exit_handler_op, name="Deploy & test model"):
# 2. Deploy and test models
create_endpoint_op = EndpointCreateOp(...).set_display_name("Create endpoint")
deploy_model_op = ModelDeployOp(...).set_display_name("Deploy model")
locust_tests_op = LocustTestsOp(...).set_display_name("Test model performance")
1. Import model and tests
The pipeline imports a pre-trained model from the Model Registry using an importer node which ensures that the model is available for downstream tasks. Concurrently, tests are imported from Cloud Storage in the form of a locustfile.py as we’ll be testing the model’s performance using Locust. A test in Locust which monitors the latency of your deployed model can be as simple as:
import google.auth
import google.auth.transport.requests
from os import environ as env
from locust import HttpUser, task
class VertexEndpointUser(HttpUser):
host = env["VERTEX_AI_ENDPOINT_HOST"]
path = env["VERTEX_AI_ENDPOINT_PATH"]
def on_start(self):
# authenticate using default credentials
creds, _ = google.auth.default()
creds.refresh(google.auth.transport.requests.Request()
self.client.headers = {"Authorization": f"Bearer {creds.token}"}
@task
def request_prediction(self):
# send a prediction request to the Vertex AI endpoint
prompt = "Write a dialog using Shakespeare language."
body = {"instances": [{"text": prompt}]}
with self.client.post(self.path, json=body, catch_response=True) as res:
if res.status_code == 200:
res.success()
else:
res.failure(f"Status code: {res.status_code} Reason: {res.reason}")
2. Deploy and test model
We use the tasks “EndpointCreateOp” and “ModelDeployOp” from Model and Endpoint Components to create a model endpoint and deploy the model as an API. For running performance tests, we implement a custom task which wraps Locust in a Kubeflow component to execute performance tests defined in the importer node previously.
The Locust component provides the test results as an HTML artifact output on Google Cloud Storage. Since Vertex AI Pipelines supports several visualisation types such as HTML or Markdown outputs, you can open the results natively via the UI:
As shown in Figure 7, the Locust test report summarises the latency results of your model over time and the success rate of requests. These results and other useful information provided by Locust will help you to understand the performance of your model before making it available to your end-users in production.
3. Clean up resources
At the end of your pipeline, the deployed endpoint is deleted to ensure that you won’t incur further cloud costs. Going back to the advantages of using an exit handler in Kubeflow, your ML pipeline will be robust to task failures or pipeline cancellations as the exit handler will be executed under all circumstances.
Are you looking for scalable training and prediction pipelines for XGBoost or TensorFlow instead? Check out our MLOps Turbo Template based on Google Cloud and Vertex AI; we appreciate your contribution to our open-source GitHub repository!
Datatonic is Google Cloud’s Machine Learning Partner of the Year with a wealth of experience developing and deploying impactful Machine Learning models and MLOps Platform builds. Need help with developing an ML model, or deploying your Machine Learning models fast? Have a look at our MLOps 101 webinar, where our experts talk you through how to get started with Machine Learning at scale or get in touch to discuss your ML or MLOps requirements!