Incremental dbt models using Adobe Analytics Clickstream

In the previous article (Adobe Analytics Clickstream data with dbt & SQL) I discussed how to start with dbt and build basic models using Adobe Analytics Clickstream data. This time will dive into a more useful feature. Creating incremental dbt models using the same Adobe Analytics data. The motivation behind this is to make your data processing pipeline robust enough to:

  • handle re-uploaded files with modified data. Data Feed exports might fail or delay. And you need to work on that assumption. Being able to cater for delayed data delivery is important
  • avoid double-counting. Sometimes mistakes happen, the same data might be ingested more than once. Better prepare for that scenario and avoid duplicates entries
  • be idempotent. Every time a data processing step executes for a particular time frame, should generate the same results. So you can break down the pipeline execution in daily steps. No matter how many times you run the step for a particular day, the end result in the data should always be the same

The above assumes you already have a process to load your Data Feeds in a base table. If you want to setup a data loading process in BigQuery, feel free to contact at info@analyticsmayhem.com! Let’s get into it!

Merge Statement

Incremental models in dbt rely on using MERGE statement. For the purposes of this article, I am using BigQuery (MERGE documentation). DML stands for Data Manipulation Language and allows you to update, insert and delete data from a BigQuery table.

Even though using MERGE you can perform multiple operations, for the purposes of dbt the usage is more narrow. The samples below will help you better understand the actual process. “createTable.sql” will create a table with the single entry and “merge.sql” will insert 2 new rows but modify the existing one based on the unique id.

The result is row with id = ‘a’ will have its value updated from 1 to 2.

merge functionality demo
Merge on column “id” as unique id

In short, if we try to re-run a model as incremental, existing rows will update if needed and new ones will be inserted.

How to implement an incremental model

When you configure a dbt model as incremental, you can specify to insert or modify rows based on specific conditions. These are usually date/time constraints. Let’s examine the basic modifications from the previous article.

We will need a column that will serve as a unique identifier. Adobe provides in the Data Feeds the columns hitid_low and hitid_high. The concatenation of which generates a unique hit value. Let’s add this to our root clickstream model.

unique id
Generating unique_hit_id

Next we need to modify the clean and the discarded models accordingly. The below additions (line 3 and 4) will change the models to incremental using column unique_hit_id as the unique value.

model materialisation incremental
Model materialisation

In practise this is the equivalent of the clause destination_table.id = source_table.id (merge.sql – line 11) from the SQL samples.

We will also need to apply a condition to filter the rows. Since we are working with partitioned tables using date_time column, we need to filter the rows (and decrease costs of processing). To do so we will use a macro. Macros allow us to re-use code and enforce the DRY principle. Injecting a macro is easy!

macro placement
Incremental_filter() macro placement

This macro will filter rows based on the start_date and end_date of our execution period. We will define these parameters later on run-time.

macro definition
Incremental_filter() macro definition

In a simpler example, this would be the behaviour of the macro.

macro result
Macro result

Running incremental models

How do we actually run all this? Previously we had defined in the dbt_project.yml file two variables; start_date and end_date. Those are referenced in the incremental_filter() macro.

dbt_project variables
dbt_project.yml

We can overwrite their values during dbt execution using the “–vars” argument. This way we programmatically control which dates we want the process to run for. Or we can build a pipeline that will run daily and process only data from the previous day.

dbt run --models clickstream_clean --vars '{"start_date":"2020-02-15","end_date":"2020-02-16"}'

Final notes

  • Running dbt in full-refresh mode will essentially drop the destination table and recreate it.
  • You can create any kind of condition when applying the incremental filtering. In the most basic examples, the only check is that the event_time of the incoming rows is bigger than the maximum value of the existing rows.
  • If you had to re-upload data for a particular period in your clickstream table, you would need to update all downstream dependent models. To do run dbt run –models clickstream+ –var ‘{“start_date”:”<period start>”,”end_date”:”<period end>”}’. Notice the plus sign “+” after clickstream!

You can see the changes from the previous article on this pull request: https://github.com/konosp/adobe-clickstream-dbt/pull/2/files and the full repository at https://github.com/konosp/adobe-clickstream-dbt

Leave a Reply

Your email address will not be published. Required fields are marked *