I might or might not push the code to GitHub. Let me see. If I do, until then, here’s the listing of the essence of the code referenced in this series. They are generally heavily commented to aid your understanding.

SparkSQL Values Retrieval Facility

package com.myproject.experiment.spark

import java.sql.Types._
import java.sql.{Date, Timestamp}

import org.apache.spark.sql.types._
import org.apache.spark.sql._

object Helpers {

  /**
    * Get information from the dataframe relevant to the update operation
    *
    * @param updateDF: dataframe that holds data to update
    * @param uniqueKey: comma separated list of columns composing the unique key, e.g. "unique_key_col_1, unique_key_col_2"
    * @return tuple (schema, uniqueKeyColList, updateColList)
    *               - schema: schema of updateDF
    *               - uniqueKeyColList: scala list of columns making up the unique key, e.g. List(unique_key_col_1, unique_key_col_2)
    *               - updateColList: scala list of data columns that will be updated, e.g. List(string_data_col, number_data_col)
    */
  def getUpdateInfo(updateDF: DataFrame, uniqueKey: String): (StructType, List[String], List[String]) = {
    val uniqueKeyCols = uniqueKey.split(",\\W*")
    val updateCols = updateDF.columns diff uniqueKeyCols

    (updateDF.schema, uniqueKeyCols.toList, updateCols.toList)
  }

  /**
    * Retrieves the values of columns of interest in jdbc types from a SparkSQL row
    * @param row: sparkSql row to retrieve values from
    * @param schema: schema of the dataframe the row belongs to
    *                Note that given the row, the schema can be inferred within this function
    *                But passing in the schema available from the client code instead of inferring it every time for each row is slightly more efficient
    * @param cols: Scala list of columns in the row to retrieve values for
    * @return: List((column, valueOpt, sqlType)): Scala list of triplets representing the values retrieved
    *         - column: column name
    *         - valueOpt: Some(value) where value is the scala value converted from the sparkSql value, or None if the sparkSql value is null
    *         - sqlType: the type of the converted value as an int; this is the SQL type code defined in java.sql.Types
    */
  def retrieveJdbcValuesFromSparkSqlRow(row: Row, schema: StructType, cols: List[String]): List[(String, Option[Any], Int)] = {
    /**
      * Retrieves the values of columns of interest in the original SparkSQL types from a SparkSQL row
      * @param row: sparkSql row to retrieve values from
      * @param schema: schema of the dataframe the row belongs to
      * @param cols: Scala list of columns in the row to retrieve values for
      * @return: List((column, sparkSQL value, sparkSQL type)): Scala list of triplets representing the values retrieved
      *         - column: column name
      *         - sparkSQL value: original SparkSQL value, possibly null
      *         - sparkSQL type: type of the SparkSQL value as defined in org.apache.spark.sql.types
      */
    def retrieveValuesFromSparkSqlRow(row: Row, schema: StructType, cols: List[String]): List[(String, Any, DataType)] = {
      cols.map(col => (col, row.getAs(col), schema(col).dataType))
    }

    /**
      * Converts a sparkSql value (as extracted from a dataframe row) to a jdbc value
      * in preparation to write it to a jdbc db via executing a jdbc statement
      * @param sparkSqlValue: sparkSql value to convert
      * @param sparkSqlType: sparkSqlType of the value to convert
      * @return: (valueOpt, sqlType)
      *         - valueOpt: Some(value) where value is the scala value converted from the sparkSql value, or None if the sparkSql value is null
      *         - sqlType: the type of the converted value; this is the SQL type code defined in java.sql.Types
      */
    def sparkSqlToJdbc(sparkSqlValue: Any, sparkSqlType: DataType): (Option[Any], Int) = {
      val sparkSqlTypeToJdbcType: Map[DataType, Int] = Map(LongType -> BIGINT, IntegerType -> INTEGER, BooleanType -> BOOLEAN, StringType -> VARCHAR, TimestampType -> TIMESTAMP, DateType -> DATE, DoubleType -> DOUBLE, FloatType -> FLOAT)

      val jdbcValue = if (sparkSqlValue != null) {
        Some(sparkSqlType match {
          case IntegerType => sparkSqlValue.asInstanceOf[Int]
          case StringType => sparkSqlValue.asInstanceOf[String]
          case BooleanType => sparkSqlValue.asInstanceOf[Boolean]
          case LongType => sparkSqlValue.asInstanceOf[Long]
          case FloatType => sparkSqlValue.asInstanceOf[Float]
          case DoubleType => sparkSqlValue.asInstanceOf[Double]
          case TimestampType => sparkSqlValue.asInstanceOf[Timestamp]
          case DateType => sparkSqlValue.asInstanceOf[Date]
        })
      } else None

      (jdbcValue, sparkSqlTypeToJdbcType(sparkSqlType))
    }

    retrieveValuesFromSparkSqlRow(row, schema, cols).map(elem => {
      val (jdbcValue, jdbcType) = sparkSqlToJdbc(elem._2, elem._3)
      (elem._1, jdbcValue, jdbcType)
    })
  }
}

Utility to execute SQL statement in the DB via JDBC

package com.myproject.experiment.jdbc

import java.sql.{Date, PreparedStatement, Timestamp}

object Helpers {
  /**
    * Execute an arbitrary non-query SQL statement
    *
    * @param preparedSql: a precompiled SQL statement, which may have placeholders ? to be plugged in with real binding values at execution time
    * @param boundValues: list of binding values (boundValueOpt, sqlType) for the prepared statement
    *                   - boundValueOpt: Some(boundValue) or None, which represents null
    *                   - sqlType: the SQL type code defined in java.sql.Types; only used when the boundValue is null
    */
  def execSQL(preparedSql: PreparedStatement, boundValues: List[(Option[Any], Int)]) = {
    for (((boundValueOpt, sqlType), boundPosition) <- boundValues zip (1 to boundValues.size)) {
      if (boundValueOpt != None) {
        boundValueOpt.get match {
          case i: Int => preparedSql.setInt(boundPosition, i)
          case s: String => preparedSql.setString(boundPosition, s)
          case l: Long => preparedSql.setLong(boundPosition, l)
          case b: Boolean => preparedSql.setBoolean(boundPosition, b)
          case t: Timestamp => preparedSql.setTimestamp(boundPosition, t)
          case d: Date => preparedSql.setDate(boundPosition, d)
          case db: Double => preparedSql.setDouble(boundPosition, db)
          case f: Float => preparedSql.setFloat(boundPosition, f)
        }
      } else {
        preparedSql.setNull(boundPosition, sqlType)
      }
    }
    val ret = preparedSql.execute
  }
}

Generic JDBC Write Facility

package com.myproject.experiment.jdbc

import java.sql.{Connection, PreparedStatement}

import com.myproject.experiment.jdbc.Helpers.execSQL

/** Common logic for writing (encompassing both update and insert) to a target record in a jdbc table
  * This class is strictly concerned with the business of writing to a jdbc record,
  * and has nothing to do with Spark
  *
  *  @param conn a jdbc connection
  */
abstract class JdbcRecordWrite(val conn: Connection) {
  private var preparedSql: PreparedStatement = null
  private var placeHolderCols: List[String] = null

  init()

  /**
    * Initializes state, including precompiling the sql write string template, and determining which columns in that template are placeholders
    */
  private def init() = {
    val sqlStmtString = getSqlStmtString()
    preparedSql = conn.prepareStatement(sqlStmtString)
    placeHolderCols = getPlaceHolderCols(getAllPreparedColumns())
  }

  /**
    * Determine which columns of the sql statement are placeholders
    *
    * @param allPreparedColumns: List of all table columns accessed by this sql statement.
    *                            This is a list of tuple (column name, valueOpt), where valueOpt is either:
    *                            - Some(value): represents a literal value to bake in the sql string template
    *                            - None: represents a placeholder
    * @return list of names of placeholder columns
    */
  private def getPlaceHolderCols(allPreparedColumns: List[(String, Option[Any])]): List[String] = {
    allPreparedColumns.filter({ case(col, valueOpt) => valueOpt == None }).map(_._1)
  }

  /**
    * Return list of all table columns accessed by this sql statement; abstract method to be implemented by subclass
    */
  protected def getAllPreparedColumns(): List[(String, Option[Any])]

  /**
    * Return the sql write (update or insert) string template; abstract method to be implemented by subclass
    */
  protected def getSqlStmtString(): String

  /**
    * Make sure the binding columns at runtime when writing a given record matches with the placeholder columns prepared at compile time
    * If mismatch, an exception gets thrown
    *
    * @param boundCols: List of binding columns when writing a given record.
    */
  private def checkBoundCols(boundCols: List[String]) = {
    if (placeHolderCols != boundCols)
      throw new IllegalArgumentException("Invalid request to execute JDBC Statement: placeholder columns and bound columns are incompatible!")
  }

  /**
    * Execute the write operation to the target record
    *
    * @param boundValues: List((column, valueOpt, sqlType)): Scala list of triplets representing the runtime binding values
    *         - column: binding column name
    *         - valueOpt: Some(value) where value is the non-null binding value, or None if null
    *         - sqlType: the type of the binding value as an int; this is the SQL type code defined in java.sql.Types
    */
  def execute(boundValues: List[(String, Option[Any], Int)]) = {
    checkBoundCols(boundValues.map(_._1))

    execSQL(preparedSql, boundValues.map({ case (_, jdbcValue, jdbcType) => (jdbcValue, jdbcType) }))
  }
}
package com.myproject.experiment.jdbc

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType

/** Defines behavior for retrieving values from a SparkSql row and writing to a target jdbc record
  *
  */
trait WritingFromSparkSqlRow {
  def execute(row: Row, schema: StructType, boundCols: List[String])
}
package com.myproject.experiment.jdbc

import java.sql.Connection

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import com.myproject.experiment.spark.Helpers.retrieveJdbcValuesFromSparkSqlRow

/** Common logic for writing (encompassing both update and insert) to a target record in a jdbc table
  * This class enables retrieving values from a SparkSql row and writing them to the corresponding target jdbc record
  *
  *  @param conn a jdbc connection
  */
abstract class JdbcRecordWriteFromSparkSqlRow(conn: Connection) extends JdbcRecordWrite(conn) with WritingFromSparkSqlRow {
  /**
    * Execute the write operation to the target record
    *
    * @param row: sparkSql row to retrieve values from
    * @param schema: schema of the dataframe the row belongs to
    * @param boundCols: Scala list of columns in the row to retrieve values for
    */
  def execute(row: Row, schema: StructType, boundCols: List[String]) = {
    val boundValues = retrieveJdbcValuesFromSparkSqlRow(row, schema, boundCols)

    execute(boundValues)
  }
}

Utility to support building SQL String Template

package com.myproject.experiment.jdbc

/** Provides utility to support building the sql string template that will compile to the prepared statement
  *
  */
trait SqlStringBuildingSupporter {
  /**
    * Generate the proper presentation format for a literal value in the template
    *
    * @param value: the literal value of concern
    * @return the literal value in the presentation format acceptable by the string template
    */
  protected def getLiteralValueForSqlString(value: Any): Any = {
    if (value == null) "null"
    else {
      value match {
        case s: String => "'" + s + "'"
        case v: Any => v
      }
    }
  }
}

The Update Facility

package com.myproject.experiment.jdbc

import java.sql.Connection
/** Logic for updating a target record in a jdbc table
  * This class enables retrieving values from a SparkSql row or just taking a set of values from somewhere (not necessarily Spark)
  * then updating them to the corresponding target jdbc record
  *
  *  @param conn a jdbc connection
  *  @param table name of the target jdbc table
  *  @param setClauseColumns list of columns to be updated in the target record
  *  @param whereClauseColumns list of columns uniquely identifying the target record to be updated
  */
class JdbcRecordUpdateFromSparkSqlRow(conn: Connection,
                                      val table: String,
                                      val setClauseColumns: List[(String, Option[Any])],
                                      val whereClauseColumns: List[(String, Option[Any])])
extends JdbcRecordWriteFromSparkSqlRow(conn) with SqlStringBuildingSupporter {

  /**
    * Return list of all table columns accessed by this Update sql statement
    */
  override protected def getAllPreparedColumns(): List[(String, Option[Any])] = setClauseColumns ++ whereClauseColumns

  /**
    * Generate the string representing a given column. This string will be plugged in the Update sql statement template
    *
    * @param col: name of the given column
    * @param valueOpt: instantiating value of the given column, which is either:
    *                  - Some(value): represents a literal value to bake in the Update sql string template
    *                  - None: represents a placeholder
    * @return the string representing the given column in the Update sql statement template.
    *         This string would be of the form "col = literal value" or "col = ?"
    */
  private def getOneColumnForUpdateSqlString(col: String, valueOpt: Option[Any]): String = {
    col + " = " + {
      valueOpt match {
        case Some(value) => getLiteralValueForSqlString(value)
        case None => "?"
      }
    }
  }

  /**
    * Construct the Update sql string template
    * This template potentially has placeholders ? and literal values
    *
    * @return Update sql string template, e.g.
    * update transaction_table set string_data_col = 'changed_string_data_value_1',
    *                              number_data_col = null
    *                          where unique_key_col_1 = ? and
    *                                unique_key_col_2 = 127
    */
  override protected def getSqlStmtString(): String = {
    val updateClause = "update " + table
    val setClause = "set " + setClauseColumns.map({ case(col, valueOpt) => getOneColumnForUpdateSqlString(col, valueOpt) }).mkString(", ")
    val whereClause = "where " + whereClauseColumns.map({ case(col, valueOpt) => getOneColumnForUpdateSqlString(col, valueOpt) }).mkString(" and ")

    updateClause + " " + setClause + " " + whereClause
  }

}

The Insert Facility

package com.myproject.experiment.jdbc

import java.sql.Connection

/** Logic for inserting a record into a jdbc table
  * This class enables retrieving values from a SparkSql row or just taking a set of values from somewhere (not necessarily Spark)
  * then using them to create the corresponding new target jdbc record
  *
  *  @param conn a jdbc connection
  *  @param table name of the target jdbc table
  *  @param insertColumns list of columns to be inserted with the target record
  */
class JdbcRecordInsertFromSparkSqlRow(conn: Connection,
                                      val table: String,
                                      val insertColumns: List[(String, Option[Any])])
extends JdbcRecordWriteFromSparkSqlRow(conn) with SqlStringBuildingSupporter {

  /**
    * Return list of all columns inserted with the record by this Insert sql statement
    */
  override protected def getAllPreparedColumns(): List[(String, Option[Any])] = insertColumns

  /**
    * Construct the Insert sql string template
    * This template potentially has placeholders ? and literal values
    *
    * @return Insert sql string template, e.g.
    * insert into transaction_table
    *             (unique_key_col_1, unique_key_col_2, dataset_id, string_data_col, number_data_col)
    *             values (?, ?, 199, null, ?)
    */
  override protected def getSqlStmtString(): String = {
    val insTableClause = "insert into " + table
    val colNamesClause = "(" + insertColumns.map(_._1).mkString(", ") + ")"
    val valuesClause = "(" + insertColumns.map({ case (_, valueOpt) => if (valueOpt != None) getLiteralValueForSqlString(valueOpt.get) else "?" }).mkString(", ") + ")"

    insTableClause + " " + colNamesClause + " values " + valuesClause
  }
}

The Failure Reporting Facility

create table error_log (
    dataset_id bigint not null,
    k text not null,
    v mediumtext,
    error_message text,
    write_timestamp datetime
);
package com.myproject.experiment.errorhandling

import java.sql.{Connection, Timestamp}
import java.sql.Types._
import java.util.Calendar

import com.myproject.experiment.jdbc.JdbcRecordInsertFromSparkSqlRow
import com.myproject.experiment.spark.Helpers.retrieveJdbcValuesFromSparkSqlRow

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row

/** Log a failed record write to the error_log table
  *
  *  @param conn a jdbc connection
  *  @param datasetId id of the dataset the failed record belongs to
  *  @param uniqueKeyColumns list of unique key columns of the record
  *  @param dataColumns list of data columns of the record
  */
class FailedRecordHandler(conn: Connection, datasetId: Long, uniqueKeyColumns: List[String], dataColumns: List[String]) {

  private val uniqueKeyColsSet = uniqueKeyColumns.toSet
  private val dataColsSet = dataColumns.toSet

  // Initialize the object to insert into the error_log table
  private val errorLogInsert = new JdbcRecordInsertFromSparkSqlRow(conn, "error_log", List(
    ("dataset_id", Some(datasetId)),
    ("k", None),
    ("v", None),
    ("error_message", None),
    ("write_timestamp", None)
  ))

  /**
    * Concatenate a list of triplet values into a single triplet value
    *
    * @param combinedColName: name of the column component of the result triplet value. It represents a column in the error_log table
    * @param columns: List of triplet values to concatenate. A triplet (column name, valueOpt, jdbcType) is typically obtained from a SparkSQL row
    * @return the single concatenated triplet. Its value would then be inserted to the corresponding column of this failed record in error_log table
    */
  private def getCombinedColumn(combinedColName: String, columns: List[(String, Option[Any], Int)]): (String, Option[Any], Int) = {
    def combineCol(accuCol: Tuple3[String, Option[Any], Int], col: Tuple3[String, Option[Any], Int]): (String, Option[Any], Int) = {
      val colValue = col._2 match {
        case Some(v) => col._1 + ":::: " + v.toString
        case None => col._1 + ":::: "
      }

      var accuColValue = accuCol._2.get
      accuColValue = if (accuColValue == "") colValue else accuColValue + " | " + colValue

      (accuCol._1, Some(accuColValue), accuCol._3)
    }

    columns.foldLeft((combinedColName, Some("").asInstanceOf[Option[Any]], VARCHAR))(combineCol)
  }

  /**
    * Insert the values of the failed record to the error_log table
    *
    * @param e: Exception that got raised when the record failed to write to the target table
    * @param uniqueKeyValues: Values of the unique key columns of the failed record
    * @param dataValues: Values of the data columns of the failed record
    */
  def handle(e: Throwable, uniqueKeyValues: List[(String, Option[Any], Int)], dataValues: List[(String, Option[Any], Int)]): Unit = {
    if (uniqueKeyColsSet != uniqueKeyValues.map(_._1).toSet)
      throw new Exception("FailedRecordHandler: invalid set of unique key columns!")

    if (dataColsSet != dataValues.map(_._1).toSet)
      throw new Exception("FailedRecordHandler: invalid set of data columns!")

    val kColBoundValue = getCombinedColumn("k", uniqueKeyValues)
    val vColBoundValue = getCombinedColumn("v", dataValues)
    val now = new Timestamp(Calendar.getInstance().getTime().getTime)

    errorLogInsert.execute(List(kColBoundValue, vColBoundValue, ("error_message", Some(e.toString), VARCHAR), ("write_timestamp", Some(now), TIMESTAMP)))
  }

  /**
    * Insert the values of the failed record to the error_log table
    *
    * @param e: Exception that got raised when the record failed to write to the target table
    * @param row: The SparkSQL row from which the failed record is derived
    * @param schema: Schema of the SparkSQL row from which the failed record is derived
    */
  def handle(e: Throwable, row: Row, schema: StructType): Unit = {
    val uniqueKeyValues = retrieveJdbcValuesFromSparkSqlRow(row, schema, uniqueKeyColumns)
    val dataValues = retrieveJdbcValuesFromSparkSqlRow(row, schema, dataColumns)

    handle(e, uniqueKeyValues, dataValues)
  }
}

The Update Operation

package com.myproject.experiment

import java.util.Calendar
import java.sql.{DriverManager, Timestamp}
import java.sql.Types.TIMESTAMP

import com.myproject.experiment.errorhandling.FailedRecordHandler
import com.myproject.experiment.jdbc.JdbcRecordUpdateFromSparkSqlRow
import com.myproject.experiment.spark.Helpers._
import com.myproject.jdbc.utils.SQLHelpers.getConnProperties
import org.apache.spark.sql.DataFrame

object UpdateOperation {
  /**
    * Update the target table row by row individually from each partition of the updateDF
    *
    * @param updateDF: dataframe that holds data to update
    * @param uniqueKey: comma separated list of columns composing the unique key, e.g. "unique_key_col_1, unique_key_col_2"
    * @param targetTable: target table name, e.g. transaction_table
    * @param datasetId: id of the original dataset from which updateDF is derived, e.g. 199
    */
  def updateTableV1(updateDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = {
    Class.forName("com.mysql.jdbc.Driver")

    // Get from system configuration the url and properties to connect to the database
    val (jdbcUrl, connProperties) = getConnProperties()

    val (updateSchema, uniqueKeyCols, updateCols) = getUpdateInfo(updateDF, uniqueKey)

    updateDF.foreachPartition({rows =>
      val conn = DriverManager.getConnection(jdbcUrl, connProperties)

      try {
        val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, updateCols.map((_, None)), uniqueKeyCols.map((_, None)))
        val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, updateCols)

        rows.foreach({row =>
          try {
            targetRecordUpdate.execute(row, updateSchema, updateCols ++ uniqueKeyCols)
          } catch {
            case e: Throwable => failedRecordHandler.handle(e, row, updateSchema)
          }
        })
      } finally {
        conn.close()
      }
    })
  }

  def updateTableV2(updateDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = {
    Class.forName("com.mysql.jdbc.Driver")

    // Get from system configuration the url and properties to connect to the database
    val (jdbcUrl, connProperties) = getConnProperties()

    val (updateSchema, uniqueKeyCols, updateCols) = getUpdateInfo(updateDF, uniqueKey)

    updateDF.foreachPartition({rows =>
      val conn = DriverManager.getConnection(jdbcUrl, connProperties)

      try {
        val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, (updateCols :+ "write_timestamp").map((_, None)), uniqueKeyCols.map((_, None)) :+ ("dataset_id", Some(datasetId)))
        val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, updateCols)

        var updateColsBoundValues: List[(String, Option[Any], Int)] = null
        var uniqueKeyColsBoundValues: List[(String, Option[Any], Int)] = null

        rows.foreach({row =>
          try {
            val now = new Timestamp(Calendar.getInstance().getTime().getTime)
            updateColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, updateCols)
            uniqueKeyColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, uniqueKeyCols)

            targetRecordUpdate.execute((updateColsBoundValues :+ ("write_timestamp", Some(now), TIMESTAMP)) ++ uniqueKeyColsBoundValues)
          } catch {
            case e: Throwable => failedRecordHandler.handle(e, uniqueKeyColsBoundValues, updateColsBoundValues)
          }
        })
      } finally {
        conn.close()
      }
    })
  }

}

The Insert Operation

package com.myproject.experiment

import java.sql.Types._

import com.myproject.jdbc.utils.SQLHelpers.getConnProperties
import com.myproject.experiment.jdbc.JdbcRecordInsertFromSparkSqlRow
import java.sql.{DriverManager, Timestamp}
import java.util.Calendar

import com.myproject.experiment.errorhandling.FailedRecordHandler
import com.myproject.experiment.spark.Helpers._
import org.apache.spark.sql._

object InsertOperation {
  /**
    * Insert into the target table row by row individually from each partition of the insertDF
    *
    * @param insertDF: dataframe that holds data to insert
    * @param targetTable: target table name, e.g. transaction_table
    * @param datasetId: id of the original dataset from which insertDF is derived, e.g. 261
    */
  def insertTableV1(insertDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = {
    Class.forName("com.mysql.jdbc.Driver")

    // Get from system configuration the url and properties to connect to the database
    val (jdbcUrl, connProperties) = getConnProperties()

    val (insertSchema, insertCols) = (insertDF.schema, insertDF.columns.toList)
    val uniqueKeyCols = uniqueKey.split(",\\W*").toList
    val dataCols = insertCols diff uniqueKeyCols

    insertDF.foreachPartition({rows =>
      val conn = DriverManager.getConnection(jdbcUrl, connProperties)

      try {
        val targetRecordInsert = new JdbcRecordInsertFromSparkSqlRow(conn, targetTable, insertCols.map((_, None)))
        val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, dataCols)

        rows.foreach({row =>
          try {
            targetRecordInsert.execute(row, insertSchema, insertCols)
          } catch {
            case e: Throwable => failedRecordHandler.handle(e, row, insertSchema)
          }
        })
      } finally {
        conn.close()
      }
    })
  }

  def insertTableV2(insertDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = {
    Class.forName("com.mysql.jdbc.Driver")

    // Get from system configuration the url and properties to connect to the database
    val (jdbcUrl, connProperties) = getConnProperties()

    val insertDF2 = insertDF.drop("dataset_id")

    val (insertSchema, insertCols) = (insertDF2.schema, insertDF2.columns.toList)
    val uniqueKeyCols = uniqueKey.split(",\\W*").toList
    val dataCols = insertCols diff uniqueKeyCols

    insertDF2.foreachPartition({rows =>
      val conn = DriverManager.getConnection(jdbcUrl, connProperties)

      try {
        val targetRecordInsert = new JdbcRecordInsertFromSparkSqlRow(conn, targetTable, (uniqueKeyCols ++ dataCols :+ "write_timestamp").map((_, None)) :+ ("dataset_id", Some(datasetId)))
        val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, dataCols)

        var uniqueKeyColsBoundValues: List[(String, Option[Any], Int)] = null
        var dataColsBoundValues: List[(String, Option[Any], Int)] = null

        rows.foreach({row =>
          try {
            val now = new Timestamp(Calendar.getInstance().getTime().getTime)
            uniqueKeyColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, insertSchema, uniqueKeyCols)
            dataColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, insertSchema, dataCols)

            targetRecordInsert.execute(uniqueKeyColsBoundValues ++ dataColsBoundValues :+ ("write_timestamp", Some(now), TIMESTAMP))
          } catch {
            case e: Throwable => failedRecordHandler.handle(e, uniqueKeyColsBoundValues, dataColsBoundValues)
          }
        })
      } finally {
        conn.close()
      }
    })
  }

}

Enabling Precompilation of SQL String Template for Prepared Statement in JDBC Connection

package com.myproject.jdbc.utils

import java.util.Properties

object SQLHelpers {
  // Other utilities

  def getConnProperties(): (String, Properties) = {
    // Other setup

    val connProperties = new Properties()
    // Set up other connection properties

    // Enable server-side prepared statements
    connProperties.put("useServerPrepStmts", "true")

    // Set up connection string
    val jdbcUrl = s"jdbc:mysql://${jdbcHostName}:${jdbcPort}/${jdbcDatabase}"

    (jdbcUrl, connProperties)
  }
}