We now start exploring different approaches with the Update operation as the first evaluation criterion. Following is the first potential solution.
Summary
This first approach is the most naive, which relies most heavily on available dataframe operations. Essentially, we are trying to force the DataFrameWriter API to fit our use case.
The high-level idea is from the updateDF, which carries new data, and the existing target dataset in the target table, which may carry not out-of-date data (besides out-of-date data to be updated), we build a full replacement in Spark for existing data. Then, we swap it with the current dataset in the table.
Details
We simulate the steps in this approach in the Spark shell. Recall that we are about to update the dataset 199 in the transaction_table, and given the updateDF dataframe as follows:
scala> updateDF.printSchema root |-- unique_key_col: string (nullable = false) |-- string_data_col: string (nullable = true)
As can be seen, updateDF has the columns (unique_key_col, string_data_col) because string_data_col is the only column that has fresh data.
scala> updateDF.take(2) res1: Array[org.apache.spark.sql.Row] = Array([unique_key_value_1,changed_string_data_value_1], [unique_key_value_2,changed_string_data_value_2])
This approach proceeds as follows:
- First, we read dataset 199 from transaction_table into a dataframe currentDF:
scala> val currentDF = spark.read.jdbc(jdbcUrl, "(select unique_key_col, dataset_id, number_data_col from transaction_table where dataset_id = 199) transaction_table", connProperties) currentDF: org.apache.spark.sql.DataFrame = [unique_key_col: string, dataset_id: bigint ... 1 more field] scala> currentDF.take(2) res2: Array[org.apache.spark.sql.Row] = Array([unique_key_value_1,199,100], [unique_key_value_2,199,200])
Note that we pick number_data_col, which is not out-of-date, instead of string_data_col. Additionally, we also pick dataset_id.
-
Then we issue a SQL DELETE statement that deletes the entire dataset 199 from the transaction_table. In the real SparkSQL job of the Pipeline 1, we could accomplish this from the driver program via two steps: first, establish a jdbc connection to the database; then, issue a SQL statement like the following via that connection:
delete from transaction_table where dataset_id = 199
-
Then we join updateDF and currentDF via unique_key_col. After that, we morph the join result into a schema that matches the transaction_table, arriving at the dataframe finalUpdateDF:
scala> val finalUpdateDF = updateDF.join(currentDF, updateDF("unique_key_col") === currentDF("unique_key_col")). | drop(currentDF("unique_key_col")) finalUpdateDF: org.apache.spark.sql.DataFrame = [unique_key_col: string, string_data_col: string ... 2 more fields] scala> finalUpdateDF.printSchema root |-- unique_key_col: string (nullable = false) |-- string_data_col: string (nullable = true) |-- dataset_id: long (nullable = false) |-- number_data_col: integer (nullable = true) scala> finalUpdateDF.take(2) res4: Array[org.apache.spark.sql.Row] = Array([unique_key_value_2,changed_string_data_value_2,199,200], [unique_key_value_1,changed_string_data_value_1,199,100])
-
Now finally, insert data in this finalUpdateDF using SaveMode.Append mode back into transaction_table:
scala> finalUpdateDF.write.mode(SaveMode.Append).jdbc(jdbcUrl, "transaction_table", connProperties)
During this process, while the dataset of interest is absent from the transaction_table, there might need to have some mechanism in place such that if other OLTP applications access its records, they’ll be informed of the situation to react accordingly.
Discussion
The major drawback of this approach is that if the transaction_table has dependent tables, e.g. transaction_table_2 that references it via a foreign key, in step 2 we need to also delete the dependent records from transaction_table_2. Worse, if the transaction_table_2, in turn, has transaction_table_3 that depends on it via a foreign key, then we need to recursively delete yet another set of dependent records from transaction_table_3. This situation could go on and on. Basically, this approach requires cascading delete in the general case. This will become very complex and messy.
That’s not to mention if for some reason step 4 fails (even just one record fails to insert), not just no new data get updated, but we also lose existing data. The only remedy is we do some extra work of perhaps restoring existing data from currentDF back into transaction_table in the case of failure (*), and of course do the same for transaction_table_2, transaction_table_3, and so on. Too cumbersome. Not worth it.
Conclusion
These drawbacks rule this approach out from further consideration.
val currentDF = spark.read.jdbc(jdbcUrl, "(select * from transaction_table where dataset_id = 199) transaction_table", connProperties)