So far, we have formulated the problem we are facing: how to have Spark write data from a dataframe to a relational table, such that this operation satisfies a set of requirements? Look at it another way: how to have Spark perform the Insert or Update operation to a relational table that supports the three capabilities (Partial Success, Failure Reporting, Schema Change)?
SparkSQL provides the DataFrameWriter API for writing to an external system. Let’s consider its native support to write data from a dataframe to a MySQL, or more precisely, a JDBC, table. Using this API, a statement of the following form can potentially do the job:
df.write.options(opts).mode(“SomeSaveMode").jdbc(jdbcUrl, “transaction_table", connProperties)
Several SaveMode modes might potentially be helpful. We’ll explore them next.
Considering the Insert operation
Concerning the Insert operation, the only applicable mode is SaveMode.Append, possibly used as follows:
insertDF.write.mode(SaveMode.Append).jdbc(jdbcUrl, “transaction_table", connProperties)
This works. That is, the dataset will get inserted. If all goes well.
But what if not everything goes well?
Turns out, this fails to meet the Partial Success Requirement: it’s an All-Or-Nothing operation. That is, if one record fails to insert for some reason, the entire job will fail and no records will be persisted to the table.
Of course, the Failure Reporting Requirement is also meaningless. Not just there’s no mechanism to capture failed records. When there are failed records, all records will become failure anyway. There’s no differentiation between the “good” and the “bad”.
There might be a workaround. Usually, a record failing to insert is because of one or multiple of several reasons. These include but not limited to the record violating some database constraints, or perhaps it containing incompatible data. An example of the latter would be some string field in the record has a value too large to fit in the varchar column.
We might be able to encode all these known causes in the pipeline 1 during the TRANSFORM step. In that case, the insertDF would go through a series of validations that filter out most or all of the “bad” records before arriving at the LOAD step. If we can do that, in practice, we may expect that the failure of this last write step will be statistically insignificant.
Regarding the Schema Change Requirement exclusively, this SparkSQL native API fulfills it. That is, as long as the insertDF’s schema is a subset of that of the target table, records will be successfully inserted.
In summary, the DataFrameWriter API doesn’t support all 3 capabilities required for the Insert operation. Nevertheless, it’s still somewhat acceptable if we can employ the workaround to compensate for its weakness.
Considering the Update operation
Concerning the Update operation, the mode closest in semantics is SaveMode.Overwrite. To our concern, we can use it in one of only 2 ways.
Either we issue a statement of the following form:
updateDF.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, “transaction_table", connProperties)
Or we pass the truncate option in, like so:
updateDF.write.option(“truncate”, “true").mode(SaveMode.Overwrite).jdbc(jdbcUrl, “transaction_table", connProperties)
In the former, it will first drop the target table, then recreate it, and finally dump all data in the updateDF to this newly created table. Naturally, along the way, all existing data as well as table metadata like keys, indexes, constraints,… get lost.
In the latter, it will truncate the target table, then dump the updateDF’s data to it. Although the table’s metadata get retained, all existing data get lost.
None of these behaviors is acceptable. Remember, we are interacting with a transactional target table full of existing data not supposed to be touched by this particular Spark job execution. These data are shared with and potentially accessed concurrently by other busy OLTP applications.
So, forget about the 3 capabilities that this Update operation needs to support. Even just this Update operation itself, the DataFrameWriter API can’t fulfill.
How to proceed next
The conclusion is SparkSQL’s DataFrameWriter API is insufficient in dealing with our requirements. While we might be able to stretch it to support the Insert operation to some extent, the Update operation is entirely out of reach. This prompts an exploration of different alternatives.
When considering a potential solution, first we’ll evaluate if it can fulfill the Update operation together with the 3 required capabilities. If so, and if it can do the same for the Insert operation, that’d be ideal.
Four general approaches are identified, and the next post is the discussion of the first one.
In examining these four approaches, for illustrative purpose, we take the example of attempting to update the dataset 199 to the transaction_table. Thus, we assume to already have been given the updateDF with column set (unique_key_col, string_data_col) that has 2 rows.