The first two approaches don’t cut it. We now move on to the third one, in search of a solution that can reasonably perform the Update operation.
Summary
We dump the updateDF dataframe to a temp table, then issue a SQL Update statement that joins that temp table with the target table and updates the target records at the same time.
Details
We simulate the steps in this approach in the Spark shell. Recall that we are about to update the dataset 199 in the transaction_table, and given the updateDF dataframe as follows:
scala> updateDF.printSchema root |-- unique_key_col: string (nullable = false) |-- string_data_col: string (nullable = true) scala> updateDF.take(2) res1: Array[org.apache.spark.sql.Row] = Array([unique_key_value_1,changed_string_data_value_1], [unique_key_value_2,changed_string_data_value_2])
Now, we dump this dataframe to a temp table:
scala> updateDF.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, “transaction_table_update_temp", connProperties)
Note that this operation will first drop the transaction_table_update_temp if it exists (*). After that, it will recreate the table with the same schema as updateDF. Finally, it will dump data from the dataframe to the newly recreated table.
Then, we’d issue the following SQL statement that performs the final action. Just as mentioned in step 2 of approach I, in the real SparkSQL job, we’d execute this from the driver program via a jdbc connection:
update transaction_table target, transaction_table_update_temp temp set target.string_data_col = temp.string_data_col where target.unique_key_col = temp.unique_key_col
Discussion
Essentially, this approach relies on one single update statement wrapped in one single database transaction. If something fails during the execution of the update, the entire batch fails, nothing gets updated. Also, there’s no capturing of the failed records (everything fails anyway!). In other words, the Partial Success Requirement and Failure Reporting Requirement are not fulfilled.
Conclusion
This prompts us to continue our search, and we arrive at approach IV next.
Implementation
Motivation
This approach is nevertheless somewhat promising. It doesn’t suffer from some severe limitations as do the previous two approaches. It does the job; namely, it successfully performs the Update operation when things go well. It can also flexibly handle schema. The only concern is it doesn’t support the other two capabilities (when not everything goes well).
Recall that in the discussion about the sufficiency (or lack thereof) of the DataFrameWriter API to the Insert operation, we mentioned that there might be a way to minimize the chance of failure of the write step to MySQL. That argument applies to this Update operation, too. If we can implement validations in the TRANSFORM step of the SparkSQL job, problematic records potentially causing update failure might have already been filtered out before they get to the updateDF. In that case, both the Partial Success Requirement and Failure Reporting Requirement might no longer be that critical.
Given that, the implementation of this third approach might be worth a try. Presented following is the essence of that implementation. It’s meant to be incorporated into the Update operation of the SparkSQL job of Pipeline 1.
Implementing Code
Working on the implementation, we have to depart from aspects specific to the illustrative example presented earlier. In that example, we deal with the transaction_table that has a single column unique key (unique_key_col), only 1 data column to update (string_data_col),… The implementation, on the other hand, has to handle the general case. For example, it has to be able to deal with a target table having a composite unique key; it has to smoothly handle any dynamic number of data columns requiring to update (Schema Change Requirement),… That means our code needs to be parameterized appropriately to be generic.
With that, first, let’s look at some note-worthy small supporting utilities. They are used not just for this approach, but throughout the series. They are pretty heavily commented to aid your understanding. In fact, for all our implementation in the series, the Scala code presented is generally comprehensively commented.
Function getUpdateInfo()
Recall that the SparkSQL job that carries out this Update operation gets triggered by an event. It obtains the unique key information of the target table as a string. This simple function getUpdateInfo() takes this uniqueKey string as an argument. Based on that, it figures out the actual list of unique key columns, and as a result, the list of data columns to update. It simply follows the formula for the updateDF’s columns set.
Following is the code of this function. Together with comments, it should be self-explanatory:
package com.myproject.experiment.spark import org.apache.spark.sql.types.StructType import org.apache.spark.sql.DataFrame object Helpers { /** * Get from the dataframe information relevant to the update operation * *@param updateDF: dataframe that holds data to update *@param uniqueKey: comma separated list of columns composing the unique key, e.g. "unique_key_col_1, unique_key_col_2" *@return tuple (schema, uniqueKeyColList, updateColList) * - schema: schema of updateDF * - uniqueKeyColList: scala list of columns making up the unique key, e.g. List(unique_key_col_1, unique_key_col_2) * - updateColList: scala list of data columns that will be updated, e.g. List(string_data_col, number_data_col) */ def getUpdateInfo(updateDF: DataFrame, uniqueKey: String): (StructType, List[String], List[String]) = { val uniqueKeyCols = uniqueKey.split(",\\W*") val updateCols = updateDF.columns diff uniqueKeyCols (updateDF.schema, uniqueKeyCols.toList, updateCols.toList) } }
Function execSQL()
We need a utility to perform the last action. This function will execute the final SQL Update statement via a jdbc connection from the driver program. The implementation utilizes the Java JDBC API. This is an important function. It will be referenced multiple times throughout the rest of the series.
This function is based on the concept of Prepared Statement in the Java JDBC API. This concept covers notions like placeholders ? in the SQL statement string, binding values at runtime, precompilation of SQL string template, server-side prepared statement, and so on. These notions will become crucial later on in the series. So if you are not yet familiar with them, you might want to take a look a bit.
For now, within the scope of this post, don’t worry. We largely won’t touch those concepts. It’s because the SQL Update statement string used in this approach has no placeholders, and no binding values involved.
Alright, here’s the implementation:
package com.myproject.experiment.jdbc import java.sql.{Connection, Date, Timestamp} object Helpers { /** * Prepare then execute an arbitrary non-query SQL statement * *@param conn: jdbc connection *@param sql: SQL statement string template, which may have placeholders ? to be plugged in with real binding values at execution time *@param boundValues: list of binding values for the prepared statement */ def execSQL(conn: Connection, sql: String, boundValues: List[Any]) = { val preparedSql = conn.prepareStatement(sql) for((boundValue, boundPosition) <- boundValues zip (1 to boundValues.size)) { boundValue match { case i: Int => preparedSql.setInt(boundPosition, i) case s: String => preparedSql.setString(boundPosition, s) case l: Long => preparedSql.setLong(boundPosition, l) case b: Boolean => preparedSql.setBoolean(boundPosition, b) case t: Timestamp => preparedSql.setTimestamp(boundPosition, t) case d: Date => preparedSql.setDate(boundPosition, d) case db: Double => preparedSql.setDouble(boundPosition, db) case f: Float => preparedSql.setFloat(boundPosition, f) } } val ret = preparedSql.execute } }
Some quick further notes. This function takes a SQL string template, and a list of runtime binding values. First, it calls jdbcConnection.prepareStatement() to create the prepared statement. Then, it binds all the binding values to their respective placeholder position in the statement. Finally, it executes the “fully-prepared” statement against the database.
As you’ll see in the next section, the SQL statement string passed to this function is final. It has no placeholders because it’s of the form mentioned earlier. Thus, the execution of this function in this approach’s implementation effectively skips the for loop (lines 15 – 26).
The Main Logic
With the supporting utilities in place, we are now ready to tackle the main logic.
First, we need to build the SQL Update statement string used in the final step. This is taken care of by the function getUpdateSql() (line 25).
The actual main sequence of actions takes place in the function updateTable() (line 47). As you can see, it follows the steps laid out in the Details section.
package com.myproject.experiment import java.sql.DriverManager import com.myproject.experiment.jdbc.Helpers.execSQL import com.myproject.experiment.spark.Helpers.getUpdateInfo import com.myproject.jdbc.utils.SQLHelpers.getConnProperties import org.apache.spark.sql.{DataFrame, SaveMode} object BatchUpdateViaTableJoin { /** * Construct the SQL Update statement string that joins the temp table with the target table and updates the target records * *@param targetTable: target table name, e.g. transaction_table *@param tempTable: temp table name, e.g. transaction_table_update_temp *@param updateCols: scala list of data columns that will be updated, e.g. List(string_data_col, number_data_col) *@param uniqueKeyCols: scala list of columns making up the unique key, e.g. List(unique_key_col_1, unique_key_col_2) *@return SQL Update Statement string, e.g. * update transaction_table target, transaction_table_update_temp temp * set target.string_data_col = temp.string_data_col, * target.number_data_col = temp.number_data_col * where target.unique_key_col_1 = temp.unique_key_col_1 and * target.unique_key_col_2 = temp.unique_key_col_2 */ def getUpdateSql(targetTable: String, tempTable: String, updateCols: List[String], uniqueKeyCols: List[String]): String = { val (targetTableAlias, tempTableAlias) = ("target", "temp") def getColMapping(col: String) = targetTableAlias + "." + col + " = " + tempTableAlias + "." + col val updateClause = "update " + targetTable + " " + targetTableAlias + ", " + tempTable + " " + tempTableAlias val setClause = "set " + updateCols.map(getColMapping(_)).mkString(", ") val whereClause = "where " + uniqueKeyCols.map(getColMapping(_)).mkString(" and ") updateClause + " " + setClause + " " + whereClause } /** * Main function: * - Dumps data from the updateDF to temp table * - Propagates fresh data from the temp table to the target table * *@param updateDF: dataframe that holds data to update *@param uniqueKey: comma separated list of columns composing the unique key, e.g. "unique_key_col_1, unique_key_col_2" *@param targetTable: target table name, e.g. transaction_table *@param tempTable: temp table name, e.g. transaction_table_update_temp */ def updateTable(updateDF: DataFrame, uniqueKey: String, targetTable: String, tempTable: String) = { Class.forName("com.mysql.jdbc.Driver") // Get from system configuration the url and properties to connect to the database val (jdbcUrl, connProperties) = getConnProperties() val (updateSchema, uniqueKeyCols, updateCols) = getUpdateInfo(updateDF, uniqueKey) val updateSql = getUpdateSql(targetTable, tempTable, updateCols, uniqueKeyCols) // Write to temp table updateDF .write .mode(SaveMode.Overwrite) .jdbc(jdbcUrl, tempTable, connProperties) val conn = DriverManager.getConnection(jdbcUrl, connProperties) try { // Perform the main action: update the target table execSQL(conn, updateSql, Nil) conn.close() } catch { case e: Throwable => { if (conn != null) conn.close() throw e } } } }
The updateTable() function obtains the SQL Update statement string from the function getUpdateSql() (line 55). Then, it passes that string to the call to execSQL() (line 67). As you can see, it also passes an empty list of binding values (Nil) to that call. This is because the Update statement string is itself sufficient and executable. It will do the job of updating the target records without needing any further runtime values to be bound to.
Recall that the Spark job for the Update operation is provided by the system configuration the unique key information. In this approach, besides the unique key information, in the system configuration, we also map each target table to its temp table. Thus, once the Spark job kicks off, it also figures out the name of the target table’s corresponding temp table for this process. This temp table name gets passed to the call to updateTable().
In summary, to perform the Update operation, the Spark job will invoke the function updateTable() with a call that might look like this:
updateTable(updateDF, “unique_key_col_1, unique_key_col_2", “transaction_table", “transaction_table_update_temp”)
(*) Similar to in approach II, we assume there’s no race condition such that this dropping of the temp table is safe to do.