This site uses cookies. By continuing to browse, you agree to our use of cookies as outlined in our Privacy and Cookie Policy.
Building reliable and reproducible Machine Learning systems in production is a tough challenge that involves many facets beyond just the development of the ML model. Among these, data quality controls are an essential component for achieving high-quality models and predictions.
Data quality or data validation analysis includes identifying data inconsistencies and errors (such as missing fields or values), inconsistent formatting, and incorrect values (for example, negative incomes) etc. If some of these erroneous data points are included when training the model, this can lead to incorrect predictions, thus lowering the performance of the model. Similarly, if the data used at prediction time does not have the same characteristics as the data used to train the model, the predictions can often be unreliable.
In this blog post, we will show how to implement data validation using Great Expectations on Google Cloud’s Vertex AI Pipelines. Vertex AI Pipelines is based on KubeFlow Pipelines (KFP), an open-source platform for building Machine Learning pipelines using containers, and provides a great way to orchestrate your Machine Learning workflow on Google Cloud.
Every ML pipeline involves several steps of accessing or manipulating data, such as data ingestion or feature engineering. In this post, we explain how to develop KFP components that can validate the data within any of these steps.
We will create KFP components to perform two different tasks:
Great Expectations is an open-source Python tool for data validation, profiling and documentation. With Great Expectations, we can assert what we expect from a dataset through the use of “Expectations”, which are essentially assertions or unit tests applied to data. Great Expectations offers a great way to mitigate data quality issues that persist through machine learning pipelines.
Before we jump into building components, we need to familiarise ourselves with key terminology and concepts within the Great Expectations workflow:
In order to perform a validation, we need to have a Datasource and an Expectation Suite already defined. Then, using a Checkpoint that combines the different available Datasource and Expectations, we are able to validate that all the Expectations have been successfully satisfied.
Before we create a Data Validation component using Great Expectations, we first need to set up our Google Cloud project to be able to run Great Expectations with a Data Context located in Google Cloud Storage.
First, we need to create the Data Context. At the time of writing, Great Expectations does not allow us to create a Data Context directly on a remote bucket. Therefore, we need to create the Data Context locally and then copy the resulting directory to a GCS bucket.
To start, open your terminal and run these commands:
pip install great_expectations
great_expectations init
This will install the Great Expectations library and initialise it. In the terminal, we will see a message from Great Expectations saying that a new Data Context will be created, and with this, a new directory that contains the following structure:
great_expectations
|-- great_expectations.yml
|-- expectations
|-- checkpoints
|-- plugins
|-- .gitignore
|-- uncommitted
|-- config_variables.yml
|-- data_docs
|-- validations
Selecting yes when prompted will allow Great Expectations to create the folder structure. The resulting folders and files are self-explanatory: great_expectations.yml contains the information on the Data Context and Data Sources. The expectations folder contains all the JSON Expectations Suite files, the checkpoints folder contains all the YAML checkpoints files, and so on. The uncommitted/data_docs/local_site_validations folder will contain the Data Docs in HTML format, which are the outputs of the applied validations.
Once we create the great_expectations directory, we then need to copy it into a GCS bucket by running the following:
gsutil cp -r great_expectations gs://<BUCKET NAME>/great_expectations
As a note, we need to take into account that Great Expectations only works with a local Data Context. Luckily for us, Vertex Pipelines uses GCSFuse to make Google Cloud Storage buckets available in the filesystem under /gcs/<BUCKET NAME>.
In the future, every time we have to specify a path to either a Vertex pipeline component or in a notebook in Workbench with this GE backend, we will need to provide the following:
import great_expectations as gx
context = gx.data_context.DataContext(context_root_dir=/gcs/<BUCKET NAME>/great_expectations)
This Data Context definition is what will allow us to work with a Great Expectations backend located in GCS, that can be used in any part or tool of the project.
The workflow to create components which will run validations using Great Expectations is as follows:
The first step of the workflow is to define the connections to all Datasources we want to validate. For this blog post, we will only focus on creating a Datasource that connects to a GCS bucket that contains CSV files. Defining the Datasource connections is done by configuring the great_expectations.yml file. There are a few ways to do this, and the Great Expectations documentation contains guides that cover most use cases.
Depending on the use case, the files we need to validate may stay static over time (i.e. location of the files/tables remain unchanged) or the data may be generated as a Dataset Artifact, thus being located in a particular subfolder of the pipeline_root bucket of the Vertex Pipeline.
If we expect our CSV files to always be stored in the same GCS bucket, we therefore only expect to create the Datasource once, as Great Expectations will link that Datasource with that particular bucket. The code below shows how to create a new Datasource:
from ruamel import yaml
import great_expectations as gx
from great_expectations.core.batch import Batch, BatchRequest, RuntimeBatchRequest
context = gx.get_context()
datasource_yaml = rf"""
name: {DATASOURCE_NAME}
class_name: Datasource
execution_engine:
class_name: PandasExecutionEngine
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetGCSDataConnector
bucket_or_name: {BUCKET_NAME}
prefix: {PREFIX_NAME}
default_regex:
pattern: (.*)\.csv
group_names:
- data_asset_name
"""
context.add_datasource(**yaml.load(datasource_yaml))
Make sure to edit the Datasource name and the GCS bucket and prefix name for where your CSV file(s) are located. For validation purposes, we recommend following the Test your new Datasource section of the Great Expectations documentation.
NOTE: The line of code context=gx.get_context() is advisable if you are working locally. However, as we have already mentioned, we aim to use Great Expectations in a production environment and therefore we recommend working in a Workbench and pointing the Data Context to the GCS backend that we have uploaded previously, by defining the context as follows:
context = gx.data_context.DataContext(context_root_dir=/gcs/<BUCKET NAME>/great_expectations)
When dealing with Vertex AI Pipelines, it is very common to work with intermediate Artifacts that sit between different components. Amongst these Artifacts, we will find Datasets, which sometimes can be outputs of pre-processing or featuring engineering steps generated by a particular component. These Artifacts are usually stored in a sub-path that is automatically generated by Vertex inside the pipeline_root bucket that we define when building the pipeline. This means that the location of the Dataset changes every time a new pipeline run takes place. Therefore, for this particular use case, we need to create a new Vertex AI Pipelines component that automatically updates the location of the Datasource in the Great Expectations Data Context.
The component that works for this use case is introduced below. The steps followed by the component are:
from kfp.v2 import dsl from kfp.v2.dsl import Dataset, Input @dsl.component(base_image="python:3.9", packages_to_install=["great_expectations==0.15.32"]) def create_gcs_great_expectations_datasource( dataset_bucket: str dataset: Input[Dataset], datasource_name: str, ge_context_root_dir: str, expectation_suite_validation: str, data_connector_name: str, ): import logging from pathlib import Path import great_expectations as gx from great_expectations.core.batch import BatchRequest from pathlib import Path context = gx.data_context.DataContext(context_root_dir=ge_context_root_dir) bucket=Path(dataset.path).parts[2] datasource_prefix = "/".join(Path(dataset.path).parts[3:-1]) datasource_yaml = rf""" name: {datasource_name} class_name: Datasource execution_engine: class_name: PandasExecutionEngine data_connectors: default_runtime_data_connector_name: class_name: RuntimeDataConnector batch_identifiers: - default_identifier_name {data_connector_name}: class_name: InferredAssetGCSDataConnector bucket_or_name: {bucket} prefix: {datasource_prefix} default_regex: pattern: (.*)\.csv group_names: - data_asset_name """ context.add_datasource(**yaml.load(datasource_yaml)) # Test the connection context.test_yaml_config(yaml_config=datasource_yaml) logging.info("Data Source updated") data_asset_name = dataset.path.replace(f"{pipeline_root_path}/", "").replace(".csv", "") logging.info(f"Data Asset Name: {data_asset_name}") batch_request = BatchRequest( datasource_name=datasource_name, data_connector_name=data_connector_name, data_asset_name=data_asset_name, ) context.get_expectation_suite(expectation_suite_name=expectation_ suite_validation) context.get_validator( batch_request=batch_request, expectation_suite_name=expectation_suite_validation ) logging.info("Data Source successfully created")
After creating our GCS datasource, the datasources section of the great_expectations.yml file looks like this:
datasources:
my_datasource:
data_connectors:
default_runtime_data_connector_name:
module_name: great_expectations.datasource.data_connector
batch_identifiers:
- default_identifier_name
class_name: RuntimeDataConnector
name: default_runtime_data_connector_name
default_inferred_data_connector_name:
module_name: great_expectations.datasource.data_connector
default_regex:
pattern: (.*)\.csv
group_names:
- data_asset_name
prefix: my_prefix
class_name: InferredAssetGCSDataConnector
name: default_inferred_data_connector_name
bucket_or_name: dataset_bucket
module_name: great_expectations.datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: PandasExecutionEngine
class_name: Datasource
There are four workflows that can be used to create an Expectation Suite (in-depth instructions here), but in this blog, we will focus on creating one using the interactive workflow with Python method by writing code in a Jupyter notebook. This method requires the Datasource we want to validate against to be configured in the great_expectations.yml file as we did in the 1. Connect to Datasources section.
This method has the following workflow:
The code below shows how to create an Expectation Suite and add a few Expectations. As usual, remember to edit the variable names.
import great_expectations as gx from great_expectations.core.batch import BatchRequest DATASOURCE_NAME = {DATASOURCE_NAME} DATA_CONNECTOR_NAME = {DATA_CONNECTOR_NAME} GCS_PREFIX = {GCS_PREFIX} FILE_NAME = {FILE_NAME.csv} EXPECTATION_SUITE_NAME = {EXPECTATION_SUITE_NAME} context = gx.data_context.DataContext(context_root_dir=/gcs/<BUCKET_NAME>/great _expectations) batch_request = BatchRequest( datasource_name = DATASOURCE_NAME, data_connector_name = DATA_CONNECTOR_NAME, data_asset_name = f"{GCS_PREFIX}/{FILE_NAME}" ) suite = context.create_expectation_suite(expectation_suite_name = EXPECTATION_SUITE_NAME, overwrite_existing=True) validator = context.get_validator( batch_request = batch_request, expectation_suite_name = {EXPECTATION_SUITE_NAME} ) # Expect all `customer_id` values to not be null validator.expect_column_values_to_not_be_null(column='customer_id') # Expect all `unique_key` values to be unique validator.expect_column_values_to_be_unique(column='unique_key') # Expect all `unique_key` values to be 40 characters long validator.expect_column_value_lengths_to_equal(column = 'unique_key', value=40) # Expect `taxi_trip_in_seconds` values to be greater than 0 validator.expect_column_values_to_be_between(column = 'taxi_trip_in_seconds', min_value=0) # Expect mean of all `taxi_trip_fare` values to be between 20 and 30 dollars validator.expect_column_mean_to_be_between(column = 'taxi_trip_fare', min_value=20, max_value=30) # Expect `trip_start_timestamp` dates to be between 01/01/2000 and today validator.expect_column_values_to_be_between(column = 'trip_start_timestamp', min_value=date.fromisoformat("2000-01-01"), max_value=datetime.date.today().strftime('%Y-%m-%d')) # Expect `payment_type` values to be in set {'Cash', 'Credit Card'} validator.expect_column_values_to_be_in_set(column = 'payment_type', value_set=["Cash", "Credit Card"]) validator.save_expectation_suite(discard_failed_expectations=False)
After creating the Expectation Suite, remember to use gsutil to copy the resulting JSON file into the expectations folder in GCS.
NOTE: We know that the Expectation Suites are stored automatically in the great_expectations/expectations folder. Alternatively, we can also define any other folder or bucket to host our Expectations Suites. Great Expectations calls this an Expectation Store. We just need to update the great_expectations.yml file accordingly, but for more information refer here.
Considerations
The list of all available Expectations can be found here. However, there are a few things we need to consider:
We can now create the Data Validator component inside a Vertex AI Pipelines component that will allow us to validate our connected Datasource using the Expectation Suite we’ve just created. The recommended method for validating data is through the use of Checkpoints. Checkpoints will validate data, save validation results, run any actions we have specified, and finally create Data Docs with their results.
The logic is simple. We need to provide all the information we mentioned before: everything related to the Data Context, Datasource and the Expectation Suite. In addition, we have an Output[HTML] Artifact that will be used to output the results of the validation in HTML format, by making use of Great Expectation’s Data Docs.
The component consists of the following steps:
from kfp.v2 import dsl
from kfp.v2.dsl import HTML, Output
@dsl.component(
base_image="python:3.9",
packages_to_install=["great_expectations==0.15.32", "google-cloud-logging==3.2.5"],
)
def data_validation(
ge_context_root_dir: str,
checkpoint_name: str,
datasource_name: str,
data_connector_name: str,
data_asset_name: str,
expectation_suite_validation: str,
output_html: Output[HTML],
project_id: str,
) -> list:
import logging
import great_expectations as gx
from great_expectations.render.renderer import ValidationResultsPageRenderer
from great_expectations.render.view import DefaultJinjaPageView
from ruamel import yaml
context = gx.data_context.DataContext(context_root_dir=ge_context_root_dir)
# In order to create a validation step, a checkpoint needs to be created.
# Checkpoints combine the data_asset_name with its corresponding expectation suite.
yaml_config = f"""
name: {checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S"
validations:
- batch_request:
datasource_name: {datasource_name}
data_connector_name: {data_connector_name}
data_asset_name: {data_asset_name}
data_connector_query:
index: -1
expectation_suite_name: {expectation_suite_validation}
"""
context.add_checkpoint(**yaml.load(yaml_config))
# Run checkpoint to validate the data
checkpoint_result = context.run_checkpoint(checkpoint_name=checkpoint_name)
# Validation results are rendered as HTML
document_model = ValidationResultsPageRenderer().render(
list(checkpoint_result.run_results.values())[0]["validation_result"]
)
if not output_html.path.endswith(".html"):
output_html.path += ".html"
# Write validation results as output HTML
with open(output_html.path, "w") as writer:
writer.write(DefaultJinjaPageView().render(document_model))
The final HTML output Artifact shows the results of the validations:
NOTE: In the example code here we have defined the configuration using YAML in a string to be consistent with the Great Expectations documentation. However, you could also define the configuration directly in a Python dictionary.
We now have a pipeline component that reads a Datasource, loads an Expectation Suite and validates our data. But what if our Expectations fail? Should the pipeline fail? To enable this, we have to add the following lines of code into the component:
if not checkpoint_result["success"]:
raise RuntimeError(f"Validations failed")
Or perhaps we only want the component to send a log to Google Cloud Logging, which we can use later to generate custom alerts or e-mails. If we want to be more specific we can, for example, generate specific logs for any monitoring and alerting tool. We can add the following code snippet to the already existing component, which will help us to parse the validation results and generate the proper logging alerts.
These are the steps followed:
# Return validation results to check its results
validation_results = (checkpoint_result.to_json_dict()
.get("run_results")
.get(str(list(checkpoint_result.get("run_results").keys())[0]))
.get("validation_result")
.get("results")
)
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler
from google.logging.type import log_severity_pb2 as severity
# Create a handler for Google Cloud Logging.
log_name="great_expectations_logger"
gcloud_logging_client = google.cloud.logging.Client(project=project_id)
gcloud_logging_handler = CloudLoggingHandler(gcloud_logging_client, name=log_name)
gcs_logger = gcloud_logging_client.logger(log_name)
# Handle failed validations to check in which lists appear (ERRORS & WARNINGS)
errors = []
for validation in validation_results:
if not validation["success"]:
column_failed = validation["expectation_config"]["kwargs"]["column"]
expectation_failed = validation["expectation_config"]["expectation_type"]
errors.append(column_failed, expectation_failed)
if errors:
json_payload = {"message": f"Great Expectations Errors: {errors}"}
gcs_logger.log_struct(json_payload, severity=severity.ERROR)
This piece of code will generate specific logs if validations fail. If we want, we could generate a specific alert by querying those specific logs. For further information about creating Alerts on Vertex Pipelines, refer to this blog post.
In our particular case, we could use the following query to find the logs generated by this component as follows:
logName="projects/{my_project}/logs/great_expectations_logger"
resource.type="k8s_container"
severity="ERROR"
jsonPayload.message:"Great Expectations Errors"
Poor data quality is one of the biggest challenges in Machine Learning, and it often leads to silent failures or poor performance in production. Now you can use Great Expectations to ensure data quality in your production ML pipelines on Vertex AI! In this post, we have shown only a simple example of data validation using Great Expectations, but it is a very powerful tool so be sure to check out the Great Expectations documentation for more details. Finally, don’t forget to follow us on Medium for more Vertex AI Tips and Tricks and much more!
Datatonic is Google Cloud’s Machine Learning Partner of the Year with a wealth of experience developing and deploying impactful Machine Learning models and MLOps platform builds. Need help with developing an ML model, or deploying your Machine Learning models fast? Have a look at our MLOps101 webinar, where our experts talk you through how to get started with Machine Learning at scale or get in touch to discuss your ML or MLOps requirements!
Know exactly where and how to start your AI journey with Datatonic’s
three-week AI Innovation Jumpstart *.
* Duration dependent on data complexity and use case chosen for POC model
With your own data sets, convince your business of the value of migrating your data warehouse, data lake and/or streaming platform to the cloud in four weeks.
With your own data, see how Looker can modernise your BI needs
with Datatonic’s two-week Showcase.