Now, we move on to implement the Insert operation. As you’ll see, it’s pretty similar to the Update operation laid out in the last post.

Insert target record with values purely obtained from SparkSQL row

First, just like for the Update, we tackle the standard case: all values needed for the target record are available from the current SparkSQL row.

Recall that at this stage in the SparkSQL job of the pipeline 1, we are handed over the insertDF, told what table to insert to, and what the id of the dataset the insertDF originates from is. Given that, the following is the implementation of the Insert operation:

package com.myproject.experiment

import com.myproject.jdbc.utils.SQLHelpers.getConnProperties
import com.myproject.experiment.jdbc.JdbcRecordInsertFromSparkSqlRow
import java.sql.DriverManager

import com.myproject.experiment.spark.Helpers._
import org.apache.spark.sql._

object InsertOperation {
  /**
    * Insert into the target table row by row individually from each partition of the insertDF
    *
    * @param insertDF: dataframe that holds data to insert
    * @param targetTable: target table name, e.g. transaction_table
    * @param datasetId: id of the original dataset from which insertDF is derived, e.g. 261
    */
  def insertTable(insertDF: DataFrame, targetTable: String, datasetId: Long) = {
    Class.forName("com.mysql.jdbc.Driver")

    // Get from system configuration the url and properties to connect to the database
    val (jdbcUrl, connProperties) = getConnProperties()

    val (insertSchema, insertCols) = (insertDF.schema, insertDF.columns.toList)

    insertDF.foreachPartition({rows =>
      val conn = DriverManager.getConnection(jdbcUrl, connProperties)

      try {
        val targetRecordInsert = new JdbcRecordInsertFromSparkSqlRow(conn, targetTable, insertCols.map((_, None)))

        rows.foreach({row =>
          try {
            targetRecordInsert.execute(row, insertSchema, insertCols)
          } catch {
            case _: Throwable =>
          }
        })
      } finally {
        conn.close()
      }
    })
  }
}
Hopefully, by now, what this code does is pretty obvious:
  • The insertCols list is the list of all columns of the insertDF (line 24)
  • In the initialization of the targetRecordInsert object (line 30), its usage indicates that all columns of this dataframe will be inserted into the target table as values bound to placeholders (insertCols.map((_, None)))
  • Passing insertCols to targetRecordInsert.execute() (line 34) just follows through this commitment.

I guess you’d agree this implementation is as simple and elegant as the Update counterpart.

Insert target record with literal values and binding values from both SparkSQL row as well as computed on the fly

Next, just like with the Update operation, we tackle this more general case. As with the Update counterpart, again, assume we work with the target table with the following structure:

create table transaction_table (
    unique_key_col varchar(1000) not null,
    dataset_id bigint not null,
    string_data_col varchar(50),
    number_data_col int,
    write_timestamp datetime,
    unique key (unique_key_col)
);

The insertDF at this point would have a schema like this:

scala> insertDF.printSchema
root
 |-- unique_key_col: string (nullable = false)
 |-- dataset_id: long (nullable = false)
 |-- string_data_col: string (nullable = true)
 |-- number_data_col: integer (nullable = true)

Since for each Insert operation run, we work with only one dataset, all rows in the insertDF share the same value for column dataset_id. This is a known value that gets passed in the function insertTable() via the argument datasetId (line 18).

In the implementation of the previous section, this value is bound dynamically at execution time repeatedly for each row. We can do better than that. To have optimized performance, instead, it should be baked directly into the Insert string template. The performance benefit would be considerable in the more general case when there are more columns like dataset_id, and the insertDF is non-trivial in size.

Regarding column write_timestamp of the target table. As explained in the last post, this column obtains value from the system time, not from the current row. In other words, its binding value gets computed on the fly for each row.

So to handle both columns dataset_id (to have better performance) and write_timestamp (to obtain the right value), an Insert template like the following example needs to be issued:

insert into transaction_table
       (unique_key_col, dataset_id, string_data_col, number_data_col, write_timestamp)
values (?, 261, ?, ?, ?)

We can achieve this with some changes to our current implementation. First, some additional imports:

import java.util.Calendar
import java.sql.{DriverManager, Timestamp}
import java.sql.Types.TIMESTAMP

Then, some changes to the insertTable() function:

def insertTable(insertDF: DataFrame, targetTable: String, datasetId: Long) = {
  Class.forName("com.mysql.jdbc.Driver")

  // Get from system configuration the url and properties to connect to the database
  val (jdbcUrl, connProperties) = getConnProperties()

  val insertDF2 = insertDF.drop("dataset_id")

  val (insertSchema, insertCols) = (insertDF2.schema, insertDF2.columns.toList)

  insertDF2.foreachPartition({rows =>
    val conn = DriverManager.getConnection(jdbcUrl, connProperties)

    try {
      val targetRecordInsert = new JdbcRecordInsertFromSparkSqlRow(conn, targetTable, (insertCols :+ "write_timestamp").map((_, None)) :+ ("dataset_id", Some(datasetId)))

      rows.foreach({row =>
        try {
          val now = new Timestamp(Calendar.getInstance().getTime().getTime)
          val insertColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, insertSchema, insertCols)

          targetRecordInsert.execute(insertColsBoundValues :+ ("write_timestamp", Some(now), TIMESTAMP))
        } catch {
          case _: Throwable =>
        }
      })
    } finally {
      conn.close()
    }
  })
}

First, we need to drop the dataset_id column from insertDF to arrive at insertDF2 (line 7) because later on, we’d build this column “manually” for the template with the specified literal value. From now on, we’d work with insertDF2.

From here, we proceed fairly similar to the way we did with the Update counterpart. All the real changes next happen within the outer try block of the insertDF2.foreachPartition() call (lines 14 – 29). Particularly:
  • We append the write_timestamp as an additional placeholder column to the insertCols list (line 15).
  • We extend that list with dataset_id as having the specified literal value (line 15).
  • Then, in executing each row, we manually build the list of binding values:
    • Obtain the values for insertCols using retrieveJdbcValuesFromSparkSqlRow() (line 20).
    • Compute the current timestamp from the system time (line 19), then build the required value triplet (line 22).
  • Finally, we perform the insert by calling the right version of the execute() method, passing in this list of binding values (line 22).

This demonstrates the general case in which, for writing to the target record, we don’t need to dynamically bind values to all columns. Moreover, the current SparkSQL row doesn’t provide all the binding values needed. To handle the latter, we need to resort to the more “basic” version of the execute() method, and likely “manually” construct the values needed appropriately.