I’m a Solution Architect and Data Engineer in our Integrations team at work, and I own one of our API products, a Data Lake GraphQL API. In this article, I wanted to share with you the process of how I built it — starting from the data side.
This article assumes you’re familiar with Azure services such as Data Lake Storage, Data Factory, and Databricks, and have some general knowledge of GraphQL, Python/PySpark, and SQL.
Problem
Data lakes are designed to store large amounts of data usually meant for data analytics and reporting, but one of our use cases at work was to turn that Data Lake into a source for a GraphQL API. Sort of like a Reverse ETL but applications (both Internal and SaaS) will call our API to get the data they need.
Why? Well, it’s a central data source that contains everything our users need, and the data is highly relational which is perfect for GraphQL.
There are a few issues though, primarily with the data:
- We have no control over the data lake and the data that goes in there since another group owns that. So we take what they can give.
- Data is stored as parquet files in the lake. We need to move that to an SQL database to be able to use an ORM and take advantage of that relational goodness.
- In our Data Lake, there’s no enforcement of primary key uniqueness, meaning that data is sometimes ingested without deduplication.
- No fields can be used to signify that the data is new or updated, such as an updated_at field, so there was no easy way to get the delta.
- Our data lake uses the Star Schema model, and some fact tables can get really huge (~70 million rows), so ingesting them from scratch every time would take almost 8 hours.
Design
Our API’s high-level architecture looks like this:
In Part 1 of this series, I’ll discuss the first three stages and tackle the data engineering aspect first.
Let’s go through every issue above and see what we can do about it.
- Data Quality Control - we can create our own pipelines to have more control over the data we get from the lake.
- Parquet to SQL Transformation - we can do this in Databricks.
- Deduplication - we can also do this in Databricks.
- Change Data Capture - I needed to implement our own CDC system due to the lack of any versioning, update timestamps, or logs from our source. What worked for us is creating a hash of all the columns and storing that as a separate column. Basically, a checksum column. If a single column’s value changes, the hash would also change.
- Large Data Volume - I needed to create a delta load pipeline that can detect and extract only the new and updated data since the last load to minimize the volume of data written to our database. This pipeline should include a mechanism for identifying changes in the data (inserts, updates, and deletes) and reflect that in our SQL database.
Let’s break the above into just three concrete steps for our pipeline, which is the result of months of testing and development:
- Sanitation
- Deduplication
- Delta Loading
Sanitation
We need to sanitize and modify the source tables to ensure data quality before ingesting it into the SQL database. This includes filtering records, filling in null values, and removing data quality issues.
Below are snippets of code from our Databricks notebook (which runs on Apache Spark):
The source table is read from ADLS2 and returned as a DataFrame.
1# Read the source table2dfSource = (spark.read3 .format('parquet')4 .option("mode", "FAILFAST")5 .load(adls2sourcepath))
The DataFrame is sanitized depending on what table it is.
1# do source table modifications2if pTableName == 'fact_table_1':3 dfSource = dfSource.filter(f"fiscal_year_num >= '{datetime.datetime.now().year - 1}'") # filter records to current year-14 dfSource = dfSource.fillna(value='',subset=["tdcval_code"]) #tdcval_code is part of the primary key and cannot be null5elif pTableName == 'fact_table_2':6 dfSource = dfSource.filter("contract_id is not null") # remove empty Contract id's that were data issues7 dfSource = dfSource.fillna(value=0,subset=["allocation_id"]) #allocation_id is part of the primary key and cannot be null8elif pTableName == 'fact_table_3':9 dfSource = dfSource.filter(f"doc_post_date >= '{datetime.datetime.now().year - 1}'") # filter records to current year-110elif pTableName == 'fact_table_4':11 dfSource = dfSource.filter(f"partition_key > '201712'") # filter for data quality issues. If you remove that filter data before that time is not of good quality.
Deduplication
To deduplicate, we need to know the primary keys. To do this dynamically, I created a function in SQL Server that outputs the table definition.
This returns each column’s name, type, length, precision, scale, and if it’s nullable or a primary key.
1CREATE FUNCTION [config].[func_get_table_definition]2(3 @SchemaName [NVARCHAR](128),@TableName [NVARCHAR](225)4)5RETURNS TABLE6AS7RETURN8(9 SELECT10 c.name 'column_name',11 t.Name 'data_type',12 c.max_length/2 'length',13 c.precision ,14 c.scale ,15 c.is_nullable,16 ISNULL(i.is_primary_key, 0) 'is_pk'17 FROM18 sys.columns c19 INNER JOIN20 sys.types t ON c.user_type_id = t.user_type_id21 -- OUTER APPLY selects a single row that matches each row from the left table.22 OUTER APPLY23 (24 SELECT TOP 1 *25 FROM sys.index_columns M26 WHERE M.object_id = c.object_id AND M.column_id = c.column_id27 ) m28 LEFT OUTER JOIN29 sys.indexes i ON m.object_id = i.object_id AND m.index_id = i.index_id30 WHERE31 c.object_id = OBJECT_ID(@SchemaName + '.' + @TableName)32)
Example output for a workflow_status_dim table, which I’ll be using in the rest of the examples here:
column_name | data_type | length | precision | scale | is_nullable | is_pk |
workflow_status | nvarchar | 10 | 0 | 0 | FALSE | TRUE |
workflow_status_description | nvarchar | 35 | 0 | 0 | TRUE | FALSE |
To run the function in Databricks, we just read it like any other table via a SELECT
statement:
1# Read target table definition2dfTargetDef = spark.read \3 .format("jdbc") \4 .option("url", jdbcUrl) \5 .option("query", "select * from config.func_get_table_definition('{0}', '{1}')".format(pSchemaName, pTableName)) \6 .option("user", dbUser) \7 .option("password", dbPass).load()
Once we have the primary keys, we can deduplicate using PySpark’s dropDuplicates
method:
1# dedup source data according to primary key columns23pkColumns = []45pkColumnsDf = dfTargetDef.filter(dfTargetDef.is_pk == 'true')6for row in pkColumnsDf.collect():7 pkColumns.append(f"{row.column_name}")8 if('char' in row.data_type): dfSource = dfSource.withColumn(row.column_name, trim(col(row.column_name))) # trim string primary columns, remove leading and trailing spaces. Spaces cause issues in merging/primary keys910display(dfSource.groupBy(pkColumns).agg(count(11 '*').alias("count_duplicates")).filter(12 col('count_duplicates') >= 2)) # display the duplicate records1314dfSourceCount = dfSource.count()15dfSource = dfSource.dropDuplicates(pkColumns)16dfSourceDedupCount = dfSource.count()17dupCount = dfSourceCount - dfSourceDedupCount1819print(dupCount)2021# Create a temp view of the source table22dfSource.createOrReplaceTempView("SourceTable")
Aside from deduplication, we also do a minor but important sanitation step which is trimming the char
-type primary key columns. Rogue spaces in keys can cause issues when merging the data (unfortunately, this was based on experience).
Delta Loading
Generating the Checksum
The first step in the delta-loading process is creating the checksum column in the source.
The code below dynamically generates an SQL statement that creates a delta table named {pTableName}_src
within the Databricks HMS (Hive Metastore). This contains the source table columns plus additional columns such as:
- check_sum which contains the hash of all columns created using the built-in
HASH
function of PySpark - date columns (modified and created date)
- flag columns (is_modified and is_deleted) to be able to filter out the changes
1# Create source table with check_sum hash2spark.sql("REFRESH TABLE SourceTable") # Invalidates the cached entries3spark.sql(f"DROP TABLE IF EXISTS {pTableName}_src")45entries = [f"CREATE TABLE {pTableName}_src AS SELECT"]6for row in dfTargetDef.collect():7 entries.append(f"SourceTable.{row.column_name},")89entries.append("HASH (")10for idx,row in dfTargetDef.toPandas().iterrows():11 entries.append(f"SourceTable.{row.column_name}{'' if idx == len(dfTargetDef.toPandas().index)-1 else ','}")1213entries.append(") as check_sum,")14entries.append("current_timestamp() as created_date,\ncurrent_timestamp() as modified_date,\nfalse as is_modified,\nfalse as is_deleted")15entries.append("FROM SourceTable;")1617sql_comm = "\n".join(str(x) for x in entries)18spark.sql(sql_comm)
This generates an SQL statement like below:
1CREATE TABLE workflow_status_dim_src AS SELECT2SourceTable.workflow_status,3SourceTable.workflow_status_description,4HASH (5SourceTable.workflow_status,6SourceTable.workflow_status_description7) as check_sum,8current_timestamp() as created_date,9current_timestamp() as modified_date,10false as is_modified,11false as is_deleted12FROM SourceTable;
Merging the Data
We first read the target table in our SQL database (our working tables are in the api
schema). We store this as {pTableName}_dlt
in Databricks.
1dfTarget = spark.read \2 .format("com.microsoft.sqlserver.jdbc.spark") \3 .option("url", jdbcUrl) \4 .option("dbtable", "api." + pTableName) \5 .option("user", dbUser) \6 .option("password", dbPass).load()78dfTarget.createOrReplaceTempView("TargetTable")910# Create the delta/managed tables. This initially loads the rows from the source and target tables.11spark.sql(f"CREATE TABLE IF NOT EXISTS {pTableName}_dlt USING delta AS SELECT * FROM TargetTable;")
Next is to merge the source with the target table. The below code dynamically generates a MERGE
statement to merge {pTableName}_src
into {pTableName}_dlt
.
1# Merge source into target table23entries = [f"MERGE INTO {pTableName}_dlt tgt USING {pTableName}_src src ON ("]45# Get primary keys6df_pk = dfTargetDef.filter(dfTargetDef.is_pk == 'true').toPandas()7for idx,row in df_pk.iterrows():8 entries.append(f"src.{row.column_name} = tgt.{row.column_name}{'' if idx == len(df_pk.index)-1 else ' AND'}")910entries.append(') WHEN MATCHED AND tgt.check_sum <> src.check_sum THEN UPDATE SET')1112# https://www.geeksforgeeks.org/how-to-iterate-over-rows-and-columns-in-pyspark-dataframe/13for row in dfTargetDef.collect():14 entries.append(f"tgt.{row.column_name} = src.{row.column_name},")1516entries.append('tgt.check_sum = src.check_sum,\ntgt.modified_date = src.modified_date,\ntgt.is_modified = true,\ntgt.is_deleted = false')1718entries.append('WHEN NOT MATCHED BY TARGET THEN INSERT (')19for row in dfTargetDef.collect():20 entries.append(f"{row.column_name},")2122entries.append('check_sum,\ncreated_date,\nmodified_date,\nis_modified,\nis_deleted')23entries.append(') VALUES (')2425for row in dfTargetDef.collect():26 entries.append(f"src.{row.column_name},")2728entries.append('src.check_sum,\nsrc.created_date,\nsrc.modified_date,\ntrue,\nfalse')29entries.append(') WHEN NOT MATCHED BY SOURCE THEN UPDATE SET tgt.is_deleted = true;')3031sql_comm = "\n".join(str(x) for x in entries)32spark.sql(sql_comm)
This generates an SQL statement like below:
1MERGE INTO workflow_status_dim_dlt tgt USING workflow_status_dim_src src ON (2src.workflow_status = tgt.workflow_status3) WHEN MATCHED AND tgt.check_sum <> src.check_sum THEN UPDATE SET4tgt.workflow_status = COALESCE(src.workflow_status, tgt.workflow_status),5tgt.workflow_status_description = COALESCE(src.workflow_status_description, tgt.workflow_status_description),6tgt.check_sum = src.check_sum,7tgt.modified_date = src.modified_date,8tgt.is_modified = true,9tgt.is_deleted = false10WHEN NOT MATCHED BY TARGET THEN INSERT (11workflow_status,12workflow_status_description,13check_sum,14created_date,15modified_date,16is_modified,17is_deleted18) VALUES (19src.workflow_status,20src.workflow_status_description,21src.check_sum,22src.created_date,23src.modified_date,24true,25false26) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET tgt.is_deleted = true;
- If the primary keys match and the checksums don’t, that means the record has been updated. Set
is_modified
to True. - When no primary keys match, that means it’s a new record. Insert and set
is_modified
to True. - When the primary key isn’t present in the source, then it’s a deleted record. Set
is_deleted
to True.
Writing the Changes
Once the delta tables have been merged, we write the changes (those with True is_modified
or is_deleted
) to our SQL database in another schema (we use delta
).
We use Microsoft’s custom Spark connector for SQL Server to take advantage of the BULK INSERT
functionality and make our writes a lot quicker.
1dfWrite = spark.sql(f"SELECT * FROM {pTableName}_dlt where is_modified=True OR is_deleted=True;")23dfWrite.write \4 .format("com.microsoft.sqlserver.jdbc.spark") \5 .mode("overwrite") \6 .option("url", jdbcUrl) \7 .option("dbtable", "delta." + pTableName) \8 .option("reliabilityLevel", "BEST_EFFORT") \9 .option("batchsize", 100000) \10 .option("tableLock", "true") \11 .option("user", dbUser) \12 .option("password", dbPass) \13 .option("schemaCheckEnabled", "false") \14 .save()
Since we write the changes to another table, we have to merge that to our main table in the api
schema. This time, the MERGE
statement runs in a stored procedure in the SQL database, not in Databricks.
Since columns get changed from time to time in our data lake tables and there are multiple tables in the lake, I had to make the stored procedure dynamic. To do that, I created the procedure via Databricks.
I tapped directly into the database connection object to be able to create a stored procedure in the SQL database.
1# Create stored procedure code to merge delta table to api table in database2driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager3con = driver_manager.getConnection(jdbcUrl, dbUser, dbPass)45exec_statement = con.prepareCall(f"IF (OBJECT_ID('config.delta_load_{pTableName}', 'P') IS NOT NULL) DROP PROCEDURE [config].[delta_load_{pTableName}];")6exec_statement.execute()78entries = [f"CREATE PROCEDURE [config].[delta_load_{pTableName}] AS BEGIN"]910entries.append(f"MERGE INTO [api].[{pTableName}] AS tgt USING [delta].[{pTableName}] src ON (") # https://stackoverflow.com/questions/10724348/merge-violation-of-primary-key-constraint1112# Get primary keys13df_pk = dfTargetDef.filter(dfTargetDef.is_pk == 'true').toPandas()14for idx,row in df_pk.iterrows():15 entries.append(f"src.{row.column_name} = tgt.{row.column_name}{'' if idx == len(df_pk.index)-1 else ' AND'}")1617entries.append(') WHEN MATCHED AND src.is_modified = 1 THEN UPDATE SET')1819for row in dfTargetDef.collect():20 entries.append(f"tgt.{row.column_name} = src.{row.column_name},")2122entries.append('tgt.check_sum = src.check_sum,\ntgt.modified_date = src.modified_date')23entries.append('WHEN MATCHED AND src.is_deleted = 1 THEN DELETE WHEN NOT MATCHED THEN INSERT (')2425for row in dfTargetDef.collect():26 entries.append(f"{row.column_name},")2728entries.append('check_sum,\ncreated_date,\nmodified_date')29entries.append(') VALUES (')3031for row in dfTargetDef.collect():32 entries.append(f"src.{row.column_name},")3334entries.append('src.check_sum,\nsrc.created_date,\nsrc.modified_date')35entries.append(f'); DROP TABLE [delta].{pTableName}; END')3637sql_proc = "\n".join(str(x) for x in entries)3839# create stored procedure in database40exec_statement = con.prepareCall(sql_proc)41exec_statement.execute()4243# Close connections44exec_statement.close()45con.close()
This generates an SQL statement like below:
1CREATE PROCEDURE [config].[delta_load_workflow_status_dim] AS BEGIN2MERGE INTO [api].[workflow_status_dim] tgt USING [delta].[workflow_status_dim] src ON (3src.workflow_status = tgt.workflow_status4) WHEN MATCHED AND src.is_modified = 1 THEN UPDATE SET5tgt.workflow_status = COALESCE(src.workflow_status, tgt.workflow_status),6tgt.workflow_status_description = COALESCE(src.workflow_status_description, tgt.workflow_status_description),7tgt.check_sum = src.check_sum,8tgt.modified_date = src.modified_date9WHEN MATCHED AND src.is_deleted = 1 THEN DELETE WHEN NOT MATCHED THEN INSERT (10workflow_status,11workflow_status_description,12check_sum,13created_date,14modified_date15) VALUES (16src.workflow_status,17src.workflow_status_description,18src.check_sum,19src.created_date,20src.modified_date21); DROP TABLE [delta].workflow_status_dim; END
- If the primary keys match and
is_modified
is True, that means the record has been updated → UPDATE. - If the primary keys match and
is_deleted
is True, that means the record has been deleted→ DELETE. - When no primary keys match, that means it’s a new record → INSERT.
- Once the merge is done, we drop the table in the
delta
schema.
Orchestration
Last but not the least is to orchestrate the whole process. For this, we use Data Factory.
The above flow is what we use for each table in the Data lake.
- Record the start date
- Run the Databricks notebook to load the changes to the SQL DB
- If there are changes, run the stored procedure to merge the changes in the SQL DB
- If steps 2 or 3 fail, roll back the tables in Databricks using
RESTORE TABLE
to make sure their states are the same as the tables in the SQL DB. - Finally, run a stored procedure to save metadata for debugging & analytics purposes (errors, number of rows added/deleted/changed, etc.). We maintain this in another table in the SQL DB.
Results
With our delta load implementation, we’ve seen a 70% decrease in data load times vs. loading each table from scratch from the lake to our SQL database every time (which was our very first iteration).
Before, our data pipelines took 7-10 hours to complete each day, but now it just takes 1-2 hours. This highly depends on your compute capacity, but in our case (Standard_DS3_v2 2-8 worker Databricks cluster and GP_S_Gen5_12 SQL database) assuming a change of 10% or less in the datasets:
- A table in the tens of thousands of rows just takes a minute to load the new data.
- A table with a million rows takes about 10 minutes or less.
- A table with about ~80 million rows takes around an hour.
Let me know if you have comments, questions, or suggestions below, or connect with me at hello@kenzojrc.com or my socials. If this article helped you in any way, it would also be great to hear it!
Stay tuned for the next.