This site uses cookies. By continuing to browse, you agree to our use of cookies as outlined in our Privacy and Cookie Policy.
Author: Ash Sultan, Analytics Engineering Lead
For users of dbt also working with Google Cloud (GC), dbt’s out-of-the-box Data Lineage graph document has been a godsend for a long time. However, users have been looking for a more centralised, native solution in GC for a long time — and ideally, in Data Catalog (now part of Dataplex).
Well — Google has definitely listened and granted this wish! Dataplex (Data Catalog) now has a Lineage feature that allows us to track data movement out-of-the-box too!
See: Google Cloud’s official page for Data Lineage (currently in Preview)
For data transformation solutions for Data Warehouses or Data Marts in BigQuery, building transformation pipelines using dbt allows users to leverage the power of dbt docs to consume the lineage graph to build a tracking information picture for our data flow.
However, since this is a third-party non-GC native solution, a degree of “scaffolding” work has to be done to integrate dbt’s lineage graph to the wider native GC solutions (e.g., Data Catalog) to build a more complete governance picture. Thus, having lineage capability natively makes it a lot easier to build a full governance view in GC.
In this blog, we will illustrate our example solution for data model building in BigQuery using dbt.
Throughout this process, we used the following tools and made some assumptions about the existing infrastructure:
The table we will focus on is our example dimension table for the analytical layer named dim_another_example_using_staging. This table is materialised using a very simple configuration and source SQL and has upstream dependencies as shown below:
{{ config( materialized = 'table', partition_by = { "field": "source_partition_timestamp", "data_type": "TIMESTAMP", "granularity": "day" } ) }} SELECT message_id, message_description, source_partition_timestamp, CURRENT_TIMESTAMP() AS insertion_timestamp FROM {{ ref('stg_dwh_example') }}
Yes — we can! All we need to do is enable the API and view the result!
Let’s take the following pre-requisite steps:
That’s it! We should now be able to see Data Lineage for the table built using dbt in BigQuery UI as shown below:
Whilst this is simple, some caveats exist when building tables in dbt, which can produce a confusing picture on the lineage graph in BigQuery UI. A couple of such examples are:
In this section, we will focus on one of the key caveats: working with an incremental, insert-overwrite MERGE strategy.
Please see: dbt’s page on incremental (specifically, insert-overwrite strategy)
At its core, the incremental, insert-overwrite strategy takes a write-append approach to MERGE. However, this works “under the hood” by creating a temp table (named as the same name as the target table + suffix __dbt_tmp) with a list of partitions to be replaced on the target table and on the same DML statement, simply replacing those partitions on target when matched with data in the temp table.
But, this can result in the BigQuery UI lineage graph producing a confusing output.
Look at one of the tables further upstream in the earlier lineage graph (fig 3) — stg_example. The config, code and dbt lineage for this table is as shown below:
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "source_partition_timestamp",
"data_type": "TIMESTAMP",
"granularity": "day"
},
table_cluster_fields(['sender', 'sender_email', 'message_id', 'message_description'] ),
post_hook="{{ row_access_policy('sender_pii_access') }}",
)
}}
WITH
# Example incremental CTE with partition filter
example AS (
SELECT
message_id,
message_description,
sender,
sender_email,
source_partition_timestamp
FROM {{ ref('extr_example') }}
WHERE source_partition_timestamp >= CAST( {{ load_based_filter_date() }} AS TIMESTAMP)
)
# Incremental Final Data
SELECT
*,
CURRENT_TIMESTAMP() AS insertion_timestamp
FROM example
If you now look at the lineage graph in BigQuery UI for this table, it’ll look quite confusing at first glance:
But, a closer look will show you that BigQuery has built the lineage based on the references on the DML generated by dbt, for the MERGE statement (insert-overwrite), which is also shown below:
-- generated script to merge partitions into `datatonic-<REDACTED>`.`sandbox_ash_medium_stg`.`stg_example`
declare dbt_partitions_for_replacement array<TIMESTAMP>;
declare _dbt_max_partition TIMESTAMP default (
select max(source_partition_timestamp) from `datatonic-<REDACTED>`.`sandbox_ash_medium_stg`.`stg_example`
where source_partition_timestamp is not null
);
-- 1. create a temp table
create or replace table `datatonic-<REDACTED>`.`sandbox_ash_medium_stg`.`stg_example__dbt_tmp`
partition by TIMESTAMP_trunc(source_partition_timestamp, day)
cluster by sender, sender_email, message_id, message_description
OPTIONS(
description="""This table an example table for data staging""",
labels=[('domain', 'data_staging'), ('data_steward', 'ashiq_sultan')],
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
# Example incremental config post_hook=row_access_policy('stg_example'),
# General config for a full table materialisation (example: a type-1 dimension data staging)
WITH
__dbt__cte__extr_example as (
SELECT
message_id,
message_description,
sender,
sender_email,
source_partition_timestamp
FROM `datatonic-<REDACTED>`.`dl_example_for_dbt`.`dl_example_generated`
),# Example incremental CTE with paritition filter
example AS (
SELECT
message_id,
message_description,
sender,
sender_email,
source_partition_timestamp
FROM __dbt__cte__extr_example
WHERE source_partition_timestamp >= CAST( IFNULL(DATE(_dbt_max_partition), '1990-01-01') AS TIMESTAMP)
)
# Incremental Final Data
SELECT
*,
CURRENT_TIMESTAMP() AS insertion_timestamp
FROM example
);
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct TIMESTAMP_trunc(source_partition_timestamp, day))
from `datatonic-<REDACTED>`.`sandbox_ash_medium_stg`.`stg_example__dbt_tmp`
);
-- 3. run the merge statement
merge into `datatonic-<REDACTED>`.`sandbox_ash_medium_stg`.`stg_example` as DBT_INTERNAL_DEST
using (
select * from `datatonic-<REDACTED>`.`sandbox_ash_medium_stg`.`stg_example__dbt_tmp`
) as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source
and TIMESTAMP_trunc(DBT_INTERNAL_DEST.source_partition_timestamp, day) in unnest(dbt_partitions_for_replacement)
then delete
when not matched then insert
(`message_id`, `message_description`, `sender`, `sender_email`, `source_partition_timestamp`, `insertion_timestamp`)
values
(`message_id`, `message_description`, `sender`, `sender_email`, `source_partition_timestamp`, `insertion_timestamp`)
;
-- 4. clean up the temp table
drop table if exists `datatonic-<REDACTED>`.`sandbox_ash_medium_stg`.`stg_example__dbt_tmp`
While these caveats should be considered, this combination of tools provides an easy and intuitive way to keep on top of Data Lineage when using Google Cloud. Data Lineage is crucial to any business wanting to know the source and keep track of processing, its data.
Datatonic, Google Cloud’s 4 X Partner of the Year, is the leading cloud data + AI consultancy for the world’s most ambitious businesses, challenging traditions to deliver tangible innovation at the leading edge of Google Cloud. Our team also has a wealth of experience with dbt, running the Expert Lounge at dbt Labs’ recent London event, Coalesce London. Need help with dbt? Get in touch at datatonic.com/contact/.
Know exactly where and how to start your AI journey with Datatonic’s
three-week AI Innovation Jumpstart *.
* Duration dependent on data complexity and use case chosen for POC model
With your own data sets, convince your business of the value of migrating your data warehouse, data lake and/or streaming platform to the cloud in four weeks.
With your own data, see how Looker can modernise your BI needs
with Datatonic’s two-week Showcase.