Class JdbcRecordInsertFromSparkSqlRow
We are now ready to move on to implement the Insert facility. It’s similar to the Update one. In fact, slightly simpler. Here it is:
package com.myproject.experiment.jdbc import java.sql.Connection /** Logic for inserting a record into a jdbc table * This class enables retrieving values from a SparkSql row or just taking a set of values from somewhere (not necessarily Spark) * then using them to create the corresponding new target jdbc record * * @param conn a jdbc connection * @param table name of the target jdbc table * @param insertColumns list of columns to be inserted with the target record */ class JdbcRecordInsertFromSparkSqlRow(conn: Connection, val table: String, val insertColumns: List[(String, Option[Any])]) extends JdbcRecordWriteFromSparkSqlRow(conn) with SqlStringBuildingSupporter { /** * Return list of all columns inserted with the record by this Insert sql statement */ override protected def getAllPreparedColumns(): List[(String, Option[Any])] = insertColumns /** * Construct the Insert sql string template * This template potentially has placeholders ? and literal values * * @return Insert sql string template, e.g. * insert into transaction_table * (unique_key_col_1, unique_key_col_2, dataset_id, string_data_col, number_data_col) * values (?, ?, 199, null, ?) */ override protected def getSqlStmtString(): String = { val insTableClause = "insert into " + table val colNamesClause = "(" + insertColumns.map(_._1).mkString(", ") + ")" val valuesClause = "(" + insertColumns.map({ case (_, valueOpt) => if (valueOpt != None) getLiteralValueForSqlString(valueOpt.get) else "?" }).mkString(", ") + ")" insTableClause + " " + colNamesClause + " values " + valuesClause } }
Instantiation
To perform an insert operation for a Spark partition, the client code needs to instantiate an instance of this class. In doing so, it needs to specify what the target table is (line 14), and all the columns inserted for a given record (line 15). Of course, these columns are specified according to the same now-familiar specification, which allows designating whether a given column is a placeholder or containing a literal value.
The function getAllPreparedColumns() (line 21), again, just like in the Update counterpart, simply returns all the columns of this statement.
Building the Insert SQL Template
The function getSqlStmtString() (line 32) takes care of constructing the Insert SQL Template, whose an example might look like the following:
insert into transaction_table (unique_key_col_1, unique_key_col_2, dataset_id, string_data_col, number_data_col) values (?, ?, 199, null, ?)
This function, in turn, relies on getLiteralValueForSqlString() (line 35)) to have the proper treatment of string literals (putting a single quote around them).
That’s it. To construct a template that looks like the example above, the client code might instantiate a JdbcRecordInsertFromSparkSqlRow instance like so:
new JdbcRecordInsertFromSparkSqlRow(conn, “transaction_table”, List( (“unique_key_col_1”, None), (“unique_key_col_2”, None), (“dataset_id”, Some(199)), (“string_data_col”, Some(null)), (“number_data_col”, None) ) )
Just like with Update counterpart, in practice, when this Insert facility is put to use for an Insert operation from a Spark partition, the instantiation work by the client code tends to be much simpler. Again, it’s because the list of columns can usually be inferred programmatically from the insertDF. Examples will be presented in a future post.
Given the template, and as a consequence, the prepared statement, resulted from this instantiation, the actual Insert operation for a given record becomes trivial. Again, you’ll see examples in a future post.