<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          百萬級數(shù)據(jù)批量讀寫入MySQL

          共 2766字,需瀏覽 6分鐘

           ·

          2020-11-04 09:11

          Spark SQL讀取MySQL的方式

          Spark SQL還包括一個可以使用JDBC從其他數(shù)據(jù)庫讀取數(shù)據(jù)的數(shù)據(jù)源。與使用JdbcRDD相比,應(yīng)優(yōu)先使用此功能。這是因為結(jié)果作為DataFrame返回,它們可以在Spark SQL中輕松處理或與其他數(shù)據(jù)源連接。JDBC數(shù)據(jù)源也更易于使用Java或Python,因為它不需要用戶提供ClassTag。

          可以使用Data Sources API將遠程數(shù)據(jù)庫中的表加載為DataFrame或Spark SQL臨時視圖。用戶可以在數(shù)據(jù)源選項中指定JDBC連接屬性。user和password通常作為用于登錄數(shù)據(jù)源的連接屬性。除連接屬性外,Spark還支持以下不區(qū)分大小寫的選項:

          屬性名稱解釋
          url要連接的JDBC URL
          dbtable讀取或?qū)懭氲腏DBC表
          query指定查詢語句
          driver用于連接到該URL的JDBC驅(qū)動類名
          partitionColumn, lowerBound, upperBound如果指定了這些選項,則必須全部指定。另外, numPartitions必須指定
          numPartitions表讀寫中可用于并行處理的最大分區(qū)數(shù)。這也確定了并發(fā)JDBC連接的最大數(shù)量。如果要寫入的分區(qū)數(shù)超過此限制,我們可以通過coalesce(numPartitions)在寫入之前進行調(diào)用將其降低到此限制
          queryTimeout默認為0,查詢超時時間
          fetchsizeJDBC的獲取大小,它確定每次要獲取多少行。這可以幫助提高JDBC驅(qū)動程序的性能
          batchsize默認為1000,JDBC批處理大小,這可以幫助提高JDBC驅(qū)動程序的性能。
          isolationLevel事務(wù)隔離級別,適用于當前連接。它可以是一個NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ,或SERIALIZABLE,對應(yīng)于由JDBC的連接對象定義,缺省值為標準事務(wù)隔離級別READ_UNCOMMITTED。此選項僅適用于寫作。
          sessionInitStatement在向遠程數(shù)據(jù)庫打開每個數(shù)據(jù)庫會話之后,在開始讀取數(shù)據(jù)之前,此選項將執(zhí)行自定義SQL語句,使用它來實現(xiàn)會話初始化代碼。
          truncate這是與JDBC writer相關(guān)的選項。當SaveMode.Overwrite啟用時,就會清空目標表的內(nèi)容,而不是刪除和重建其現(xiàn)有的表。默認為false
          pushDownPredicate用于啟用或禁用謂詞下推到JDBC數(shù)據(jù)源的選項。默認值為true,在這種情況下,Spark將盡可能將過濾器下推到JDBC數(shù)據(jù)源。

          源碼

          • SparkSession
          /**
          ???*?Returns?a?[[DataFrameReader]]?that?can?be?used?to?read?non-streaming?data?in?as?a
          ???*?`DataFrame`.
          ???*?{{{
          ???*???sparkSession.read.parquet("/path/to/file.parquet")
          ???*???sparkSession.read.schema(schema).json("/path/to/file.json")
          ???*?}}}
          ???*
          ???*?@since?2.0.0
          ???*/

          ??def?read:?DataFrameReader?=?new?DataFrameReader(self)
          • DataFrameReader
          ??//?...省略代碼...
          ??/**
          ???*所有的數(shù)據(jù)由RDD的一個分區(qū)處理,如果你這個表很大,很可能會出現(xiàn)OOM
          ???*可以使用DataFrameDF.rdd.partitions.size方法查看
          ???*/

          ??def?jdbc(url:?String,?table:?String,?properties:?Properties):?DataFrame?=?{
          ????assertNoSpecifiedSchema("jdbc")
          ????this.extraOptions?++=?properties.asScala
          ????this.extraOptions?+=?(JDBCOptions.JDBC_URL?->?url,?JDBCOptions.JDBC_TABLE_NAME?->?table)
          ????format("jdbc").load()
          ??}
          /**
          ???*?@param?url?數(shù)據(jù)庫url
          ???*?@param?table?表名
          ???*?@param?columnName?分區(qū)字段名
          ???*?@param?lowerBound??`columnName`的最小值,用于分區(qū)步長
          ???*?@param?upperBound??`columnName`的最大值,用于分區(qū)步長.
          ???*?@param?numPartitions?分區(qū)數(shù)量?
          ???*?@param?connectionProperties?其他參數(shù)
          ???*?@since?1.4.0
          ???*/

          ??def?jdbc(
          ??????url:?String,
          ??????table:?String,
          ??????columnName:?String,
          ??????lowerBound:?Long,
          ??????upperBound:?Long,
          ??????numPartitions:?Int,
          ??????connectionProperties:?Properties):?DataFrame?=?{
          ????this.extraOptions?++=?Map(
          ??????JDBCOptions.JDBC_PARTITION_COLUMN?->?columnName,
          ??????JDBCOptions.JDBC_LOWER_BOUND?->?lowerBound.toString,
          ??????JDBCOptions.JDBC_UPPER_BOUND?->?upperBound.toString,
          ??????JDBCOptions.JDBC_NUM_PARTITIONS?->?numPartitions.toString)
          ????jdbc(url,?table,?connectionProperties)
          ??}

          ??/**
          ???*?@param?predicates?每個分區(qū)的where條件
          ???*?比如:"id <= 1000", "score > 1000 and score <= 2000"
          ???*?將會分成兩個分區(qū)
          ???*?@since?1.4.0
          ???*/

          ??def?jdbc(
          ??????url:?String,
          ??????table:?String,
          ??????predicates:?Array[String],
          ??????connectionProperties:?Properties):?DataFrame?=?{
          ????assertNoSpecifiedSchema("jdbc")
          ????val?params?=?extraOptions.toMap?++?connectionProperties.asScala.toMap
          ????val?options?=?new?JDBCOptions(url,?table,?params)
          ????val?parts:?Array[Partition]?=?predicates.zipWithIndex.map?{?case?(part,?i)?=>
          ??????JDBCPartition(part,?i)?:?Partition
          ????}
          ????val?relation?=?JDBCRelation(parts,?options)(sparkSession)
          ????sparkSession.baseRelationToDataFrame(relation)
          ??}

          示例

          ?private?def?runJdbcDatasetExample(spark:?SparkSession):?Unit?=?{
          ????
          ????//?從JDBC?source加載數(shù)據(jù)(load)
          ????val?jdbcDF?=?spark.read
          ??????.format("jdbc")
          ??????.option("url",?"jdbc:mysql://127.0.0.1:3306/test")
          ??????.option("dbtable",?"mytable")
          ??????.option("user",?"root")
          ??????.option("password",?"root")
          ??????.load()

          ????val?connectionProperties?=?new?Properties()
          ????connectionProperties.put("user",?"root")
          ????connectionProperties.put("password",?"root")
          ????val?jdbcDF2?=?spark.read
          ??????.jdbc("jdbc:mysql://127.0.0.1:3306/test",?"mytable",?connectionProperties)
          ????//?指定讀取schema的數(shù)據(jù)類型
          ????connectionProperties.put("customSchema",?"id?DECIMAL(38,?0),?name?STRING")
          ????val?jdbcDF3?=?spark.read
          ??????.jdbc("jdbc:mysql://127.0.0.1:3306/test",?"mytable",?connectionProperties)

          ??}

          值得注意的是,上面的方式如果不指定分區(qū)的話,Spark默認會使用一個分區(qū)讀取數(shù)據(jù),這樣在數(shù)據(jù)量特別大的情況下,會出現(xiàn)OOM。在讀取數(shù)據(jù)之后,調(diào)用DataFrameDF.rdd.partitions.size方法可以查看分區(qū)數(shù)。

          Spark SQL批量寫入MySQL

          代碼示例如下:

          object?BatchInsertMySQL?{
          ??case?class?Person(name:?String,?age:?Int)
          ??def?main(args:?Array[String]):?Unit?=?{

          ????//?創(chuàng)建sparkSession對象
          ????val?conf?=?new?SparkConf()
          ??????.setAppName("BatchInsertMySQL")
          ????val?spark:?SparkSession?=??SparkSession.builder()
          ??????.config(conf)
          ??????.getOrCreate()
          ????import?spark.implicits._
          ????//?MySQL連接參數(shù)
          ????val?url?=?JDBCUtils.url
          ????val?user?=?JDBCUtils.user
          ????val?pwd?=?JDBCUtils.password

          ????//?創(chuàng)建Properties對象,設(shè)置連接mysql的用戶名和密碼
          ????val?properties:?Properties?=?new?Properties()

          ????properties.setProperty("user",?user)?//?用戶名
          ????properties.setProperty("password",?pwd)?//?密碼
          ????properties.setProperty("driver",?"com.mysql.jdbc.Driver")
          ????properties.setProperty("numPartitions","10")

          ????//?讀取mysql中的表數(shù)據(jù)
          ????val?testDF:?DataFrame?=?spark.read.jdbc(url,?"test",?properties)
          ?????println("testDF的分區(qū)數(shù):??"?+?testDF.rdd.partitions.size)
          ???testDF.createOrReplaceTempView("test")
          ???testDF.persist(StorageLevel.MEMORY_AND_DISK)
          ???testDF.printSchema()

          ????val?result?=
          ??????s"""--?SQL代碼
          ???????????????"
          "".stripMargin

          ????val?resultBatch?=?spark.sql(result).as[Person]
          ????println("resultBatch的分區(qū)數(shù):?"?+?resultBatch.rdd.partitions.size)

          ????//?批量寫入MySQL
          ????//?此處最好對處理的結(jié)果進行一次重分區(qū)
          ????//?由于數(shù)據(jù)量特別大,會造成每個分區(qū)數(shù)據(jù)特別多
          ????resultBatch.repartition(500).foreachPartition(record?=>?{

          ??????val?list?=?new?ListBuffer[Person]
          ??????record.foreach(person?=>?{
          ????????val?name?=?Person.name
          ????????val?age?=?Person.age
          ????????list.append(Person(name,age))
          ??????})
          ??????upsertDateMatch(list)?//執(zhí)行批量插入數(shù)據(jù)
          ????})
          ????//?批量插入MySQL的方法
          ????def?upsertPerson(list:?ListBuffer[Person]):?Unit?=?{

          ??????var?connect:?Connection?=?null
          ??????var?pstmt:?PreparedStatement?=?null

          ??????try?{
          ????????connect?=?JDBCUtils.getConnection()
          ????????//?禁用自動提交
          ????????connect.setAutoCommit(false)

          ????????val?sql?=?"REPLACE?INTO?`person`(name,?age)"?+
          ??????????"?VALUES(?,??)"

          ????????pstmt?=?connect.prepareStatement(sql)

          ????????var?batchIndex?=?0
          ????????for?(person?<-?list)?{
          ??????????pstmt.setString(1,?person.name)
          ??????????pstmt.setString(2,?person.age)
          ??????????//?加入批次
          ??????????pstmt.addBatch()
          ??????????batchIndex?+=1
          ??????????//?控制提交的數(shù)量,
          ??????????// MySQL的批量寫入盡量限制提交批次的數(shù)據(jù)量,否則會把MySQL寫掛!!!
          ??????????if(batchIndex?%?1000?==?0?&&?batchIndex?!=0){
          ????????????pstmt.executeBatch()
          ????????????pstmt.clearBatch()
          ??????????}

          ????????}
          ????????//?提交批次
          ????????pstmt.executeBatch()
          ????????connect.commit()
          ??????}?catch?{
          ????????case?e:?Exception?=>
          ??????????e.printStackTrace()
          ??????}?finally?{
          ????????JDBCUtils.closeConnection(connect,?pstmt)
          ??????}
          ????}

          ????spark.close()
          ??}
          }

          JDBC連接工具類:

          object?JDBCUtils?{
          ??val?user?=?"root"
          ??val?password?=?"root"
          ??val?url?=?"jdbc:mysql://localhost:3306/mydb"
          ??Class.forName("com.mysql.jdbc.Driver")
          ??//?獲取連接
          ??def?getConnection()?=?{
          ????DriverManager.getConnection(url,user,password)
          ??}
          //?釋放連接
          ??def?closeConnection(connection:?Connection,?pstmt:?PreparedStatement):?Unit?=?{
          ????try?{
          ??????if?(pstmt?!=?null)?{
          ????????pstmt.close()
          ??????}
          ????}?catch?{
          ??????case?e:?Exception?=>?e.printStackTrace()
          ????}?finally?{
          ??????if?(connection?!=?null)?{
          ????????connection.close()
          ??????}
          ????}
          ??}
          }

          總結(jié)

          Spark寫入大量數(shù)據(jù)到MySQL時,在寫入之前盡量對寫入的DF進行重分區(qū)處理,避免分區(qū)內(nèi)數(shù)據(jù)過多。在寫入時,要注意使用foreachPartition來進行寫入,這樣可以為每一個分區(qū)獲取一個連接,在分區(qū)內(nèi)部設(shè)定批次提交,提交的批次不易過大,以免將數(shù)據(jù)庫寫掛。

          瀏覽 67
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人依人网 | 亚洲无码手机在线观看 | 五月月色色网网 | 亚洲人成人片77777 | 无码影视在线观看 |