With the Failure Handling facility in place, we are now ready to fully complete the case study. As you’ll see, it’s pretty straightforward and elegant.
The Update Operation
Failure Handling when Updating the target record with values purely obtained from SparkSQL row
Adding failure handling to this standard case is simple. We only need to make a few changes within the outer try block of the updateDF.foreachPartition() call. Recall its current state:
try { val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, updateCols.map((_, None)), uniqueKeyCols.map((_, None))) rows.foreach({row => try { targetRecordUpdate.execute(row, updateSchema, updateCols ++ uniqueKeyCols) } catch { case _: Throwable => } }) } finally { conn.close() }
Only 2 lines need to be added. This snippet will become:
try { val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, updateCols.map((_, None)), uniqueKeyCols.map((_, None))) val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, updateCols) rows.foreach({row => try { targetRecordUpdate.execute(row, updateSchema, updateCols ++ uniqueKeyCols) } catch { case e: Throwable => failedRecordHandler.handle(e, row, updateSchema) } }) } finally { conn.close() }
We just need to first instantiate the failedRecordHandler object (line 3). After that, we call its handle() method in the catch block, passing in the exception and the current failed row (line 9). The row and the exception will then be logged into the error_log table, available for reporting later.
Failure Handling when Updating the target record with all different types of values
Adding failure handling to this case is also fairly simple. Recall the current state of the outer try block of the updateDF.foreachPartition() call:
try { val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, (updateCols :+ "write_timestamp").map((_, None)), uniqueKeyCols.map((_, None)) :+ ("dataset_id", Some(datasetId))) rows.foreach({row => try { val now = new Timestamp(Calendar.getInstance().getTime().getTime) val updateColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, updateCols) val uniqueKeyColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, uniqueKeyCols) targetRecordUpdate.execute((updateColsBoundValues :+ ("write_timestamp", Some(now), TIMESTAMP)) ++ uniqueKeyColsBoundValues) } catch { case _: Throwable => } }) } finally { conn.close() }
With the changes, this snippet will become:
try { val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, (updateCols :+ "write_timestamp").map((_, None)), uniqueKeyCols.map((_, None)) :+ ("dataset_id", Some(datasetId))) val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, updateCols) var updateColsBoundValues: List[(String, Option[Any], Int)] = null var uniqueKeyColsBoundValues: List[(String, Option[Any], Int)] = null rows.foreach({row => try { val now = new Timestamp(Calendar.getInstance().getTime().getTime) updateColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, updateCols) uniqueKeyColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, uniqueKeyCols) targetRecordUpdate.execute((updateColsBoundValues :+ ("write_timestamp", Some(now), TIMESTAMP)) ++ uniqueKeyColsBoundValues) } catch { case e: Throwable => failedRecordHandler.handle(e, uniqueKeyColsBoundValues, updateColsBoundValues) } }) } finally { conn.close() }
Essentially, just like with the standard case, we also only need to instantiate the failedRecordHandler object (line 3). Then, in the catch block, call its handle() method (line 16), passing in the exception, and the binding values already retrieved from the failed row in the try block (lines 11 – 12). One note is that the 2 variables updateColsBoundValues and uniqueKeyColsBoundValues need to be defined at the partition level (lines 5 – 6) so that they are accessible from within the catch block.
The Insert Operation
Changes to the Insert operation are similar to those for the Update counterparts.
One note though. So far, in this operation, we don’t care about the unique key columns in the target table and the insertDF. But this information is now needed for failure handling. Recall that the SparkSQL Insert job in the Pipeline 1, triggered by the original event, receives the list of unique key columns of the target table via system config. As part of the change to add failure handling capability, this list, being a comma-separated list of columns, now gets passed to the insertTable() function of the Insert operation as a string argument. This has always been the case for the Update operation if you follow it closely.
Failure Handling when Inserting the target record with values purely obtained from SparkSQL row
Recall the current state of this operation:
def insertTable(insertDF: DataFrame, targetTable: String, datasetId: Long) = { Class.forName("com.mysql.jdbc.Driver") // Get from system configuration the url and properties to connect to the database val (jdbcUrl, connProperties) = getConnProperties() val (insertSchema, insertCols) = (insertDF.schema, insertDF.columns.toList) insertDF.foreachPartition({rows => val conn = DriverManager.getConnection(jdbcUrl, connProperties) try { val targetRecordInsert = new JdbcRecordInsertFromSparkSqlRow(conn, targetTable, insertCols.map((_, None))) rows.foreach({row => try { targetRecordInsert.execute(row, insertSchema, insertCols) } catch { case _: Throwable => } }) } finally { conn.close() } }) }
With failure handling, this insertTable() function now becomes:
def insertTable(insertDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = { Class.forName("com.mysql.jdbc.Driver") // Get from system configuration the url and properties to connect to the database val (jdbcUrl, connProperties) = getConnProperties() val (insertSchema, insertCols) = (insertDF.schema, insertDF.columns.toList) val uniqueKeyCols = uniqueKey.split(",\\W*").toList val dataCols = insertCols diff uniqueKeyCols insertDF.foreachPartition({rows => val conn = DriverManager.getConnection(jdbcUrl, connProperties) try { val targetRecordInsert = new JdbcRecordInsertFromSparkSqlRow(conn, targetTable, insertCols.map((_, None))) val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, dataCols) rows.foreach({row => try { targetRecordInsert.execute(row, insertSchema, insertCols) } catch { case e: Throwable => failedRecordHandler.handle(e, row, insertSchema) } }) } finally { conn.close() } }) }
-
As mentioned earlier, this function now has the additional uniqueKey argument (line 1).
-
From there, we need to determine the 2 scala lists uniqueKeyCols (line 8) and dataCols (line 9). This is trivial since their union result in all the columns of the insertDF. One note is that the column dataset_id belongs to the dataCols list. This column has a constant value across all rows of the dataframe. If you remember, we optimized this matter here (to which we’ll add the failure handling capability in the next section).
-
From here, the changes are almost identical to the Update counterpart. We need to instantiate the failedRecordHandler object for the partition (line 16). And in the catch block, call its handle() method, passing in the exception and the failed row (line 22).
Failure Handling when Inserting the target record with all different types of values
Recall the current state of this operation:
def insertTable(insertDF: DataFrame, targetTable: String, datasetId: Long) = { Class.forName("com.mysql.jdbc.Driver") // Get from system configuration the url and properties to connect to the database val (jdbcUrl, connProperties) = getConnProperties() val insertDF2 = insertDF.drop("dataset_id") val (insertSchema, insertCols) = (insertDF2.schema, insertDF2.columns.toList) insertDF2.foreachPartition({rows => val conn = DriverManager.getConnection(jdbcUrl, connProperties) try { val targetRecordInsert = new JdbcRecordInsertFromSparkSqlRow(conn, targetTable, (insertCols :+ "write_timestamp").map((_, None)) :+ ("dataset_id", Some(datasetId))) rows.foreach({row => try { val now = new Timestamp(Calendar.getInstance().getTime().getTime) val insertColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, insertSchema, insertCols) targetRecordInsert.execute(insertColsBoundValues :+ ("write_timestamp", Some(now), TIMESTAMP)) } catch { case _: Throwable => } }) } finally { conn.close() } }) }
With failure handling, this insertTable() function now becomes:
def insertTable(insertDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = { Class.forName("com.mysql.jdbc.Driver") // Get from system configuration the url and properties to connect to the database val (jdbcUrl, connProperties) = getConnProperties() val insertDF2 = insertDF.drop("dataset_id") val (insertSchema, insertCols) = (insertDF2.schema, insertDF2.columns.toList) val uniqueKeyCols = uniqueKey.split(",\\W*").toList val dataCols = insertCols diff uniqueKeyCols insertDF2.foreachPartition({rows => val conn = DriverManager.getConnection(jdbcUrl, connProperties) try { val targetRecordInsert = new JdbcRecordInsertFromSparkSqlRow(conn, targetTable, (uniqueKeyCols ++ dataCols :+ "write_timestamp").map((_, None)) :+ ("dataset_id", Some(datasetId))) val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, dataCols) var uniqueKeyColsBoundValues: List[(String, Option[Any], Int)] = null var dataColsBoundValues: List[(String, Option[Any], Int)] = null rows.foreach({row => try { val now = new Timestamp(Calendar.getInstance().getTime().getTime) uniqueKeyColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, insertSchema, uniqueKeyCols) dataColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, insertSchema, dataCols) targetRecordInsert.execute(uniqueKeyColsBoundValues ++ dataColsBoundValues :+ ("write_timestamp", Some(now), TIMESTAMP)) } catch { case e: Throwable => failedRecordHandler.handle(e, uniqueKeyColsBoundValues, dataColsBoundValues) } }) } finally { conn.close() } }) }
-
Just like the standard case, insertTable() now has the additional uniqueKey argument (line 1). From there, we infer the 2 lists uniqueKeyCols (line 10) and dataCols (line 11).
-
From here on, these 2 lists, in a sense, drive the implementation forward. Particularly, wherever insertCols is used in the previous version, it gets replaced by these 2 lists. That ensures the order of placeholder columns when building the Insert template always matches that of the binding values supplied to the prepared statement at execution time.
-
Other than that, the implementation of failure handling is almost identical to the Update counterpart. First, the instantiation of the failedRecordHandler object for the partition is conducted (line 18). Then, in the catch block, we call this object’s handle() method, passing in the exception and the required values (line 31).
What if the Failure Handling itself fails?
Whether it’s the Update or Insert operation, what if during execution, for some failed row, when we call failedRecordHandler.handle() in the catch block, for some reason, this call itself fails?
Perhaps the failed row is a “massive” item. Its data is too large to fit in the error_log table. There might also potentially be a variety of other operational problems.
Regardless, when this happens, an exception will be thrown from the catch block unhandled. The Spark job will crash. We might be left with an inconsistent state. The job is done half-way. Some data might have been persisted to the target table. Some other “good” data which otherwise would have been successfully written to MySQL are left untouched. The error_log table might have caught some “bad” data, while some other “bad” data, at least the very row that caused this whole situation, fall through the cracks. It might be hard to investigate which data is which. Recoverability is challenging. It’s not pretty. It’s no fun.
It’s best to prevent this scenario right from the start with a good architecture, or somehow minimize it. Choosing a robust logging system helps in this regard.