Insights

Joining CSV Data In Apache Beam

This article describes how we built a generic solution to perform joins of CSV data in Apache Beam. Typically in Apache Beam, joins are not straightforward. Beam supplies a Join library which is useful, but the data still needs to be prepared before the join, and merged after the join. How then do we perform these actions generically, such that the solution can be reused? Read on to find out!

Apache Beam is a unified framework for batch and streaming data sources that provides intuitive support for your ETL (Extract-Transform-Load) pipelines. Abstracting the application code from the executing engine (runner) means you can port your processes between runners. In this article, we will be using the Google Cloud Dataflow runner, which is a managed service on the Google Cloud Platform for running Beam jobs and provides useful autoscaling capabilities. We also use Beam’s Java SDK.

At Datatonic, we recently undertook a project for a client to build a data lake. This article describes our implementation of joins in Apache Beam for this project, allowing joins of generic CSV data.

The Project

This project aimed to build a data lake on GCP, using two of Google’s flagship technologies: Google Cloud Storage and BigQuery. The data lake should be generic enough so that, over time, it can scale up to handle data for all of the client’s business. To start with, the data lake was built for a single part of the client’s business. Once the initial data lake was built, the client would take full ownership of the data lake and expand it to cover their remaining businesses.

If you’ve looked at building a data lake on GCP before, you may recognise the following flow:

  1. Files land in GCS
  2. Dataflow reads the files from GCS and performs necessary processing
  3. Dataflow writes to GCS and BigQuery

The processing step can include joining data. For example, sales data might be split across multiple files. Joining the sales data and writing to a single location (e.g. a BigQuery table) is ideal for analytics. In this project, the files landing in GCS were CSVs. This made the processing simpler than, say, JSON, where we would have to consider nested data structures.

Challenges

To use Beam’s Join library, you have to transform each element in each input collection to a KV object, where the key is the object you would like to join on (let’s call it the “join-key”). When joining, a CoGroupByKey transform is applied, which groups elements from both the left and right collections with the same join-key. The Join library produces a single PCollection, where an element is a KV object with:

  • The join-key as the key
  • Another KV object as the value. This internal KV object contains:
    • The left collection element as key
    • The right collection element as value

The next transform should invoke a DoFn that merges the left and right collection elements in a logical way.

Some of the challenges we faced were:

  1. The solution needed to scale out to the client’s various businesses. How do we build a solution which is generic enough to handle a variety of schemas and data sources, and is simple enough for relative newcomers to Beam to use to build new pipelines?
  2. When joining a left and right collection, the column names may not match.
  3. One requirement was to join on multiple columns (i.e. multiple entries in the Maps).
  4. In an outer join, you may end up with null values from one or both sides (nulls can appear on both sides in a full join). For the join columns, which collection’s values should be used; left or right?
  5. Another challenge when handling null values was ensuring that the resulting PCollection had the same columns for all elements. When no match was found in the right table for a left join, for example, how do we ensure that the joined row has the same columns as a row which did find matches in both left and right?

Solution

Building A Generic Solution

As we were reading and processing CSV data, and we wanted to avoid writing non-generic Java code (e.g. a POJO), we read each line of CSV data as a String and transformed this line (using the CSV header) into a Map<String, String> object, where the keys are the column names, and the values are the column values. The values are all Strings as the CSV is read as a String.

The result was two PCollections, each with Map<String, String> elements:

PCollection<String> leftInput = TextIO.read().from(“left.csv”)
PCollection<String> rightInput = TextIO.read().from(“right.csv”)

PCollection<Map<String, String>> leftMaps = leftInput.apply(ParDo.of(new CsvSplitter(leftHeader)).withSideInputs(leftHeader))
PCollection<Map<String, String>> rightMaps = rightInput.apply(ParDo.of(new CsvSplitter(rightHeader)).withSideInputs(rightHeader))

To prepare a PCollection for a join, we created a DoFn that would extract the column values to generate a join-key. This could be reused for any PCollection of Maps ahead of a join.

After a join using Beam’s Join library, we were left with two Maps (one for the row from the left collection, and one for the row from the right collection). Merging two Maps in Java is usually easy, except in this case we don’t want to include the same column twice (remembering that by definition a join column appears in both left and right collections). We therefore created a generic DoFn that would merge the two Maps, while taking account of any join columns.

As a small aesthetic change to help readability, the sections of the pipeline where joins occurred were separated out into Java static methods. For a more sophisticated approach to making joins generic, see the Next Steps section.

Matching Column Names In Left And Right Collections

To keep track of mappings between columns, we created a ColumnMapping Java object:

public class ColumnMapping implements Serializable {

    private String alias; // The name of the resulting column of the join (i.e. the "AS" value in SQL)
    private String columnNameInLeftTable;
    private String columnNameInRightTable;

    // Add getter methods here
}

It’s as simple as that. The class implements Serializable as this is necessary in Beam to send data between workers.

Now when we prepared a PCollection for a join by generating a join-key, we could feed in the ColumnMapping object and an indication of whether this is the left or right collection being processed (we used an enum for this):

// These are supplied via the DoFn constructor
ColumnMapping mapping = ...;
JoinCollection isWhichCollection = ...;

// This is supplied as the DoFn ProcessElement method argument
Map<String, String> element = ...;

String joinKey;
if (isWhichCollection.equals(JoinCollection.LEFT)) {
    joinKey = element.get(mapping.getColumnNameInLeftTable());
} else {
    joinKey = element.get(mapping.getColumnNameInRightTable());
}

KV<String, Map<String, String>> output = KV.of(joinKey, element);

Joining On Multiple Columns

We created a Java collection (e.g. a List) of ColumnMapping objects and passed this to the DoFn (instead of a single object). Instead of the joinKey being a single String, it can be a List of Strings. Using multiple ColumnMapping objects gives us a simple and readable way of tracking many different mappings.

Selecting Which Collection Values To Use In Join Columns

In the ColumnMapping object, we added a specifier to indicate which collection to pull values from (again, using an enum):

public class ColumnMapping implements Serializable {

    private String alias;
    private String columnNameInLeftTable;
    private String columnNameInRightTable;
    private JoinCollection useWhichValue;

    // Add getter methods here
}

After invoking Beam’s Join library, the DoFn merges the two Maps, using the ColumnMapping objects to ensure the correct values are used:

// This is supplied via the DoFn constructor
List<ColumnMapping> mappings = ...;

// These are supplied in the DoFn's ProcessElement method input argument
Map<String, String> leftRow = ...;
Map<String, String> rightRow = ...;

Map<String, String> output = new HashMap<>();

for (leftCell : leftRow) {
    if (mappings list does not contain leftCell) {
        // Add the leftCell key and value to the output
    } else if (mappings contains leftCell and specifies to use the left value) {
        // Add the left value to the output, using the mapping alias as key
    }
}

for (rightCell : rightRow) {
    if (mappings list does not contain rightCell) {
        // Add the rightCell key and value to the output
    } else if (mappins contains rightCell and specifies to use the right value) {
        // Add the right value to the output, using the mapping alias as key
    }
}

Ensuring All Elements Consist Of The Same Columns

When using Beam’s Join library for outer joins, you have to supply a value to use as the null value (as null cannot be serialized). We wanted all elements in a joined PCollection to have the same schema, to avoid any complications when writing the data*.

To do this, before executing the join, we created a PCollectionView of the nullable collection’s keys and used Java’s Collections class to create an empty Map for nulls:

PCollectionView<List<String>> rightKeys = right
    .apply(Sample.any(1)) // We only need to do this for a single element
    .apply(ParDo.of(new RetrieveKeys()))
    .apply(View.asSingleton());

PCollection<KV<String, KV<Map<String, String>, Map<String, String>>>> joined = Join.leftOuterJoin(left, right, Collections.emptyMap());

In the DoFn that merges the two Maps, we feed in the PCollectionView as a side input, and any time we find an empty Map for the appropriate side, we use the side input to generate a Map with the correct columns and empty values. We then merge the Maps as before.

If that sounds complicated, the important thing to remember is that you are simply using the PCollectionView to keep a record of the nullable side’s keys, and then use these to inform the post-join result.

*BigQuery does not have any issues when uploading rows with missing columns, as long as those columns are defined as nullable in BigQuery. However, this may not be the case with every destination, and some destinations that won’t throw exceptions (e.g. writing to text file) may still benefit from a consistent schema.

Next Steps

Making Joins Generic

Abstracting the join logic to a separate method should allow for simpler reuse. A user would then simply need to define the ColumnMapping objects and feed in the PCollections to be joined, along with some flags to indicate the expected method (e.g. whether to use a left, right, full or inner join). As long as the input PCollections contain Maps, the process of extracting join-keys from the Maps, grouping the elements, merging the Maps, and accounting for nulls could be completely removed from the rest of the pipeline.

Of course, given that joins are dependent upon order (i.e. defining the left and right sides is crucial), the arguments to such a method would be very important. The correct order might also be unclear; a common Java problem. To get around this, we could simply use a Builder pattern:

CsvJoin csvJoin = new CsvJoin.Builder()
       .leftHandCollection(leftPipeline)
       .rightHandCollection(rightPipeline)
       .joinOnColumnMappings(columnMappings)
       .usingJoinType(JoinType.LEFT)
       .build();

PCollection<Map<String, String>> output = csvJoin.joinCollections();

Expand To JSON

CSV data is quite simple to handle, as it involves no nesting or grouping. Other formats such as JSON present additional challenges. We could expand the solution to also handle JSON data.

If using other data source formats (e.g. avro), Beam’s new schema feature should help. Keep an eye out for this in the next versions of Beam!

Something To Consider

We attempted joining multiple larger datasets in Beam, but found after three hours the process was ongoing. Even using more powerful machine types as Dataflow workers did not significantly help the performance, and we wanted to keep the cost of running the pipeline low. Running the join in BigQuery using standard SQL, by contrast, took less than 1 minute. Therefore, we chose to use BigQuery for the largest joins.

It may sound like a cheat, but it’s always worth bearing in mind whether a technology is appropriate. Beam (like other data engineering frameworks) provides a way to understand, maintain, and debug data processing pipelines that simply isn’t available in SQL systems. A complex SQL query can be incredibly difficult to read, whereas code is generally more intuitive. However, on this occasion the performance benefit of the SQL solution outweighed the costs.

Summary

Joins in Beam can be tricky, as we found. Using a consistent approach allowed us to create generic functions and transforms, which can scale out to the client’s various businesses. For future projects involving processing CSV data, most of the code is reusable – a core principle in all software development.

If you are interested in building a Data Lake for your own business, or generally how we at Datatonic can help you unlock value from your data, don’t hesitate to contact us!

Related
View all
View all
UNDP Solar
Insights
Providing Access to Energy with UNDP
Computer Vision
Partner of the Year Awards
Insights
Datatonic Wins Four 2024 Google Cloud Partner of the Year Awards
Women in Data and Analytics
Insights
Coding Confidence: Inspiring Women in Data and Analytics