Now, after all the runtime and operational issues have been addressed, let’s move on to looking at code quality. We need to make it more robust, flexible, and maintainable. In other words, the code need to be brought to production quality.
Let’s take a look back at the now familiar snippet of key code:
execSQL(preparedSQL, updateCols.map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) ++ uniqueKeyCols.map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) )
We can transform it to look a bit nicer:
execSQL(preparedSQL, (updateCols ++ uniqueKeyCols).map({col => sparkSqlToJdbc(row.getAs(col), updateSchema(col).dataType)}) )
Recall that one of the main activities happening in here is given a SparkSQL row and a set of columns of interest, we retrieve from the row the set of values of those columns. After that, we perform the necessary conversion to have those values ready to be fed to the execution in the database server.
This would be better served by a more generic, robust, and understandable function. We introduce retrieveJdbcValuesFromSparkSqlRow() (see implementation below). It works exactly as described above: get the values, convert them, then return. For each value, besides the value itself and its sql type, now we also attach the name of the column it belongs to. Note that the conversion function sparkSqlToJdbc() now is moved to become a nested supporting function of this new function. The implementation follows:
package com.myproject.experiment.spark import java.sql.Types._ import java.sql.{Date, Timestamp} import org.apache.spark.sql.types._ import org.apache.spark.sql._ object Helpers { // Other utilities /** * Retrieves the values of columns of interest in jdbc types from a SparkSQL row * @param row: sparkSql row to retrieve values from * @param schema: schema of the dataframe the row belongs to * Note that given the row, the schema can be inferred within this function * But passing in the schema available from the client code instead of inferring it every time for each row is slightly more efficient * @param cols: Scala list of columns in the row to retrieve values for * @return: List((column, valueOpt, sqlType)): Scala list of triplets representing the values retrieved * - column: column name * - valueOpt: Some(value) where value is the scala value converted from the sparkSql value, or None if the sparkSql value is null * - sqlType: the type of the converted value as an int; this is the SQL type code defined in java.sql.Types */ def retrieveJdbcValuesFromSparkSqlRow(row: Row, schema: StructType, cols: List[String]): List[(String, Option[Any], Int)] = { /** * Retrieves the values of columns of interest in the original SparkSQL types from a SparkSQL row * @param row: sparkSql row to retrieve values from * @param schema: schema of the dataframe the row belongs to * @param cols: Scala list of columns in the row to retrieve values for * @return: List((column, sparkSQL value, sparkSQL type)): Scala list of triplets representing the values retrieved * - column: column name * - sparkSQL value: original SparkSQL value, possibly null * - sparkSQL type: type of the SparkSQL value as defined in org.apache.spark.sql.types */ def retrieveValuesFromSparkSqlRow(row: Row, schema: StructType, cols: List[String]): List[(String, Any, DataType)] = { cols.map(col => (col, row.getAs(col), schema(col).dataType)) } /** * Converts a sparkSql value (as extracted from a dataframe row) to a jdbc value * in preparation to write it to a jdbc db via executing a jdbc statement * @param sparkSqlValue: sparkSql value to convert * @param sparkSqlType: sparkSqlType of the value to convert * @return: (valueOpt, sqlType) * - valueOpt: Some(value) where value is the scala value converted from the sparkSql value, or None if the sparkSql value is null * - sqlType: the type of the converted value; this is the SQL type code defined in java.sql.Types */ def sparkSqlToJdbc(sparkSqlValue: Any, sparkSqlType: DataType): (Option[Any], Int) = { val sparkSqlTypeToJdbcType: Map[DataType, Int] = Map(LongType -> BIGINT, IntegerType -> INTEGER, BooleanType -> BOOLEAN, StringType -> VARCHAR, TimestampType -> TIMESTAMP, DateType -> DATE, DoubleType -> DOUBLE, FloatType -> FLOAT) val jdbcValue = if (sparkSqlValue != null) { Some(sparkSqlType match { case IntegerType => sparkSqlValue.asInstanceOf[Int] case StringType => sparkSqlValue.asInstanceOf[String] case BooleanType => sparkSqlValue.asInstanceOf[Boolean] case LongType => sparkSqlValue.asInstanceOf[Long] case FloatType => sparkSqlValue.asInstanceOf[Float] case DoubleType => sparkSqlValue.asInstanceOf[Double] case TimestampType => sparkSqlValue.asInstanceOf[Timestamp] case DateType => sparkSqlValue.asInstanceOf[Date] }) } else None (jdbcValue, sparkSqlTypeToJdbcType(sparkSqlType)) } retrieveValuesFromSparkSqlRow(row, schema, cols).map(elem => { val (jdbcValue, jdbcType) = sparkSqlToJdbc(elem._2, elem._3) (elem._1, jdbcValue, jdbcType) }) } }
With this facility in place, now the key statement at the beginning of the post would become:
execSQL(preparedSQL, retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, (updateCols ++ uniqueKeyCols)).map({ case (col, jdbcValue, jdbcType) => (jdbcValue, jdbcType) }))
This new facility will continue to become very handy as our development progresses.