Load Data Feeds to Google BigQuery

Adobe Analytics reporting solutions (i.e. Reports & Analytics, Analysis Workspace, Report Builder or Data Warehouse) offer a wide range of options to view, understand and analyse your data. Admittedly, this covers the majority of most peoples’ data needs. However there comes a time that you need something more. You need the extra granular detail that otherwise is too complicated to get to or you just need a different approach in analyzing your corporate data. There are a number of reasons and those really depend on the particular use case. But even if there is no specific case and it is for exploration’s sake, Adobe Data Feedsis a really useful data extraction solution from Adobe.

However their use can be a little intimidating for first time users. This can be due to a number of reasons:

  1. The format of the data; each row represents individual events taking place on the site instead of the the traditional dimension/metric approach.
  2. Too many technical columns not really relevant or even known to most people.
  3. Availability of columns that contain data before and after additional data processing steps.

Combining the above with potentially huge volume of traffic can easily lead to unmanageable situations making data exploration and analysis almost unfeasible (remember; each page view generates a row in the final extract). Most people work in laptops, does everybody have a laptop that can handle 20-30 GB worth of data in-memory without significantly limiting other functionalities? What about constant queries in a local database? Would you be patient for your queries to take 15 minutes to return some basic results? In those cases a good cloud-based data processing and warehousing solution really helps. It gives the Analytics team the flexibility to perform custom analysis, ETL jobs on demand or scheduled and scale up or down as they see fit within their own “Analytics Lab”; true flexibility that empowers innovation!

This leads to this walkthrough; the key steps required to setup an on-going data pipeline from Adobe Analytics to Google BigQuery.

Let’s get started!

Pre-requisites

The examples shown below will be in Python. Furthermore, you will need at minimum a Google Cloud Platform account with enabled billing and at minimum owner-level access to StorageCompute and BigQuery. For first time users there is a trial version with a few hundred pounds available to spend any way you like. This is ideal for experimentation.

Data Feed configuration & FTP setup

Once you get to the Data Feed configuration, you will notice two sections. General delivery settings and the columns needed for the extract.

General delivery includes the report suite(s), feed interval and date range, delivery option. The simplest option is FTP. If you do not have an FTP server available, Adobe offers this service free of charge. If you use an Adobe-provided FTP server, keep in mind that there is an upper limit of 50 MB. If this is exceeded (depending on the traffic of your report suite and the columns selected), the delivery will fail and you can lose data. In this case you might want to go with hourly feed interval so data is split in smaller parts and avoid the said risk.

In the columns section, you need to specify which fields you want the feed to include. There is a huge list of options ranging from standard technology and geolocation attributes, to all the custom eVars, props and events. The documentation also elaborates on the concept of the “post_” columns versus the regular ones. This will require some additional investment from your end to understand what works best for you. If you are not certain, you can always select both versions of a variable (i.e. both evar1 and post_evar1) and query the appropriate one within BigQuery. One drawback of the data feeds is, if you want to adjust the feed by adding/removing columns and apply this change on historical data, you need to re-configure and re-run the feed.

Note: Adobe natively supports the delivery to Microsoft Azure or Amazon S3. In theory you can send the files to AWS and then pull them into Google Cloud Storage using a transfer job but this implies maintaining active accounts and paying two cloud providers. This is not my personal favorite and the explored approach is restricted to GCP-only resources.

Key settings:

  • Remove Escaped Characters: Do check this! It removes from the feeds escaped characters which cause problems down the line. For example, if in your data collection process includes the tab character “\t”, this is automatically escaped as “\\t”. However as the extracts are tab-delimited files, BigQuery loading process is very easy to break. It is safer to remove them altogether and make your process more reliable.
  • Compression format: Gzip. Data loading into BigQuery is flexible but not with regards to compression. Out of the two options Adobe offers (zip or gzip), BigQuery supports only gzip. So stick with it. This way the .tsv files can be directly loaded without decompressing them while also archiving them in Storage without decompressing them. This can have a big impact in long-term billing.
  • Packaging type: Multiple files. This is a personal preference but I like multiple files. This way the only content that needs decompression are the lookup files. Those contain information like the click stream headers (configured below), look-ups for browser details, operating systems etc. The reason is that the the actual hit-level data include only a mapping reference to those values but not the actual text. If you select Single File, then everything is compressed under a single .zip file.
  • Manifest: Manifest File. It is usually good idea to include the manifest file. This way you can include an automated step to parse the file, extract the data files’ names, validate the MD5 check-sum before pushing the files to the next step of the process. Depending on the Feed Interval (hourly or daily), this can contain multiple entries in each file.

The settings mentioned can be found in the middle section of the New Data Feed page but sometime it is easy to not pay attention to them:

Job scheduling

The extract is scheduled! Now the complicated part. You need to orchestrate the job. With some basic Linux skills, everything can be done within a small Virtual Machine.

Create a small VM; machine type can even be the smallest one; f1-micro (1 vCPU, 0.6 GB memory). The process is not memory or CPU intensive by itself and the VM is for orchestration purposes. The cost is low and if you use the same machine for scheduling additional jobs, the cost per job drops even further.

After the VM is initiated you will need first to make sure Python and pip are installed. Usually the VMs come with Python/pip pre-installed but if for any reason they are not, you can get this easily done following these instructions. Once you are done, you can install all the python requirements through pip (full list):

pip install -r requirements.txt

The actual scheduling operation can be done using cron jobs. In Linux you can execute scripts hourly if you place them under /etc/cron.hourly. The file needs to be executable and not have any file extension (i.e. it needs to be named “execute_pipeline”):

#!/bin/sh
cd /project-path/ &&
python main.py

Data transfer (1/2) – Download through FTP

In order to fetch the data from the FTP, the easiest way is to install pysftp through pip. pysftp provides an easy utility to download entire folders from a remote path:

import pysftp

# Create the connection
ftp_connection = pysftp.Connection(
    host = ftp_host , 
    username = ftp_username,
    password = ftp_password
)
      
# Copy the contents of "feeds_remote_path" into "feeds_local_folder"
ftp_connection.get_d(remotedir = feeds_remote_path, 
localdir = feeds_local_folder)

Data pre-processing (optional)

If you want to be extra cautious about the files you process, this is where the manifest files come into play. They provide a summary of the completed files along with a way to validate them. The most useful info is the name of the file and the MD5 checksum. You can parse the manifest files, extract file names and MD5 values and validate versus the actual files.

Datafeed-Manifest-Version: 1.0
	Lookup-Files: 1
	Data-Files: 1
	Total-Records: 611

	Lookup-File: bugzilla_2012-09-09-lookup_data.tar.gz
	MD5-Digest: af6de42d8b945d4ec1cf28360085308
	File-Size: 63750

	Data-File: 01-bugzilla_2012-09-09.tsv.gz
	MD5-Digest: 9c70bf783cb3d0095a4836904b72c991
	File-Size: 122534
	Record-Count: 611

Only then a file can continue to the next step of the processing. The hashlib package offers a nice way to calculate MD5.

Note: Because we are parsing click stream data, it might be the case the files are in the in the size of 20-30 MB (or even more). You can parse each file in chunks and gradually calculate the overall MD5 check-sum per file to make sure you do not run the Virtual Machine out of memory and crash it (we are using the smallest machine type after all and GCP VMs do not come with Swap space for some reason):

import hashlib

final_hash_md5 = ''

hash_md5 = hashlib.md5()
with open(file_name_with_path, 'rb') as f:
    for chunk in iter(lambda: f.read(4096), b''):
        hash_md5.update(chunk)

final_hash_md5 = hash_md5.hexdigest()

Data transfer (2/2) – Upload to Google Storage

Data files can be easily uploaded with the using the Cloud Storage Python SDK. In particular, you can iterate through all files and upload them one-by-one using the very convenient upload_from_filename method.

from google.cloud import storage
from google.cloud.storage import Blob

# Initiate GCP client and bucket client
client = storage.Client(project='')
bucket = client.get_bucket('')

# Create blob object
blob_name = "path/to/blob/file-1.tsv.gz"
blob = bucket.blob(blob_name)

# Perform upload of data from the local file to the blob object on GCP
blob.upload_from_file(local_file_name)

At random times I have noticed that the uploaded files cannot be properly loaded into BigQuery due to the presence of the null ASCII character (“Bad character (ASCII 0) encountered”). Fortunately by setting the content-type properly resolves this issue.

blob.upload_from_filename(file_name, content_type='application/gzip')

You can modify retrospectively the content type for any given file. The utility tool gsutil that comes with the Google Cloud SDK provides a flexible set of commands. Simply run this in your local command line or through Google Cloud Shell:

gsutil -m setmeta -h "Content-Type: application/gzip" \
    gs://bucket name/path/to/blobs/*.tsv.gz 

Make sure the blob path name contains the file extension as well i.e. “path/to/blob/*.tsv.gz” instead of “path/to/blob/*”. This prevents the parsing of temporary files that might have been created during the data transfer process.

Data Loading into BigQuery

The generic instructions to load a CSV/TSV into a table are easy to follow and this is a great starting point. A few changes are required to make Adobe Data Feeds imports more reliable and less prone to break. The CSV options provide a range of configurations (some of them are not available in the BigQuery console):

from google.cloud import bigquery

job_config = bigquery.LoadJobConfig()
job_config.allow_jagged_rows = True
job_config.ignore_unknown_values = True
job_config.field_delimiter = '\t'
job_config.null_marker = ''
job_config.write_disposition = 'WRITE_APPEND'
job_config.skip_leading_rows = 0
job_config.source_format = bigquery.SourceFormat.CSV

# Configure partitioning
partitioning = bigquery.table.TimePartitioning(field='date_time',
   require_partition_filter = True)

job_config.time_partitioning = partitioning


# The following are not available in the console but are important!
job_config.encoding = 'ISO-8859-1' 
job_config.quote_character = '^'

All options are added to a job configuration object which later on is used during the load operation. The most important are:

Partition: Using time partition to limit your queries (and your costs) is a good long term strategy. Thankfully Adobe provides a field (date_time) that is compatible with the expected format of BigQuery. Make sure you include this when you configure your feed. The attribute “require_partition_filter” makes the usage of the field mandatory during querying time and therefore your users aware of this explicitly.

Encoding: Data Feeds are automatically encoded in ISO-8859-1 and there is no option to change it.

Quote Character: BigQuery by default uses the double quote (“) to understand quoted sections. In the case of Data Feeds however, double-quote does not have any special meaning. Instead we have already removed tabs, new lines, backslashes (see Table Special Characters -1) using the configuration “Remove Escaped Characters” in the feed creation process. Also the caret symbol (^) is being used to escape a set of characters (see Table Special Characters – 2).

So why is this important? If your data collection process contains double-quotes, they will not be escaped and the loading process will fail. My experimentation so far has shown that setting this to the caret symbol works always (even though this might not be the case in all implementations).

Table Special Characters – 1:

Table Special Characters – 2:

Adobe Data Feeds – Special Characters reference

Another aspect of the loading process is the table schema. The quick and easy way is to set all columns to STRING and during querying time cast the types to the most appropriate one (i.e. INT64). The only exception to this is the ‘date_time’ column which is used for partitioning; it needs to be set as ‘TIMESTAMP’. To get the correct column names you have two options:

  1. Manually download the headers from the Data Feeds console. A CSV file containing all headers, each one is a separate line. Then you can store the file locally and reference it every time the loading process is executed.
  2. Extract and parse on every execution the file named “<report suite>_<date>_<hour>-lookup_data.tar.gz. It contains a tarball with all the lookup values and the hit data headers. The tarfile python module helps us extract them in a destination folder.
import tarfile

tar_file = tarfile.open(lookup_data_tar)
tar_file.extractall(path = destination_folder)

No matter which method you follow, you need to make sure the column names follow the requirements set by BigQuery. The following uses regular expression while iterating through all column headers (assuming you have already passed them into a list) and extract only valid characters. Eventually the schema is added to the job configuration. Additionally the type of the field “date_time” is set to TIMESTAMP due to partition purposes.

import re

headers = ['evar1','evar2', ... , 'visid_high','visid_low']

headers_regex_pattern = '[^a-zA-Z0-9_]'
regex = re.compile(headers_regex_pattern)

for header in headers: 
    '''Fields must contain only letters, numbers, and underscores.
    Start with a letter or underscore, 
    and be at most 128 characters'''
    
    column_name = regex.sub('',header)
    column_type = 'STRING'

    # This is required for the partitioning

    if (header == 'date_time'):
        column_type = 'TIMESTAMP'
        
    column = bigquery.SchemaField(column_name, column_type)
    schema_fields.append(column)

job_config.schema = schema_fields

The actual load operation is straight-forward.

from google.cloud import bigquery


uri = 'gs://bucket name/path/to/blobs/*.tsv.gz'
dataset = 'dataset name'
destination_table = 'click_stream'

client = bigquery.Client(project = 'project-id')
dataset_ref = client.dataset(dataset)


load_job = client.load_table_from_uri(uri,\
            dataset_ref.table(destination_table),\
            job_config = job_config)  # job_config is defined earlier

print('Starting job {}'.format(load_job.job_id))
load_job.result()  # Waits for table load to complete.


print('Job finished.')

If for any reason the job fails, the initial error message might not be the most explanatory. In those cases it is always helpful to investigate the load job error stream in Google Cloud Shell:

bq --format=prettyjson show -j job-id

On my last run, it took around 20 minutes to load around 150 GB compressed tsv files. The volumes will vary depending on the traffic of your site, number of columns, information that you collect etc.

Once the table is available, you can see in the Schema tab all the columns created. Personally what fascinates me most if the amount of processing Adobe performs in the background. It is not easy to grasp it until you start exploring the feeds and realize the amount of burden that is hidden from the end-users.

150 GB of compressed tsv files with 500 columns explodes into more than 2 TB and 650 million rows of data. That’s just for a couple of months.

Next time you will ask for the last two years of product views broken down by device, marketing channel, product classifications performed by visitors that have made an order with AOV higher than £20 in previous Black Friday, take a moment to appreciate the work that takes place in the background!

Querying time

Basic queries are easy. However you want to be cost efficient as well. This is where time parting becomes useful (BigQuery Pricing Best Practices). BigQuery charges based on the amount of data you process per query. There is a free tier for the 1st TB processed but that is easy to exceed.

One way to limit costs is to select only relevant columns. However if you have billions of lines, you still going to process a lot of data. For example, if you table contains the last 2 years worth of data but you want to analyse just two months, all the rows will still be processed (and billed) even if you use a WHERE or LIMIT clause. This additional billing is avoided with time partitioning during the table creation step. And by enforcing the time partition parameter to be required, essentially you are telling your users to actively think of the date range and the potential implications.

-- Number of page views by day (excluding discard hits)

SELECT
  EXTRACT(DATE FROM date_time) AS day,
  COUNT(date_time) AS hits_volume
FROM
  `[dataset name].click_stream`
WHERE
  date_time > '2018-08-01'
  AND date_time < '2018-08-10'
  AND page_event = '0'
  AND exclude_hit = '0'
GROUP BY
  day
ORDER BY
  day ASC

This creates a big saving in the long term which probably is hard to realize initially. It comes at the expense of always requiring a WHERE clause for all queries for this table; which is negligible.

What about the products variable? The initial data is not in the most practical format admittedly due to merchandising evars, success event as well as the presence of multiple products in the same hit.

But this is where SQL becomes interesting! The following calculates per day the top-3 most viewed products.

-- Create temporary table with products viewed
WITH product_info AS (
  SELECT
    EXTRACT(DATE FROM date_time) AS day,
    SPLIT(product_list,',') AS products
  FROM
    `[dataset name].click_stream`
  WHERE
    date_time >= '2018-08-02'
    AND date_time < '2018-08-05'
    AND page_event = '0'
    AND exclude_hit = '0'
    AND product_list IS NOT NULL)


SELECT
  day,
  t.product_id,
  t.daily_rank,
  t.instances
FROM (

  -- Exports product id from the products string and calculates the daily rank
  SELECT
    day,
    SPLIT(prd,';')[OFFSET(1)] AS product_id,
    COUNT(prd) AS instances,
    RANK() OVER(PARTITION BY day ORDER BY COUNT(prd) DESC) AS daily_rank
  FROM
    product_info,
    UNNEST(products) AS prd
  WHERE
    prd != ''
  GROUP BY
    day,
    product_id
  ORDER BY
    day ASC,
    instances DESC 
   -- End of nested statement

) as t
WHERE t.daily_rank <= 3

And you get this nice and clean view.

There is a lot above to go through but in order to be able to process and analyse the products variable you need to experiment with arrays in BigQuery, especially the UNNEST statementand its related functionality. In a similar fashion you can extract and format merchandising eVars and metrics. There are powerful functions that will help you greatly in your in-depth analysis!

Additional points

There are a few aspects not really explored in the above walkthrough. Will just summarize:

  • It is assumed that you are authenticated using you personal account. Google Cloud SDKs offer the option to authenticate with a service account as well (highly recommended). Python packages for the different GCP solutions offer an authentication method through a service account.
  • The lookup tables contain information about browser versions, location etc. They act as lookup tables and can be loaded in a similar manner. The difference is that their schema is just two columns with headers “key” and “value” of type string.
  • Instead of a VM in Cloud Compute, a local machine can be used instead. Instead of the cost of the VM, you will have to maintain and operate the machine yourself.
  • Alternative solutions can be used for scheduling like Google Cloud Composer but for the extend and purposes of this walkthrough, a simple VM is more than enough.
  • Alternative approaches can be followed in data processing. Storing compressed data in Storage is much cheaper than decompressed in BigQuery. For this reason, you can always write Dataflow or PySpark jobs to parse your files and load in BigQuery only the necessary columns.

I am offering an out of the box solution that implements the process described. Check https://analyticsmayhem.com/consulting/adobe-analytics-clickstream-bigquery-connector-beta/ for more details!

PS: If you are interested in cross-device reporting, check my previous article.