I might or might not push the code to GitHub. Let me see. If I do, until then, here’s the listing of the essence of the code referenced in this series. They are generally heavily commented to aid your understanding.
SparkSQL Values Retrieval Facility
package com.myproject.experiment.spark import java.sql.Types._ import java.sql.{Date, Timestamp} import org.apache.spark.sql.types._ import org.apache.spark.sql._ object Helpers { /** * Get information from the dataframe relevant to the update operation * * @param updateDF: dataframe that holds data to update * @param uniqueKey: comma separated list of columns composing the unique key, e.g. "unique_key_col_1, unique_key_col_2" * @return tuple (schema, uniqueKeyColList, updateColList) * - schema: schema of updateDF * - uniqueKeyColList: scala list of columns making up the unique key, e.g. List(unique_key_col_1, unique_key_col_2) * - updateColList: scala list of data columns that will be updated, e.g. List(string_data_col, number_data_col) */ def getUpdateInfo(updateDF: DataFrame, uniqueKey: String): (StructType, List[String], List[String]) = { val uniqueKeyCols = uniqueKey.split(",\\W*") val updateCols = updateDF.columns diff uniqueKeyCols (updateDF.schema, uniqueKeyCols.toList, updateCols.toList) } /** * Retrieves the values of columns of interest in jdbc types from a SparkSQL row * @param row: sparkSql row to retrieve values from * @param schema: schema of the dataframe the row belongs to * Note that given the row, the schema can be inferred within this function * But passing in the schema available from the client code instead of inferring it every time for each row is slightly more efficient * @param cols: Scala list of columns in the row to retrieve values for * @return: List((column, valueOpt, sqlType)): Scala list of triplets representing the values retrieved * - column: column name * - valueOpt: Some(value) where value is the scala value converted from the sparkSql value, or None if the sparkSql value is null * - sqlType: the type of the converted value as an int; this is the SQL type code defined in java.sql.Types */ def retrieveJdbcValuesFromSparkSqlRow(row: Row, schema: StructType, cols: List[String]): List[(String, Option[Any], Int)] = { /** * Retrieves the values of columns of interest in the original SparkSQL types from a SparkSQL row * @param row: sparkSql row to retrieve values from * @param schema: schema of the dataframe the row belongs to * @param cols: Scala list of columns in the row to retrieve values for * @return: List((column, sparkSQL value, sparkSQL type)): Scala list of triplets representing the values retrieved * - column: column name * - sparkSQL value: original SparkSQL value, possibly null * - sparkSQL type: type of the SparkSQL value as defined in org.apache.spark.sql.types */ def retrieveValuesFromSparkSqlRow(row: Row, schema: StructType, cols: List[String]): List[(String, Any, DataType)] = { cols.map(col => (col, row.getAs(col), schema(col).dataType)) } /** * Converts a sparkSql value (as extracted from a dataframe row) to a jdbc value * in preparation to write it to a jdbc db via executing a jdbc statement * @param sparkSqlValue: sparkSql value to convert * @param sparkSqlType: sparkSqlType of the value to convert * @return: (valueOpt, sqlType) * - valueOpt: Some(value) where value is the scala value converted from the sparkSql value, or None if the sparkSql value is null * - sqlType: the type of the converted value; this is the SQL type code defined in java.sql.Types */ def sparkSqlToJdbc(sparkSqlValue: Any, sparkSqlType: DataType): (Option[Any], Int) = { val sparkSqlTypeToJdbcType: Map[DataType, Int] = Map(LongType -> BIGINT, IntegerType -> INTEGER, BooleanType -> BOOLEAN, StringType -> VARCHAR, TimestampType -> TIMESTAMP, DateType -> DATE, DoubleType -> DOUBLE, FloatType -> FLOAT) val jdbcValue = if (sparkSqlValue != null) { Some(sparkSqlType match { case IntegerType => sparkSqlValue.asInstanceOf[Int] case StringType => sparkSqlValue.asInstanceOf[String] case BooleanType => sparkSqlValue.asInstanceOf[Boolean] case LongType => sparkSqlValue.asInstanceOf[Long] case FloatType => sparkSqlValue.asInstanceOf[Float] case DoubleType => sparkSqlValue.asInstanceOf[Double] case TimestampType => sparkSqlValue.asInstanceOf[Timestamp] case DateType => sparkSqlValue.asInstanceOf[Date] }) } else None (jdbcValue, sparkSqlTypeToJdbcType(sparkSqlType)) } retrieveValuesFromSparkSqlRow(row, schema, cols).map(elem => { val (jdbcValue, jdbcType) = sparkSqlToJdbc(elem._2, elem._3) (elem._1, jdbcValue, jdbcType) }) } }
Utility to execute SQL statement in the DB via JDBC
package com.myproject.experiment.jdbc import java.sql.{Date, PreparedStatement, Timestamp} object Helpers { /** * Execute an arbitrary non-query SQL statement * * @param preparedSql: a precompiled SQL statement, which may have placeholders ? to be plugged in with real binding values at execution time * @param boundValues: list of binding values (boundValueOpt, sqlType) for the prepared statement * - boundValueOpt: Some(boundValue) or None, which represents null * - sqlType: the SQL type code defined in java.sql.Types; only used when the boundValue is null */ def execSQL(preparedSql: PreparedStatement, boundValues: List[(Option[Any], Int)]) = { for (((boundValueOpt, sqlType), boundPosition) <- boundValues zip (1 to boundValues.size)) { if (boundValueOpt != None) { boundValueOpt.get match { case i: Int => preparedSql.setInt(boundPosition, i) case s: String => preparedSql.setString(boundPosition, s) case l: Long => preparedSql.setLong(boundPosition, l) case b: Boolean => preparedSql.setBoolean(boundPosition, b) case t: Timestamp => preparedSql.setTimestamp(boundPosition, t) case d: Date => preparedSql.setDate(boundPosition, d) case db: Double => preparedSql.setDouble(boundPosition, db) case f: Float => preparedSql.setFloat(boundPosition, f) } } else { preparedSql.setNull(boundPosition, sqlType) } } val ret = preparedSql.execute } }
Generic JDBC Write Facility
package com.myproject.experiment.jdbc import java.sql.{Connection, PreparedStatement} import com.myproject.experiment.jdbc.Helpers.execSQL /** Common logic for writing (encompassing both update and insert) to a target record in a jdbc table * This class is strictly concerned with the business of writing to a jdbc record, * and has nothing to do with Spark * * @param conn a jdbc connection */ abstract class JdbcRecordWrite(val conn: Connection) { private var preparedSql: PreparedStatement = null private var placeHolderCols: List[String] = null init() /** * Initializes state, including precompiling the sql write string template, and determining which columns in that template are placeholders */ private def init() = { val sqlStmtString = getSqlStmtString() preparedSql = conn.prepareStatement(sqlStmtString) placeHolderCols = getPlaceHolderCols(getAllPreparedColumns()) } /** * Determine which columns of the sql statement are placeholders * * @param allPreparedColumns: List of all table columns accessed by this sql statement. * This is a list of tuple (column name, valueOpt), where valueOpt is either: * - Some(value): represents a literal value to bake in the sql string template * - None: represents a placeholder * @return list of names of placeholder columns */ private def getPlaceHolderCols(allPreparedColumns: List[(String, Option[Any])]): List[String] = { allPreparedColumns.filter({ case(col, valueOpt) => valueOpt == None }).map(_._1) } /** * Return list of all table columns accessed by this sql statement; abstract method to be implemented by subclass */ protected def getAllPreparedColumns(): List[(String, Option[Any])] /** * Return the sql write (update or insert) string template; abstract method to be implemented by subclass */ protected def getSqlStmtString(): String /** * Make sure the binding columns at runtime when writing a given record matches with the placeholder columns prepared at compile time * If mismatch, an exception gets thrown * * @param boundCols: List of binding columns when writing a given record. */ private def checkBoundCols(boundCols: List[String]) = { if (placeHolderCols != boundCols) throw new IllegalArgumentException("Invalid request to execute JDBC Statement: placeholder columns and bound columns are incompatible!") } /** * Execute the write operation to the target record * * @param boundValues: List((column, valueOpt, sqlType)): Scala list of triplets representing the runtime binding values * - column: binding column name * - valueOpt: Some(value) where value is the non-null binding value, or None if null * - sqlType: the type of the binding value as an int; this is the SQL type code defined in java.sql.Types */ def execute(boundValues: List[(String, Option[Any], Int)]) = { checkBoundCols(boundValues.map(_._1)) execSQL(preparedSql, boundValues.map({ case (_, jdbcValue, jdbcType) => (jdbcValue, jdbcType) })) } }
package com.myproject.experiment.jdbc import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType /** Defines behavior for retrieving values from a SparkSql row and writing to a target jdbc record * */ trait WritingFromSparkSqlRow { def execute(row: Row, schema: StructType, boundCols: List[String]) }
package com.myproject.experiment.jdbc import java.sql.Connection import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import com.myproject.experiment.spark.Helpers.retrieveJdbcValuesFromSparkSqlRow /** Common logic for writing (encompassing both update and insert) to a target record in a jdbc table * This class enables retrieving values from a SparkSql row and writing them to the corresponding target jdbc record * * @param conn a jdbc connection */ abstract class JdbcRecordWriteFromSparkSqlRow(conn: Connection) extends JdbcRecordWrite(conn) with WritingFromSparkSqlRow { /** * Execute the write operation to the target record * * @param row: sparkSql row to retrieve values from * @param schema: schema of the dataframe the row belongs to * @param boundCols: Scala list of columns in the row to retrieve values for */ def execute(row: Row, schema: StructType, boundCols: List[String]) = { val boundValues = retrieveJdbcValuesFromSparkSqlRow(row, schema, boundCols) execute(boundValues) } }
Utility to support building SQL String Template
package com.myproject.experiment.jdbc /** Provides utility to support building the sql string template that will compile to the prepared statement * */ trait SqlStringBuildingSupporter { /** * Generate the proper presentation format for a literal value in the template * * @param value: the literal value of concern * @return the literal value in the presentation format acceptable by the string template */ protected def getLiteralValueForSqlString(value: Any): Any = { if (value == null) "null" else { value match { case s: String => "'" + s + "'" case v: Any => v } } } }
The Update Facility
package com.myproject.experiment.jdbc import java.sql.Connection /** Logic for updating a target record in a jdbc table * This class enables retrieving values from a SparkSql row or just taking a set of values from somewhere (not necessarily Spark) * then updating them to the corresponding target jdbc record * * @param conn a jdbc connection * @param table name of the target jdbc table * @param setClauseColumns list of columns to be updated in the target record * @param whereClauseColumns list of columns uniquely identifying the target record to be updated */ class JdbcRecordUpdateFromSparkSqlRow(conn: Connection, val table: String, val setClauseColumns: List[(String, Option[Any])], val whereClauseColumns: List[(String, Option[Any])]) extends JdbcRecordWriteFromSparkSqlRow(conn) with SqlStringBuildingSupporter { /** * Return list of all table columns accessed by this Update sql statement */ override protected def getAllPreparedColumns(): List[(String, Option[Any])] = setClauseColumns ++ whereClauseColumns /** * Generate the string representing a given column. This string will be plugged in the Update sql statement template * * @param col: name of the given column * @param valueOpt: instantiating value of the given column, which is either: * - Some(value): represents a literal value to bake in the Update sql string template * - None: represents a placeholder * @return the string representing the given column in the Update sql statement template. * This string would be of the form "col = literal value" or "col = ?" */ private def getOneColumnForUpdateSqlString(col: String, valueOpt: Option[Any]): String = { col + " = " + { valueOpt match { case Some(value) => getLiteralValueForSqlString(value) case None => "?" } } } /** * Construct the Update sql string template * This template potentially has placeholders ? and literal values * * @return Update sql string template, e.g. * update transaction_table set string_data_col = 'changed_string_data_value_1', * number_data_col = null * where unique_key_col_1 = ? and * unique_key_col_2 = 127 */ override protected def getSqlStmtString(): String = { val updateClause = "update " + table val setClause = "set " + setClauseColumns.map({ case(col, valueOpt) => getOneColumnForUpdateSqlString(col, valueOpt) }).mkString(", ") val whereClause = "where " + whereClauseColumns.map({ case(col, valueOpt) => getOneColumnForUpdateSqlString(col, valueOpt) }).mkString(" and ") updateClause + " " + setClause + " " + whereClause } }
The Insert Facility
package com.myproject.experiment.jdbc import java.sql.Connection /** Logic for inserting a record into a jdbc table * This class enables retrieving values from a SparkSql row or just taking a set of values from somewhere (not necessarily Spark) * then using them to create the corresponding new target jdbc record * * @param conn a jdbc connection * @param table name of the target jdbc table * @param insertColumns list of columns to be inserted with the target record */ class JdbcRecordInsertFromSparkSqlRow(conn: Connection, val table: String, val insertColumns: List[(String, Option[Any])]) extends JdbcRecordWriteFromSparkSqlRow(conn) with SqlStringBuildingSupporter { /** * Return list of all columns inserted with the record by this Insert sql statement */ override protected def getAllPreparedColumns(): List[(String, Option[Any])] = insertColumns /** * Construct the Insert sql string template * This template potentially has placeholders ? and literal values * * @return Insert sql string template, e.g. * insert into transaction_table * (unique_key_col_1, unique_key_col_2, dataset_id, string_data_col, number_data_col) * values (?, ?, 199, null, ?) */ override protected def getSqlStmtString(): String = { val insTableClause = "insert into " + table val colNamesClause = "(" + insertColumns.map(_._1).mkString(", ") + ")" val valuesClause = "(" + insertColumns.map({ case (_, valueOpt) => if (valueOpt != None) getLiteralValueForSqlString(valueOpt.get) else "?" }).mkString(", ") + ")" insTableClause + " " + colNamesClause + " values " + valuesClause } }
The Failure Reporting Facility
create table error_log ( dataset_id bigint not null, k text not null, v mediumtext, error_message text, write_timestamp datetime );
package com.myproject.experiment.errorhandling import java.sql.{Connection, Timestamp} import java.sql.Types._ import java.util.Calendar import com.myproject.experiment.jdbc.JdbcRecordInsertFromSparkSqlRow import com.myproject.experiment.spark.Helpers.retrieveJdbcValuesFromSparkSqlRow import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Row /** Log a failed record write to the error_log table * * @param conn a jdbc connection * @param datasetId id of the dataset the failed record belongs to * @param uniqueKeyColumns list of unique key columns of the record * @param dataColumns list of data columns of the record */ class FailedRecordHandler(conn: Connection, datasetId: Long, uniqueKeyColumns: List[String], dataColumns: List[String]) { private val uniqueKeyColsSet = uniqueKeyColumns.toSet private val dataColsSet = dataColumns.toSet // Initialize the object to insert into the error_log table private val errorLogInsert = new JdbcRecordInsertFromSparkSqlRow(conn, "error_log", List( ("dataset_id", Some(datasetId)), ("k", None), ("v", None), ("error_message", None), ("write_timestamp", None) )) /** * Concatenate a list of triplet values into a single triplet value * * @param combinedColName: name of the column component of the result triplet value. It represents a column in the error_log table * @param columns: List of triplet values to concatenate. A triplet (column name, valueOpt, jdbcType) is typically obtained from a SparkSQL row * @return the single concatenated triplet. Its value would then be inserted to the corresponding column of this failed record in error_log table */ private def getCombinedColumn(combinedColName: String, columns: List[(String, Option[Any], Int)]): (String, Option[Any], Int) = { def combineCol(accuCol: Tuple3[String, Option[Any], Int], col: Tuple3[String, Option[Any], Int]): (String, Option[Any], Int) = { val colValue = col._2 match { case Some(v) => col._1 + ":::: " + v.toString case None => col._1 + ":::: " } var accuColValue = accuCol._2.get accuColValue = if (accuColValue == "") colValue else accuColValue + " | " + colValue (accuCol._1, Some(accuColValue), accuCol._3) } columns.foldLeft((combinedColName, Some("").asInstanceOf[Option[Any]], VARCHAR))(combineCol) } /** * Insert the values of the failed record to the error_log table * * @param e: Exception that got raised when the record failed to write to the target table * @param uniqueKeyValues: Values of the unique key columns of the failed record * @param dataValues: Values of the data columns of the failed record */ def handle(e: Throwable, uniqueKeyValues: List[(String, Option[Any], Int)], dataValues: List[(String, Option[Any], Int)]): Unit = { if (uniqueKeyColsSet != uniqueKeyValues.map(_._1).toSet) throw new Exception("FailedRecordHandler: invalid set of unique key columns!") if (dataColsSet != dataValues.map(_._1).toSet) throw new Exception("FailedRecordHandler: invalid set of data columns!") val kColBoundValue = getCombinedColumn("k", uniqueKeyValues) val vColBoundValue = getCombinedColumn("v", dataValues) val now = new Timestamp(Calendar.getInstance().getTime().getTime) errorLogInsert.execute(List(kColBoundValue, vColBoundValue, ("error_message", Some(e.toString), VARCHAR), ("write_timestamp", Some(now), TIMESTAMP))) } /** * Insert the values of the failed record to the error_log table * * @param e: Exception that got raised when the record failed to write to the target table * @param row: The SparkSQL row from which the failed record is derived * @param schema: Schema of the SparkSQL row from which the failed record is derived */ def handle(e: Throwable, row: Row, schema: StructType): Unit = { val uniqueKeyValues = retrieveJdbcValuesFromSparkSqlRow(row, schema, uniqueKeyColumns) val dataValues = retrieveJdbcValuesFromSparkSqlRow(row, schema, dataColumns) handle(e, uniqueKeyValues, dataValues) } }
The Update Operation
package com.myproject.experiment import java.util.Calendar import java.sql.{DriverManager, Timestamp} import java.sql.Types.TIMESTAMP import com.myproject.experiment.errorhandling.FailedRecordHandler import com.myproject.experiment.jdbc.JdbcRecordUpdateFromSparkSqlRow import com.myproject.experiment.spark.Helpers._ import com.myproject.jdbc.utils.SQLHelpers.getConnProperties import org.apache.spark.sql.DataFrame object UpdateOperation { /** * Update the target table row by row individually from each partition of the updateDF * * @param updateDF: dataframe that holds data to update * @param uniqueKey: comma separated list of columns composing the unique key, e.g. "unique_key_col_1, unique_key_col_2" * @param targetTable: target table name, e.g. transaction_table * @param datasetId: id of the original dataset from which updateDF is derived, e.g. 199 */ def updateTableV1(updateDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = { Class.forName("com.mysql.jdbc.Driver") // Get from system configuration the url and properties to connect to the database val (jdbcUrl, connProperties) = getConnProperties() val (updateSchema, uniqueKeyCols, updateCols) = getUpdateInfo(updateDF, uniqueKey) updateDF.foreachPartition({rows => val conn = DriverManager.getConnection(jdbcUrl, connProperties) try { val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, updateCols.map((_, None)), uniqueKeyCols.map((_, None))) val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, updateCols) rows.foreach({row => try { targetRecordUpdate.execute(row, updateSchema, updateCols ++ uniqueKeyCols) } catch { case e: Throwable => failedRecordHandler.handle(e, row, updateSchema) } }) } finally { conn.close() } }) } def updateTableV2(updateDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = { Class.forName("com.mysql.jdbc.Driver") // Get from system configuration the url and properties to connect to the database val (jdbcUrl, connProperties) = getConnProperties() val (updateSchema, uniqueKeyCols, updateCols) = getUpdateInfo(updateDF, uniqueKey) updateDF.foreachPartition({rows => val conn = DriverManager.getConnection(jdbcUrl, connProperties) try { val targetRecordUpdate = new JdbcRecordUpdateFromSparkSqlRow(conn, targetTable, (updateCols :+ "write_timestamp").map((_, None)), uniqueKeyCols.map((_, None)) :+ ("dataset_id", Some(datasetId))) val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, updateCols) var updateColsBoundValues: List[(String, Option[Any], Int)] = null var uniqueKeyColsBoundValues: List[(String, Option[Any], Int)] = null rows.foreach({row => try { val now = new Timestamp(Calendar.getInstance().getTime().getTime) updateColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, updateCols) uniqueKeyColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, updateSchema, uniqueKeyCols) targetRecordUpdate.execute((updateColsBoundValues :+ ("write_timestamp", Some(now), TIMESTAMP)) ++ uniqueKeyColsBoundValues) } catch { case e: Throwable => failedRecordHandler.handle(e, uniqueKeyColsBoundValues, updateColsBoundValues) } }) } finally { conn.close() } }) } }
The Insert Operation
package com.myproject.experiment import java.sql.Types._ import com.myproject.jdbc.utils.SQLHelpers.getConnProperties import com.myproject.experiment.jdbc.JdbcRecordInsertFromSparkSqlRow import java.sql.{DriverManager, Timestamp} import java.util.Calendar import com.myproject.experiment.errorhandling.FailedRecordHandler import com.myproject.experiment.spark.Helpers._ import org.apache.spark.sql._ object InsertOperation { /** * Insert into the target table row by row individually from each partition of the insertDF * * @param insertDF: dataframe that holds data to insert * @param targetTable: target table name, e.g. transaction_table * @param datasetId: id of the original dataset from which insertDF is derived, e.g. 261 */ def insertTableV1(insertDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = { Class.forName("com.mysql.jdbc.Driver") // Get from system configuration the url and properties to connect to the database val (jdbcUrl, connProperties) = getConnProperties() val (insertSchema, insertCols) = (insertDF.schema, insertDF.columns.toList) val uniqueKeyCols = uniqueKey.split(",\\W*").toList val dataCols = insertCols diff uniqueKeyCols insertDF.foreachPartition({rows => val conn = DriverManager.getConnection(jdbcUrl, connProperties) try { val targetRecordInsert = new JdbcRecordInsertFromSparkSqlRow(conn, targetTable, insertCols.map((_, None))) val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, dataCols) rows.foreach({row => try { targetRecordInsert.execute(row, insertSchema, insertCols) } catch { case e: Throwable => failedRecordHandler.handle(e, row, insertSchema) } }) } finally { conn.close() } }) } def insertTableV2(insertDF: DataFrame, uniqueKey: String, targetTable: String, datasetId: Long) = { Class.forName("com.mysql.jdbc.Driver") // Get from system configuration the url and properties to connect to the database val (jdbcUrl, connProperties) = getConnProperties() val insertDF2 = insertDF.drop("dataset_id") val (insertSchema, insertCols) = (insertDF2.schema, insertDF2.columns.toList) val uniqueKeyCols = uniqueKey.split(",\\W*").toList val dataCols = insertCols diff uniqueKeyCols insertDF2.foreachPartition({rows => val conn = DriverManager.getConnection(jdbcUrl, connProperties) try { val targetRecordInsert = new JdbcRecordInsertFromSparkSqlRow(conn, targetTable, (uniqueKeyCols ++ dataCols :+ "write_timestamp").map((_, None)) :+ ("dataset_id", Some(datasetId))) val failedRecordHandler = new FailedRecordHandler(conn, datasetId, uniqueKeyCols, dataCols) var uniqueKeyColsBoundValues: List[(String, Option[Any], Int)] = null var dataColsBoundValues: List[(String, Option[Any], Int)] = null rows.foreach({row => try { val now = new Timestamp(Calendar.getInstance().getTime().getTime) uniqueKeyColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, insertSchema, uniqueKeyCols) dataColsBoundValues = retrieveJdbcValuesFromSparkSqlRow(row, insertSchema, dataCols) targetRecordInsert.execute(uniqueKeyColsBoundValues ++ dataColsBoundValues :+ ("write_timestamp", Some(now), TIMESTAMP)) } catch { case e: Throwable => failedRecordHandler.handle(e, uniqueKeyColsBoundValues, dataColsBoundValues) } }) } finally { conn.close() } }) } }
Enabling Precompilation of SQL String Template for Prepared Statement in JDBC Connection
package com.myproject.jdbc.utils import java.util.Properties object SQLHelpers { // Other utilities def getConnProperties(): (String, Properties) = { // Other setup val connProperties = new Properties() // Set up other connection properties // Enable server-side prepared statements connProperties.put("useServerPrepStmts", "true") // Set up connection string val jdbcUrl = s"jdbc:mysql://${jdbcHostName}:${jdbcPort}/${jdbcDatabase}" (jdbcUrl, connProperties) } }