The previous post introduced the commonalities in implementing the update and insert operations. Now, we translate that insight into some classes presented in this post. First, let’s look at the class sitting at the top of the chain.

Class JdbcRecordWrite

This class is strictly about the jdbc-based write operation business (update or insert a single target record) and has nothing to do with Spark. It encapsulates the following logic: building a SQL string template (update or insert statement), precompiling that template to create a cached prepared statement, determining placeholder columns of that statement, and executing that statement given a set of binding values.

The Code

Its implementation follows:

package com.myproject.experiment.jdbc

import java.sql.{Connection, PreparedStatement}

import com.myproject.experiment.jdbc.Helpers.execSQL

/** Common logic for writing (encompassing both update and insert) to a target record in a jdbc table
  * This class is strictly concerned with the business of writing to a jdbc record,
  * and has nothing to do with Spark
  *
  *  @param conn a jdbc connection
  */
abstract class JdbcRecordWrite(val conn: Connection) {
  private var preparedSql: PreparedStatement = null
  private var placeHolderCols: List[String] = null

  init()

  /**
    * Initializes state, including precompiling the sql write string template, and determining which columns in that template are placeholders
    */
  private def init() = {
    val sqlStmtString = getSqlStmtString()
    preparedSql = conn.prepareStatement(sqlStmtString)
    placeHolderCols = getPlaceHolderCols(getAllPreparedColumns())
  }

  /**
    * Determine which columns of the sql statement are placeholders
    *
    * @param allPreparedColumns: List of all table columns accessed by this sql statement.
    *                            This is a list of tuple (column name, valueOpt), where valueOpt is either:
    *                            - Some(value): represents a literal value to bake in the sql string template
    *                            - None: represents a placeholder
    * @return list of names of placeholder columns
    */
  private def getPlaceHolderCols(allPreparedColumns: List[(String, Option[Any])]): List[String] = {
    allPreparedColumns.filter({ case(col, valueOpt) => valueOpt == None }).map(_._1)
  }

  /**
    * Return list of all table columns accessed by this sql statement; abstract method to be implemented by subclass
    */
  protected def getAllPreparedColumns(): List[(String, Option[Any])]

  /**
    * Return the sql write (update or insert) string template; abstract method to be implemented by subclass
    */
  protected def getSqlStmtString(): String

  /**
    * Make sure the binding columns at runtime when writing a given record matches with the placeholder columns prepared at compile time
    * If mismatch, an exception gets thrown
    *
    * @param boundCols: List of binding columns when writing a given record.
    */
  private def checkBoundCols(boundCols: List[String]) = {
    if (placeHolderCols != boundCols)
      throw new IllegalArgumentException("Invalid request to execute JDBC Statement: placeholder columns and bound columns are incompatible!")
  }

  /**
    * Execute the write operation to the target record
    *
    * @param boundValues: List((column, valueOpt, sqlType)): Scala list of triplets representing the runtime binding values
    *         - column: binding column name
    *         - valueOpt: Some(value) where value is the non-null binding value, or None if null
    *         - sqlType: the type of the binding value as an int; this is the SQL type code defined in java.sql.Types
    */
  def execute(boundValues: List[(String, Option[Any], Int)]) = {
    checkBoundCols(boundValues.map(_._1))

    execSQL(preparedSql, boundValues.map({ case (_, jdbcValue, jdbcType) => (jdbcValue, jdbcType) }))
  }
}

Explanation

Some further notes about this implementation are in order:

  • getSqlStmtString() (line 49): is an abstract method. It produces the SQL string template (either an update or insert statement). It’s implemented in the concrete subclasses for the Update and Insert operations (detailed in a future post).

  • As mentioned in the previous post, when instantiating the SQL string template that will compile to the prepared statement used by a write operation, we need a mechanism that enables specifying a given column in the template to be either a placeholder or one with a literal value. We choose a column specification to be a tuple of type (String, Option[Any]). The value of the first element of the tuple would be the column name. The second element’s value would be one of two choices:
    • If it’s Some(x), then x would be interpreted as the literal value meant to be baked into this template for this column.
    • If it’s None, then this column would be considered a placeholder. In this case, a question mark ? would be placed in its value’s position in the template. This question mark will be bound to a real runtime value before executing the fully-prepared statement in the database server.
  • Given this column specification, when instantiating an write operation (Update or Insert), the client code needs to provide a list of target table columns to be accessed by the operation as a List[(String, Option[Any])]. You’ll see examples of this in future posts (*).
  • getAllPreparedColumns() (line 44): return the list of all columns accessed by this write operation mentioned in the previous point. It’s an abstract method to be implemented by subclasses, either for the Update or Insert operation.
  • getPlaceholderCols() (line 37): from the list of all columns returned by the previous method, infer all the placeholder columns used in this template.
  • checkBoundCols() (line 57): when writing to a target record, typically a set of binding values will be supplied. Each value in this set needs to be bound to the right placeholder column. For this reason, the client code is required to provide the column name together with its value. This method then checks to make sure the placeholder column and the column provided by the caller are a match. It needs to do that for all binding values.
  • execute(): performs the write operation by calling execSQL().

Trait WritingFromSparkSqlRow

The class JdbcRecordWrite has nothing to do with Spark. Hence, here we declare a trait that introduces Spark into the picture. It defines the signature of the operation that retrieves values of specified columns from a given SparkSQL row then writes them to the target database record. It’s a short and straightforward trait:

package com.myproject.experiment.jdbc

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType

/** Defines behavior for retrieving values from a SparkSql row and writing to a target jdbc record
  *
  */
trait WritingFromSparkSqlRow {
  def execute(row: Row, schema: StructType, boundCols: List[String])
}

Class JdbcRecordWriteFromSparkSqlRow

Now we need to integrate the above two artifacts to have a class that enables writing to a target record from a SparkSQL row. The execute() method defined by the trait is implemented in terms of the overloaded counterpart defined by the “root” class (line 25). It should be pretty self-explanatory:

package com.myproject.experiment.jdbc

import java.sql.Connection

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import com.myproject.experiment.spark.Helpers.retrieveJdbcValuesFromSparkSqlRow

/** Common logic for writing (encompassing both update and insert) to a target record in a jdbc table
  * This class enables retrieving values from a SparkSql row and writing them to the corresponding target jdbc record
  *
  *  @param conn a jdbc connection
  */
abstract class JdbcRecordWriteFromSparkSqlRow(conn: Connection) extends JdbcRecordWrite(conn) with WritingFromSparkSqlRow {
  /**
    * Execute the write operation to the target record
    *
    * @param row: sparkSql row to retrieve values from
    * @param schema: schema of the dataframe the row belongs to
    * @param boundCols: Scala list of columns in the row to retrieve values for
    */
  def execute(row: Row, schema: StructType, boundCols: List[String]) = {
    val boundValues = retrieveJdbcValuesFromSparkSqlRow(row, schema, boundCols)

    execute(boundValues)
  }
}

One note here is that now we can see the benefit and convenience of retrieveJdbcValuesFromSparkSqlRow() (line 23) returning the column name for each of the values retrieved. These column names will then be used to match against placeholder column names in binding their values to the prepared statement for execution.


(*) Example of column list in update instantiation, and insert instantiation.