美文网首页大数据工具包
Structured Streaming自定义MySQLSink

Structured Streaming自定义MySQLSink

作者: 0o青团o0 | 来源:发表于2019-12-11 13:52 被阅读0次

之前使用过foreach单条处理的MySQLSink,可能导致连续开关连接,性能较差,故通过prepareStatement的addBatch批量处理数据。

class MySQLBatchSink(urlName: String, properties: Properties,
                     tableName: String,
                     fieldNames: Array[String],
                     sqlType: SqlType.Value) extends ForeachWriter[Row]() with Logging {
  Class.forName(properties.getProperty("driver"))
  var conn: Connection = _
  var ps: PreparedStatement = _

  val values: String = fieldNames.map(_ => "?").mkString(",")
  val sqlStr: String = sqlType match {
    case SqlType.Replace => s"REPLACE INTO `$tableName` (`${fieldNames.mkString("`,`")}`) VALUES ($values)"
    case SqlType.Update => s"UPDATE `$tableName` SET `${fieldNames.mkString("`=?, `")}`=?"
    case SqlType.Upsert =>
      s"""
         |INSERT INTO `$tableName` (`${fieldNames.mkString("`,`")}`) VALUES ($values)
         |ON DUPLICATE KEY UPDATE `${fieldNames.mkString("`=?, `")}`=?
         |""".stripMargin
  }

  override def open(partitionId: Long, epochId: Long): Boolean = {
    conn = DriverManager.getConnection(properties.getProperty(urlName), properties)
    conn.setAutoCommit(false)
    ps = conn.prepareStatement(sqlStr)
    true
  }

  override def process(value: Row): Unit = {
    for (i <- 0 until value.size) {
      val index_head: Int = i + 1
      prepareStatament(value, i, index_head)

      if (sqlType == SqlType.Upsert) {
        val index_tail: Int = index_head + value.size
        prepareStatament(value, i, index_tail)
      }
    }
    ps.addBatch()
  }

  override def close(errorOrNull: Throwable): Unit = {
    try {
      ps.executeBatch()
      conn.commit()
    } catch {
      case NonFatal(e) => logWarning("Exception committing transaction", e)
    } finally {
      try {
        if (ps != null) ps.close()
      } catch {
        case e: Exception => logWarning("Exception closing prepareStatement", e)
      }

      try {
        if (conn != null) conn.close()
      } catch {
        case e: Exception => logWarning("Exception closing connection", e)
      }
    }
  }

  private def prepareStatament(value: Row, i: Int, index: Int): Unit = {
    value.get(i) match {
      case v: Int => ps.setInt(index, v)
      case v: Long => ps.setLong(index, v)
      case v: String => ps.setString(index, v)
      case v: Timestamp => ps.setTimestamp(index, v)
      case v: Float => ps.setFloat(index, v)
      case v: Double => ps.setDouble(index, v)
      case v: java.math.BigDecimal => ps.setBigDecimal(index, v)
      case v: Boolean => ps.setBoolean(index, v)
      case v: Byte => ps.setByte(index, v)
      case v: Short => ps.setShort(index, v)
      case null => ps.setNull(index, SparkUtil.sparkTypeToSqlType(value.schema.fields(i).dataType))
      case _ => throw new IllegalArgumentException(s"No support for Spark SQL type ${value.schema.fields(i).dataType}")
    }
  }
}

这样在process按条处理时只是addBatch,真正提交是在close时(即当前批次结束)才执行并关闭连接。

相关文章

网友评论

    本文标题:Structured Streaming自定义MySQLSink

    本文链接:https://www.haomeiwen.com/subject/htmogctx.html