Approach III, although the best so far, still doesn’t quite make it. Hence, we now move on to this next one.

Summary

Each row in the updateDF dataframe gets extracted individually, and explicitly updates the corresponding target record.

Details

We no longer rely on the DataFrameWriter API to perform the write operation in a coarse-grained manner at the dataframe level. Instead, we execute at the partition level using the method foreachPartition() provided by the dataframe. At this level, first, we manage ourselves one database connection per partition. Then, for each row in that partition, we manually issue a separate JDBC statement over that connection, updating the single corresponding target record.

Since we don’t operate at the dataframe level, there isn’t a series of dataframe manipulations to illustrate the concept in the Spark shell as shown in the previous approaches. Instead, the implementation is immediately presented.

Implementation

The Code

Here’s the implementing code:

package com.myproject.experiment

import com.myproject.jdbc.utils.SQLHelpers.getConnProperties
import com.myproject.experiment.spark.Helpers.getUpdateInfo
import org.apache.spark.sql.types._
import java.sql.{Date, DriverManager, Timestamp}

import com.myproject.experiment.jdbc.Helpers.execSQL
import org.apache.spark.sql._

object GranularLevelRecordUpdate {
  /**
    * Converts a sparkSql value (as extracted from a dataframe row) to a jdbc value
    * in preparation to write it to a jdbc db via executing a jdbc statement
    * @param sparkSqlValue: sparkSql value to convert
    * @param sparkSqlType: sparkSqlType of the value to convert
    * @return: the scala value converted from the sparkSql value
    */
  def sparkSqlToJdbc(sparkSqlValue: Any, sparkSqlType: DataType): Any = {
    sparkSqlType match {
      case IntegerType => sparkSqlValue.asInstanceOf[Int]
      case StringType => sparkSqlValue.asInstanceOf[String]
      case BooleanType => sparkSqlValue.asInstanceOf[Boolean]
      case LongType => sparkSqlValue.asInstanceOf[Long]
      case FloatType => sparkSqlValue.asInstanceOf[Float]
      case DoubleType => sparkSqlValue.asInstanceOf[Double]
      case TimestampType => sparkSqlValue.asInstanceOf[Timestamp]
      case DateType => sparkSqlValue.asInstanceOf[Date]
    }
  }

  /**
    * Construct the SQL Update statement string template that updates an individual target record
    * This template has placeholders ? to be bound to real values extracted from the updateDF row at runtime
    *
    * @param targetTable: target table name, e.g. transaction_table
    * @param updateCols: scala list of data columns that will be updated, e.g. List(string_data_col, number_data_col)
    * @param uniqueKeyCols: scala list of columns making up the unique key, e.g. List(unique_key_col_1, unique_key_col_2)
    * @return SQl Update Statement string, e.g.
    *         update transaction_table
    *         set string_data_col = ?,
    *             number_data_col = ?
    *         where unique_key_col_1 = ? and
    *               unique_key_col_2 = ?
    */
  def getUpdateSql(targetTable: String, updateCols: List[String], uniqueKeyCols: List[String]): String = {
    def getColBinding(col: String) = col + " = ?"

    val updateClause = "update " + targetTable
    val setClause = "set " + updateCols.map(getColBinding(_)).mkString(", ")
    val whereClause = "where " + uniqueKeyCols.map(getColBinding(_)).mkString(" and ")

    updateClause + " " + setClause + " " + whereClause
  }

  /**
    * Main function:
    *   - Per-partition processing: in each partition of the updateDF, update the target table row by row individually
    *
    * @param updateDF: dataframe that holds data to update
    * @param uniqueKey: comma separated list of columns composing the unique key, e.g. "unique_key_col_1, unique_key_col_2"
    * @param targetTable: target table name, e.g. transaction_table
    */
  def updateTable(updateDF: DataFrame, uniqueKey: String, targetTable: String) = {
    Class.forName("com.mysql.jdbc.Driver")

    // Get from system configuration the url and properties to connect to the database
    val (jdbcUrl, connProperties) = getConnProperties()

    val (updateSchema, uniqueKeyCols, updateCols) = getUpdateInfo(updateDF, uniqueKey)

    val updateSql = getUpdateSql(targetTable, updateCols, uniqueKeyCols)

    updateDF.foreachPartition({rows =>
      val conn = DriverManager.getConnection(jdbcUrl, connProperties)

      rows.foreach({row =>
        try {
          // Perform the main action: update the target record
          execSQL(conn, updateSql,  updateCols.map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) ++
            uniqueKeyCols.map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) )
        } catch {
          case _: Throwable =>
        }
      })

      conn.close()
    })
  }
}

Explanation

Suppose, as an example, for a given existing dataset in the table transaction_table, this Update operation is going to update the column string_data_col. Then, from a partition, to update a specific target record, we’d need to issue over a JDBC connection a SQL statement of the following form:

update transaction_table set string_data_col = ? where unique_key_col = ?

Note that this is a SQL string template with 2 placeholders ?. In the implementation, this template is constructed by the function getUpdateSql() (line 46).

The main actions take place in the function updateTable() (line 64). Notably, this function invokes getUpdateSql() to obtain the needed SQL string template (line 72). Then, it passes this template to execSQL() for execution (line 80; note that execSQL() is presented in the previous post).

For this template to become executable, it needs runtime binding values. These are the values of data columns as well as unique key columns, which are retrieved from the current row in the partition. After retrieval, these values are bound to the placeholders in the template, creating a fully executable SQL statement. This statement then gets executed, updating the target record. All of these happen within lines 80 – 81, with support from the implementation of execSQL() (refer back to this function’s code for further clarification).

To retrieve the value of a given column from a SparkSQL row, we call row.getAs(columnName) (lines 80 – 81). However, this alone is insufficient because Scala doesn’t know what data type to use to interpret the value retrieved.

Fortunately, the type information is available from the schema of the dataframe. Particularly, to retrieve the type of the given column, we call schema(columnName).dataType (lines 80 – 81). However, what this gets us is SparkSQL data type, such as IntegerType, StringType,… We are now back to the regular scala space, where the execSQL() function expects a list of regular scala values for binding. Thus, we need one more step to convert the value retrieved from the row to a value of regular scala type (Int, String,…). The function sparkSqlToJdbc() (line 19) supports this conversion.

Note that the 2 lists, data columns’ values and unique key columns’ values, need to be concatenated in that order. This is because that’s the same order that their placeholders get arranged in the update statement string template. If you follow how the lists of unique key columns and data columns get generated from the function getUpdateInfo() (presented in the previous post), to how they are used to build the update template in getUpdateSql() (line 46), to how everything converges in the call to execSQL() (lines 80 – 81), you’ll see that that’s the case.

Alright, with this implementation, to perform the Update operation, the Spark job would invoke the function updateTable() with a call that might look like this:

updateTable(updateDF, “unique_key_col_1, unique_key_col_2", “transaction_table")

Discussion

This approach is the most different so far. It gives us full control of processing at the record level, instead of merely relying on the DataFrameWriter API. Moreover, it fulfills all the three capabilities. Particularly:

  • Partial Success Requirement: Each record write to the database proceeds in a separate transaction, isolated from other records. If it fails, it has no impact on others. Partial job success is guaranteed.
  • Failure Reporting Requirement: In the implementation above, the catch block of the try catch is empty (lines 82 – 84). If the record processing fails, currently it gets ignored. If instead, we implement something in the catch block that captures and places the failed record in a storage somewhere, this requirement will be fulfilled. We’ll explore this further in a future post.
  • Schema Change Requirement: Note that the structure of the SQL statement to update the target record is inferred programmatically from the schema of updateDF. That means, whatever schema is presented in a given run, it’ll be handled properly.

Would this general approach work for the Insert operation? You bet.

Imagine instead of the SQL update statement, we issue the SQL insert statement that takes the row from the partition, and inserts it into the table. This’ll work. It will also support all the three required capabilities. We’ll explore this in future posts.

Conclusion

This approach is the only one that meets all 5 requirements. Hence, we’ll adopt it for our solution.

However, the implementation presented is only at the level of experiment. Specifically, at least looking at the main code, the try catch block in the updateTable() function (lines 78 – 84), what do you think? I’d argue the way to compile the list of values from the row could be improved regarding understandability, usability, flexibility, maintainability, and robustness. In fact, the same argument also applies to how we build the SQL string template (function getUpdateSql() – line 46).

Moreover, crucially, the astute reader might have noticed this implementation would not work in real production data due to a slightly subtle issue. Furthermore, it’ll likely exhibit sub-optimal performance on non-trivial data size due to another issue. Can you guess what these two issues are (*)? We’ll address them in future posts.



(*) Hint: Both of these issues lie in the function execSQL(), and one also manifests itself in sparkSqlToJdbc()