Given the background of the case study explained in details in the previous post, now we focus on the requirements of the LOAD step of the SparkSQL jobs in the pipeline 1.

The 5 Requirements

This step, particularly the write operation to the MySQL table, needs to satisfy the following high-level requirements.

Update Requirement


The updateDF needs to be able to update the given existing dataset in the target table. It needs to precisely update all the selected columns of all the dataset’s records. This is a self-evident requirement.

Insert Requirement


The insertDF needs to be able to insert all records of the given new dataset into the target table. Another self-evident requirement.

Partial Success Requirement


This operation needs to support partial job success. Whether it’s a dataset insert or update job, typically it’s a multi-record write operation. If a subset of records fails to write to the table for some reason, they should not affect the remaining records in the same dataset. In other words, the failure in writing some records should not prevent the other records from being persisted.

Failure Reporting Requirement


When failure happens, there should be some mechanism that handles it. Particularly, when there are records in the given dataset (whether it’s an insert or update operation) failing to write to the table, they should be logged or captured somewhere to be reported later.

Schema Change Requirement


Different executions of the SparkSQL jobs against the same table could result in different schemas for the insertDF or updateDF.

Take the update operation against the transaction_table for example. The first triggering event might generate the updateDF with schema (unique_key_col, string_data_col). This is because string_data_col is the only column that changes for the given dataset in the front-end. However, an hour later, the second triggering event for the same dataset might generate the updateDF with a different schema (unique_key_col, number_data_col). That is because within that hour since the first event, this dataset has changed once again but this time the changes occurred only to the number_data_col column.

Similarly, the insertDF generated by the Spark job across different executions for different new datasets against the same transaction_table might have different schemas. For example, when inserting dataset 199, the insertDF might carry the schema (unique_key_col, dataset_id, string_data_col, number_data_col). But when inserting dataset 261 to the same table in a different run, it might carry a different schema (unique_key_col, dataset_id, string_data_col). This is because upon being created in the front-end, the dataset 261 happens to be missing the column number_data_col, which is fine. As long as the column set of the insertDF is a subset of that of the target table, it’s considered a valid schema in this regard.

This requirement dictates that the LOAD step, whether it’s an update or insert operation, must be able to flexibly and adequately handle any schema presented to it at runtime. Particularly:

  1. For a given insertDF, if it’s missing columns from the target table, the new records it inserts into the table should have null value in the missing columns. If those columns are defined as non-nullable in the table, this entire dataframe will fail to insert, and be subject to the Failure Reporting Requirement.
  2. For a given updateDF, if it’s missing data columns from the target table, it should update only data columns present in its schema, and leave its missing columns untouched in the table. In practice, this, in fact, is typically the case.
  3. If the insertDF or updateDF has columns that are not present in the target table’s schema, obviously the write operation is invalid and the entire dataframe will be subject to the Failure Reporting Requirement.

This Schema Change Requirement, although seems tedious to explain, is pretty straightforward.

Summary

We can look at these 5 requirements from a different angle: the LOAD step of the SparkSQL jobs in the pipeline 1 has to fulfill the two specific operations (update, and insert) that support the other three capabilities (partial success, failure reporting, and schema change).