百萬級數(shù)據(jù)批量讀寫入MySQL
Spark SQL讀取MySQL的方式
Spark SQL還包括一個可以使用JDBC從其他數(shù)據(jù)庫讀取數(shù)據(jù)的數(shù)據(jù)源。與使用JdbcRDD相比,應(yīng)優(yōu)先使用此功能。這是因為結(jié)果作為DataFrame返回,它們可以在Spark SQL中輕松處理或與其他數(shù)據(jù)源連接。JDBC數(shù)據(jù)源也更易于使用Java或Python,因為它不需要用戶提供ClassTag。
可以使用Data Sources API將遠程數(shù)據(jù)庫中的表加載為DataFrame或Spark SQL臨時視圖。用戶可以在數(shù)據(jù)源選項中指定JDBC連接屬性。user和password通常作為用于登錄數(shù)據(jù)源的連接屬性。除連接屬性外,Spark還支持以下不區(qū)分大小寫的選項:
| 屬性名稱 | 解釋 |
|---|---|
url | 要連接的JDBC URL |
dbtable | 讀取或?qū)懭氲腏DBC表 |
query | 指定查詢語句 |
driver | 用于連接到該URL的JDBC驅(qū)動類名 |
partitionColumn, lowerBound, upperBound | 如果指定了這些選項,則必須全部指定。另外, numPartitions必須指定 |
numPartitions | 表讀寫中可用于并行處理的最大分區(qū)數(shù)。這也確定了并發(fā)JDBC連接的最大數(shù)量。如果要寫入的分區(qū)數(shù)超過此限制,我們可以通過coalesce(numPartitions)在寫入之前進行調(diào)用將其降低到此限制 |
queryTimeout | 默認為0,查詢超時時間 |
fetchsize | JDBC的獲取大小,它確定每次要獲取多少行。這可以幫助提高JDBC驅(qū)動程序的性能 |
batchsize | 默認為1000,JDBC批處理大小,這可以幫助提高JDBC驅(qū)動程序的性能。 |
isolationLevel | 事務(wù)隔離級別,適用于當前連接。它可以是一個NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,對應(yīng)于由JDBC的連接對象定義,缺省值為標準事務(wù)隔離級別READ_UNCOMMITTED。此選項僅適用于寫作。 |
sessionInitStatement | 在向遠程數(shù)據(jù)庫打開每個數(shù)據(jù)庫會話之后,在開始讀取數(shù)據(jù)之前,此選項將執(zhí)行自定義SQL語句,使用它來實現(xiàn)會話初始化代碼。 |
truncate | 這是與JDBC writer相關(guān)的選項。當SaveMode.Overwrite啟用時,就會清空目標表的內(nèi)容,而不是刪除和重建其現(xiàn)有的表。默認為false |
pushDownPredicate | 用于啟用或禁用謂詞下推到JDBC數(shù)據(jù)源的選項。默認值為true,在這種情況下,Spark將盡可能將過濾器下推到JDBC數(shù)據(jù)源。 |
源碼
SparkSession
/**
???*?Returns?a?[[DataFrameReader]]?that?can?be?used?to?read?non-streaming?data?in?as?a
???*?`DataFrame`.
???*?{{{
???*???sparkSession.read.parquet("/path/to/file.parquet")
???*???sparkSession.read.schema(schema).json("/path/to/file.json")
???*?}}}
???*
???*?@since?2.0.0
???*/
??def?read:?DataFrameReader?=?new?DataFrameReader(self)
DataFrameReader
??//?...省略代碼...
??/**
???*所有的數(shù)據(jù)由RDD的一個分區(qū)處理,如果你這個表很大,很可能會出現(xiàn)OOM
???*可以使用DataFrameDF.rdd.partitions.size方法查看
???*/
??def?jdbc(url:?String,?table:?String,?properties:?Properties):?DataFrame?=?{
????assertNoSpecifiedSchema("jdbc")
????this.extraOptions?++=?properties.asScala
????this.extraOptions?+=?(JDBCOptions.JDBC_URL?->?url,?JDBCOptions.JDBC_TABLE_NAME?->?table)
????format("jdbc").load()
??}
/**
???*?@param?url?數(shù)據(jù)庫url
???*?@param?table?表名
???*?@param?columnName?分區(qū)字段名
???*?@param?lowerBound??`columnName`的最小值,用于分區(qū)步長
???*?@param?upperBound??`columnName`的最大值,用于分區(qū)步長.
???*?@param?numPartitions?分區(qū)數(shù)量?
???*?@param?connectionProperties?其他參數(shù)
???*?@since?1.4.0
???*/
??def?jdbc(
??????url:?String,
??????table:?String,
??????columnName:?String,
??????lowerBound:?Long,
??????upperBound:?Long,
??????numPartitions:?Int,
??????connectionProperties:?Properties):?DataFrame?=?{
????this.extraOptions?++=?Map(
??????JDBCOptions.JDBC_PARTITION_COLUMN?->?columnName,
??????JDBCOptions.JDBC_LOWER_BOUND?->?lowerBound.toString,
??????JDBCOptions.JDBC_UPPER_BOUND?->?upperBound.toString,
??????JDBCOptions.JDBC_NUM_PARTITIONS?->?numPartitions.toString)
????jdbc(url,?table,?connectionProperties)
??}
??/**
???*?@param?predicates?每個分區(qū)的where條件
???*?比如:"id <= 1000", "score > 1000 and score <= 2000"
???*?將會分成兩個分區(qū)
???*?@since?1.4.0
???*/
??def?jdbc(
??????url:?String,
??????table:?String,
??????predicates:?Array[String],
??????connectionProperties:?Properties):?DataFrame?=?{
????assertNoSpecifiedSchema("jdbc")
????val?params?=?extraOptions.toMap?++?connectionProperties.asScala.toMap
????val?options?=?new?JDBCOptions(url,?table,?params)
????val?parts:?Array[Partition]?=?predicates.zipWithIndex.map?{?case?(part,?i)?=>
??????JDBCPartition(part,?i)?:?Partition
????}
????val?relation?=?JDBCRelation(parts,?options)(sparkSession)
????sparkSession.baseRelationToDataFrame(relation)
??}
示例
?private?def?runJdbcDatasetExample(spark:?SparkSession):?Unit?=?{
????
????//?從JDBC?source加載數(shù)據(jù)(load)
????val?jdbcDF?=?spark.read
??????.format("jdbc")
??????.option("url",?"jdbc:mysql://127.0.0.1:3306/test")
??????.option("dbtable",?"mytable")
??????.option("user",?"root")
??????.option("password",?"root")
??????.load()
????val?connectionProperties?=?new?Properties()
????connectionProperties.put("user",?"root")
????connectionProperties.put("password",?"root")
????val?jdbcDF2?=?spark.read
??????.jdbc("jdbc:mysql://127.0.0.1:3306/test",?"mytable",?connectionProperties)
????//?指定讀取schema的數(shù)據(jù)類型
????connectionProperties.put("customSchema",?"id?DECIMAL(38,?0),?name?STRING")
????val?jdbcDF3?=?spark.read
??????.jdbc("jdbc:mysql://127.0.0.1:3306/test",?"mytable",?connectionProperties)
??}
值得注意的是,上面的方式如果不指定分區(qū)的話,Spark默認會使用一個分區(qū)讀取數(shù)據(jù),這樣在數(shù)據(jù)量特別大的情況下,會出現(xiàn)OOM。在讀取數(shù)據(jù)之后,調(diào)用DataFrameDF.rdd.partitions.size方法可以查看分區(qū)數(shù)。
Spark SQL批量寫入MySQL
代碼示例如下:
object?BatchInsertMySQL?{
??case?class?Person(name:?String,?age:?Int)
??def?main(args:?Array[String]):?Unit?=?{
????//?創(chuàng)建sparkSession對象
????val?conf?=?new?SparkConf()
??????.setAppName("BatchInsertMySQL")
????val?spark:?SparkSession?=??SparkSession.builder()
??????.config(conf)
??????.getOrCreate()
????import?spark.implicits._
????//?MySQL連接參數(shù)
????val?url?=?JDBCUtils.url
????val?user?=?JDBCUtils.user
????val?pwd?=?JDBCUtils.password
????//?創(chuàng)建Properties對象,設(shè)置連接mysql的用戶名和密碼
????val?properties:?Properties?=?new?Properties()
????properties.setProperty("user",?user)?//?用戶名
????properties.setProperty("password",?pwd)?//?密碼
????properties.setProperty("driver",?"com.mysql.jdbc.Driver")
????properties.setProperty("numPartitions","10")
????//?讀取mysql中的表數(shù)據(jù)
????val?testDF:?DataFrame?=?spark.read.jdbc(url,?"test",?properties)
?????println("testDF的分區(qū)數(shù):??"?+?testDF.rdd.partitions.size)
???testDF.createOrReplaceTempView("test")
???testDF.persist(StorageLevel.MEMORY_AND_DISK)
???testDF.printSchema()
????val?result?=
??????s"""--?SQL代碼
???????????????""".stripMargin
????val?resultBatch?=?spark.sql(result).as[Person]
????println("resultBatch的分區(qū)數(shù):?"?+?resultBatch.rdd.partitions.size)
????//?批量寫入MySQL
????//?此處最好對處理的結(jié)果進行一次重分區(qū)
????//?由于數(shù)據(jù)量特別大,會造成每個分區(qū)數(shù)據(jù)特別多
????resultBatch.repartition(500).foreachPartition(record?=>?{
??????val?list?=?new?ListBuffer[Person]
??????record.foreach(person?=>?{
????????val?name?=?Person.name
????????val?age?=?Person.age
????????list.append(Person(name,age))
??????})
??????upsertDateMatch(list)?//執(zhí)行批量插入數(shù)據(jù)
????})
????//?批量插入MySQL的方法
????def?upsertPerson(list:?ListBuffer[Person]):?Unit?=?{
??????var?connect:?Connection?=?null
??????var?pstmt:?PreparedStatement?=?null
??????try?{
????????connect?=?JDBCUtils.getConnection()
????????//?禁用自動提交
????????connect.setAutoCommit(false)
????????val?sql?=?"REPLACE?INTO?`person`(name,?age)"?+
??????????"?VALUES(?,??)"
????????pstmt?=?connect.prepareStatement(sql)
????????var?batchIndex?=?0
????????for?(person?<-?list)?{
??????????pstmt.setString(1,?person.name)
??????????pstmt.setString(2,?person.age)
??????????//?加入批次
??????????pstmt.addBatch()
??????????batchIndex?+=1
??????????//?控制提交的數(shù)量,
??????????// MySQL的批量寫入盡量限制提交批次的數(shù)據(jù)量,否則會把MySQL寫掛!!!
??????????if(batchIndex?%?1000?==?0?&&?batchIndex?!=0){
????????????pstmt.executeBatch()
????????????pstmt.clearBatch()
??????????}
????????}
????????//?提交批次
????????pstmt.executeBatch()
????????connect.commit()
??????}?catch?{
????????case?e:?Exception?=>
??????????e.printStackTrace()
??????}?finally?{
????????JDBCUtils.closeConnection(connect,?pstmt)
??????}
????}
????spark.close()
??}
}
JDBC連接工具類:
object?JDBCUtils?{
??val?user?=?"root"
??val?password?=?"root"
??val?url?=?"jdbc:mysql://localhost:3306/mydb"
??Class.forName("com.mysql.jdbc.Driver")
??//?獲取連接
??def?getConnection()?=?{
????DriverManager.getConnection(url,user,password)
??}
//?釋放連接
??def?closeConnection(connection:?Connection,?pstmt:?PreparedStatement):?Unit?=?{
????try?{
??????if?(pstmt?!=?null)?{
????????pstmt.close()
??????}
????}?catch?{
??????case?e:?Exception?=>?e.printStackTrace()
????}?finally?{
??????if?(connection?!=?null)?{
????????connection.close()
??????}
????}
??}
}
總結(jié)
Spark寫入大量數(shù)據(jù)到MySQL時,在寫入之前盡量對寫入的DF進行重分區(qū)處理,避免分區(qū)內(nèi)數(shù)據(jù)過多。在寫入時,要注意使用foreachPartition來進行寫入,這樣可以為每一個分區(qū)獲取一個連接,在分區(qū)內(nèi)部設(shè)定批次提交,提交的批次不易過大,以免將數(shù)據(jù)庫寫掛。
