The Problem

The previous post suggests a fix to execSQL() to handle null values. We are not done yet. Let’s take a look at the main function updateTable() in approach IV:

def updateTable(updateDF: DataFrame, uniqueKey: String, targetTable: String) = {
  Class.forName("com.mysql.jdbc.Driver")

  // Get from system configuration the url and properties to connect to the database
  val (jdbcUrl, connProperties) = getConnProperties()

  val (updateSchema, uniqueKeyCols, updateCols) = getUpdateInfo(updateDF, uniqueKey)

  val updateSql = getUpdateSql(targetTable, updateCols, uniqueKeyCols)

  updateDF.foreachPartition({rows =>
    val conn = DriverManager.getConnection(jdbcUrl, connProperties)

    rows.foreach({row =>
      try {
        execSQL(conn, updateSql,  updateCols.map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) ++
          uniqueKeyCols.map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) )
      } catch {
        case _: Throwable =>
      }
    })

    conn.close()
  })
}

In the call updateDF.foreachPartition() (lines 11 – 24), the invocation of execSQL() (lines 16 – 17) gets executed once for each row. Recall that the update statement string template updateSql passed to it has the following general form:

update transaction_table set string_data_col = ? where unique_key_col = ?

How does execSQL() treat that template? Let’s take a look at the implementation of execSQL() we came up with by the end of the previous post:

def execSQL(conn: Connection, sql: String, boundValues: List[(Option[Any], Int)]) = {
  val preparedSql = conn.prepareStatement(sql)
  for (((boundValueOpt, sqlType), boundPosition) <- boundValues zip (1 to boundValues.size)) {
    if (boundValueOpt != None) {
      boundValueOpt.get match {
        case i: Int => preparedSql.setInt(boundPosition, i)
        case s: String => preparedSql.setString(boundPosition, s)
        case l: Long => preparedSql.setLong(boundPosition, l)
        case b: Boolean => preparedSql.setBoolean(boundPosition, b)
        case t: Timestamp => preparedSql.setTimestamp(boundPosition, t)
        case d: Date => preparedSql.setDate(boundPosition, d)
        case db: Double => preparedSql.setDouble(boundPosition, db)
        case f: Float => preparedSql.setFloat(boundPosition, f)
      }
    } else {
      preparedSql.setNull(boundPosition, sqlType)
    }
  }
  val ret = preparedSql.execute
}

So execSQL() calls conn.prepareStatement() on the updateSql statement string template again and again once for every row (line 2), although this template remains unchanged across rows. This defeats the purpose of prepareStatement(), which aims at precompiling a constant statement string template once on the database server, then reusing it multiple times with different sets of binding values sent from the client. The current implementation thus suffers from avoidable performance penalty for partitions with a non-trivial number of rows (*).

The Fix

The fix is relatively simple: we’d call conn.prepareStatement(updateSql) only once for the entire partition. Then, the resulted already server-side prepared (in other words, pre-compiled) statement would be shared across all rows of the partition.

The pre-compilation feature is an option set as a property of the database connection. It, by default, is disabled, however. So first, we need to enable it.

In the updateTable() implementation, at line 12, we establish the connection for the partition. We do that using the jdbcUrl and connProperties, obtained from the call to getConnProperties() (line 5). Note that connProperties is the set of properties, including the pre- compilation option, of the connection. Thus, enabling this option is done by changing the code of getConnProperties() function, like so:

def getConnProperties(): (String, Properties) = {
  // Other setup

  val connProperties = new Properties()
  // Set up other connection properties

  // Enable server-side prepared statements
  connProperties.put("useServerPrepStmts", "true")

  // Set up connection string
  val jdbcUrl = s"jdbc:mysql://${jdbcHostName}:${jdbcPort}/${jdbcDatabase}"

  (jdbcUrl, connProperties)
}

The change is made by adding line 8 above. With that, the database connection established from this set of properties will enable pre-compiling SQL statement template.

Now, we only need to make a minor change to execSQL(). From now on, it will take in the reference to an already server-side prepared statement:

def execSQL(preparedSql: PreparedStatement, boundValues: List[(Option[Any], Int)]) = {
  for (((boundValueOpt, sqlType), boundPosition) <- boundValues zip (1 to boundValues.size)) {
    if (boundValueOpt != None) {
      boundValueOpt.get match {
        case i: Int => preparedSql.setInt(boundPosition, i)
        case s: String => preparedSql.setString(boundPosition, s)
        case l: Long => preparedSql.setLong(boundPosition, l)
        case b: Boolean => preparedSql.setBoolean(boundPosition, b)
        case t: Timestamp => preparedSql.setTimestamp(boundPosition, t)
        case d: Date => preparedSql.setDate(boundPosition, d)
        case db: Double => preparedSql.setDouble(boundPosition, db)
        case f: Float => preparedSql.setFloat(boundPosition, f)
      }
    } else {
      preparedSql.setNull(boundPosition, sqlType)
    }
  }
  val ret = preparedSql.execute
}

Finally, we change the client code accordingly:

def updateTable(updateDF: DataFrame, uniqueKey: String, targetTable: String) = {
  Class.forName("com.mysql.jdbc.Driver")

  // Get from system configuration the url and properties to connect to the database
  val (jdbcUrl, connProperties) = getConnProperties()

  val (updateSchema, uniqueKeyCols, updateCols) = getUpdateInfo(updateDF, uniqueKey)

  updateDF.foreachPartition({rows =>
    val conn = DriverManager.getConnection(jdbcUrl, connProperties)
    val updateSql = getUpdateSql(targetTable, updateCols, uniqueKeyCols)

    try {
      val preparedSQL = conn.prepareStatement(updateSql)

      rows.foreach({row =>
        try {
          execSQL(preparedSQL,  updateCols.map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) ++
            uniqueKeyCols.map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) )
        } catch {
          case _: Throwable =>
        }
      })
    } finally {
      conn.close()
    }
  })
}

In line 14, we create the server-side prepared statement pre-compiled for the entire partition. Then, we pass it to the newly modified execSQL() function in line 18. And we are set.

Now, our implementation should achieve better performance on non-trivial workload.


(*) Older versions of MySQL server and MySQL Connector/J driver don’t support pre-compilation. In such case, the current implementation doesn’t suffer from performance penalty.