Vertex AI Tips + Tricks: Data Validation in Vertex Pipelines using Great Expectations
Building reliable and reproducible Machine Learning systems in production is a tough challenge that involves many facets beyond just the development of the ML model. Among these, data quality controls are an essential component for achieving high-quality models and predictions.
Data quality or data validation analysis includes identifying data inconsistencies and errors (such as missing fields or values), inconsistent formatting, and incorrect values (for example, negative incomes) etc. If some of these erroneous data points are included when training the model, this can lead to incorrect predictions, thus lowering the performance of the model. Similarly, if the data used at prediction time does not have the same characteristics as the data used to train the model, the predictions can often be unreliable.
In this blog post, we will show how to implement data validation using Great Expectations on Google Cloud’s Vertex AI Pipelines. Vertex AI Pipelines is based on KubeFlow Pipelines (KFP), an open-source platform for building Machine Learning pipelines using containers, and provides a great way to orchestrate your Machine Learning workflow on Google Cloud.
Every ML pipeline involves several steps of accessing or manipulating data, such as data ingestion or feature engineering. In this post, we explain how to develop KFP components that can validate the data within any of these steps.
We will create KFP components to perform two different tasks:
- Creating a Great Expectations Datasource.
- Data validation, which will output a report and optionally include logging & alerting.
What is Great Expectations?
Great Expectations is an open-source Python tool for data validation, profiling and documentation. With Great Expectations, we can assert what we expect from a dataset through the use of “Expectations”, which are essentially assertions or unit tests applied to data. Great Expectations offers a great way to mitigate data quality issues that persist through machine learning pipelines.
Before we jump into building components, we need to familiarise ourselves with key terminology and concepts within the Great Expectations workflow:
- Data Context — A Data Context represents a Great Expectations project. It organises storage and access for Expectation Suites, Datasources, notification settings and data fixtures. The Data Context is created by instantiating Great Expectation via the CLI and is configured and maintained via the resulting great_expectations.yml file.
- Datasources — A Datasource is a configuration that describes where the data is, how to connect to it, and which execution engine to use when running a Checkpoint. Therefore, Datasources have two components: storage (e.g. BigQuery tables) and compute (e.g. SQLAlchemy). One of the main advantages of Great Expectations is the ability to validate data across various data sources, such as in-memory Pandas, Spark data frames, CSV files stored on the cloud, or relational databases such as BigQuery and PostgreSQL.
- Expectations — Expectations provide a language for describing the expected behaviour of data. They are stored within Expectation Suites, which are a collection of Expectations in JSON format. Expectation Suites and Datasets have a many-to-many relationship; an Expectation Suite can be used to validate data across one or more datasets, and one dataset can be validated using one or more Expectation Suites. Some commonly used Expectations include checking for missing values, validating data types, and validating whether a value is found in a predefined set, or is between a min and a max value. The full list of available Expectations can be found in the Expectations Gallery.
- Batch Request — A Batch Request defines a batch of data to run Expectations on from a given data source. They can describe data in a file, database, or dataframe.
- Checkpoint Config — Checkpoint Configs describe which Expectation Suite should be run against which data, and what actions to take during and after a run (e.g., sending Slack notifications informing of failed validations).
- Data Docs — Data Docs are clean, human-readable documents that contain information about a validation, such as the Expectation Suite used for validation, the result of each Expectation assertion etc. Data Docs exist in HTML inside the Data Context.
In order to perform a validation, we need to have a Datasource and an Expectation Suite already defined. Then, using a Checkpoint that combines the different available Datasource and Expectations, we are able to validate that all the Expectations have been successfully satisfied.
How can we integrate Great Expectations in Vertex AI?
Before we create a Data Validation component using Great Expectations, we first need to set up our Google Cloud project to be able to run Great Expectations with a Data Context located in Google Cloud Storage.
Data Context creation in GCS
First, we need to create the Data Context. At the time of writing, Great Expectations does not allow us to create a Data Context directly on a remote bucket. Therefore, we need to create the Data Context locally and then copy the resulting directory to a GCS bucket.
To start, open your terminal and run these commands:
pip install great_expectations
great_expectations init
This will install the Great Expectations library and initialise it. In the terminal, we will see a message from Great Expectations saying that a new Data Context will be created, and with this, a new directory that contains the following structure:
great_expectations
|-- great_expectations.yml
|-- expectations
|-- checkpoints
|-- plugins
|-- .gitignore
|-- uncommitted
|-- config_variables.yml
|-- data_docs
|-- validations
Selecting yes when prompted will allow Great Expectations to create the folder structure. The resulting folders and files are self-explanatory: great_expectations.yml contains the information on the Data Context and Data Sources. The expectations folder contains all the JSON Expectations Suite files, the checkpoints folder contains all the YAML checkpoints files, and so on. The uncommitted/data_docs/local_site_validations folder will contain the Data Docs in HTML format, which are the outputs of the applied validations.
Once we create the great_expectations directory, we then need to copy it into a GCS bucket by running the following:
gsutil cp -r great_expectations gs://<BUCKET NAME>/great_expectations
As a note, we need to take into account that Great Expectations only works with a local Data Context. Luckily for us, Vertex Pipelines uses GCSFuse to make Google Cloud Storage buckets available in the filesystem under /gcs/<BUCKET NAME>.
In the future, every time we have to specify a path to either a Vertex pipeline component or in a notebook in Workbench with this GE backend, we will need to provide the following:
import great_expectations as gx
context = gx.data_context.DataContext(context_root_dir=/gcs/<BUCKET NAME>/great_expectations)
This Data Context definition is what will allow us to work with a Great Expectations backend located in GCS, that can be used in any part or tool of the project.
The workflow to create components which will run validations using Great Expectations is as follows:
- Connect to Datasources
- Create Expectation Suites
- Create a validation component, which will include creating Checkpoints
- Create Logging & Alerting logic
1. Connect to Datasources
The first step of the workflow is to define the connections to all Datasources we want to validate. For this blog post, we will only focus on creating a Datasource that connects to a GCS bucket that contains CSV files. Defining the Datasource connections is done by configuring the great_expectations.yml file. There are a few ways to do this, and the Great Expectations documentation contains guides that cover most use cases.
Depending on the use case, the files we need to validate may stay static over time (i.e. location of the files/tables remain unchanged) or the data may be generated as a Dataset Artifact, thus being located in a particular subfolder of the pipeline_root bucket of the Vertex Pipeline.
1.1. Static Datasource
If we expect our CSV files to always be stored in the same GCS bucket, we therefore only expect to create the Datasource once, as Great Expectations will link that Datasource with that particular bucket. The code below shows how to create a new Datasource:
from ruamel import yaml
import great_expectations as gx
from great_expectations.core.batch import Batch, BatchRequest, RuntimeBatchRequest
context = gx.get_context()
datasource_yaml = rf"""
name: {DATASOURCE_NAME}
class_name: Datasource
execution_engine:
class_name: PandasExecutionEngine
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetGCSDataConnector
bucket_or_name: {BUCKET_NAME}
prefix: {PREFIX_NAME}
default_regex:
pattern: (.*).csv
group_names:
- data_asset_name
"""
context.add_datasource(**yaml.load(datasource_yaml))
Make sure to edit the Datasource name and the GCS bucket and prefix name for where your CSV file(s) are located. For validation purposes, we recommend following the Test your new Datasource section of the Great Expectations documentation.
NOTE: The line of code context=gx.get_context() is advisable if you are working locally. However, as we have already mentioned, we aim to use Great Expectations in a production environment and therefore we recommend working in a Workbench and pointing the Data Context to the GCS backend that we have uploaded previously, by defining the context as follows:
context = gx.data_context.DataContext(context_root_dir=/gcs/<BUCKET NAME>/great_expectations)
1.2. Dynamic Datasource
When dealing with Vertex AI Pipelines, it is very common to work with intermediate Artifacts that sit between different components. Amongst these Artifacts, we will find Datasets, which sometimes can be outputs of pre-processing or featuring engineering steps generated by a particular component. These Artifacts are usually stored in a sub-path that is automatically generated by Vertex inside the pipeline_root bucket that we define when building the pipeline. This means that the location of the Dataset changes every time a new pipeline run takes place. Therefore, for this particular use case, we need to create a new Vertex AI Pipelines component that automatically updates the location of the Datasource in the Great Expectations Data Context.
The component that works for this use case is introduced below. The steps followed by the component are:
- Firstly, we have the full path of the Dataset Artifact as an Input, but we need to split that path into the bucket name and the prefix (the subpath inside the bucket where the Dataset is located) to add it to the Datasource definition.
- We define the Datasource and add it to the Great Expectations Data Context as we did in the Static Datasource.
- We want to validate that the Datasource connection is established and working properly. This step is not mandatory, but it is recommended. For this, we need to have an Expectation Suite already loaded into our GCS backend. We also need to create a BatchRequest from the Datasource. We finally validate the data through a validator object using the BatchRequest and the Expectation Suite. If the validator fails, the component fails.
from kfp.v2 import dsl from kfp.v2.dsl import Dataset, Input @dsl.component(base_image="python:3.9", packages_to_install=["great_expectations==0.15.32"]) def create_gcs_great_expectations_datasource( dataset_bucket: str dataset: Input[Dataset], datasource_name: str, ge_context_root_dir: str, expectation_suite_validation: str, data_connector_name: str, ): import logging from pathlib import Path import great_expectations as gx from great_expectations.core.batch import BatchRequest from pathlib import Path context = gx.data_context.DataContext(context_root_dir=ge_context_root_dir) bucket=Path(dataset.path).parts[2] datasource_prefix = "/".join(Path(dataset.path).parts[3:-1]) datasource_yaml = rf""" name: {datasource_name} class_name: Datasource execution_engine: class_name: PandasExecutionEngine data_connectors: default_runtime_data_connector_name: class_name: RuntimeDataConnector batch_identifiers: - default_identifier_name {data_connector_name}: class_name: InferredAssetGCSDataConnector bucket_or_name: {bucket} prefix: {datasource_prefix} default_regex: pattern: (.*).csv group_names: - data_asset_name """ context.add_datasource(**yaml.load(datasource_yaml)) # Test the connection context.test_yaml_config(yaml_config=datasource_yaml) logging.info("Data Source updated") data_asset_name = dataset.path.replace(f"{pipeline_root_path}/", "").replace(".csv", "") logging.info(f"Data Asset Name: {data_asset_name}") batch_request = BatchRequest( datasource_name=datasource_name, data_connector_name=data_connector_name, data_asset_name=data_asset_name, ) context.get_expectation_suite(expectation_suite_name=expectation_ suite_validation) context.get_validator( batch_request=batch_request, expectation_suite_name=expectation_suite_validation ) logging.info("Data Source successfully created")
After creating our GCS datasource, the datasources section of the great_expectations.yml file looks like this:
datasources:
my_datasource:
data_connectors:
default_runtime_data_connector_name:
module_name: great_expectations.datasource.data_connector
batch_identifiers:
- default_identifier_name
class_name: RuntimeDataConnector
name: default_runtime_data_connector_name
default_inferred_data_connector_name:
module_name: great_expectations.datasource.data_connector
default_regex:
pattern: (.*).csv
group_names:
- data_asset_name
prefix: my_prefix
class_name: InferredAssetGCSDataConnector
name: default_inferred_data_connector_name
bucket_or_name: dataset_bucket
module_name: great_expectations.datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: PandasExecutionEngine
class_name: Datasource
2. Create Expectation Suites
There are four workflows that can be used to create an Expectation Suite (in-depth instructions here), but in this blog, we will focus on creating one using the interactive workflow with Python method by writing code in a Jupyter notebook. This method requires the Datasource we want to validate against to be configured in the great_expectations.yml file as we did in the 1. Connect to Datasources section.
This method has the following workflow:
- Create a Batch Request, which specifies a batch of data to be used to create Expectations against.
- Specify the name of our Expectation Suite.
- Instantiate a validator object responsible for running our Expectation Suite against the data.
- Create Expectations using the validator object and save them in the Expectation Suite.
The code below shows how to create an Expectation Suite and add a few Expectations. As usual, remember to edit the variable names.
import great_expectations as gx from great_expectations.core.batch import BatchRequest DATASOURCE_NAME = {DATASOURCE_NAME} DATA_CONNECTOR_NAME = {DATA_CONNECTOR_NAME} GCS_PREFIX = {GCS_PREFIX} FILE_NAME = {FILE_NAME.csv} EXPECTATION_SUITE_NAME = {EXPECTATION_SUITE_NAME} context = gx.data_context.DataContext(context_root_dir=/gcs/<BUCKET_NAME>/great _expectations) batch_request = BatchRequest( datasource_name = DATASOURCE_NAME, data_connector_name = DATA_CONNECTOR_NAME, data_asset_name = f"{GCS_PREFIX}/{FILE_NAME}" ) suite = context.create_expectation_suite(expectation_suite_name = EXPECTATION_SUITE_NAME,
overwrite_existing=True) validator = context.get_validator( batch_request = batch_request, expectation_suite_name = {EXPECTATION_SUITE_NAME} ) # Expect all `customer_id` values to not be null validator.expect_column_values_to_not_be_null(column='customer_id') # Expect all `unique_key` values to be unique validator.expect_column_values_to_be_unique(column='unique_key') # Expect all `unique_key` values to be 40 characters long validator.expect_column_value_lengths_to_equal(column = 'unique_key', value=40) # Expect `taxi_trip_in_seconds` values to be greater than 0 validator.expect_column_values_to_be_between(column = 'taxi_trip_in_seconds', min_value=0) # Expect mean of all `taxi_trip_fare` values to be between 20 and 30 dollars validator.expect_column_mean_to_be_between(column = 'taxi_trip_fare', min_value=20,
max_value=30) # Expect `trip_start_timestamp` dates to be between 01/01/2000 and today validator.expect_column_values_to_be_between(column = 'trip_start_timestamp',
min_value=date.fromisoformat("2000-01-01"),
max_value=datetime.date.today().strftime('%Y-%m-%d')) # Expect `payment_type` values to be in set {'Cash', 'Credit Card'} validator.expect_column_values_to_be_in_set(column = 'payment_type',
value_set=["Cash", "Credit Card"]) validator.save_expectation_suite(discard_failed_expectations=False)
After creating the Expectation Suite, remember to use gsutil to copy the resulting JSON file into the expectations folder in GCS.
NOTE: We know that the Expectation Suites are stored automatically in the great_expectations/expectations folder. Alternatively, we can also define any other folder or bucket to host our Expectations Suites. Great Expectations calls this an Expectation Store. We just need to update the great_expectations.yml file accordingly, but for more information refer here.
Considerations
The list of all available Expectations can be found here. However, there are a few things we need to consider:
- Among all of the Expectations present in the Expectations Gallery, only the ones labelled to be in Production will work and therefore are available through the great_expectations Python Library.
- Not all the Expectations are available across all types of Datasources. Before using an Expectation, please check whether it can be integrated with your required Datasource
- Custom Expectations can be also created. For more information refer here.
3. Data Validation Component
We can now create the Data Validator component inside a Vertex AI Pipelines component that will allow us to validate our connected Datasource using the Expectation Suite we’ve just created. The recommended method for validating data is through the use of Checkpoints. Checkpoints will validate data, save validation results, run any actions we have specified, and finally create Data Docs with their results.
The logic is simple. We need to provide all the information we mentioned before: everything related to the Data Context, Datasource and the Expectation Suite. In addition, we have an Output[HTML] Artifact that will be used to output the results of the validation in HTML format, by making use of Great Expectation’s Data Docs.
The component consists of the following steps:
- Definition of the Data Context (remember that we use a Data Context located in GCS).
- As mentioned before, to perform a validation we need to create a Checkpoint where we define the combination of Expectation Suite and Dataset that needs to be validated.
- We will run the newly created and saved Checkpoint, resulting in a CheckpointResult object that contains appropriate metadata.
- Finally, we parse the CheckpointResult into a ExpectationSuiteValidationResult object to be able to render its HTML and show it as an output.
from kfp.v2 import dsl
from kfp.v2.dsl import HTML, Output
@dsl.component(
base_image="python:3.9",
packages_to_install=["great_expectations==0.15.32", "google-cloud-logging==3.2.5"],
)
def data_validation(
ge_context_root_dir: str,
checkpoint_name: str,
datasource_name: str,
data_connector_name: str,
data_asset_name: str,
expectation_suite_validation: str,
output_html: Output[HTML],
project_id: str,
) -> list:
import logging
import great_expectations as gx
from great_expectations.render.renderer import ValidationResultsPageRenderer
from great_expectations.render.view import DefaultJinjaPageView
from ruamel import yaml
context = gx.data_context.DataContext(context_root_dir=ge_context_root_dir)
# In order to create a validation step, a checkpoint needs to be created.
# Checkpoints combine the data_asset_name with its corresponding expectation suite.
yaml_config = f"""
name: {checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S"
validations:
- batch_request:
datasource_name: {datasource_name}
data_connector_name: {data_connector_name}
data_asset_name: {data_asset_name}
data_connector_query:
index: -1
expectation_suite_name: {expectation_suite_validation}
"""
context.add_checkpoint(**yaml.load(yaml_config))
# Run checkpoint to validate the data
checkpoint_result = context.run_checkpoint(checkpoint_name=checkpoint_name)
# Validation results are rendered as HTML
document_model = ValidationResultsPageRenderer().render(
list(checkpoint_result.run_results.values())[0]["validation_result"]
)
if not output_html.path.endswith(".html"):
output_html.path += ".html"
# Write validation results as output HTML
with open(output_html.path, "w") as writer:
writer.write(DefaultJinjaPageView().render(document_model))
The final HTML output Artifact shows the results of the validations:
NOTE: In the example code here we have defined the configuration using YAML in a string to be consistent with the Great Expectations documentation. However, you could also define the configuration directly in a Python dictionary.
4. Logging & Alerting
We now have a pipeline component that reads a Datasource, loads an Expectation Suite and validates our data. But what if our Expectations fail? Should the pipeline fail? To enable this, we have to add the following lines of code into the component:
if not checkpoint_result["success"]:
raise RuntimeError(f"Validations failed")
Or perhaps we only want the component to send a log to Google Cloud Logging, which we can use later to generate custom alerts or e-mails. If we want to be more specific we can, for example, generate specific logs for any monitoring and alerting tool. We can add the following code snippet to the already existing component, which will help us to parse the validation results and generate the proper logging alerts.
These are the steps followed:
- Get a dictionary with all the validation results.
- Create a Google Logger. If we want to search for a particular logging message in the future, it will be easier if this logging message has a particular name, shape or message.
- Iterate over all the validation results and check the columns and Expectations that have failed.
- Log those particular failures.
# Return validation results to check its results
validation_results = (checkpoint_result.to_json_dict()
.get("run_results")
.get(str(list(checkpoint_result.get("run_results").keys())[0]))
.get("validation_result")
.get("results")
)
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler
from google.logging.type import log_severity_pb2 as severity
# Create a handler for Google Cloud Logging.
log_name="great_expectations_logger"
gcloud_logging_client = google.cloud.logging.Client(project=project_id)
gcloud_logging_handler = CloudLoggingHandler(gcloud_logging_client, name=log_name)
gcs_logger = gcloud_logging_client.logger(log_name)
# Handle failed validations to check in which lists appear (ERRORS & WARNINGS)
errors = []
for validation in validation_results:
if not validation["success"]:
column_failed = validation["expectation_config"]["kwargs"]["column"]
expectation_failed = validation["expectation_config"]["expectation_type"]
errors.append(column_failed, expectation_failed)
if errors:
json_payload = {"message": f"Great Expectations Errors: {errors}"}
gcs_logger.log_struct(json_payload, severity=severity.ERROR)
This piece of code will generate specific logs if validations fail. If we want, we could generate a specific alert by querying those specific logs. For further information about creating Alerts on Vertex Pipelines, refer to this blog post.
In our particular case, we could use the following query to find the logs generated by this component as follows:
logName="projects/{my_project}/logs/great_expectations_logger"
resource.type="k8s_container"
severity="ERROR"
jsonPayload.message:"Great Expectations Errors"
Conclusion
Poor data quality is one of the biggest challenges in Machine Learning, and it often leads to silent failures or poor performance in production. Now you can use Great Expectations to ensure data quality in your production ML pipelines on Vertex AI! In this post, we have shown only a simple example of data validation using Great Expectations, but it is a very powerful tool so be sure to check out the Great Expectations documentation for more details. Finally, don’t forget to follow us on Medium for more Vertex AI Tips and Tricks and much more!
Datatonic is Google Cloud’s Machine Learning Partner of the Year with a wealth of experience developing and deploying impactful Machine Learning models and MLOps platform builds. Need help with developing an ML model, or deploying your Machine Learning models fast? Have a look at our MLOps101 webinar, where our experts talk you through how to get started with Machine Learning at scale or get in touch to discuss your ML or MLOps requirements!