We now explore the second approach using the Update operation as the first test. Following is its examination.
Summary
In a nutshell, this approach writes update data to a temp table, then uses database trigger to propagate these data to the target table.
Details
We simulate the steps in this approach in the Spark shell. Recall that we are about to update the dataset 199 in the transaction_table, and given the updateDF dataframe as follows:
1 2 3 4 5 6 7 | scala> updateDF.printSchema root |-- unique _ key _ col : string (nullable = false ) |-- string _ data _ col : string (nullable = true ) scala> updateDF.take( 2 ) res 1 : Array[org.apache.spark.sql.Row] = Array([unique _ key _ value _ 1 ,changed _ string _ data _ value _ 1 ], [unique _ key _ value _ 2 ,changed _ string _ data _ value _ 2 ]) |
Note that only the column string_data_col holds fresh data, and number_data_col doesn’t exist in this dataframe.
This approach utilizes a temp table with database trigger:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | create table temp_table ( unique_key_col varchar (1000) not null , dataset_id bigint not null , string_data_col varchar (50), number_data_col int ); create trigger update_transaction_table_row after insert on temp_table for each row begin update transaction_table set string_data_col = new.string_data_col, number_data_col = new.number_data_col where unique_key_col = new.unique_key_col; end ; |
The temp_table’s schema matches that of the transaction_table, but it has no unique key specified. The trigger essentially copies all data of a record from the temp_table to the corresponding record in the transaction_table whenever it gets inserted into the temp_table.
Given this setup, we derive the finalUpdateDF dataframe similar to how it’s done in approach I:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | scala> val currentDF = spark.read.jdbc(jdbcUrl, "(select unique_key_col, dataset_id, number_data_col from transaction_table where dataset_id = 199) transaction_table" , connProperties) currentDF : org.apache.spark.sql.DataFrame = [unique _ key _ col : string, dataset _ id : bigint ... 1 more field] scala> currentDF.take( 2 ) res 2 : Array[org.apache.spark.sql.Row] = Array([unique _ key _ value _ 1 , 199 , 100 ], [unique _ key _ value _ 2 , 199 , 200 ]) scala> val finalUpdateDF = updateDF.join(currentDF, updateDF( "unique_key_col" ) === currentDF( "unique_key_col" )). | drop(currentDF( "unique_key_col" )) finalUpdateDF : org.apache.spark.sql.DataFrame = [unique _ key _ col : string, string _ data _ col : string ... 2 more fields] scala> finalUpdateDF.printSchema root |-- unique _ key _ col : string (nullable = false ) |-- string _ data _ col : string (nullable = true ) |-- dataset _ id : long (nullable = false ) |-- number _ data _ col : integer (nullable = true ) scala> finalUpdateDF.take( 2 ) res 4 : Array[org.apache.spark.sql.Row] = Array([unique _ key _ value _ 2 ,changed _ string _ data _ value _ 2 , 199 , 200 ], [unique _ key _ value _ 1 ,changed _ string _ data _ value _ 1 , 199 , 100 ]) |
Now, we truncate the temp_table (*), then dump the finalUpdateDF to it:
1 | scala> finalUpdateDF.write.option( "truncate" , "true" ).mode(SaveMode.Overwrite).jdbc(jdbcUrl, "temp_table" , connProperties) |
At this point, the temp_table’s trigger will propagate the fresh data to the transaction_table, and we are set.
Discussion
This approach suffers some drawbacks, summarized as follows.
First, in this particular example, if you notice, we only need to refresh column string_data_col of dataset 199 (the updateDF doesn’t carry column number_data_col). But as the lines 12 – 13 indicate, the trigger still has to update all columns for the record. Why?
Remember the Schema Change Requirement? Each run of the SparkSQL job might bring over an updateDF with a different set of columns, which should be a subset of all columns of the transaction_table. But the trigger can’t know and can’t anticipate in advance which columns are present in a given run. Because of this, it has to make a safe bet, accepting to pay the price of doing unnecessary work of blindly updating all columns even when some of them don’t have new data in the current run. This is because in MySQL, we can’t issue dynamic SQL statement in trigger that’s capable of handling unknown columns.
With this implementation, imagine a table with hundreds of columns, and in a given run, only a small fraction of them has fresh data. What a wasteful work (**)!
Second, database trigger in RDBMS systems tends to be an inflexible piece of code, hard to maintain and evolve. When large, busy systems scale in multiple dimensions, maintenance of trigger tends to cause lots of pain over the long run. Generally, moving code and logic from the database side to the application side is a better practice if possible.
Lastly, during trigger execution, if a failure occurs somewhere for some reason, the entire job fails, no records get updated. The Partial Success Requirement is not fulfilled.
Conclusion
These drawbacks eliminate this approach from the race.
(*) This is a temp table entirely dedicated to the processing of this Spark job for this specific target table. In other words, the OLTP apps don’t access it. Furthermore, assume the ETL pipeline 1 is designed such that at most only one update job instance for a given target table is active at a time. This guarantees there’s no race condition, so this truncation action is safe to be performed.
(**) In fact, MySQL engine is smart enough to ignore updating same value to a column, so the actual performance penalty might not be that bad.