<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark SQL知識點與實戰(zhàn)

          共 25112字,需瀏覽 51分鐘

           ·

          2021-11-27 13:10

          本文目錄

          • Spark SQL概述

            • 1、什么是Spark SQL

            • 2、Spark SQL的特點

            • 3、什么的DataFrame

            • 4、什么是DataSet

          • Spark SQL編程

            • 1、Spark Session新的起始點

            • 2、DataFrame

            • 3、DataSet

            • 4、DataFrame與DataSet的互操作

          • Spark SQL數(shù)據(jù)的加載與保存

            • 1、通用的加載與保存方式

            • 2、JSON文件

            • 3、MySQL

            • 4、Hive

          • Spark SQL實戰(zhàn)

            • 1、數(shù)據(jù)準備

            • 2、需求

            • 知識星球

          Spark SQL概述

          1、什么是Spark SQL

          Spark SQL是Spark用于結構化數(shù)據(jù)(structured data)處理的Spark模塊。
          與基本的Spark RDD API不同,Spark SQL的抽象數(shù)據(jù)類型為Spark提供了關于數(shù)據(jù)結構和正在執(zhí)行的計算的更多信息。
          在內部,Spark SQL使用這些額外的信息去做一些額外的優(yōu)化,有多種方式與Spark SQL進行交互,比如: SQL和DatasetAPI。
          當計算結果的時候,使用的是相同的執(zhí)行引擎,不依賴你正在使用哪種API或者語言。這種統(tǒng)一也就意味著開發(fā)者可以很容易在不同的API之間進行切換,這些API提供了最自然的方式來表達給定的轉換。
          Hive是將Hive SQL轉換成 MapReduce然后提交到集群上執(zhí)行,大大簡化了編寫MapReduce的程序的復雜性,由于MapReduce這種計算模型執(zhí)行效率比較慢。所以Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執(zhí)行,執(zhí)行效率非常快!
          Spark SQL它提供了2個編程抽象,類似Spark Core中的RDD
          (1)DataFrame
          (2)Dataset

          2、Spark SQL的特點

          1)易整合

          無縫的整合了SQL查詢和Spark編程

          2)統(tǒng)一的數(shù)據(jù)訪問方式

          使用相同的方式連接不同的數(shù)據(jù)源

          3)兼容Hive

          在已有的倉庫上直接運行SQL或者HiveQL

          4)標準的數(shù)據(jù)連接

          通過JDBC或者ODBC來連接

          3、什么的DataFrame

          在Spark中,DataFrame是一種以RDD為基礎的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進行了針對性的優(yōu)化,最終達到大幅提升運行時效率的目標。反觀RDD,由于無從得知所存數(shù)據(jù)元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優(yōu)化。
          同時,與Hive類似,DataFrame也支持嵌套數(shù)據(jù)類型(struct、array和map)。從API易用性的角度上看,DataFrame API提供的是一套高層的關系操作,比函數(shù)式的RDD API要更加友好,門檻更低。

          圖片1

          上圖直觀地體現(xiàn)了DataFrame和RDD的區(qū)別。
          左側的RDD[Person]雖然以Person為類型參數(shù),但Spark框架本身不了解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得 Spark SQL 可以清楚地知道該數(shù)據(jù)集中包含哪些列,每列的名稱和類型各是什么。
          DataFrame是為數(shù)據(jù)提供了Schema的視圖。可以把它當做數(shù)據(jù)庫中的一張表來對待,DataFrame也是懶執(zhí)行的,但性能上比RDD要高,主要原因:優(yōu)化的執(zhí)行計劃,即查詢計劃通過Spark catalyst optimiser進行優(yōu)化。比如下面一個例子:

          圖2

          為了說明查詢優(yōu)化,我們來看上圖展示的人口數(shù)據(jù)分析的示例。圖中構造了兩個DataFrame,將它們join之后又做了一次filter操作。
          如果原封不動地執(zhí)行這個執(zhí)行計劃,最終的執(zhí)行效率是不高的。因為join是一個代價較大的操作,也可能會產(chǎn)生一個較大的數(shù)據(jù)集。如果我們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾后的較小的結果集,便可以有效縮短執(zhí)行時間。而Spark SQL的查詢優(yōu)化器正是這樣做的。簡而言之,邏輯查詢計劃優(yōu)化就是一個利用基于關系代數(shù)的等價變換,將高成本的操作替換為低成本操作的過程。

          圖片3

          4、什么是DataSet

          DataSet是分布式數(shù)據(jù)集合。DataSet是Spark 1.6中添加的一個新抽象,是DataFrame的一個擴展。它提供了RDD的優(yōu)勢(強類型,使用強大的lambda函數(shù)的能力)以及Spark SQL優(yōu)化執(zhí)行引擎的優(yōu)點。DataSet也可以使用功能性的轉換(操作map,flatMap,filter等等)。
          1)是DataFrame API的一個擴展,是SparkSQL最新的數(shù)據(jù)抽象;
          2)用戶友好的API風格,既具有類型安全檢查也具有DataFrame的查詢優(yōu)化特性;
          3)用樣例類來定義DataSet中數(shù)據(jù)的結構信息,樣例類中每個屬性的名稱直接映射到DataSet中的字段名稱;
          4)DataSet是強類型的。比如可以有DataSet[Car],DataSet[Person]。
          5)DataFrame是DataSet的特列,DataFrame=DataSet[Row]?,所以可以通過as方法將DataFrame轉換為DataSet。Row是一個類型,跟Car、Person這些的類型一樣,所有的表結構信息都用Row來表示。

          Spark SQL編程

          1、Spark Session新的起始點

          在老的版本中,SparkSQL提供兩種SQL查詢起始點:一個叫SQLContext,用于Spark自己提供的SQL查詢;一個叫HiveContext,用于連接Hive的查詢。
          SparkSession是Spark最新的SQL查詢起始點,實質上是SQLContext和HiveContext的組合,所以在SQLContex和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的。當我們使用 spark-shell 的時候, spark 會自動的創(chuàng)建一個叫做spark的SparkSession, 就像我們以前可以自動獲取到一個sc來表示SparkContext

          圖片1

          2、DataFrame

          Spark SQL的DataFrame API 允許我們使用 DataFrame 而不用必須去注冊臨時表或者生成SQL表達式。DataFrame API 既有transformation操作也有action操作,DataFrame的轉換從本質上來說更具有關系, 而 DataSet API 提供了更加函數(shù)式的 API。

          2.1 創(chuàng)建DataFrame

          在Spark SQL中SparkSession是創(chuàng)建DataFrame和執(zhí)行SQL的入口,創(chuàng)建DataFrame有三種方式:通過Spark的數(shù)據(jù)源進行創(chuàng)建從一個存在的RDD進行轉換還可以從Hive Table進行查詢返回

          2.2 SQL風格語法

          SQL語法風格是指我們查詢數(shù)據(jù)的時候使用SQL語句來查詢,這種風格的查詢必須要有臨時視圖或者全局視圖來輔助
          1)創(chuàng)建一個DataFrame

          scala>?val?df?=?spark.read.json("/opt/module/spark-local/people.json")
          df:?org.apache.spark.sql.DataFrame?=?[age:?bigint,?name:?string]

          2)對DataFrame創(chuàng)建一個臨時表

          scala>?df.createOrReplaceTempView("people")

          3)通過SQL語句實現(xiàn)查詢全表

          scala>?val?sqlDF?=?spark.sql("SELECT?*?FROM?people")
          sqlDF:?org.apache.spark.sql.DataFrame?=?[age:?bigint,?name:?string]

          4)結果展示

          scala>?sqlDF.show
          +---+--------+
          |age|????name|
          +---+--------+
          |?18|qiaofeng|
          |?19|??duanyu|
          |?20|???xuzhu|
          +---+--------+

          ?注意:普通臨時表是Session范圍內的,如果想應用范圍內有效,可以使用全局臨時表。使用全局臨時表時需要全路徑訪問,如:global_temp.people

          5)對于DataFrame創(chuàng)建一個全局表

          scala>?df.createGlobalTempView("people")

          6)通過SQL語句實現(xiàn)查詢全表

          scala>?spark.sql("SELECT?*?FROM?global_temp.people").show()
          +---+--------+
          |age|????name|
          +---+--------+
          |?18|qiaofeng|
          |?19|??duanyu|
          |?20|???xuzhu|
          +---+--------+

          scala>?spark.newSession().sql("SELECT?*?FROM?global_temp.people").show()
          +---+--------+
          |age|????name|
          +---+--------+
          |?18|qiaofeng|
          |?19|??duanyu|
          |?20|???xuzhu|
          +---+--------+

          2.3 DSL風格語法

          DataFrame提供一個特定領域語言(domain-specific language, DSL)去管理結構化的數(shù)據(jù),可以在Scala, Java, Python和R中使用DSL,使用DSL語法風格不必去創(chuàng)建臨時視圖了。
          1)創(chuàng)建一個DataFrame

          scala>?val?df?=?spark.read.json("/opt/module/spark-local?/people.json")
          df:?org.apache.spark.sql.DataFrame?=?[age:?bigint,?name:?string]

          2)查看DataFrame的Schema信息

          scala>?df.printSchema
          root
          ?|--?age:?Long?(nullable?=?true)
          ?|--?name:?string?(nullable?=?true)

          3)只查看”name”列數(shù)據(jù)

          scala>?df.select("name").show()
          +--------+
          |????name|
          +--------+
          |qiaofeng|
          |??duanyu|
          |???xuzhu|
          +--------+

          4)查看所有列

          scala>?df.select("*").show
          +--------+---------+
          |????name?|age|
          +--------+---------+
          |qiaofeng|???????18|
          |??duanyu|???????19|
          |???xuzhu|???????20|
          +--------+---------+

          5)查看”name”列數(shù)據(jù)以及”age+1”數(shù)據(jù)
          注意:涉及到運算的時候, 每列都必須使用$

          scala>?df.select($"name",$"age"?+?1).show
          +--------+---------+
          |????name|(age?+?1)|
          +--------+---------+
          |qiaofeng|???????19|
          |??duanyu|???????20|
          |???xuzhu|???????21|
          +--------+---------+

          6)查看”age”大于”19”的數(shù)據(jù)

          scala>?df.filter($"age">19).show
          +---+-----+
          |age|?name|
          +---+-----+
          |?20|xuzhu|
          +---+-----+

          7)按照”age”分組,查看數(shù)據(jù)條數(shù)

          scala>?df.groupBy("age").count.show
          +---+-----+
          |age|count|
          +---+-----+
          |?19|????1|
          |?18|????1|
          |?20|????1|
          +---+-----+

          2.4 RDD轉換為DataFrame

          在 IDEA 中開發(fā)程序時,如果需要RDD 與DF 或者DS 之間互相操作,那么需要引入import spark.implicits._。
          這里的spark不是Scala中的包名,而是創(chuàng)建的sparkSession 對象的變量名稱,所以必須先創(chuàng)建 SparkSession 對象再導入。這里的 spark 對象不能使用var 聲明,因為 Scala 只支持val 修飾的對象的引入。
          spark-shell 中無需導入,自動完成此操作。

          scala>?val?idRDD?=?sc.textFile("data/id.txt")?scala>?idRDD.toDF("id").show
          +---+
          |?id|
          +---+
          |?1|
          |?2|
          |?3|
          |?4|
          +---+

          實際開發(fā)中,一般通過樣例類將RDD轉換為DataFrame。

          scala>?case?class?User(name:String,?age:Int)?defined?class?User
          scala>?sc.makeRDD(List(("zhangsan",30),?("lisi",40))).map(t=>User(t._1,?t._2)).toDF.show
          +--------+---+
          |?name|age|
          +--------+---+

          2.5 DataFrame轉換為RDD

          DataFrame其實就是對RDD的封裝,所以可以直接獲取內部的RDD

          scala>?val?df?=?sc.makeRDD(List(("zhangsan",30),?("lisi",40))).map(t=>User(t._1,?t._2)).toDF
          df:?org.apache.spark.sql.DataFrame?=?[name:?string,?age:?int]

          scala>?val?rdd?=?df.rdd
          rdd:?org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]?=?MapPartitionsRDD[46]?at?rdd?at?:25

          scala>?val?array?=?rdd.collect
          array:?Array[org.apache.spark.sql.Row]?=?Array([zhangsan,30],?[lisi,40])

          注意:此時得到的RDD存儲類型為Row

          scala>?array(0)
          res28:?org.apache.spark.sql.Row?=?[zhangsan,30]?scala>?array(0)(0)
          res29:?Any?=?zhangsan
          scala>?array(0).getAs[String]("name")?res30:?String?=?zhangsan

          3、DataSet

          DataSet是具有強類型的數(shù)據(jù)集合,需要提供對應的類型信息。

          3.1 創(chuàng)建DataSet

          1)使用樣例類序列創(chuàng)建DataSet

          scala>?case?class?Person(name:?String,?age:?Long)
          defined?class?Person

          scala>?val?caseClassDS?=?Seq(Person("wangyuyan",2)).toDS()

          caseClassDS:?org.apache.spark.sql.Dataset[Person]?=?[name:?string,?age:?Long]

          scala>?caseClassDS.show
          +---------+---+
          |?????name|age|
          +---------+---+
          |wangyuyan|??2|
          +---------+---+

          2)使用基本類型的序列創(chuàng)建DataSet

          scala>?val?ds?=?Seq(1,2,3,4,5,6).toDS
          ds:?org.apache.spark.sql.Dataset[Int]?=?[value:?int]

          scala>?ds.show
          +-----+
          |value|
          +-----+
          |????1|
          |????2|
          |????3|
          |????4|
          |????5|
          |????6|
          +-----+

          注意:在實際使用的時候,很少用到把序列轉換成DataSet,更多是通過RDD來得到DataSet。

          3.2 RDD轉換為DataSet

          SparkSQL能夠自動將包含有樣例類的RDD轉換成DataSet,樣例類定義了table的結構,樣例類屬性通過反射變成了表的列名。樣例類可以包含諸如Seq或者Array等復雜的結構。

          1)創(chuàng)建一個RDD

          scala>?val?peopleRDD?=?sc.textFile("/opt/module/spark-local/people.txt")

          peopleRDD:?org.apache.spark.rdd.RDD[String]?=?/opt/module/spark-local/people.txt?MapPartitionsRDD[19]?at?textFile?at?:24

          2)創(chuàng)建一個樣例類

          scala>?case?class?Person(name:String,age:Int)
          defined?class?Person
          3)將RDD轉化為DataSet??
          scala>?peopleRDD.map(line?=>?{val?fields?=?line.split(",");Person(fields(0),fields(1).?toInt)}).toDS

          res0:?org.apache.spark.sql.Dataset[Person]?=?[name:?string,?age:?Long]

          3.3DataSet轉換為RDD

          調用rdd方法即可。
          1)創(chuàng)建一個DataSet

          scala>?val?DS?=?Seq(Person("zhangcuishan",?32)).toDS()

          DS:?org.apache.spark.sql.Dataset[Person]?=?[name:?string,?age:?Long]

          2)將DataSet轉換為RDD

          scala>?DS.rdd

          res1:?org.apache.spark.rdd.RDD[Person]?=?MapPartitionsRDD[6]?at?rdd?at?:28

          4、DataFrame與DataSet的互操作

          4.1 DataFrame轉為DataSet

          1)創(chuàng)建一個DateFrame

          scala>?val?df?=?spark.read.json("/opt/module/spark-local/people.json")

          df:?org.apache.spark.sql.DataFrame?=?[age:?bigint,?name:?string]

          2)創(chuàng)建一個樣例類

          scala>?case?class?Person(name:?String,age:?Long)
          defined?class?Person

          3)將DataFrame轉化為DataSet

          scala>?df.as[Person]

          res5:?org.apache.spark.sql.Dataset[Person]?=?[age:?bigint,?name:?string]

          這種方法就是在給出每一列的類型后,使用as方法,轉成Dataset,這在數(shù)據(jù)類型是DataFrame又需要針對各個字段處理時極為方便。在使用一些特殊的操作時,一定要加上 import spark.implicits._?不然toDF、toDS無法使用。

          4.2Dataset轉為DataFrame

          1)創(chuàng)建一個樣例類

          scala>?case?class?Person(name:?String,age:?Long)
          defined?class?Person

          2)創(chuàng)建DataSet

          scala>?val?ds?=?Seq(Person("zhangwuji",32)).toDS()

          ds:?org.apache.spark.sql.Dataset[Person]?=?[name:?string,?age:?bigint]

          3)將DataSet轉化為DataFrame

          scala>?var?df?=?ds.toDF
          df:?org.apache.spark.sql.DataFrame?=?[name:?string,?age:?bigint]

          4)展示

          scala>?df.show
          +---------+---+
          |?????name|age|
          +---------+---+
          |zhangwuji|?32|
          +---------+---+

          5、IDEA實踐

          1)Maven工程添加依賴

          <dependency>
          ?<groupId>org.apache.sparkgroupId>
          ?<artifactId>spark-sql_2.11artifactId>
          ?<version>2.1.1version>
          dependency>

          2)代碼實現(xiàn)

          object?SparkSQL01_Demo?{
          ??def?main(args:?Array[String]):?Unit?=?{
          ????//創(chuàng)建上下文環(huán)境配置對象
          ????val?conf:?SparkConf?=?new?SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")

          ????//創(chuàng)建SparkSession對象
          ????val?spark:?SparkSession?=?SparkSession.builder().config(conf).getOrCreate()
          ????//RDD=>DataFrame=>DataSet轉換需要引入隱式轉換規(guī)則,否則無法轉換
          ????//spark不是包名,是上下文環(huán)境對象名
          ????import?spark.implicits._

          ????//讀取json文件?創(chuàng)建DataFrame??{"username":?"lisi","age":?18}
          ????val?df:?DataFrame?=?spark.read.json("D:\\dev\\workspace\\spark-bak\\spark-bak-00\\input\\test.json")
          ????//df.show()

          ????//SQL風格語法
          ????df.createOrReplaceTempView("user")
          ????//spark.sql("select?avg(age)?from?user").show

          ????//DSL風格語法
          ????//df.select("username","age").show()

          ????//*****RDD=>DataFrame=>DataSet*****
          ????//RDD
          ????val?rdd1:?RDD[(Int,?String,?Int)]?=?spark.sparkContext.makeRDD(List((1,"qiaofeng",30),(2,"xuzhu",28),(3,"duanyu",20)))

          ????//DataFrame
          ????val?df1:?DataFrame?=?rdd1.toDF("id","name","age")
          ????//df1.show()

          ????//DateSet
          ????val?ds1:?Dataset[User]?=?df1.as[User]
          ????//ds1.show()

          ????//*****DataSet=>DataFrame=>RDD*****
          ????//DataFrame
          ????val?df2:?DataFrame?=?ds1.toDF()

          ????//RDD??返回的RDD類型為Row,里面提供的getXXX方法可以獲取字段值,類似jdbc處理結果集,但是索引從0開始
          ????val?rdd2:?RDD[Row]?=?df2.rdd
          ????//rdd2.foreach(a=>println(a.getString(1)))

          ????//*****RDD=>DataSe*****
          ????rdd1.map{
          ??????case?(id,name,age)=>User(id,name,age)
          ????}.toDS()

          ????//*****DataSet=>=>RDD*****
          ????ds1.rdd

          ????//釋放資源
          ????spark.stop()
          ??}
          }
          case?class?User(id:Int,name:String,age:Int)

          Spark SQL數(shù)據(jù)的加載與保存

          1、通用的加載與保存方式

          1)spark.read.load是加載數(shù)據(jù)的通用方法
          2)df.write.save 是保存數(shù)據(jù)的通用方法

          1.1 數(shù)據(jù)加載

          1)read直接加載數(shù)據(jù)

          scala>?spark.read.

          csv???format???jdbc???json???load???option???options???orc???parquet???schema???table???text???textFile

          注意:加載數(shù)據(jù)的相關參數(shù)需寫到上述方法中,如:textFile需傳入加載數(shù)據(jù)的路徑,jdbc需傳入JDBC相關參數(shù)。
          例如:直接加載Json數(shù)據(jù)

          scala>?spark.read.json("/opt/module/spark-local/people.json").show
          +---+--------+
          |age|????name|
          +---+--------+
          |?18|qiaofeng|
          |?19|??duanyu|
          |?20|???xuzhu|

          2)format指定加載數(shù)據(jù)類型

          scala>?spark.read.format("…")[.option("…")].load("…")

          用法詳解:
          (1)format("…"):指定加載的數(shù)據(jù)類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"
          (2)load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要傳入加載數(shù)據(jù)的路徑
          (3)option("…"):在"jdbc"格式下需要傳入JDBC相應參數(shù),url、user、password和dbtable
          例如:使用format指定加載Json類型數(shù)據(jù)

          scala>?spark.read.format("json").load?("/opt/module/spark-local/people.json").show
          +---+--------+
          |age|????name|
          +---+--------+
          |?18|qiaofeng|
          |?19|??duanyu|
          |?20|???xuzhu|

          3)在文件上直接運行SQL
          前面的是使用read API先把文件加載到DataFrame然后再查詢,也可以直接在文件上進行查詢。

          scala>??spark.sql("select?*?from?json.`/opt/module/spark-local/people.json`").show

          +---+--------+
          |age|????name|
          +---+--------+
          |?18|qiaofeng|
          |?19|??duanyu|
          |?20|???xuzhu|
          +---+--------+|

          說明:json表示文件的格式. 后面的文件具體路徑需要用反引號括起來。

          1.2 保存數(shù)據(jù)

          1)write直接保存數(shù)據(jù)

          scala>?df.write.
          csv??jdbc???json??orc???parquet?textFile…?…

          注意:保存數(shù)據(jù)的相關參數(shù)需寫到上述方法中。如:textFile需傳入加載數(shù)據(jù)的路徑,jdbc需傳入JDBC相關參數(shù)。
          例如:直接將df中數(shù)據(jù)保存到指定目錄

          //默認保存格式為parquet
          scala>?df.write.save("/opt/module/spark-local/output")
          //可以指定為保存格式,直接保存,不需要再調用save了
          scala>?df.write.json("/opt/module/spark-local/output")

          2)format指定保存數(shù)據(jù)類型

          scala>?df.write.format("…")[.option("…")].save("…")

          用法詳解:
          (1)format("…"):指定保存的數(shù)據(jù)類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
          (2)save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要傳入保存數(shù)據(jù)的路徑。
          (3)option("…"):在"jdbc"格式下需要傳入JDBC相應參數(shù),url、user、password和dbtable

          3)文件保存選項
          保存操作可以使用 SaveMode, 用來指明如何處理數(shù)據(jù),使用mode()方法來設置。有一點很重要: 這些 SaveMode 都是沒有加鎖的, 也不是原子操作。
          SaveMode是一個枚舉類,其中的常量包括:

          2021-05-13_183502

          例如:使用指定format指定保存類型進行保存

          df.write.mode("append").json("/opt/module/spark-local/output")??

          1.3 默認數(shù)據(jù)源

          Spark SQL的默認數(shù)據(jù)源為Parquet格式。數(shù)據(jù)源為Parquet文件時,Spark SQL可以方便的執(zhí)行所有的操作,不需要使用format。修改配置項spark.sql.sources.default,可修改默認數(shù)據(jù)源格式。
          1)加載數(shù)據(jù)

          val?df?=?spark.read.load("/opt/module/spark-local/examples/src/main/resources/users.parquet").show

          +------+--------------+----------------+
          |??name|favorite_color|favorite_numbers|
          +------+--------------+----------------+
          |Alyssa|??????????null|??[3,?9,?15,?20]|
          |???Ben|???????????red|??????????????[]|
          +------+--------------+----------------+

          df:?Unit?=?()

          2)保存數(shù)據(jù)

          scala>?var?df?=?spark.read.json("/opt/module/spark-local/people.json")
          //保存為parquet格式
          scala>?df.write.mode("append").save("/opt/module/spark-local/output")

          2、JSON文件

          Spark SQL能夠自動推測JSON數(shù)據(jù)集的結構,并將它加載為一個Dataset[Row]。可以通過SparkSession.read.json()去加載一個一個JSON文件。
          注意:這個JSON文件不是一個傳統(tǒng)的JSON文件,每一行都得是一個JSON串。格式如下:

          {"name":"Michael"}
          {"name":"Andy","age":30}
          {"name":"Justin","age":19}

          1)導入隱式轉換

          import?spark.implicits._

          2)加載JSON文件

          val?path?=?"/opt/module/spark-local/people.json"
          val?peopleDF?=?spark.read.json(path)

          3)創(chuàng)建臨時表

          peopleDF.createOrReplaceTempView("people")

          4)數(shù)據(jù)查詢

          val?teenagerNamesDF?=?spark.sql("SELECT?name?FROM?people?WHERE?age?BETWEEN?13?AND?19")
          teenagerNamesDF.show()
          +------+
          |??name|
          +------+
          |Justin|
          +------+

          3、MySQL

          Spark SQL可以通過JDBC從關系型數(shù)據(jù)庫中讀取數(shù)據(jù)的方式創(chuàng)建DataFrame,通過對DataFrame一系列的計算后,還可以將數(shù)據(jù)再寫回關系型數(shù)據(jù)庫中。
          **如果使用spark-shell操作,可在啟動shell時指定相關的數(shù)據(jù)庫驅動路徑或者將相關的數(shù)據(jù)庫驅動放到spark的類路徑下。**

          bin/spark-shell?
          --jars?mysql-connector-java-5.1.27-bin.jar

          這里演示在Idea中通過JDBC對Mysql進行操作

          3.1 導入依賴

          <dependency>
          ????<groupId>mysqlgroupId>
          ????<artifactId>mysql-connector-javaartifactId>
          ????<version>5.1.27version>
          dependency>

          3.2 從JDBC讀數(shù)據(jù)

          object?SparkSQL02_Datasource?{
          ??def?main(args:?Array[String]):?Unit?=?{
          ????//創(chuàng)建上下文環(huán)境配置對象
          ????val?conf:?SparkConf?=?new?SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")

          ????//創(chuàng)建SparkSession對象
          ????val?spark:?SparkSession?=?SparkSession.builder().config(conf).getOrCreate()

          ????import?spark.implicits._

          ????//方式1:通用的load方法讀取
          ????spark.read.format("jdbc")
          ??????.option("url",?"jdbc:mysql://hadoop202:3306/test")
          ??????.option("driver",?"com.mysql.jdbc.Driver")
          ??????.option("user",?"root")
          ??????.option("password",?"123456")
          ??????.option("dbtable",?"user")
          ??????.load().show

          ????
          ????//方式2:通用的load方法讀取?參數(shù)另一種形式
          ????spark.read.format("jdbc")
          ??????.options(Map("url"->"jdbc:mysql://hadoop202:3306/test?user=root&password=123456",
          ????????"dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show

          ????//方式3:使用jdbc方法讀取
          ????val?props:?Properties?=?new?Properties()
          ????props.setProperty("user",?"root")
          ????props.setProperty("password",?"123456")
          ????val?df:?DataFrame?=?spark.read.jdbc("jdbc:mysql://hadoop202:3306/test",?"user",?props)
          ????df.show

          ????//釋放資源
          ????spark.stop()
          ??}
          }

          3.3 向JDBC寫數(shù)據(jù)

          object?SparkSQL03_Datasource?{
          ??def?main(args:?Array[String]):?Unit?=?{
          ????//創(chuàng)建上下文環(huán)境配置對象
          ????val?conf:?SparkConf?=?new?SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")

          ????//創(chuàng)建SparkSession對象
          ????val?spark:?SparkSession?=?SparkSession.builder().config(conf).getOrCreate()
          ????import?spark.implicits._

          ????val?rdd:?RDD[User2]?=?spark.sparkContext.makeRDD(List(User2("lisi",?20),?User2("zs",?30)))
          ????val?ds:?Dataset[User2]?=?rdd.toDS
          ????//方式1:通用的方式? format指定寫出類型
          ????ds.write
          ??????.format("jdbc")
          ??????.option("url",?"jdbc:mysql://hadoop202:3306/test")
          ??????.option("user",?"root")
          ??????.option("password",?"123456")
          ??????.option("dbtable",?"user")
          ??????.mode(SaveMode.Append)
          ??????.save()

          ????//方式2:通過jdbc方法
          ????val?props:?Properties?=?new?Properties()
          ????props.setProperty("user",?"root")
          ????props.setProperty("password",?"123456")
          ????ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop202:3306/test",?"user",?props)

          ????//釋放資源
          ????spark.stop()
          ??}
          }

          case?class?User2(name:?String,?age:?Long)

          4、Hive

          Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL編譯時可以包含 Hive 支持,也可以不包含。
          包含 Hive 支持的 Spark SQL 可以支持 Hive 表訪問、UDF (用戶自定義函數(shù))以及 Hive 查詢語言(HiveQL/HQL)等。需要強調的一點是,如果要在 Spark SQL 中包含Hive 的庫,并不需要事先安裝 Hive。一般來說,最好還是在編譯Spark SQL時引入Hive支持,這樣就可以使用這些特性了。如果你下載的是二進制版本的 Spark,它應該已經(jīng)在編譯時添加了 Hive 支持。
          若要把 Spark SQL 連接到一個部署好的 Hive 上,你必須把 hive-site.xml 復制到 Spark的配置文件目錄中($SPARK_HOME/conf)。即使沒有部署好 Hive,Spark SQL 也可以運行,需要注意的是,如果你沒有部署好Hive,Spark SQL 會在當前的工作目錄中創(chuàng)建出自己的 Hive 元數(shù)據(jù)倉庫,叫作 metastore_db。此外,對于使用部署好的Hive,如果你嘗試使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)語句來創(chuàng)建表,這些表會被放在你默認的文件系統(tǒng)中的 /user/hive/warehouse 目錄中(如果你的 classpath 中有配好的 hdfs-site.xml,默認的文件系統(tǒng)就是 HDFS,否則就是本地文件系統(tǒng))。
          spark-shell默認是Hive支持的;代碼中是默認不支持的,需要手動指定(加一個參數(shù)即可)。

          4.1 使用內嵌Hive

          如果使用 Spark 內嵌的 Hive, 則什么都不用做, 直接使用即可。
          Hive 的元數(shù)據(jù)存儲在 derby 中, 倉庫地址:$SPARK_HOME/spark-warehouse。

          scala>?spark.sql("show?tables").show
          +--------+---------+-----------+
          |database|tableName|isTemporary|
          +--------+---------+-----------+
          +--------+---------+-----------+

          scala>?spark.sql("create?table?aa(id?int)")
          19/02/09?18:36:10?WARN?HiveMetaStore:?Location:?file:/opt/module/spark-local/spark-warehouse/aa?specified?for?non-external?table:aa
          res2:?org.apache.spark.sql.DataFrame?=?[]

          scala>?spark.sql("show?tables").show
          +--------+---------+-----------+
          |database|tableName|isTemporary|
          +--------+---------+-----------+
          |?default|???????aa|??????false|
          +--------+---------+-----------+

          向表中加載本地數(shù)據(jù)數(shù)據(jù)

          scala>?spark.sql("load?data?local?inpath?'./ids.txt'?into?table?aa")
          res8:?org.apache.spark.sql.DataFrame?=?[]

          scala>?spark.sql("select?*?from?aa").show
          +---+
          |?id|
          +---+
          |100|
          |101|
          |102|
          |103|
          |104|
          |105|
          |106|
          +---+

          在實際使用中, 幾乎沒有任何人會使用內置的 Hive。

          4.2 外部Hive應用

          如果Spark要接管Hive外部已經(jīng)部署好的Hive,需要通過以下幾個步驟。
          (1)確定原有Hive是正常工作的
          (2)需要把hive-site.xml拷貝到spark的conf/目錄下
          (3)如果以前hive-site.xml文件中,配置過Tez相關信息,注釋掉
          (4)把Mysql的驅動copy到Spark的jars/目錄下
          (5)需要提前啟動hive服務,hive/bin/hiveservices.sh start
          (6)如果訪問不到hdfs,則需把core-site.xml和hdfs-site.xml拷貝到conf/目錄
          啟動 spark-shell

          scala>?spark.sql("show?tables").show
          +--------+---------+-----------+
          |database|tableName|isTemporary|
          +--------+---------+-----------+
          |?default|??????emp|??????false|
          +--------+---------+-----------+

          scala>?spark.sql("select?*?from?emp").show
          19/02/09?19:40:28?WARN?LazyStruct:?Extra?bytes?detected?at?the?end?of?the?row!?Ignoring?similar?problems.
          +-----+-------+---------+----+----------+------+------+------+
          |empno|??ename|??????job|?mgr|??hiredate|???sal|??comm|deptno|
          +-----+-------+---------+----+----------+------+------+------+
          |?7369|??SMITH|????CLERK|7902|1980-12-17|?800.0|??null|????20|
          |?7499|??ALLEN|?SALESMAN|7698|?1981-2-20|1600.0|?300.0|????30|
          |?7521|???WARD|?SALESMAN|7698|?1981-2-22|1250.0|?500.0|????30|
          |?7566|??JONES|??MANAGER|7839|??1981-4-2|2975.0|??null|????20|
          |?7654|?MARTIN|?SALESMAN|7698|?1981-9-28|1250.0|1400.0|????30|
          |?7698|??BLAKE|??MANAGER|7839|??1981-5-1|2850.0|??null|????30|
          |?7782|??CLARK|??MANAGER|7839|??1981-6-9|2450.0|??null|????10|
          |?7788|??SCOTT|??ANALYST|7566|?1987-4-19|3000.0|??null|????20|
          |?7839|???KING|PRESIDENT|null|1981-11-17|5000.0|??null|????10|
          |?7844|?TURNER|?SALESMAN|7698|??1981-9-8|1500.0|???0.0|????30|
          |?7876|??ADAMS|????CLERK|7788|?1987-5-23|1100.0|??null|????20|
          |?7900|??JAMES|????CLERK|7698|?1981-12-3|?950.0|??null|????30|
          |?7902|???FORD|??ANALYST|7566|?1981-12-3|3000.0|??null|????20|
          |?7934|?MILLER|????CLERK|7782|?1982-1-23|1300.0|??null|????10|
          |?7944|zhiling|????CLERK|7782|?1982-1-23|1300.0|??null|????50|
          +-----+-------+---------+----+----------+------+------+------+

          4.3 運行Spark SQL CLI

          Spark SQLCLI可以很方便的在本地運行Hive元數(shù)據(jù)服務以及從命令行執(zhí)行查詢任務。在Spark目錄下執(zhí)行如下命令啟動Spark SQ LCLI,直接執(zhí)行SQL語句,類似Hive窗口。

          bin/spark-sql

          4.4 代碼中操作Hive

          1)添加依賴

          <dependency>
          ????<groupId>org.apache.sparkgroupId>
          ????<artifactId>spark-hive_2.11artifactId>
          ????<version>2.1.1version>
          dependency>
          <dependency>
          ????<groupId>org.apache.hivegroupId>
          ????<artifactId>hive-execartifactId>
          ????<version>1.2.1version>
          dependency>

          2)拷貝hive-site.xml到resources目錄
          3)代碼實現(xiàn)

          object?SparkSQL08_Hive{
          ?def?main(args:?Array[String]):?Unit?=?{
          ????//創(chuàng)建上下文環(huán)境配置對象
          ????val?conf:?SparkConf?=?new?SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
          ????val?spark:?SparkSession?=?SparkSession
          ??????.builder()
          ??????.enableHiveSupport()
          ??????.master("local[*]")
          ??????.appName("SQLTest")
          ??????.getOrCreate()
          ????spark.sql("show?tables").show()
          ????//釋放資源
          ????spark.stop()
          ??}
          }

          Spark SQL實戰(zhàn)

          1、數(shù)據(jù)準備

          Spark-sql操作所有的數(shù)據(jù)均來自Hive,首先在Hive中創(chuàng)建表,并導入數(shù)據(jù)。一共有3張表:1張用戶行為表,1張城市表,1張產(chǎn)品表。

          CREATE?TABLE?`user_visit_action`(
          ??`date`?string,
          ??`user_id`?bigint,
          ??`session_id`?string,
          ??`page_id`?bigint,
          ??`action_time`?string,
          ??`search_keyword`?string,
          ??`click_category_id`?bigint,
          ??`click_product_id`?bigint,
          ??`order_category_ids`?string,
          ??`order_product_ids`?string,
          ??`pay_category_ids`?string,
          ??`pay_product_ids`?string,
          ??`city_id`?bigint)
          row?format?delimited?fields?terminated?by?'\t';
          load?data?local?inpath?'/opt/module/data/user_visit_action.txt'?into?table?sparkpractice.user_visit_action;

          CREATE?TABLE?`product_info`(
          ??`product_id`?bigint,
          ??`product_name`?string,
          ??`extend_info`?string)
          row?format?delimited?fields?terminated?by?'\t';
          load?data?local?inpath?'/opt/module/data/product_info.txt'?into?table?sparkpractice.product_info;

          CREATE?TABLE?`city_info`(
          ??`city_id`?bigint,
          ??`city_name`?string,
          ??`area`?string)
          row?format?delimited?fields?terminated?by?'\t';
          load?data?local?inpath?'/opt/module/data/city_info.txt'?into?table?sparkpractice.city_info;

          2、需求

          2.1 需求簡介

          這里的熱門商品是從點擊量的維度來看的,計算各個區(qū)域前三大熱門商品,并備注上每個商品在主要城市中的分布比例,超過兩個城市用其他顯示。
          例如:

          2021-05-13_184130

          2.2 思路分析

          1)使用sql來完成,碰到復雜的需求,可以使用udf或udaf
          2)查詢出來所有的點擊記錄,并與city_info表連接,得到每個城市所在的地區(qū),與Product_info表連接得到產(chǎn)品名稱
          3)按照地區(qū)和商品名稱分組,統(tǒng)計出每個商品在每個地區(qū)的總點擊次數(shù)
          4)每個地區(qū)內按照點擊次數(shù)降序排列
          5)只取前三名,并把結果保存在數(shù)據(jù)庫中
          6)城市備注需要自定義UDAF函數(shù)

          2.3 代碼實現(xiàn)

          1)UDAF函數(shù)定義

          class?AreaClickUDAF?extends?UserDefinedAggregateFunction?{
          ??//?輸入數(shù)據(jù)的類型:??北京??String
          ??override?def?inputSchema:?StructType?=?{
          ????StructType(StructField("city_name",?StringType)?::?Nil)
          ????//????????StructType(Array(StructField("city_name",?StringType)))
          ??}

          ??//?緩存的數(shù)據(jù)的類型:?北京->1000,?天津->5000??Map,??總的點擊量??1000/?
          ??override?def?bufferSchema:?StructType?=?{
          ????//?MapType(StringType,?LongType)?還需要標注?map的key的類型和value的類型
          ????StructType(StructField("city_count",?MapType(StringType,?LongType))?::?StructField("total_count",?LongType)?::?Nil)
          ??}

          ??//?輸出的數(shù)據(jù)類型??"北京21.2%,天津13.2%,其他65.6%"??String
          ??override?def?dataType:?DataType?=?StringType

          ??//?相同的輸入是否應用有相同的輸出.
          ??override?def?deterministic:?Boolean?=?true

          ??//?給存儲數(shù)據(jù)初始化
          ??override?def?initialize(buffer:?MutableAggregationBuffer):?Unit?=?{
          ????//初始化map緩存
          ????buffer(0)?=?Map[String,?Long]()
          ????//?初始化總的點擊量
          ????buffer(1)?=?0L
          ??}

          ??//?分區(qū)內合并?Map[城市名,?點擊量]
          ??override?def?update(buffer:?MutableAggregationBuffer,?input:?Row):?Unit?=?{
          ????//?首先拿到城市名,?然后把城市名作為key去查看map中是否存在,?如果存在就把對應的值?+1,?如果不存在,?則直接0+1
          ????val?cityName?=?input.getString(0)
          ????//????????val?map:?collection.Map[String,?Long]?=?buffer.getMap[String,?Long](0)
          ????val?map:?Map[String,?Long]?=?buffer.getAs[Map[String,?Long]](0)
          ????buffer(0)?=?map?+?(cityName?->?(map.getOrElse(cityName,?0L)?+?1L))
          ????//?碰到一個城市,?則總的點擊量要+1
          ????buffer(1)?=?buffer.getLong(1)?+?1L
          ??}

          ??//?分區(qū)間的合并
          ??override?def?merge(buffer1:?MutableAggregationBuffer,?buffer2:?Row):?Unit?=?{
          ????val?map1?=?buffer1.getAs[Map[String,?Long]](0)
          ????val?map2?=?buffer2.getAs[Map[String,?Long]](0)

          ????//?把map1的鍵值對與map2中的累積,?最后賦值給buffer1
          ????buffer1(0)?=?map1.foldLeft(map2)?{
          ??????case?(map,?(k,?v))?=>
          ????????map?+?(k?->?(map.getOrElse(k,?0L)?+?v))
          ????}

          ????buffer1(1)?=?buffer1.getLong(1)?+?buffer2.getLong(1)
          ??}

          ??//?最終的輸出.?"北京21.2%,天津13.2%,其他65.6%"
          ??override?def?evaluate(buffer:?Row):?Any?=?{
          ????val?cityCountMap?=?buffer.getAs[Map[String,?Long]](0)
          ????val?totalCount?=?buffer.getLong(1)

          ????var?citysRatio:?List[CityRemark]?=?cityCountMap.toList.sortBy(-_._2).take(2).map?{
          ??????case?(cityName,?count)?=>?{
          ????????CityRemark(cityName,?count.toDouble?/?totalCount)
          ??????}
          ????}
          ????//?如果城市的個數(shù)超過2才顯示其他
          ????if?(cityCountMap.size?>?2)?{
          ??????citysRatio?=?citysRatio?:+?CityRemark("其他",?citysRatio.foldLeft(1D)(_?-?_.cityRatio))
          ????}
          ????citysRatio.mkString(",?")
          ??}
          }


          case?class?CityRemark(cityName:?String,?cityRatio:?Double)?{
          ??val?formatter?=?new?DecimalFormat("0.00%")
          ??override?def?toString:?String?=?s"$cityName:${formatter.format(cityRatio)}"
          }

          2)具體實現(xiàn)

          object?SparkSQL04_TopN?{
          ??def?main(args:?Array[String]):?Unit?=?{
          ????val?spark:?SparkSession?=?SparkSession
          ??????.builder()
          ??????.master("local[2]")
          ??????.appName("AreaClickApp")
          ??????.enableHiveSupport()
          ??????.getOrCreate()
          ????spark.sql("use?sparkpractice")
          ????//?0?注冊自定義聚合函數(shù)
          ????spark.udf.register("city_remark",?new?AreaClickUDAF)
          ????//?1.?查詢出所有的點擊記錄,并和城市表產(chǎn)品表做內連接
          ????spark.sql(
          ??????"""
          ????????|select
          ????????|????c.*,
          ????????|????v.click_product_id,
          ????????|????p.product_name
          ????????|from?user_visit_action?v?join?city_info?c?join?product_info?p?on?v.city_id=c.city_id?and?v.click_product_id=p.product_id
          ????????|where?click_product_id>-1
          ??????"
          "".stripMargin).createOrReplaceTempView("t1")

          ????//?2.?計算每個區(qū)域,?每個產(chǎn)品的點擊量
          ????spark.sql(
          ??????"""
          ????????|select
          ????????|????t1.area,
          ????????|????t1.product_name,
          ????????|????count(*)?click_count,
          ????????|????city_remark(t1.city_name)
          ????????|from?t1
          ????????|group?by?t1.area,?t1.product_name
          ??????"
          "".stripMargin).createOrReplaceTempView("t2")

          ????//?3.?對每個區(qū)域內產(chǎn)品的點擊量進行倒序排列
          ????spark.sql(
          ??????"""
          ????????|select
          ????????|????*,
          ????????|????rank()?over(partition?by?t2.area?order?by?t2.click_count?desc)?rank
          ????????|from?t2
          ??????"
          "".stripMargin).createOrReplaceTempView("t3")

          ????//?4.?每個區(qū)域取top3

          ????spark.sql(
          ??????"""
          ????????|select
          ????????|????*
          ????????|from?t3
          ????????|where?rank<=3
          ??????"
          "".stripMargin).show

          ????//釋放資源
          ????spark.stop()

          ??}
          }

          知識星球

          歡迎加入我的知識星球,提供技術答疑,資料分享,模擬面試等服務。識別下方二維碼或者點擊閱讀原文,都可以加入。


          猜你喜歡
          Hive計算最大連續(xù)登陸天數(shù)
          Hadoop 數(shù)據(jù)遷移用法詳解
          Hbase修復工具Hbck
          數(shù)倉建模分層理論
          一文搞懂Hive的數(shù)據(jù)存儲與壓縮
          大數(shù)據(jù)組件重點學習這幾個

          瀏覽 141
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  牛影视AV| 国产1区2区3区 | 亚洲人成电影网网站 | 成人午夜av | 久热精品免费 |