This is a long post. Reading through and getting a good understanding of it will help you follow closely the progressive development in future posts. However, the essence, the takeaway, as alluded in the Introduction post, ultimately all comes down to how to write data from a SparkSQL dataframe to a relational table. This is summarized in the last section.

The following picture provides a simplified high-level view of the case study’s data pipeline.

 

Click on the picture to see the full-size version

 

Overview

The system, within the scope of the discussion of this series, can be seen as composed of 3 major components. On the left are various front-end systems and data sources where data come in from the outside or originate from. In the middle is a large-scale OLTP database, implemented in MySQL, where numerous transactional applications used by many users operate on. On the right is an enterprise data warehouse. Two ETL pipelines connect these 3 major components: data propagate from the front-end systems and data sources via pipeline 1 to the OLTP database, then from the OLTP database to the data warehouse via pipeline 2. In the context of this series, we can ignore the pipeline 2 and the data warehouse. We’ll only be concerned with a small, albeit crucial, point of the pipeline 1, implemented as a set of SparkSQL jobs. That will be explained further in this post.

Front-End Systems, Data Sources, and the notion of “DataSet”

The Front-End

The front-end is composed of numerous systems and data sources where data come in from the outside or originate from. Examples include but not limited to data collected from the UI, from API called by Web or Mobile clients, from third-party or external sources, curated from logs, generated from telemetry and monitoring/measurement devices, events generated from apps, files uploaded by users,… The data in these systems are stored typically under structured (in some kind of SQL database) or semi-structured (Json, XML, Csv, some kind of NoSQL database,…) format that facilitates deducing their schema.

DataSet

All these data are unified under one overarching concept: dataset. A dataset can be understood of as a group of related records or data entries, describing an aspect, a particular entity, or something of meaning to the business. For example, telemetry data reported from sensors within a vehicle in the last hour could form a dataset that might help to evaluate the driver’s performance over time. In this case, the dataset composes of potentially many data entries reported from different sensors read at different moments, assuming these entries can be “standardized” under a standard data schema of interest. Another example would be a sales rep uploading a month-end sales report in csv format. In this second case, the report file itself will become a dataset where each line in the file would make a record of the dataset.

A data entry arrived at, or generated by, these front-end systems, as mentioned above, can be in semi-structured form. However, for the practical purpose of our discussion, it can be considered having some relational-style schema (or being reasonably conveniently transformable to conform to some relational-style schema). Thus, throughout this series, for simplicity of communication, we’ll refer to all datasets as having schema, columns, composed of records,…

Since dataset is a broad notion that covers a wide range of business operations and concerns, a dataset size can vary greatly. In practice, it could be anywhere from just a handful of records to perhaps millions or more.

Once a dataset gets created at the front-end, it’ll be assigned a universally unique id, recognized and understood throughout the entire system. For simplicity, in this series, we represent this dataset id as an 8-byte number (Long data type in Scala, or a bigint in MySQL).

In the picture above, datasets are illustrated by the stars.

OLTP MySQL Database

The front-end is not where the data get transacted on. It’s just an initial staging area where data first arrive at the system. A dataset, after newly created or changed, will be propagated through pipeline 1 to a large-scale replicated MySQL database. This is where data get operated on by various enterprise applications in a true OLTP fashion.

The database consists of multiple tables that house data coming from the front-end. Each table stores datasets of the same business concern or of “similar” type in some way. For example, there’s a table about telemetry data. It houses datasets arriving from multiple different telemetry systems in the front-end, including a system collecting sensor data in real-time, and another system collecting log data in batch,… Of course, by the time these datasets arrive at this table, they have already been “normalized” to conform to the same schema, namely this table’s schema. A second example would be another table about sales data, and so on.

To illustrate, let’s see how such a table looks like (the following is not a real table; the real ones generally have this shape):

create table transaction_table (
  unique_key_col varchar(1000) not null,
  dataset_id bigint not null,
  string_data_col varchar(50),
  number_data_col int,
  unique key (unique_key_col)
);
  • unique_key_col: as the name suggests, uniquely identifies a record in this table. In the real tables, this could be a single column or a combination of multiple columns (composite unique key). Its data type could be varchar or any other valid MySQL type. Note that right at the point when the data entry that corresponds to a record in this table first arrives at the front-end, it obtains its unique_key_col value that is guaranteed to be unique in this table by some specific business rules.
  • dataset_id: uniquely identifies a dataset in the entire system. Again, a dataset is a group of records, which could be anywhere from just a handful to millions of records.
  • string_data_col, number_data_col: data columns of this record. This example table contains only these 2 columns for illustrative purpose; the real tables have tens to hundreds of columns, and their data type could be any valid MySQL type, not just varchar and int as in this example.
  • Whether the table has a primary key is irrelevant in this discussion

Suppose only 2 datasets with id 199, which has 2 records, and 17, which has 1 record, have been propagated into this table, then its data would look like this:

unique_key_col
dataset_id
string_data_col
number_data_col
‘unique_key_value_1′
199
‘string_data_value_1’
100
‘unique_key_value_2’
199
‘string_data_value_2’
200
‘unique_key_value_3’
17
‘string_data_value_3′
300

Triggering Event and Data Propagation through Pipeline 1

The Process

In the front-end, the lifecycle of a dataset starts when it for the very first time gets created (e.g. a sales rep first uploads a csv report for the current month) and assigned a dataset id. When this happens, the system will trigger an event to pipeline 1, specifically requesting this new dataset to be propagated to the corresponding specific target table in the OLTP database.

For example, after dataset 199 first starts its lifecycle in the front-end, an event will be sent to pipeline 1 that would look like this:

{
  “eventType”: “insert”,
  “dataset”: 199,
  “targetTable”: “transaction_table”
}

Pipeline 1 will trigger the appropriate SparkSQL job. This job will read dataset 199 from its source in the front-end into a dataframe, perform some transformations, and morph the dataframe into the final dataframe insertDF. This insertDF is compatible in schema with that of the transaction_table. That means it has the same column set (unique_key_col, dataset_id, string_data_col, number_data_col) with compatible data types. Eventually, the SparkSQL job writes data from insertDF to the given target table.

After this is complete, dataset 199 now exists in the transaction_table. Suppose at some point later, there are changes to certain columns of some records in dataset 199 in the front-end (e.g. the sales rep adjusts some data in the report he previously uploaded). When this happens, a new event will be sent to pipeline 1 requesting the changes to be propagated. Suppose further that these changes are made to only column string_data_col and not number_data_col, then the new event would look like this:

{
  “eventType”: “update”,
  “dataset”: 199,
  “targetTable”: “transaction_table”,
  “columns”: [“unique_key_col”, “string_data_col"]
}

Pipeline 1 will trigger another appropriate SparkSQL job. This job will read only the specified columns of dataset 199 from the front-end into a dataframe, and after performing the required processing, arrive at the final dataframe updateDF. This updateDF’s column set is (unique_key_col, string_data_col) and has compatible data types with the corresponding columns in the transaction_table. Eventually, the SparkSQL job will update data from the updateDF to that target table (*).

After this, if there are changes yet again to dataset 199 in the front-end, the update cycle repeats. Say these new changes this time only involves column number_data_col, then the new triggering event would be:

{
  “eventType”: “update”,
  “dataset”: 199,
  “targetTable”: “transaction_table”,
  “columns”: [“unique_key_col”, “number_data_col"]
}

The eventual resulting updateDF thus would have a different column set (unique_key_col, number_data_col) to ensure the proper propagation of new data to the table.

This update cycle will repeat any time there are changes to an existing dataset.

Process-Related Configurations

I’d like to draw your attention to some details relevant to the development of the solution in future posts.

Unique Key

As explained above, a SparkSQL insert or update job, once triggered by an event to propagate data through pipeline 1, is informed of the dataset id and the target table to operate on. Because it knows the target table name, via the system configuration, it can figure out the table’s unique key. The system will provide the unique key information as a string to the job. This string is a comma-separated list of columns composing the unique key.

For example, if a SparkSQL job kicks off to write data to the transaction_table, via the system configuration, it will learn that the name of this table’s unique key column is the string “unique_key_col”.

Suppose there’s a different table with a composite unique key (unique_key_col_1, unique_key_col_2). When the job operates on this table, it’ll be notified of the unique key as the string “unique_key_col_1, unique_key_col_2”.

From this string, the SparkSQL job will figure out the actual list of columns composing the target table’s unique key.

insertDF

As you might have noticed, the insertDF’s schema has the column set that always follows this general rule:

insertDF’s all columns = (unique key columns) + dataset_id + (data columns)

The columns in this dataframe don’t necessarily have to follow the order as specified in the right-hand side of the above equation.

updateDF

Similarly, the updateDF’s schema has the column set that always follows this general rule:

updateDF’s all columns = (unique key columns) + (data columns)

The columns in this dataframe also don’t necessarily have to follow the order as specified in the right-hand side of the above equation.

The focus point of this series’s discussion

As we’ve seen, the SparkSQL jobs in Pipeline 1 appear to perform a typical ETL duty. In this series, we are not concerned with their EXTRACT and TRANSFORM steps. That is, we don’t care how the jobs read datasets from the front-end or how they transform and move data along the way. Put it another way, we don’t care how the insertDF and updateDF get formed. To our concern, they are somehow “magically” handed over to us.

Our only point of interest is the very last step, the LOAD, where data get persisted from the insertDF or updateDF to the target MySQL table. In other words, we only need to care about how to write data from the insertDF and updateDF to the database. The next post will explore the requirements of this operation.


(*) For the purpose of this series’s discussion, assuming it’s fine that this update might overwrite whatever modifications might have been made by the OLTP applications directly to the dataset 199 in the transaction_table prior to this triggering update event