Yay, we are done! Finally, it’s over!!!

Oh, wait! Wait for a second! Don’t celebrate too prematurely! We’re not done yet! Just close!

Look, what about the Failure Reporting Requirement? We haven’t done anything about it yet. Don’t you remember?

General Philosophy

Turns out, the only requirement we haven’t touched upon yet is this elusive Failure Reporting Requirement. What happens when a record for some reason fails to write to the target table?

Let’s take a look back at this key snippet of code in the Update operation:

rows.foreach({row =>
  try {
    targetRecordUpdate.execute(row, updateSchema, updateCols ++ uniqueKeyCols)
  } catch {
    case _: Throwable =>
  }
})

The catch block is empty (lines 4 – 6). If the targetRecordUpdate.execute() (line 3) fails for whatever reason, this row will be ignored. Same situation for the Insert operation.

What needs to be done is to “simply” provide exception handling logic to the catch block. It needs to capture the failed row somehow so that later on, that row can be reported when required.

Remember that we virtually have no knowledge in advance about the structure of this row. Generally, it could have any number of columns, from a few to hundreds, whose data could be virtually of any type. How to provide a generic logging mechanism for such a row?

Perhaps a better way is to dump this row to some kind of schemaless mass storage or a generic logging system with some metadata design that facilitates retrieval for reporting purposes. Or maybe some kind of NoSQL storage or databases (Cassandra, HBase, MongoDB, or some key-value store,…) would be good, too. The options are many.

The design within this case study’s context

In our case study here, to demonstrate the point while keeping things simple, we won’t work with a robust and generic logging system. Instead, we take advantage of the same MySQL database. We’ll design a table to capture all failed rows. Apparently, in real life, when the workload reaches a certain scale, this would likely suffer scalability problem. But before we get there, this should work reasonably well, and certainly would serve our purpose for this case study.

Failure Logging Table

We would log any failed row, no matter its structure (what schema it has, how many columns it contains, what data types those columns are), under a key – value format. The key is the concatenation of its unique key columns’ data, converted to string, separated by |. The value is the concatenation of the row’s data columns’ data, converted to string, separated by |. Whether in the key or the value, for a given column, we present it in the form column name:::: column data (see below example for further clarification).

There are better, more robust data encoding and serialization formats and standards. But again, within the scope and purposes of our case study here, we stick with this custom format. Feel free to apply your creativity in this area if you choose to adopt this solution to your own situation.

With that, we use the following table for failure logging:

create table error_log (
    dataset_id bigint not null,
    k text not null,
    v mediumtext,
    error_message text,
    write_timestamp datetime
);
  • dataset_id: id of the dataset the failed row belongs to
  • k: key of the failed row
  • v: value of the failed row
  • error_message: message from the exception thrown when the row failed to write to the target table
  • write_timestamp: timestamp of the moment the failed row gets logged to this table

Suppose a row fails to write because one of its string columns contains a value too long to fit in the target table. Its logging in this table might look like this:

dataset_id
k
v
error_message
write_timestamp
261
unique_key_col:::: unique_key_value_5
string_data_col:::: string_data_value_too_long_that_causes_failure | number_data_col:::: 500
com.mysql.jdbc.MysqlDataTruncation: Data truncation: Data too long for column ‘string_data_col’ at row 1
2018-10-04 10:36:49

Failure Handling Implementation

At this point, we already have the Insert facility handy. We can conveniently take advantage of it for our job. We’ll create a class that is composed of, or in other words, wraps around this facility. This facility then helps insert the failed row to the error_log table.

The implementation follows:

package com.myproject.experiment.errorhandling

import java.sql.{Connection, Timestamp}
import java.sql.Types._
import java.util.Calendar

import com.myproject.experiment.jdbc.JdbcRecordInsertFromSparkSqlRow
import com.myproject.experiment.spark.Helpers.retrieveJdbcValuesFromSparkSqlRow

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row

/** Log a failed record write to the error_log table
  *
  *  @param conn a jdbc connection
  *  @param datasetId id of the dataset the failed record belongs to
  *  @param uniqueKeyColumns list of unique key columns of the record
  *  @param dataColumns list of data columns of the record
  */
class FailedRecordHandler(conn: Connection, datasetId: Long, uniqueKeyColumns: List[String], dataColumns: List[String]) {

  private val uniqueKeyColsSet = uniqueKeyColumns.toSet
  private val dataColsSet = dataColumns.toSet

  // Initialize the object to insert into the error_log table
  private val errorLogInsert = new JdbcRecordInsertFromSparkSqlRow(conn, "error_log", List(
    ("dataset_id", Some(datasetId)),
    ("k", None),
    ("v", None),
    ("error_message", None),
    ("write_timestamp", None)
  ))

  /**
    * Concatenate a list of triplet values into a single triplet value
    *
    * @param combinedColName: name of the column component of the result triplet value. It represents a column in the error_log table
    * @param columns: List of triplet values to concatenate. A triplet (column name, valueOpt, jdbcType) is typically obtained from a SparkSQL row
    * @return the single concatenated triplet. Its value would then be inserted to the corresponding column of this failed record in error_log table
    */
  private def getCombinedColumn(combinedColName: String, columns: List[(String, Option[Any], Int)]): (String, Option[Any], Int) = {
    def combineCol(accuCol: Tuple3[String, Option[Any], Int], col: Tuple3[String, Option[Any], Int]): (String, Option[Any], Int) = {
      val colValue = col._2 match {
        case Some(v) => col._1 + ":::: " + v.toString
        case None => col._1 + ":::: "
      }

      var accuColValue = accuCol._2.get
      accuColValue = if (accuColValue == "") colValue else accuColValue + " | " + colValue

      (accuCol._1, Some(accuColValue), accuCol._3)
    }

    columns.foldLeft((combinedColName, Some("").asInstanceOf[Option[Any]], VARCHAR))(combineCol)
  }

  /**
    * Insert the values of the failed record to the error_log table
    *
    * @param e: Exception that got raised when the record failed to write to the target table
    * @param uniqueKeyValues: Values of the unique key columns of the failed record
    * @param dataValues: Values of the data columns of the failed record
    */
  def handle(e: Throwable, uniqueKeyValues: List[(String, Option[Any], Int)], dataValues: List[(String, Option[Any], Int)]): Unit = {
    if (uniqueKeyColsSet != uniqueKeyValues.map(_._1).toSet)
      throw new Exception("FailedRecordHandler: invalid set of unique key columns!")

    if (dataColsSet != dataValues.map(_._1).toSet)
      throw new Exception("FailedRecordHandler: invalid set of data columns!")

    val kColBoundValue = getCombinedColumn("k", uniqueKeyValues)
    val vColBoundValue = getCombinedColumn("v", dataValues)
    val now = new Timestamp(Calendar.getInstance().getTime().getTime)

    errorLogInsert.execute(List(kColBoundValue, vColBoundValue, ("error_message", Some(e.toString), VARCHAR), ("write_timestamp", Some(now), TIMESTAMP)))
  }

  /**
    * Insert the values of the failed record to the error_log table
    *
    * @param e: Exception that got raised when the record failed to write to the target table
    * @param row: The SparkSQL row from which the failed record is derived
    * @param schema: Schema of the SparkSQL row from which the failed record is derived
    */
  def handle(e: Throwable, row: Row, schema: StructType): Unit = {
    val uniqueKeyValues = retrieveJdbcValuesFromSparkSqlRow(row, schema, uniqueKeyColumns)
    val dataValues = retrieveJdbcValuesFromSparkSqlRow(row, schema, dataColumns)

    handle(e, uniqueKeyValues, dataValues)
  }
}
Some explanations are in order:
  • Besides the database connection and datasetId, for an instance instantiation, this class requires the lists of unique key columns and data columns of the dataframe being processed (line 20).
  • errorLogInsert object (initialized in line 26) performs the plumbing job of inserting the failed row to the error_log table (line 75). It bases on an Insert template that has datasetId value baked in (line 27), while all the others are binding values at execution time (lines 28 – 31). This technique should become familiar by now.
  • getCombinedColumn() (line 41) performs the concatenation of any number of columns to produce the k or v format mentioned earlier. It does this using scala’s foldLeft function (line 54).
  • Note that a column data expected by getCombinedColumn() is a triplet (column name, valueOpt, jdbcType). This is the format of column data extracted from a SparkSQL row via the function retrieveJdbcValuesFromSparkSqlRow().
  • The first handle() function (line 64) contains the main logic. It takes in the exception thrown by the failed write operation, and the list of unique key values and data values of the failed row. First, it validates to make sure the columns containing the unique key values and data values exactly match the unique key columns and data columns specified, respectively (lines 65 – 69). Second, it utilizes getCombinedColumn() to generate values of the desired format (lines 71 – 72). And finally, it persists to the error_log table (line 75).
  • The second overloaded handle() function (line 85) provides a convenience to operate directly on a SparkSQL row. Given a failed row and the exception of that failure, it extracts unique key values and data values from the row (lines 86 – 87). Then, it persists to the error_log table by delegating the job to the first handle() function (line 89).

Now, given this Failure Handling facility, we are ready to fully complete the Update and Insert operations.