Class JdbcRecordUpdateFromSparkSqlRow

Now it’s time to work on the Update facility. The implementation follows:

package com.myproject.experiment.jdbc

import java.sql.Connection
/** Logic for updating a target record in a jdbc table
  * This class enables retrieving values from a SparkSql row or just taking a set of values from somewhere (not necessarily Spark)
  * then updating them to the corresponding target jdbc record
  *
  *  @param conn a jdbc connection
  *  @param table name of the target jdbc table
  *  @param setClauseColumns list of columns to be updated in the target record
  *  @param whereClauseColumns list of columns uniquely identifying the target record to be updated
  */
class JdbcRecordUpdateFromSparkSqlRow(conn: Connection,
                                      val table: String,
                                      val setClauseColumns: List[(String, Option[Any])],
                                      val whereClauseColumns: List[(String, Option[Any])])
extends JdbcRecordWriteFromSparkSqlRow(conn) with SqlStringBuildingSupporter {

  /**
    * Return list of all table columns accessed by this Update sql statement
    */
  override protected def getAllPreparedColumns(): List[(String, Option[Any])] = setClauseColumns ++ whereClauseColumns

  /**
    * Generate the string representing a given column. This string will be plugged in the Update sql statement template
    *
    * @param col: name of the given column
    * @param valueOpt: instantiating value of the given column, which is either:
    *                  - Some(value): represents a literal value to bake in the Update sql string template
    *                  - None: represents a placeholder
    * @return the string representing the given column in the Update sql statement template.
    *         This string would be of the form "col = literal value" or "col = ?"
    */
  private def getOneColumnForUpdateSqlString(col: String, valueOpt: Option[Any]): String = {
    col + " = " + {
      valueOpt match {
        case Some(value) => getLiteralValueForSqlString(value)
        case None => "?"
      }
    }
  }

  /**
    * Construct the Update sql string template
    * This template potentially has placeholders ? and literal values
    *
    * @return Update sql string template, e.g.
    * update transaction_table set string_data_col = 'changed_string_data_value_1',
    *                              number_data_col = null
    *                          where unique_key_col_1 = ? and
    *                                unique_key_col_2 = 127
    */
  override protected def getSqlStmtString(): String = {
    val updateClause = "update " + table
    val setClause = "set " + setClauseColumns.map({ case(col, valueOpt) => getOneColumnForUpdateSqlString(col, valueOpt) }).mkString(", ")
    val whereClause = "where " + whereClauseColumns.map({ case(col, valueOpt) => getOneColumnForUpdateSqlString(col, valueOpt) }).mkString(" and ")

    updateClause + " " + setClause + " " + whereClause
  }

}

Instantiation

To perform an Update operation for a Spark partition, the client code needs to instantiate an instance of this class. In doing so, it needs to specify what the target table is (line 14), and all the columns accessed by the update statement for both the set and the where clauses (lines 15 – 16). Note that these columns are specified according to the specification chosen as discussed in a previous post. This specification allows designating whether a given column is a placeholder or containing a literal value.

The function getAllPreparedColumns() simply returns all the columns of this statement (line 22).

Building the Update SQL Template

The construction of the template is taken care of by the function getSqlStmtString() (line 53). It generates one with the form as the following example:

update transaction_table set string_data_col = 'changed_string_data_value_1’,
                             number_data_col = null,
                             timestamp_data_col = ?
                         where unique_key_col_1 = ? and
                               unique_key_col_2 = 127

The point is each column could arbitrarily be a placeholder or containing a literal value, depending on how it gets specified by the client code during instantiation. Regardless, it’s always of the form col = ? or col = literal value, whether it’s in the set or the where clause. This string representation of a column in the template is generated by the function getOneColumnForUpdateSqlString() (line 34). This function, in turn, relies on getLiteralValueForSqlString() (line 37) to have the proper treatment of string literals (putting a single quote around them).

getSqlStmtString() depends on getOneColumnForUpdateSqlString() to generate the set clause and the where clause of the template at line 55 and 56, respectively.

One thing worth noting here. Given this string representation of a column, in the where clause, what happens when we deal with a null value (whether a literal value of null hard-coded at template construction time, or a binding value of null at execution time)? In that case, the where condition would effectively become something like where some_unique_key_col = null. This always evaluates to false (the more “proper” condition usually should be where some_unique_key_col is null). Is this the correct behavior?

Recall that here, in the where clause, we are only dealing with unique key columns. And MySQL guarantees unique key columns are always non-nullable. Thus, in practice, a search for a record whose some unique key column has the null value will definitely fail. In other words, our implementation yielding the condition where some_unique_key_col = null produces the correct behavior.

That’s about it. To construct a template that looks like the example above, the client code might instantiate a JdbcRecordUpdateFromSparkSqlRow instance like so:

new JdbcRecordUpdateFromSparkSqlRow(conn,
                                   “transaction_table”,
                                    List(
                                         (“string_data_col”, Some(“changed_string_data_value_1”)),
                                         (“number_data_col”, Some(null)),
                                         (“timestamp_data_col”, None)
                                        ),
                                    List(
                                         (“unique_key_col_1”, None),
                                         (“unique_key_col_2”, Some(127))
                                        )
                                   )

In practice, when this Update facility is put to use for an update operation from a Spark partition, the instantiation work by the client code tends to be much simpler. It’s because the list of columns in the set (lines 3 – 7) and where (lines 8 – 11) clauses can usually be inferred programmatically from the updateDF. You’ll see examples in a future post.

Given the template, and as a consequence, the prepared statement, resulted from this instantiation, the actual Update operation for a given record becomes trivial. Again, you’ll see examples in a future post.