The Problem
As alluded in Approach IV post, that implementation would not work in real production data. What’s going on? Now’s the time to deal with it. Take a look at this key line of code:
execSQL(conn, updateSql, updateCols.map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) ++ uniqueKeyCols.map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) )
Here, we are retrieving values from the current SparkSQL row for the required columns. Then, after the conversion, we bind them to the SQL statement template to execute in the database via the call to execSQL().
Let’s look back at the current implementation of execSQL():
def execSQL(conn: Connection, sql: String, boundValues: List[Any]) = { val preparedSql = conn.prepareStatement(sql) for ((boundValue, boundPosition) <- boundValues zip (1 to boundValues.size)) { boundValue match { case i: Int => preparedSql.setInt(boundPosition, i) case s: String => preparedSql.setString(boundPosition, s) case l: Long => preparedSql.setLong(boundPosition, l) case b: Boolean => preparedSql.setBoolean(boundPosition, b) case t: Timestamp => preparedSql.setTimestamp(boundPosition, t) case d: Date => preparedSql.setDate(boundPosition, d) case db: Double => preparedSql.setDouble(boundPosition, db) case f: Float => preparedSql.setFloat(boundPosition, f) } } val ret = preparedSql.execute }
Do you see any issue here?
Back to the key line of code earlier, a given column in a SparkSQL row may contain null value, and that null value gets returned by the call row.getAs(col). Together with this column’s SparkSQL type, retrieved by the call updateSchema(col).dataType, it then gets passed to the conversion function sparkSqlToJdbc(). Let’s look back at the implementation of this conversion function:
def sparkSqlToJdbc(sparkSqlValue: Any, sparkSqlType: DataType): Any = { sparkSqlType match { case IntegerType => sparkSqlValue.asInstanceOf[Int] case StringType => sparkSqlValue.asInstanceOf[String] case BooleanType => sparkSqlValue.asInstanceOf[Boolean] case LongType => sparkSqlValue.asInstanceOf[Long] case FloatType => sparkSqlValue.asInstanceOf[Float] case DoubleType => sparkSqlValue.asInstanceOf[Double] case TimestampType => sparkSqlValue.asInstanceOf[Timestamp] case DateType => sparkSqlValue.asInstanceOf[Date] } }
A null value retrieved from a SparkSQL row should be preserved exactly as null when writing to the database. But how is such a value, passed as the argument sparkSqlValue, treated by this function?
If its type is either StringType, TimestampType, or DateType, it gets preserved as null. For all the other types, it gets distorted to become the “default” value of that type (0 for IntegerType, false for BooleanType,…). In that case, the original null value disappears and gets replaced by a different value in MySQL!
We may not even get that far. If the null value’s type is either StringType, TimestampType, or DateType, again, this conversion function does the right thing. But then, in the next step, how does execSQL() treat that null value? Turns out, there’s no such treatment. If you look closely, in the pattern matching block within the for loop (lines 4 – 13), it’s assumed the boundValue variable has to be one of those specific scala types. That means when boundValue is a null, whose type is non-detectable, it causes a mismatch, and an exception gets thrown. This record will fail updating to MySQL!
The Fix
How to solve this? The Java Prepared Statement API handles null specially when binding:
preparedSql.setNull(boundPosition, sqlType)
So we have to know the sqlType, which is an Int as encoded in java.sql.Types, of the null value. This null value originates from the current SparkSQL row, from which we know its sparkSqlType. Thus from this sparkSqlType, we can also infer its corresponding sqlType. The function sparkSqlToJdbc() needs to be modified to emit not just the converted value, but its accompanying sqlType also.
Here’s the change (this function by the way also gets moved to the package com.myproject.experiment.spark):
package com.myproject.experiment.spark import java.sql.Types._ import java.sql.{Date, Timestamp} import org.apache.spark.sql.types._ // Other imports object Helpers { // Other utilities /** * Converts a sparkSql value (as extracted from a dataframe row) to a jdbc value * in preparation to write it to a jdbc db via executing a jdbc statement * @param sparkSqlValue: sparkSql value to convert * @param sparkSqlType: sparkSqlType of the value to convert * @return: (valueOpt, sqlType) * - valueOpt: Some(value) where value is the scala value converted from the sparkSql value, or None if the sparkSql value is null * - sqlType: the type of the converted value; this is the SQL type code defined in java.sql.Types */ def sparkSqlToJdbc(sparkSqlValue: Any, sparkSqlType: DataType): (Option[Any], Int) = { val sparkSqlTypeToJdbcType: Map[DataType, Int] = Map(LongType -> BIGINT, IntegerType -> INTEGER, BooleanType -> BOOLEAN, StringType -> VARCHAR, TimestampType -> TIMESTAMP, DateType -> DATE, DoubleType -> DOUBLE, FloatType -> FLOAT) val jdbcValue = if (sparkSqlValue != null) { Some(sparkSqlType match { case IntegerType => sparkSqlValue.asInstanceOf[Int] case StringType => sparkSqlValue.asInstanceOf[String] case BooleanType => sparkSqlValue.asInstanceOf[Boolean] case LongType => sparkSqlValue.asInstanceOf[Long] case FloatType => sparkSqlValue.asInstanceOf[Float] case DoubleType => sparkSqlValue.asInstanceOf[Double] case TimestampType => sparkSqlValue.asInstanceOf[Timestamp] case DateType => sparkSqlValue.asInstanceOf[Date] }) } else None (jdbcValue, sparkSqlTypeToJdbcType(sparkSqlType)) } }
Note that the converted value returned is handled in the idiomatic Scala: it’s of Option type. When dealing with a null value, it is None; otherwise, the real converted value is wrapped in Some(value).
Now given the modified conversion function always provides the sqlType information in addition to the converted value (it provides this pair as a tuple), we can fix the execSQL() so that it handles null correctly:
/** * Prepare then execute an arbitrary non-query SQL statement * * @param conn: jdbc connection * @param sql: SQL statement string, which may have placeholders ? to be plugged in with real binding values at execution time * @param boundValues: list of binding values (boundValueOpt, sqlType) for the prepared statement * - boundValueOpt: Some(boundValue) or None, which represents null * - sqlType: the SQL type code defined in java.sql.Types; only used when the boundValue is null */ def execSQL(conn: Connection, sql: String, boundValues: List[(Option[Any], Int)]) = { val preparedSql = conn.prepareStatement(sql) for (((boundValueOpt, sqlType), boundPosition) <- boundValues zip (1 to boundValues.size)) { if (boundValueOpt != None) { boundValueOpt.get match { case i: Int => preparedSql.setInt(boundPosition, i) case s: String => preparedSql.setString(boundPosition, s) case l: Long => preparedSql.setLong(boundPosition, l) case b: Boolean => preparedSql.setBoolean(boundPosition, b) case t: Timestamp => preparedSql.setTimestamp(boundPosition, t) case d: Date => preparedSql.setDate(boundPosition, d) case db: Double => preparedSql.setDouble(boundPosition, db) case f: Float => preparedSql.setFloat(boundPosition, f) } } else { preparedSql.setNull(boundPosition, sqlType) } } val ret = preparedSql.execute }
With this corrected version, we can be sure that the key statement at the beginning of the post will update the SparkSQL row correctly to the database, no matter if the values involved are null or not.