With both the Update and Insert facilities now in place, we are ready to implement the Update and Insert operations. We tackle the Update operation first in this post.
Update target record with values purely obtained from SparkSQL row
This is the simplest, most standard and basic case. All values involved in the operation, including those used in the where clause to identify the record (unique key values), as well as those written to the record from the set clause, are retrieved from the SparkSQL row.
Recall that at this stage in the SparkSQL job of the pipeline 1, we are handed over the updateDF, told what table to update to, what its unique key (potentially a composite one) is in the form of a string (comma separated list of unique key columns), and what the id of the dataset the updateDF originates from is. Given that, the implementation of the Update operation is relatively intuitive and straightforward:
package com.myproject.experiment import java.sql.DriverManager import com.myproject.experiment.jdbc.JdbcRecordUpdateFromSparkSqlRow import com.myproject.experiment.spark.Helpers._ import com.myproject.jdbc.utils.SQLHelpers.getConnProperties import org.apache.spark.sql.DataFrame object UpdateOperation { /** * Update the target table row by row individually from each partition of the updateDF * * @param updateDF: dataframe that holds data to update * @param uniqueKey: comma separated list of columns composing the unique key, e.g. "unique_key_col_1, unique_key_col_2" * @param targetTable: target table name, e.g. transaction_table * @param datasetId: id of the original dataset from which updateDF is derived, e.g. 199 */ def updateTable(updateDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = { Class.forName("com.mysql.jdbc.Driver") // Get from system configuration the url and properties to connect to the database val (jdbcUrl, connProperties) = getConnProperties() val (updateSchema, uniqueKeyCols, updateCols) = getUpdateInfo(updateDF, uniqueKey) updateDF.foreachPartition({rows => val conn = DriverManager.getConnection(jdbcUrl, connProperties) try { val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, updateCols.map((_, None)), uniqueKeyCols.map((_, None))) rows.foreach({row => try { targetRecordUpdate.execute(row, updateSchema, updateCols ++ uniqueKeyCols) } catch { case _: Throwable => } }) } finally { conn.close() } }) } }
As you can see, in initializing the object targetRecordUpdate (line 31), for both lists of columns updateCols and uniqueKeyCols, we pass them in with the transformation map((_, None)). This results in all these columns becoming placeholders in the Update string template.
With that, during the actual execution of the update (line 35), all these columns’ values are first retrieved from the current row. After that, they are bound in the right order to their placeholders, effectively performing the desired update to the target record.
The implementation of all the record write facilities so far enables us to write arguably pretty concise, understandable, and elegant code for this standard case.
Building the Update string template with literal values
In the above case, the Update string template contains only placeholders. When the need arises, we can include in it literal values, too. This section gives such an example.
Given this example table:
create table transaction_table ( unique_key_col varchar(1000) not null, dataset_id bigint not null, string_data_col varchar(50), number_data_col int, unique key (unique_key_col) );
Uniquely identifying a single target record to update requires only the unique_key_col column. But hypothetically, suppose for some reason, we’d like additionally to use the dataset_id column, too (at least, it doesn’t hurt!). That is, we’d like to issue a template that might look like this:
update transaction_table set string_data_col = ?, number_data_col = ? where unique_key_col = ? and dataset_id = 199
Note that in our context here, the updateDF from which to update the target table contains data for a specific dataset whose dataset_id is known. In other words, dataset_id is a literal value that gets passed into the function updateTable() via the argument datasetId (line 19). Thus, all the information needed to generate the above template is readily available.
So, handling the new requirement is straightforward. The only small change to the code presented in the previous section is made to the initialization of the targetRecordUpdate object at line 31:
val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, updateCols.map((_, None)), uniqueKeyCols.map((_, None)) :+ ("dataset_id", Some(datasetId)))
That’s it. We only need to append to the transformed uniqueKeyCols list an element that represents the dataset_id column having the specified literal value. All the rest of the code stays the same.
Update target record with dynamic values computed on the fly
So far, values involved in the operation are either binding values from the SparkSQL row or literal values that stay constant across rows. There are situations where the third type of values emerges. Let’s suppose the transaction table has one more column like so:
create table transaction_table ( unique_key_col varchar(1000) not null, dataset_id bigint not null, string_data_col varchar(50), number_data_col int, write_timestamp datetime, unique key (unique_key_col) );
Whenever a new record first gets created, or an existing record gets updated, the write_timestamp column (line 6) will note the timestamp of that operation. MySQL provides a mechanism to handle this at the database level, but for the sake of our example, suppose it’s the job of the application to do that. Then, continuing from our example above, the Update string template now needs to become:
update transaction_table set string_data_col = ?, number_data_col = ?, write_timestamp = ? where unique_key_col = ? and dataset_id = 199
All placeholder columns but write_timestamp obtain their binding values from the current SparkSQL row. This current row doesn’t have value for write_timestamp. This value, to be precise, is the timestamp of the very moment when the update operation for the current row is taking place. In other words, our program has to “capture” this value from the environment, from the system then feed it as a binding value to the write_timestamp placeholder for updating the current record.
How to fulfill this new requirement of updating write_timestamp? We can achieve it with the following modification to the implementation. First, some additional imports:
import java.util.Calendar import java.sql.{DriverManager, Timestamp} import java.sql.Types.TIMESTAMP
Then, in the updateTable() function (line 19), the changes take place only within the outer try block (lines 30 – 42) of the updateDF.foreachPartition() call. This block will become:
try { val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, (updateCols :+ "write_timestamp").map((_, None)), uniqueKeyCols.map((_, None)) :+ ("dataset_id", Some(datasetId))) rows.foreach({row => try { val now = new Timestamp(Calendar.getInstance().getTime().getTime) val updateColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, updateCols) val uniqueKeyColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, uniqueKeyCols) targetRecordUpdate.execute((updateColsBoundValues :+ ("write_timestamp", Some(now), TIMESTAMP)) ++ uniqueKeyColsBoundValues) } catch { case _: Throwable => } }) } finally { conn.close() }
-
First, the write_timestamp is added as a placeholder column to the set clause of the Update template. This is done in the initialization for the targetRecordUpdate object (line 2).
-
Because now, the current SparkSQL row itself is insufficient in providing all values required by the target record, we can no longer rely on this version of the execute() method. Instead, we need the other version (line 10).
-
That requires us to “manually” build the binding values to feed to the placeholders. First, we compute the now variable for the write_timestamp (line 6).
-
Then, we need to extract values for the required columns from the current row, using retrieveJdbcValuesFromSparkSqlRow() (lines 7 – 8)
-
Finally, we concatenate all these values together, honoring the order specified in the targetRecordUpdate initialization (line 2), and feed them to the execute() call (line 10).
-
Note that in this last step, we need to construct the value for write_timestamp according to the format required by execute(). Namely, it has to be the triplet (column name, Some(value), jdbc type).
This demonstrates the general case for which, in writing to the target record, the current row doesn’t provide all the values needed. Some values that change across rows (in other words, binding values) need to be obtained from some other sources, or computed dynamically on the fly somehow. In such a case, we need to resort to the more “basic” version of the execute() method, and likely “manually” construct the values needed appropriately.