<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark讀寫Elasticsearch

          共 13341字,需瀏覽 27分鐘

           ·

          2024-04-11 18:25

          Apache Spark 與 Elasticsearch 集成提供了一個(gè)強(qiáng)大的組合,用于處理大數(shù)據(jù)搜索、分析和存儲。通過使用Elasticsearch Hadoop項(xiàng)目(現(xiàn)在通常稱為Elasticsearch for Apache Hadoop,簡稱ES-Hadoop),你可以輕松地在Spark和Elasticsearch之間讀寫數(shù)據(jù)。ES-Hadoop允許Spark通過Elasticsearch REST接口高效地訪問Elasticsearch集群。


          1 Scala環(huán)境下Spark讀寫Elasticsearch

          1.1 依賴包

          1.1.1 Spark依賴
                
                <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-core_2.11</artifactId>
              <version>${spark.version}</version>
              <exclusions>
                  <exclusion>
                      <groupId>com.google.guava</groupId>
                      <artifactId>guava</artifactId>
                  </exclusion>
              </exclusions>
          </dependency>

          1.1.2 Elasticeach依賴
                
                <!--elasticsearch-->
          <dependency>
              <groupId>org.elasticsearch</groupId>
              <artifactId>elasticsearch-hadoop</artifactId>
              <version>6.4.0</version>
          </dependency>

          1.2 RDD讀寫ES

          使用RDD讀寫ES,主要是使用了SparkContext()的esRDD()和saveToEs()兩個(gè)方法。但是這個(gè)兩個(gè)方法需要引入es的包之后才有

                
                import org.elasticsearch.spark._

          1.2.1 寫數(shù)據(jù)到ES

          在這之前先寫一個(gè)case class 用于創(chuàng)建RDD

                
                case class Course(name: String, credit: Int)

                
                 val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local")
              conf.set("es.nodes""192.168.1.188")
              conf.set("es.port""9200")
              conf.set("es.index.auto.create""true")
              val sc = new SparkContext(conf)
            //rdd寫es
              val courseRdd = sc.makeRDD(Seq(Course("Hadoop", 4), Course("Spark", 3), Course("Kafka", 4)))
              courseRdd.saveToEs("/course/rdd")
              

          1.2.2 從ES讀數(shù)據(jù)
                
                 //rdd讀es
              val esCourseRdd = sc.esRDD("/course/rdd")
              esCourseRdd.foreach(c => {
                println(c.toString())
              })
              /**
                * (vNHejmUBdoXqZPoSocAx,Map(name -> Kafka, credit -> 4))
                * (u9HejmUBdoXqZPoSocAx,Map(name -> Spark, credit -> 3))
                * (utHejmUBdoXqZPoSocAx,Map(name -> Hadoop, credit -> 4))
                */


          1.3 DataFrame讀寫ES

          如果想使用spark sql讀寫ES同樣需要引入es的包

                
                    import org.elasticsearch.spark.sql._

          1.3.1 DataFrame寫數(shù)據(jù)到ES
                
                  //dataframe寫es
              val df = spark.read.format("csv").option("header"true).option("inferSchema"true).load("hdfs://192.168.1.188:9000/data/Beijing_2017_HourlyPM25_created20170803.csv")
              df.select("Year""Month""Day""Hour""Value").saveToEs("/csv/dataframe")
            

          1.3.2 DataFrame從ES讀數(shù)據(jù)
                
                //dataframe讀es
              val esDf = spark.esDF("/csv/dataframe")
              esDf.show()

              /**
                * +---+----+-----+-----+----+
                * |Day|Hour|Month|Value|Year|
                * +---+----+-----+-----+----+
                * |  1|   0|    1|  505|2017|
                * |  1|   2|    1|  466|2017|
                * |  1|  14|    1|  596|2017|
                * |  1|  17|    1|  522|2017|
                * |  1|  21|    1|  452|2017|
                * |  2|   1|    1|  466|2017|
                * |  2|   7|    1|   93|2017|
                * |  2|   8|    1|   27|2017|
                * |  2|   9|    1|   17|2017|
                * |  2|  13|    1|  251|2017|
                * |  2|  16|    1|  251|2017|
                * |  3|   2|    1|  341|2017|
                * |  3|   8|    1|  365|2017|
                * |  3|   9|    1|  361|2017|
                * |  3|  21|    1|  542|2017|
                * |  3|  22|    1|  548|2017|
                * |  4|   3|    1|  590|2017|
                * |  4|   6|    1|  482|2017|
                * |  4|  17|    1|  323|2017|
                * |  4|  22|    1|  369|2017|
                * +---+----+-----+-----+----+
                */

          1.4 完整代碼

                
                package com.hollysys.spark.elasticsearch

          import org.apache.spark.{SparkConf, SparkContext}
          import org.apache.spark.sql.SparkSession

          /**
            * Created by shirukai on 2018/8/31
            * spark 讀寫ES
            */
          object EsTest {
            def main(args: Array[String]): Unit = {
              import org.elasticsearch.spark._
              import org.elasticsearch.spark.sql._
              val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local")
              conf.set("es.nodes""192.168.1.188")
              conf.set("es.port""9200")
              conf.set("es.index.auto.create""true")
              val sc = new SparkContext(conf)
              val spark = SparkSession.builder.config(conf).getOrCreate()


              //rdd寫es
              val courseRdd = sc.makeRDD(Seq(Course("Hadoop", 4), Course("Spark", 3), Course("Kafka", 4)))
              courseRdd.saveToEs("/course/rdd")

              //rdd讀es
              val esCourseRdd = sc.esRDD("/course/rdd")
              esCourseRdd.foreach(c => {
                println(c.toString())
              })
              /**
                * (vNHejmUBdoXqZPoSocAx,Map(name -> Kafka, credit -> 4))
                * (u9HejmUBdoXqZPoSocAx,Map(name -> Spark, credit -> 3))
                * (utHejmUBdoXqZPoSocAx,Map(name -> Hadoop, credit -> 4))
                */

              //dataframe寫es
              val df = spark.read.format("csv").option("header"true).option("inferSchema"true).load("hdfs://192.168.1.188:9000/data/Beijing_2017_HourlyPM25_created20170803.csv")
              df.select("Year""Month""Day""Hour""Value").saveToEs("/csv/dataframe")
              //dataframe讀es
              val esDf = spark.esDF("/csv/dataframe")
              esDf.show()

              /**
                * +---+----+-----+-----+----+
                * |Day|Hour|Month|Value|Year|
                * +---+----+-----+-----+----+
                * |  1|   0|    1|  505|2017|
                * |  1|   2|    1|  466|2017|
                * |  1|  14|    1|  596|2017|
                * |  1|  17|    1|  522|2017|
                * |  1|  21|    1|  452|2017|
                * |  2|   1|    1|  466|2017|
                * |  2|   7|    1|   93|2017|
                * |  2|   8|    1|   27|2017|
                * |  2|   9|    1|   17|2017|
                * |  2|  13|    1|  251|2017|
                * |  2|  16|    1|  251|2017|
                * |  3|   2|    1|  341|2017|
                * |  3|   8|    1|  365|2017|
                * |  3|   9|    1|  361|2017|
                * |  3|  21|    1|  542|2017|
                * |  3|  22|    1|  548|2017|
                * |  4|   3|    1|  590|2017|
                * |  4|   6|    1|  482|2017|
                * |  4|  17|    1|  323|2017|
                * |  4|  22|    1|  369|2017|
                * +---+----+-----+-----+----+
                */
            }

          }

          case class Course(name: String, credit: Int)


          2 pyspark寫數(shù)據(jù)到Elasticsearch

          使用pyspark寫數(shù)據(jù)到Elasticsearch主要是利用的寫入外部數(shù)據(jù)源,需要org.elasticsearch.spark.sql相關(guān)的jar包

          2.1 下載相關(guān)jar包

          使用pyspark寫數(shù)據(jù)到es需要依賴elasticsearch-spark-20_2.11.jar包,可以到maven倉庫下載。

          下載地址:

          2.2 pyspark中使用

                
                # encoding: utf-8
          """
          Created by shirukai on 2018/8/31
          "
          ""
          from pyspark.sql import SparkSession

          if __name__ == '__main__':
              spark = SparkSession.builder.appName("pyspark2es").getOrCreate()
              data = [
                  (1, "Kafka"),
                  (2, "Spark"),
                  (3, "Hadoop")
              ]
              df = spark.createDataFrame(data, schema=['id''name'])
              df.write.format("org.elasticsearch.spark.sql").option("es.nodes""192.168.1.196:9200").mode("overwrite").save(
                  "pyspark2es/python")
              spark.stop()


          使用spark-submit提交任務(wù)

                
                bin/spark-submit --master local --jars /Users/shirukai/apps/spark-2.3.1/jars/elasticsearch-spark-20_2.11-6.4.0.jar  /Users/shirukai/apps/spark-2.3.1/script/pyspark2es.py


          瀏覽 59
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  年轻人在线毛片免费看视频在线 | 亚洲视频日韩精彩动漫一区二区 | 日韩无码中文字幕电影 | 人体一级A片-国产日日爱-成人AV | 婷婷人人爽|