<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          英雄惜英雄-當(dāng)Spark遇上Zeppelin之實(shí)戰(zhàn)案例

          共 6281字,需瀏覽 13分鐘

           ·

          2021-01-07 12:16

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)回復(fù)”資源“獲取更多資源f0680bc7519c83f752496af954af0206.webp

          我們?cè)谥暗奈恼?span style="color:rgb(0,82,255);">《大數(shù)據(jù)可視化從未如此簡單 - Apache Zepplien全面介紹》中提到過一文中介紹了 Zeppelin 的主要功能和特點(diǎn),并且最后還用一個(gè)案例介紹了這個(gè)框架的使用。這節(jié)課我們用兩個(gè)直觀的小案例來介紹 Zepplin 和 Spark 如何配合使用。

          到目前為止,Apache Spark 已經(jīng)支持三種集群管理器類型(Standalone,Apache Mesos 和 Hadoop YARN )。本文中我們根據(jù)官網(wǎng)文檔使用 Docker 腳本構(gòu)建一個(gè)Spark standalone mode ( Spark獨(dú)立模式 )的環(huán)境來使用。

          Spark獨(dú)立模式環(huán)境搭建

          Spark standalone 是Spark附帶的簡單集群管理器,可以輕松設(shè)置集群。您可以通過以下步驟簡單地設(shè)置 Spark獨(dú)立環(huán)境。

          注意

          由于 Apache Zeppelin 和 Spark 為其 Web UI 使用相同的 8080 端口,因此您可能需要在 conf / zeppelin-site.xml 中更改 zeppelin.server.port 。

          1. 構(gòu)建 Docker 文件

          您可以在腳本 / docker / spark-cluster-managers 下找到 docker 腳本文件。

          cd $ZEPPELIN_HOME/scripts/docker/spark-cluster-managers/spark_standalone
          docker build -t "spark_standalone" .
          2. 啟動(dòng)Docker
          docker run -it \
          -p 8080:8080 \
          -p 7077:7077 \
          -p 8888:8888 \
          -p 8081:8081 \
          -h sparkmaster \
          --name spark_standalone \
          spark_standalone bash;

          在這里運(yùn)行 docker 容器的 sparkmaster 主機(jī)名應(yīng)該在 /etc/hosts 中綁定映射關(guān)系。

          3. 在Zeppelin中配置Spark解釋器

          將 Spark master 設(shè)置為 spark://< hostname >:7077 在 Zeppelin 的解釋器設(shè)置頁面上。5018592da93cdacfeb4ac6e077fc5758.webp

          4. 用Spark解釋器運(yùn)行Zeppelin

          在 Zeppelin 中運(yùn)行帶有 Spark 解釋器的單個(gè)段落后,瀏覽 https://< hostname>:8080,并檢查 Spark 集群是否運(yùn)行正常。

          f914906301161f641e5d20e3d281802e.webp

          然后我們可以用以下命令簡單地驗(yàn)證 Spark 在 Docker 中是否運(yùn)行良好。

          ps -ef | grep spark

          Spark on Zepplin讀取本地文件

          假設(shè)我們本地有一個(gè)名為bank.csv的文件,樣例數(shù)據(jù)如下:

          age:Integer, job:String, marital : String, education : String, balance : Integer
          20;teacher;single;本科;20000
          25;plumber;single;本科;10000
          21;doctor;single;本科;25000
          23;singer;single;本科;20000
          ...

          首先,將csv格式的數(shù)據(jù)轉(zhuǎn)換成RDD Bank對(duì)象,運(yùn)行以下腳本。這也將使用filter功能過濾掉一些數(shù)據(jù)。

          val bankText = sc.textFile("yourPath/bank/bank-full.csv")
          case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)

          // split each line, filter out header (starts with "age"), and map it into Bank case class
          val bank = bankText.map(s=>s.split(";")).filter(s=>s(0)!="\"age\"").map(
          s=>Bank(s(0).toInt,
          s(1).replaceAll("\"", ""),
          s(2).replaceAll("\"", ""),
          s(3).replaceAll("\"", ""),
          s(5).replaceAll("\"", "").toInt
          )
          )
          // convert to DataFrame and create temporal table
          bank.toDF().registerTempTable("bank")

          如果想使用圖形化看到年齡分布,可以運(yùn)行如下sql:%sql select age, count(1) from bank where age < 30 group by age order by age

          48553e625fc9b133439450e70616fb3d.webp

          您可以輸入框通過更換設(shè)置年齡條件30用${maxAge=30}。%sql select age, count(1) from bank where age < ${maxAge=30} group by age order by age

          如果希望看到有一定婚姻狀況的年齡分布,并添加組合框來選擇婚姻狀況:%sql select age, count(1) from bank where marital="${marital=single,single|divorced|married}" group by age order by aged950fffa3006ea153887e8c685b136d2.webp

          Zeppelin支持畫圖,功能簡單但強(qiáng)大,可同時(shí)輸出表格、柱狀圖、折線圖、餅狀圖、折線圖、點(diǎn)圖。下面將各年齡的用戶數(shù)用畫出來,畫圖的實(shí)現(xiàn)可以將結(jié)果組織成下面這種格式:

          println(“%table column_1\tcolumn_2\n”+value_1\tvalue_2\n+…)

          cfa77b7bf62c93159de2573d5237f97c.webp

          最后,我們甚至可以直接將運(yùn)算結(jié)果存入 Mysql 中:

          %spark
          df3.write.mode("overwrite").format("jdbc").option("driver","com.mysql.jdbc.Driver").option("user","root").option("password","password").option("url","jdbc:mysql://localhost:3306/spark_demo").option("dbtable","record").save()

          Spark on Zepplin讀取HDFS文件

          首先我們需要配置HDFS文件系統(tǒng)解釋器,我們需要進(jìn)行如下的配置。在筆記本中,要啟用HDFS解釋器,可以單擊齒輪圖標(biāo)并選擇HDFS。

          593557f4c1c9203741c30a236755b883.webp

          然后我們就可以愉快的使用Zepplin讀取HDFS文件了:

          例如:下面先讀取HDFS文件,該文件為JSON文件,讀取出來之后取出第一列然后以Parquet的格式保存到HDFS上:

          184d1f9ac58faba4ada1ebd21820f9cd.webp

          Spark on Zepplin讀取流數(shù)據(jù)

          我們可以參考官網(wǎng)中,讀取Twitter實(shí)時(shí)流的案例:

          import org.apache.spark.streaming._
          import org.apache.spark.streaming.twitter._
          import org.apache.spark.storage.StorageLevel
          import scala.io.Source
          import scala.collection.mutable.HashMap
          import java.io.File
          import org.apache.log4j.Logger
          import org.apache.log4j.Level
          import sys.process.stringSeqToProcess

          /** Configures the Oauth Credentials for accessing Twitter */
          def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
          val configs = new HashMap[String, String] ++= Seq(
          "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
          println("Configuring Twitter OAuth")
          configs.foreach{ case(key, value) =>
          if (value.trim.isEmpty) {
          throw new Exception("Error setting authentication - value for " + key + " not set")
          }
          val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
          System.setProperty(fullKey, value.trim)
          println("\tProperty " + fullKey + " set as [" + value.trim + "]")
          }
          println()
          }

          // Configure Twitter credentials
          val apiKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
          val apiSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
          val accessToken = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
          val accessTokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
          configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)

          import org.apache.spark.streaming.twitter._
          val ssc = new StreamingContext(sc, Seconds(2))
          val tweets = TwitterUtils.createStream(ssc, None)
          val twt = tweets.window(Seconds(60))

          case class Tweet(createdAt:Long, text:String)
          twt.map(status=>
          Tweet(status.getCreatedAt().getTime()/1000, status.getText())
          ).foreachRDD(rdd=>
          // Below line works only in spark 1.3.0.
          // For spark 1.1.x and spark 1.2.x,
          // use rdd.registerTempTable("tweets") instead.
          rdd.toDF().registerAsTable("tweets")
          )

          twt.print

          ssc.start()

          同理,Zepplin也可以讀取Kafka中的數(shù)據(jù),注冊(cè)成表然后進(jìn)行各種運(yùn)算。我們參考一個(gè)Zepplin版本的WordCount實(shí)現(xiàn):

          %spark
          import _root_.kafka.serializer.DefaultDecoder
          import _root_.kafka.serializer.StringDecoder
          import org.apache.spark.streaming.kafka.KafkaUtils
          import org.apache.spark.storage.StorageLevel
          import org.apache.spark.streaming._

          // prevent INFO logging from pollution output
          sc.setLogLevel("INFO")

          // creating the StreamingContext with 5 seconds interval
          val ssc = new StreamingContext(sc, Seconds(5))

          val kafkaConf = Map(
          "metadata.broker.list" -> "localhost:9092",
          "zookeeper.connect" -> "localhost:2181",
          "group.id" -> "kafka-streaming-example",
          "zookeeper.connection.timeout.ms" -> "1000"
          )

          val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
          ssc,
          kafkaConf,
          Map("test" -> 1), // subscripe to topic and partition 1
          StorageLevel.MEMORY_ONLY
          )

          val words = lines.flatMap{ case(x, y) => y.split(" ")}

          import spark.implicits._

          val w=words.map(x=> (x,1L)).reduceByKey(_+_)
          w.foreachRDD(rdd => rdd.toDF.registerTempTable("counts"))
          ssc.start()

          從上面的temporary table counts 中查詢每小批量的數(shù)據(jù)中top 10 的單詞值。

          select * from counts order by _2 desc limit 10

          b02c01d0eb91b0fa60504dc3bdc6c967.webp

          怎么樣?是不是很強(qiáng)大?推薦大家可以自己試試看。


          59af4b016e231d61fdbfed6f9528c05c.webp
          84ac4079d49fe70490456537984a403e.webp

          大數(shù)據(jù)可視化從未如此簡單 - Apache Zepplien全面介紹

          593d798c5cc056ae205361c09eeb2657.webp

          Flink SQL 1.11 on Zeppelin集成指南

          c2d97a6f8ac476edba1cb093c65390e6.webp

          你不可不知的任務(wù)調(diào)度神器-AirFlow

          fb9da58d62c0d16e0bfabafc700c26a3.webp94e3868890e7c822c3d3dd7984635a6e.webp
          版權(quán)聲明:本文為大數(shù)據(jù)技術(shù)與架構(gòu)原創(chuàng)整理,轉(zhuǎn)載需作者授權(quán)。未經(jīng)作者允許轉(zhuǎn)載追究侵權(quán)責(zé)任。微信公眾號(hào)|import_bigdata編輯 |?《大數(shù)據(jù)技術(shù)與架構(gòu)》
          歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連

          文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??
          瀏覽 22
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美色逼综合 | 亚洲视频网站在线 | 日韩操B 免费在线观看黄色视频 | 在线无码视频观看 | 国产精品偷窥熟女精品视 |