Using Exit Handlers to Create Robust ML Pipelines in Production

Vertex AI
Vertex AI Exit Handlers

Author: Felix Schaumann, Machine Learning Engineering Lead

Vertex AI Pipelines offer a scalable way to automate and orchestrate your ML workflows in a serverless manner. Vertex AI supports Kubeflow Pipelines SDK v1.8.9 or later and thus, you can use a variety of open source features available in Kubeflow in Vertex AI Pipelines, too.

While some Kubeflow features are commonly used in Vertex AI Pipelines, such as breaking your pipeline into individual tasks or controlling your workflow using conditions, some often overlooked Kubeflow concepts are crucial for creating robust production-ready ML pipelines. One of these features is certainly the Exit Handler and as such, this post explains its usage in the context of deploying GPT-2 from HuggingFace and testing its performance using Vertex AI Pipelines and Locust.

Introducing Exit Handlers

When creating a robust product, error handling in your code must be considered. Python offers “try-except” statements for catching errors encountered at run-time. Additionally, Python’s “try” statement has another optional clause “finally” which is intended to define clean-up actions that must be executed under all circumstances.

Kubeflow offers a similar control flow by introducing Exit Handlers. The example below (see Figure 1) ensures that you clean up resources during model training in a Kubeflow pipeline similar to what you would achieve with a “finally” statement locally.

Figure 1: Comparing clean-up actions between Python and Kubeflow

You might notice that Kubeflow requires you to define your clean-up task before creating the dataset and training the model. Nevertheless, your exit handler will still run after your business logic. Consider the following scenarios in which your pipeline will execute Kubeflow’s exit handler:

  • Pipeline tasks complete successfully
  • Any of the pipeline tasks within the exit handler’s block fail
  • The pipeline is cancelled
Figure 2: A Kubeflow pipeline with an Exit Handler

Creating Robust ML Pipelines

Common AI +ML use cases require a training and prediction pipeline to serve predictions offline. However, some use cases (for example in the context of a chatbot) will often require an API to provide ad-hoc predictions such as answers, summaries or recommendations based on user input. In Vertex AI, you can deploy model endpoints to wrap your model in an API which can be queried upon user requests. As an example, we’ll deploy the open-source Large-Language Model (LLM) GPT-2 from HuggingFace to provide predictions based on a user prompt.

Regardless of how you automate the deployment of your model, it’s important to ensure that your API serves predictions correctly and satisfies latency requirements to keep your end users happy! Based on what we know about Kubeflow and exit handlers, let’s implement a robust deployment workflow in a Vertex AI pipeline:

Figure 3: A robust deployment pipeline

The deployment pipeline is broken down into three parts:

  1. Import model and tests: Import your pre-trained open-source model and tests from Vertex AI Model Registry and performance tests from Cloud Storage.
  2. Deploy + test model: Deploy a Vertex AI endpoint + model and run performance tests against the deployed model API.
  3. Clean up resources: Delete the deployed model and endpoint.

This pipeline structure allows us to declare the clean-up operation as an exit handler which runs even if any of the tasks fail or the pipeline is cancelled. This is especially important to keep your costs low if you deploy a model with GPU acceleration:

Figure 4: The exit handler executes regardless of task failures

The following code shows a low-code implementation of the ML pipeline which uses Google Cloud Pipeline Components and Locust:

from kfp.dsl import pipeline, importer, ExitHandler
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
from .locust_component import LocustTestsOp

def deploy_and_test_model_pipeline():
  # 1. Import model and tests
  import_model_op = importer(...).set_display_name("Import model")
  import_tests_op = importer(...).set_display_name("Import tests")

  # 3. Clean up resources
  exit_handler_op = EndpointDeleteOp(...).set_display_name("Clean up")
  with ExitHandler(exit_handler_op, name="Deploy & test model"):

    # 2. Deploy and test models
    create_endpoint_op = EndpointCreateOp(...).set_display_name("Create endpoint")
    deploy_model_op = ModelDeployOp(...).set_display_name("Deploy model")
    locust_tests_op = LocustTestsOp(...).set_display_name("Test model performance")

1. Import model and tests

The pipeline imports a pre-trained model from the Model Registry using an importer node which ensures that the model is available for downstream tasks. Concurrently, tests are imported from Cloud Storage in the form of a as we’ll be testing the model’s performance using Locust. A test in Locust which monitors the latency of your deployed model can be as simple as:

import google.auth
import google.auth.transport.requests
from os import environ as env
from locust import HttpUser, task

class VertexEndpointUser(HttpUser):

  def on_start(self):
    # authenticate using default credentials
    creds, _ = google.auth.default()
    self.client.headers = {"Authorization": f"Bearer {creds.token}"}

  def request_prediction(self):
    # send a prediction request to the Vertex AI endpoint
    prompt = "Write a dialog using Shakespeare language."
    body = {"instances": [{"text": prompt}]}
    with, json=body, catch_response=True) as res:
      if res.status_code == 200:
        res.failure(f"Status code: {res.status_code} Reason: {res.reason}")

2. Deploy and test model

We use the tasks “EndpointCreateOp” and “ModelDeployOp” from Model and Endpoint Components to create a model endpoint and deploy the model as an API. For running performance tests, we implement a custom task which wraps Locust in a Kubeflow component to execute performance tests defined in the importer node previously.

The Locust component provides the test results as an HTML artifact output on Google Cloud Storage. Since Vertex AI Pipelines supports several visualisation types such as HTML or Markdown outputs, you can open the results natively via the UI:

Figure 7: Results of the model performance test

As shown in Figure 7, the Locust test report summarises the latency results of your model over time and the success rate of requests. These results and other useful information provided by Locust will help you to understand the performance of your model before making it available to your end-users in production.

3. Clean up resources

At the end of your pipeline, the deployed endpoint is deleted to ensure that you won’t incur further cloud costs. Going back to the advantages of using an exit handler in Kubeflow, your ML pipeline will be robust to task failures or pipeline cancellations as the exit handler will be executed under all circumstances.

Are you looking for scalable training and prediction pipelines for XGBoost or TensorFlow instead? Check out our MLOps Turbo Template based on Google Cloud and Vertex AI; we appreciate your contribution to our open-source GitHub repository!

Datatonic is Google Cloud’s Machine Learning Partner of the Year with a wealth of experience developing and deploying impactful Machine Learning models and MLOps Platform builds. Need help with developing an ML model, or deploying your Machine Learning models fast? Have a look at our MLOps 101 webinar, where our experts talk you through how to get started with Machine Learning at scale or get in touch to discuss your ML or MLOps requirements!

View all
View all
Prompt Engineering
Prompt Engineering 101: Using GenAI Effectively
Generative AI
Future-Proof Your Data: Overcoming Migration Challenges
Cloud Data Engineering
Access Plan BigQuery
Designing a Secure + Efficient Access Control Plan for BigQuery