implementing a hybrid type 1 and 2 slowly changing dimension in fabric

Implementing a Hybrid Type 1 and 2 Slowly Changing Dimension in Fabric

History will be kind to me for I intend to write it.

– Winston Churchill

Sections

Most analytical databases contain fact and dimension tables organized into star schema data models. Fact tables are updated frequently, whereas dimension tables usually update less regularly. Analysts call these dimensions slowly changing dimensions and classify them into several types based on the logic of how they update. There are cases where keeping ‘old’ data is unimportant, but there are also cases where it needs to be retained.

Type 1 SCD will not retain historical information but overwrite it with new, current information. A Type 2 SCD adds a new row when data is updated. Each row includes metadata that tracks which row is current, and the date ranges for when it was the current row. A combination of Type 1 and Type 2 SCDs is often called a Type 7 SCD. A full implementation of a Type 7 SCD includes a surrogate key which is not done here for simplicity. To add a surrogate key this blog shows how.

Analysts use these patterns of SCDs in many analytical platforms. Microsoft recently introduced a new one called Fabric. Fabric is an upgraded version of Power BI that now includes many of the features available in Azure Synapse. The relevant upgrades to this blog are the ability to store data in a delta lake and to work with that data using spark notebooks.

Spark Notebooks allow a developer to use Apache Spark to work with data, which provides a powerful and flexible platform. In this blog, I will show an example of using Pyspark to create and update a table containing Type 1 and 2 SCD attributes.

The Code

I will begin by writing the necessary code to make a hybrid Type 1 and 2 SCD work. Then, I will run some data through it as an example.

hash1Col = ['EmailAddress','LastOrderDate'] # List of Type 1 columns
hash2Col = ['CustomerName','IndustrySector'] # List of Type 2 columns
 
source_df = source_df.withColumn("Hash1", lit(sha2(concat_ws("~", *hash1Col), 256))) \
   .withColumn("Hash2", lit(sha2(concat_ws("~", *hash2Col), 256))) \
   .withColumn("EffectiveFromDate", lit(current_timestamp())) \
   .withColumn("EffectiveToDate", to_timestamp(lit('2999-12-31T00:00:00.000'))) \
   .withColumn("UpdatedDate", to_timestamp(lit('2999-12-31T00:00:00.000'))) \
   .withColumn("CurrentFlag", lit("Y")) \
   .withColumn("DeletedFlag", lit("N"))

Slowly changing dimensions often contain several metadata columns which record and help control their updates. Above is the code needed to add the metadata to a data frame called source_df. A Type 2 SCD usually has a hash column created by hashing together all the values of the columns where changes are being tracked. In this case, we need two hashes because we are tracking two types of updates.

Lists contain the column names for each type. Then, hash columns are created based on each list. There are three date columns. The first two, EffectiveFromDate and EffectiveTwoDate, record the lifetime of each row for the Type 2 portion. UpdatedDate records the last time a Type 1 update was made on the row. Finally, current and deleted flags track the Type 2 versions of the data.

# Establish connection to delta table
target_dt = DeltaTable.forPath(spark, delta_path)
 
# Collecting the rows which need to be inserted into the delta table
newRowsToInsert = source_df \
   .alias("updates") \
   .join(target_dt.toDF().alias("target"), ['CustomerID'], how = 'inner') \
   .where("target.CurrentFlag = 'Y' AND (updates.Hash2 <> target.Hash2 OR target.DeletedFlag = 'Y')") \
   .selectExpr("updates.*")
 
# Creating a data frame of all rows before the merge
staged_updates = (
   newRowsToInsert
   .selectExpr("'Y' as isInsert", # Rows are flagged as inserts
       'updates.CustomerID',
        'updates.CustomerName',
        'updates.EmailAddress',
        'updates.IndustrySector',
        'updates.LastOrderDate',
        'updates.Hash1',
        'updates.Hash2',
        'updates.EffectiveFromDate',
        'updates.EffectiveToDate',
        'updates.UpdatedDate',
        'updates.CurrentFlag',
        'updates.DeletedFlag')
   .union(source_df.selectExpr("'N' as isInsert", *source_df.columns))
   )

The core of this example comes from the delta lake documentation for a Type 2 SCD. In the first step, I join source_df to the existing delta table on the CustomerID. Note that this syntax could allow a join on multiple columns by adding column names to the list. The WHERE clause of the join specifies that only current rows of the delta table are involved. The second part of the WHERE filters for when either the type 2 hashes do not match or the row is deleted.

The result is that this returns source rows which are type 2 updates, or rows reappearing in the data after disappearing and being deleted. In the second step, the newRowsToInsert data frame is unioned with the source_df, and a new column is added that labels each row as an insert or not. This data frame will be fed to a merge statement, which makes the updates.

target_dt.alias("target").merge(
staged_updates.alias("updates"),
"updates.CustomerID = target.CustomerID AND isInsert = 'N'") \
.whenMatchedUpdate( # Type 1 update. Type 2 updates skip this even if they contain a Type 1
condition = "target.CurrentFlag = 'Y' AND target.Hash1 <> updates.Hash1 AND target.Hash2 = updates.Hash2",
   set = {'EmailAddress': 'updates.EmailAddress',
                'LastOrderDate': 'updates.LastOrderDate',
                'UpdatedDate': lit(current_timestamp())}
).whenMatchedUpdate( # Type 2 update
condition = "target.CurrentFlag = 'Y' AND (updates.Hash2 <> target.Hash2 OR target.DeletedFlag = 'Y')",
   set = {                                    
        "CurrentFlag": "'N'",
       "EffectiveToDate": lit(current_timestamp())
   }
).whenNotMatchedInsert( # Type 2 Insert and new rows
values = {'CustomerID': 'updates.CustomerID',
            'CustomerName': 'updates.CustomerName',
            'EmailAddress': 'updates.EmailAddress',
            'IndustrySector': 'updates.IndustrySector',
            'LastOrderDate': 'updates.LastOrderDate',
            'Hash1': 'updates.Hash1',
            'Hash2': 'updates.Hash2',
            'EffectiveFromDate': 'updates.EffectiveFromDate',
            'EffectiveToDate': 'updates.EffectiveToDate',
            'UpdatedDate': 'updates.UpdatedDate',
            'CurrentFlag': 'updates.CurrentFlag',
            'DeletedFlag': 'updates.DeletedFlag'}
).execute() 

Most of the updates occur within a single merge statement. A merge statement is Spark’s method of doing complex updates to a delta table. In it, the delta table is specified and joined to a data frame with updates.

The merge can take multiple actions based on the match and if additional conditions are met. In this case, the merge joins on the CustomerID, and when staged_updates are labelled as not being inserted.

The first outcome is a Type 1 update, where a match in the join and additional criteria show a change in the Type 1 hash but no change in the Type 2 hash. In this case, the Type 1 columns are updated, and the UpdateDate date is updated.

The second case is for a type 2 update. The row keys match, but the type 2 hashes do not, so the row is labelled as not current, and the EffectiveToDate is updated. Deleted rows are also excluded here if a deleted row re-appears in the source.

Finally, type 2 current rows and new rows are inserted.

All this handles the updates to the data. Next, we need to handle deletes.

# Creates a data frame of all the rows in the delta table but not in the source
deleted_df = target_dt.toDF().alias("target") \
       .join(source_df.alias("updates"), ['CustomerID'], "leftanti") \
       .where("target.CurrentFlag = 'Y' AND target.DeletedFlag = 'N'")
 
# Updating metadata
deleted_df = deleted_df.withColumn("EffectiveFromDate", lit(current_timestamp())) \
   .withColumn("EffectiveToDate", to_timestamp(lit('2999-12-31T00:00:00.000'))) \
   .withColumn("CurrentFlag", lit("Y")) \
   .withColumn("DeletedFlag", lit("Y")).cache()
 
# Appending new deleted rows
deleted_df.write.format("target").mode("append").save(target_path)
 
# Merge to update current rows to not current
target_dt.alias("target").merge(
   source = deleted_df.alias("updates"),
   condition = "updates.CustomerID = target.CustomerID AND target.CurrentFlag = 'Y' AND target.DeletedFlag = 'N'"
).whenMatchedUpdate(
   set = {                                  
        "CurrentFlag": "'N'",
       "EffectiveToDate": "updates.EffectiveFromDate"
   }
).execute()

Rows which no longer appear in the source are loaded into a ‘deleted_df’ data frame, and then metadata columns are updated in the data frame. Caching retains the results so far and is essential because they will change as changes are made to the table.

The deleted rows are appended to the delta table as deleted. The final step is to update the relevant rows in the delta table to not current.

An Example

To see this in action, let’s go through an example to see how it works in practice. For this, we will create a small table with sample data. Then, we will make updates to the data and see the delta table update accordingly.

schema = StructType([ \
   StructField("CustomerID", IntegerType(), True), \
   StructField("CustomerName", StringType(), True), \
   StructField("EmailAddress", StringType(), True), \
   StructField("IndustrySector", StringType(), True), \
   StructField("LastOrderDate", StringType(), True) \
])
 
hash1Col = ['EmailAddress','LastOrderDate']
hash2Col = ['CustomerName','IndustrySector']
 
data1 = [(101,"Acme Ltd","[email protected]","Automotive","2024-03-01"),
   (102,"Mineral Corp","[email protected]","Mining","2024-02-15"),
   (103,"123 Alberta Ltd", "[email protected]","Forestry","2024-03-05"),
   (104,"Big Rig Industries","[email protected]","Oil and Gas","2024-01-10")]
 
df = spark.createDataFrame(data=data1,schema=schema)

The data is the customer information dimension table of a star schema. Running this through the code I worked through above creates a delta table which looks like this.

CustomerIDCustomerNameEmailAddressIndustrySectorLastOrderDateHash1Hash2EffectiveFromDateEffectiveToDateUpdatedDateCurrentFlagDeletedFlag
101Acme Ltd[email protected]Automotive2024-03-01efc03…7083c…2024-03-08 21:41:53.6062999-12-31 00:00:002999-12-31 00:00:00YN
102Mineral Corp[email protected]Mining2024-02-159f214…76dc2…2024-03-08 21:41:53.6062999-12-31 00:00:002999-12-31 00:00:00YN
103123 Alberta Ltd[email protected]Forestry2024-03-0560937…36b55…2024-03-08 21:41:53.6062999-12-31 00:00:002999-12-31 00:00:00YN
104Big Rig Industries[email protected]Oil and Gas2024-01-10487c5…b4a06…2024-03-08 21:41:53.6062999-12-31 00:00:002999-12-31 00:00:00YN

Now, let’s update the data. The types of changes made are shown at the end of each row and one row was deleted.

data2 = [(101,"Acme Ltd","[email protected]","Automotive","2024-03-11"), # Type 1
   (103,"Rocky Mtd Pines Ltd", "[email protected]","Forestry","2024-03-05"), # Type 1 and 2
   (104,"Big Rig Industries","[email protected]","Oil Drilling","2024-01-10"), # Type 2
   (105,"Combine Corp","[email protected]","Agriculture","2024-03-15") # New Row
 ]
# (102,"Mineral Corp","[email protected]","Mining","2024-02-15"), # Deleted
dfNew = spark.createDataFrame(data=data2,schema=schema)
CustomerIDCustomerNameEmailAddressIndustrySectorLastOrderDateHash1Hash2EffectiveFromDateEffectiveToDateUpdatedDateCurrentFlagDeletedFlag
CustomerIDCustomerNameEmailAddressIndustrySectorLastOrderDateHash1Hash2EffectiveFromDateEffectiveToDateUpdatedDateCurrentFlagDeletedFlag
101Acme Ltd[email protected]Automotive2024-03-11efc03…7083c…2024-03-08 21:41:53.6062999-12-31 00:00:002024-03-08 21:41:59.265YN
102Mineral Corp[email protected]Mining2024-02-159f214…76dc2…2024-03-08 21:41:53.6062024-03-08 21:42:10.3792999-12-31 00:00:00NN
102Mineral Corp[email protected]Mining2024-02-159f214…76dc2…2024-03-08 21:42:06.0052999-12-31 00:00:002999-12-31 00:00:00YY
103123 Alberta Ltd[email protected]Forestry2024-03-0560937…36b55…2024-03-08 21:41:53.6062024-03-08 21:41:59.2652999-12-31 00:00:00NN
103Rocky Mtd Pines Ltd[email protected]Forestry2024-03-0525ee6…6649e…2024-03-08 21:41:59.2652999-12-31 00:00:002999-12-31 00:00:00YN
104Big Rig Industries[email protected]Oil and Gas2024-01-10487c5…b4a06…2024-03-08 21:41:53.6062024-03-08 21:41:59.2652999-12-31 00:00:00NN
104Big Rig Industries[email protected]Oil Drilling2024-01-10487c5…0554a…2024-03-08 21:41:59.2652999-12-31 00:00:002999-12-31 00:00:00YN
105Combine Corp[email protected]Agriculture2024-03-151e3dd…97d4e…2024-03-08 21:41:59.2652999-12-31 00:00:002999-12-31 00:00:00YN

Here’s one more update. Note that I am also including a scenario in which data was deleted but then came back.

data3 = [(101,"Acme Inc","[email protected]","Automotive","2024-03-11"), # Type 2
   (102,"Mineral Corp","[email protected]","Mining","2024-02-15"), # Row returned
   (103,"Rocky Mtd Pines Ltd", "[email protected]","Forestry","2024-03-05"), # No change
   (104,"Big Rig Industries","[email protected]","Oil Drilling","2024-01-10"), # No change
   (105,"Combine Corp","[email protected]","Agriculture","2024-03-15") # New Row
 ]
 
dfNew = spark.createDataFrame(data=data3,schema=schema)
CustomerIDCustomerNameEmailAddressIndustrySectorLastOrderDateHash1Hash2EffectiveFromDateEffectiveToDateUpdatedDateCurrentFlagDeletedFlag
101Acme Ltd[email protected]Automotive2024-03-11efc03…7083c…2024-03-08 21:41:53.6062024-03-08 21:42:18.3082024-03-08 21:41:59.265NN
101Acme Inc[email protected]Automotive2024-03-11ef88d…75a10…2024-03-08 21:42:18.3082999-12-31 00:00:002999-12-31 00:00:00YN
102Mineral Corp[email protected]Mining2024-02-159f214…76dc2…2024-03-08 21:41:53.6062024-03-08 21:42:10.3792999-12-31 00:00:00NN
102Mineral Corp[email protected]Mining2024-02-159f214…76dc2…2024-03-08 21:42:06.0052024-03-08 21:42:18.3082999-12-31 00:00:00NY
102Mineral Corp[email protected]Mining2024-02-159f214…76dc2…2024-03-08 21:42:18.3082999-12-31 00:00:002999-12-31 00:00:00YN
103123 Alberta Ltd[email protected]Forestry2024-03-0560937…36b55…2024-03-08 21:41:53.6062024-03-08 21:41:59.2652999-12-31 00:00:00NN
103Rocky Mtd Pines Ltd[email protected]Forestry2024-03-0525ee6…6649e…2024-03-08 21:41:59.2652999-12-31 00:00:002999-12-31 00:00:00YN
104Big Rig Industries[email protected]Oil and Gas2024-01-10487c5…b4a06…2024-03-08 21:41:53.6062024-03-08 21:41:59.2652999-12-31 00:00:00NN
104Big Rig Industries[email protected]Oil Drilling2024-01-10487c5…0554a…2024-03-08 21:41:59.2652999-12-31 00:00:002999-12-31 00:00:00YN
105Combine Corp[email protected]Agriculture2024-03-151e3dd…97d4e…2024-03-08 21:41:59.2652999-12-31 00:00:002999-12-31 00:00:00YN

Conclusion

There are many ways to track changes in dimension tables. Analysts use several standard methods, but there are cases where one needs to combine the approaches. In the case I have shown, some columns act as a Type 2 SCD, while others do not need to keep history.

I have demonstrated this using Pyspark in a Microsoft Fabric notebook. The example shown in this blog does not contain a surrogate key, a common addition to dimension tables, and would turn this into a Type 7 SCD if you want to add a surrogate keyย this blogย demonstrates how to do that in a notebook.

Share this post

Let's get started with a consultation

Get started with training