url_paths不同..." />
<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          【大數(shù)據(jù)實(shí)戰(zhàn)】Flume+Kafka+Spark+Spring Boot 統(tǒng)計網(wǎng)頁訪問量項目

          共 28575字,需瀏覽 58分鐘

           ·

          2020-11-21 11:33


          1.需求說明

          1.1 需求

          到現(xiàn)在為止的網(wǎng)頁訪問量

          到現(xiàn)在為止從搜索引擎引流過來的網(wǎng)頁訪問量

          項目總體框架如圖所示:

          1.2 用戶行為日志內(nèi)容

          2.模擬日志數(shù)據(jù)制作

          用Python制作模擬數(shù)據(jù),數(shù)據(jù)包含:

          • 不同的URL地址->url_paths

          • 不同的跳轉(zhuǎn)鏈接地址->http_refers

          • 不同的搜索關(guān)鍵詞->search_keyword

          • 不同的狀態(tài)碼->status_codes

          • 不同的IP地址->ip_slices

          #coding=UTF-8import randomimport time
          url_paths = [ "class/112.html", "class/128.html", "class/145.html", "class/146.html", "class/131.html", "class/130.html", "class/145.html", "learn/821.html", "learn/825.html", "course/list"]
          http_refers=[ "http://www.baidu.com/s?wd={query}", "https://www.sogou.com/web?query={query}", "http://cn.bing.com/search?q={query}", "http://search.yahoo.com/search?p={query}",]
          search_keyword = [ "Spark+Sql", "Hadoop", "Storm", "Spark+Streaming", "大數(shù)據(jù)", "面試"]
          status_codes = ["200","404","500"]
          ip_slices = [132,156,132,10,29,145,44,30,21,43,1,7,9,23,55,56,241,134,155,163,172,144,158]
          def sample_url(): return random.sample(url_paths,1)[0]
          def sample_ip(): slice = random.sample(ip_slices,4) return ".".join([str(item) for item in slice])
          def sample_refer(): if random.uniform(0,1) > 0.2: return "-" refer_str = random.sample(http_refers,1) query_str = random.sample(search_keyword,1) return refer_str[0].format(query=query_str[0])
          def sample_status(): return random.sample(status_codes,1)[0]
          def generate_log(count = 10): time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
          f = open("/home/hadoop/tpdata/project/logs/access.log","w+") while count >= 1: query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status}\t{refer}".format( local_time=time_str, url=sample_url(), ip=sample_ip(), refer=sample_refer(), status=sample_status()) print(query_log) f.write(query_log + "\n") count = count - 1
          if __name__ == '__main__': generate_log(100)

          使用Linux Crontab定時調(diào)度工具,使其每一分鐘產(chǎn)生一批數(shù)據(jù)。

          表達(dá)式:

          */1 * * * *

          編寫python運(yùn)行腳本:

          vi?log_generator.shpython?/home/hadoop/tpdata/log.pychmod u+x log_generator.sh

          配置Crontab:?

          crontab?-e*/1 * * * * /home/hadoop/tpdata/project/log_generator.sh

          2.Flume實(shí)時收集日志信息

          開發(fā)時選型:

          編寫streaming_project.conf:

          vi streaming_project.conf
          exec-memory-logger.sources = exec-sourceexec-memory-logger.sinks = logger-sinkexec-memory-logger.channels = memory-channel
          exec-memory-logger.sources.exec-source.type = execexec-memory-logger.sources.exec-source.command = tail -F /home/hadoop/tpdata/project/logs/access.logexec-memory-logger.sources.exec-source.shell = /bin/sh -c
          exec-memory-logger.channels.memory-channel.type = memory
          exec-memory-logger.sinks.logger-sink.type = logger
          exec-memory-logger.sources.exec-source.channels = memory-channelexec-memory-logger.sinks.logger-sink.channel = memory-channel
          啟動Flume測試:
          flume-ng agent \--name exec-memory-logger \--conf $FLUME_HOME/conf \--conf-file /home/hadoop/tpdata/project/streaming_project.conf \-Dflume.root.logger=INFO,console
          啟動Zookeeper:
          ./zkServer.sh start
          啟動Kafka Server:
          ./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
          其中server.properties:
          broker.id=0############################# Socket Server Settings #############################listeners=PLAINTEXT://:9092host.name=hadoop000advertised.host.name=192.168.1.9advertised.port=9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600############################# Log Basics #############################log.dirs=/home/hadoop/app/tmp/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1############################# Log Retention Policy #############################log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000log.cleaner.enable=false############################# Zookeeper #############################zookeeper.connect=hadoop000:2181zookeeper.connection.timeout.ms=6000
          啟動一個Kafka的消費(fèi)者(topic用的之前的,沒有的話可以新建一個):
          kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic streamingtopic
          修改Flume配置文件,使得Flume的sink鏈接到Kafka:
          vi streaming_project2.conf
          exec-memory-kafka.sources = exec-sourceexec-memory-kafka.sinks = kafka-sinkexec-memory-kafka.channels = memory-channel
          exec-memory-kafka.sources.exec-source.type = execexec-memory-kafka.sources.exec-source.command = tail -F /home/hadoop/tpdata/project/logs/access.logexec-memory-kafka.sources.exec-source.shell = /bin/sh -c
          exec-memory-kafka.channels.memory-channel.type = memory
          exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSinkexec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092exec-memory-kafka.sinks.kafka-sink.topic = streamingtopicexec-memory-kafka.sinks.kafka-sink.batchSize = 5exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
          exec-memory-kafka.sources.exec-source.channels = memory-channelexec-memory-kafka.sinks.kafka-sink.channel = memory-channel
          啟動Flume:
          flume-ng agent \--name exec-memory-kafka \--conf $FLUME_HOME/conf \--conf-file /home/hadoop/tpdata/project/streaming_project2.conf \-Dflume.root.logger=INFO,console
          kafka消費(fèi)者拿到數(shù)據(jù):

          4.Spark Streaming對接Kafka對數(shù)據(jù)消費(fèi)

          4.1 pom.xml:

            4.0.0  com.taipark.spark  sparktrain  1.0  2008      2.11.8    0.9.0.0    2.2.0    2.6.0-cdh5.7.0    1.2.0-cdh5.7.0  
          cloudera https://repository.cloudera.com/artifactory/cloudera-repos
          org.scala-lang scala-library ${scala.version}

          org.apache.hadoop hadoop-client ${hadoop.version}
          org.apache.hbase hbase-client ${hbase.version}
          org.apache.hbase hbase-server ${hbase.version}
          org.apache.spark spark-streaming_2.11 ${spark.version}
          org.apache.spark spark-streaming-kafka-0-8_2.11 2.2.0
          org.apache.spark spark-streaming-flume_2.11 ${spark.version}
          org.apache.spark spark-streaming-flume-sink_2.11 ${spark.version}
          org.apache.commons commons-lang3 3.5
          org.apache.spark spark-sql_2.11 ${spark.version}
          mysql mysql-connector-java 8.0.13
          com.fasterxml.jackson.module jackson-module-scala_2.11 2.6.5
          net.jpountz.lz4 lz4 1.3.0
          org.apache.flume.flume-ng-clients flume-ng-log4jappender 1.6.0
          src/main/scala src/test/scala org.scala-tools maven-scala-plugin compile testCompile ${scala.version} -target:jvm-1.5 org.apache.maven.plugins maven-eclipse-plugin true ch.epfl.lamp.sdt.core.scalabuilder ch.epfl.lamp.sdt.core.scalanature org.eclipse.jdt.launching.JRE_CONTAINER ch.epfl.lamp.sdt.launching.SCALA_CONTAINER org.scala-tools maven-scala-plugin ${scala.version}

          4.2 連通Kafka

          新建Scala文件——WebStatStreamingApp.scala,首先使用Direct模式連通Kafka:

          package com.taipark.spark.project
          import kafka.serializer.StringDecoderimport org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}
          /** * 使用Spark Streaming消費(fèi)Kafka的數(shù)據(jù) */object WebStatStreamingApp { def main(args: Array[String]): Unit = {
          if(args.length != 2){ System.err.println("Userage:WebStatStreamingApp "); System.exit(1); } val Array(brokers,topics) = args
          val sparkConf = new SparkConf() .setAppName("WebStatStreamingApp") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Seconds(60))
          val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers) val topicSet = topics.split(",").toSet val messages = KafkaUtils .createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topicSet )
          messages.map(_._2).count().print()
          ssc.start() ssc.awaitTermination()
          }}
          設(shè)定參數(shù):
          hadoop000:9092 streamingtopic

          在本地測試是否連通:

          連通成功,可以開始編寫業(yè)務(wù)代碼完成數(shù)據(jù)清洗(ETL)。

          4.3 ETL

          新建工具類DateUtils.scala:

          package com.taipark.spark.project.utils
          import java.util.Date
          import org.apache.commons.lang3.time.FastDateFormat
          /** * 日期時間工具類 */object DateUtils { val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddHHmmss")
          def getTime(time:String)={ YYYYMMDDHHMMSS_FORMAT.parse(time).getTime }
          def parseToMinute(time:String)={ TARGET_FORMAT.format(new Date(getTime(time))) }
          def main(args: Array[String]): Unit = {// println(parseToMinute("2020-03-10 15:00:05")) }}
          新建ClickLog.scala:
          package com.taipark.spark.project.domian
          /** * 清洗后的日志信息 */case class ClickLog(ip:String,time:String,courseId:Int,statusCode:Int,referer:String)

          修改WebStatStreamingApp.scala:

          package com.taipark.spark.project.spark
          import com.taipark.spark.project.domian.ClickLogimport com.taipark.spark.project.utils.DateUtilsimport kafka.serializer.StringDecoderimport org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}
          /** * 使用Spark Streaming消費(fèi)Kafka的數(shù)據(jù) */object WebStatStreamingApp { def main(args: Array[String]): Unit = {
          if(args.length != 2){ System.err.println("Userage:WebStatStreamingApp "); System.exit(1); } val Array(brokers,topics) = args
          val sparkConf = new SparkConf() .setAppName("WebStatStreamingApp") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Seconds(60))
          val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers) val topicSet = topics.split(",").toSet val messages = KafkaUtils .createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topicSet )
          //messages.map(_._2).count().print()
          //ETL// 30.163.55.7 2020-03-10 14:32:01 "GET /class/112.html HTTP/1.1" 404 http://www.baidu.com/s?wd=Hadoop val logs = messages.map(_._2) val cleanData = logs.map(line => { val infos = line.split("\t") //infos(2) = "GET /class/112.html HTTP/1.1" val url = infos(2).split(" ")(1) var courseId = 0
          //拿到課程編號 if(url.startsWith("/class")){ val courseIdHTML = url.split("/")(2) courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt }
          ClickLog(infos(0),DateUtils.parseToMinute(infos(1)),courseId,infos(3).toInt,infos(4)) }).filter(clicklog => clicklog.courseId != 0)
          cleanData.print()
          ssc.start() ssc.awaitTermination()
          }}

          run起來測試一下:

          ETL完成。

          4.4 功能一:到現(xiàn)在為止某網(wǎng)站的訪問量

          使用數(shù)據(jù)庫來存儲統(tǒng)計結(jié)果,可視化前端根據(jù)yyyyMMdd courseid把數(shù)據(jù)庫里的結(jié)果展示出來。

          選擇HBASE作為數(shù)據(jù)庫。要啟動HDFS與Zookeeper。

          啟動HDFS:

          ./start-dfs.sh
          啟動HBASE:
          ./start-hbase.sh
          ./hbase shelllist
          HBASE表設(shè)計:
          create 'web_course_clickcount','info'
          hbase(main):008:0> desc 'web_course_clickcount'Table web_course_clickcount is ENABLED                                                                 web_course_clickcount                                                                                  COLUMN FAMILIES DESCRIPTION                                                                            {NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                      1 row(s) in 0.1650 seconds
          Rowkey設(shè)計:
          day_courseid

          使用Scala來操作HBASE:

          新建網(wǎng)頁點(diǎn)擊數(shù)實(shí)體類 CourseClickCount.scala:

          package com.taipark.spark.project.domian
          /** * 課程網(wǎng)頁點(diǎn)擊數(shù) * @param day_course HBASE中的rowkey * @param click_count 對應(yīng)的點(diǎn)擊總數(shù) */case class CourseClickCount(day_course:String,click_count:Long)
          新建數(shù)據(jù)訪問層 CourseClickCountDAO.scala:
          package com.taipark.spark.project.dao
          import com.taipark.spark.project.domian.CourseClickCount
          import scala.collection.mutable.ListBuffer
          object CourseClickCountDAO { val tableName = "web_course_clickcount" val cf = "info" val qualifer = "click_count"
          /** * 保存數(shù)據(jù)到HBASE * @param list */ def save(list:ListBuffer[CourseClickCount]): Unit ={
          }
          /** * 根據(jù)rowkey查詢值 * @param day_course * @return */ def count(day_course:String):Long={ 0l }}

          利用Java實(shí)現(xiàn)HBaseUtils打通其與HBASE:

          package com.taipark.spark.project.utils;
          import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.HBaseAdmin;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;
          import java.io.IOException;
          /** * HBase操作工具類:Java工具類采用單例模式封裝 */public class HBaseUtils { HBaseAdmin admin = null; Configuration configuration = null;
          //私有構(gòu)造方法(單例模式) private HBaseUtils(){ configuration = new Configuration(); configuration.set("hbase.zookeeper.quorum", "hadoop000:2181"); configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");
          try { admin = new HBaseAdmin(configuration); } catch (IOException e) { e.printStackTrace(); } } private static HBaseUtils instance = null;
          public static synchronized HBaseUtils getInstance(){ if(instance == null){ instance = new HBaseUtils(); } return instance; }
          //根據(jù)表名獲取HTable實(shí)例 public HTable getTable(String tableName){ HTable table = null; try { table = new HTable(configuration,tableName); } catch (IOException e) { e.printStackTrace(); } return table; }
          /** * 添加一條記錄到HBASE表 * @param tableName 表名 * @param rowkey 表rowkey * @param cf 表的columnfamily * @param column 表的列 * @param value 寫入HBASE的值 */ public void put(String tableName,String rowkey,String cf,String column,String value){ HTable table = getTable(tableName); Put put = new Put(Bytes.toBytes(rowkey)); put.add(Bytes.toBytes(cf),Bytes.toBytes(column),Bytes.toBytes(value)); try { table.put(put); } catch (IOException e) { e.printStackTrace(); } }
          public static void main(String[] args) {// HTable hTable = HBaseUtils.getInstance().getTable("web_course_clickcount");// System.out.println(hTable.getName().getNameAsString()); String tableName = "web_course_clickcount"; String rowkey = "20200310_88"; String cf = "info"; String column = "click_count"; String value = "2"; HBaseUtils.getInstance().put(tableName,rowkey,cf,column,value);
          }}

          測試運(yùn)行:

          測試工具類成功后繼續(xù)編寫DAO的代碼:

          package com.taipark.spark.project.dao
          import com.taipark.spark.project.domian.CourseClickCountimport com.taipark.spark.project.utils.HBaseUtilsimport org.apache.hadoop.hbase.client.Getimport org.apache.hadoop.hbase.util.Bytes
          import scala.collection.mutable.ListBuffer
          object CourseClickCountDAO { val tableName = "web_course_clickcount" val cf = "info" val qualifer = "click_count"
          /** * 保存數(shù)據(jù)到HBASE * @param list */ def save(list:ListBuffer[CourseClickCount]): Unit ={ val table = HBaseUtils.getInstance().getTable(tableName)
          for(ele <- list){ table.incrementColumnValue( Bytes.toBytes(ele.day_course), Bytes.toBytes(cf), Bytes.toBytes(qualifer), ele.click_count) } }
          /** * 根據(jù)rowkey查詢值 * @param day_course * @return */ def count(day_course:String):Long={ val table = HBaseUtils.getInstance().getTable(tableName)
          val get = new Get(Bytes.toBytes(day_course)) val value = table.get(get).getValue(cf.getBytes,qualifer.getBytes)
          if (value == null){ 0L }else{ Bytes.toLong(value) } }
          def main(args: Array[String]): Unit = { val list = new ListBuffer[CourseClickCount] list.append(CourseClickCount("2020311_8",8)) list.append(CourseClickCount("2020311_9",9)) list.append(CourseClickCount("2020311_10",1)) list.append(CourseClickCount("2020311_2",15))
          save(list) }}
          測試運(yùn)行一下,用hbase shell查看:
          scan 'web_course_clickcount'
          將Spark Streaming處理結(jié)果寫到HBASE中:
          package com.taipark.spark.project.spark
          import com.taipark.spark.project.dao.CourseClickCountDAOimport com.taipark.spark.project.domian.{ClickLog, CourseClickCount}import com.taipark.spark.project.utils.DateUtilsimport kafka.serializer.StringDecoderimport org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}
          import scala.collection.mutable.ListBuffer
          /** * 使用Spark Streaming消費(fèi)Kafka的數(shù)據(jù) */object WebStatStreamingApp { def main(args: Array[String]): Unit = {
          if(args.length != 2){ System.err.println("Userage:WebStatStreamingApp "); System.exit(1); } val Array(brokers,topics) = args
          val sparkConf = new SparkConf() .setAppName("WebStatStreamingApp") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Seconds(60))
          val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers) val topicSet = topics.split(",").toSet val messages = KafkaUtils .createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topicSet )
          //messages.map(_._2).count().print()
          //ETL// 30.163.55.7 2020-03-10 14:32:01 "GET /class/112.html HTTP/1.1" 404 http://www.baidu.com/s?wd=Hadoop val logs = messages.map(_._2) val cleanData = logs.map(line => { val infos = line.split("\t") //infos(2) = "GET /class/112.html HTTP/1.1" val url = infos(2).split(" ")(1) var courseId = 0
          //拿到課程編號 if(url.startsWith("/class")){ val courseIdHTML = url.split("/")(2) courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt }
          ClickLog(infos(0),DateUtils.parseToMinute(infos(1)),courseId,infos(3).toInt,infos(4)) }).filter(clicklog => clicklog.courseId != 0)
          // cleanData.print()
          cleanData.map(x => { //HBase rowkey設(shè)計:20200311_9 ((x.time.substring(0,8)) + "_" + x.courseId,1) }).reduceByKey(_+_).foreachRDD(rdd =>{ rdd.foreachPartition(partitionRecords =>{ val list = new ListBuffer[CourseClickCount]
          partitionRecords.foreach(pair =>{ list.append(CourseClickCount(pair._1,pair._2)) })
          CourseClickCountDAO.save(list) }) })
          ssc.start() ssc.awaitTermination()
          }}

          測試:

          4.5 功能二:到現(xiàn)在為止某網(wǎng)站的搜索引擎引流訪問量

          HBASE表設(shè)計:

          create 'web_course_search_clickcount','info'
          設(shè)計rowkey:
          day_search_1
          確定實(shí)體類:
          package com.taipark.spark.project.domian
          /** * 網(wǎng)站從搜索引擎過來的點(diǎn)擊數(shù)實(shí)體類 * @param day_search_course * @param click_count */case class CourseSearchClickCount (day_search_course:String,click_count:Long)
          開發(fā)DAO CourseSearchClickCountDAO.scala:
          package com.taipark.spark.project.dao
          import com.taipark.spark.project.domian.{CourseClickCount, CourseSearchClickCount}import com.taipark.spark.project.utils.HBaseUtilsimport org.apache.hadoop.hbase.client.Getimport org.apache.hadoop.hbase.util.Bytes
          import scala.collection.mutable.ListBuffer
          object CourseSearchClickCountDAO { val tableName = "web_course_search_clickcount" val cf = "info" val qualifer = "click_count"
          /** * 保存數(shù)據(jù)到HBASE * @param list */ def save(list:ListBuffer[CourseSearchClickCount]): Unit ={ val table = HBaseUtils.getInstance().getTable(tableName)
          for(ele <- list){ table.incrementColumnValue( Bytes.toBytes(ele.day_search_course), Bytes.toBytes(cf), Bytes.toBytes(qualifer), ele.click_count) } }
          /** * 根據(jù)rowkey查詢值 * @param day_search_course * @return */ def count(day_search_course:String):Long={ val table = HBaseUtils.getInstance().getTable(tableName)
          val get = new Get(Bytes.toBytes(day_search_course)) val value = table.get(get).getValue(cf.getBytes,qualifer.getBytes)
          if (value == null){ 0L }else{ Bytes.toLong(value) } }
          def main(args: Array[String]): Unit = { val list = new ListBuffer[CourseSearchClickCount] list.append(CourseSearchClickCount("2020311_www.baidu.com_8",8)) list.append(CourseSearchClickCount("2020311_cn.bing.com_9",9))
          save(list) println(count("020311_www.baidu.com_8")) }}

          測試:

          在Spark Streaming中寫到HBASE:

          package com.taipark.spark.project.spark
          import com.taipark.spark.project.dao.{CourseClickCountDAO, CourseSearchClickCountDAO}import com.taipark.spark.project.domian.{ClickLog, CourseClickCount, CourseSearchClickCount}import com.taipark.spark.project.utils.DateUtilsimport kafka.serializer.StringDecoderimport org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}
          import scala.collection.mutable.ListBuffer
          /** * 使用Spark Streaming消費(fèi)Kafka的數(shù)據(jù) */object WebStatStreamingApp { def main(args: Array[String]): Unit = {
          if(args.length != 2){ System.err.println("Userage:WebStatStreamingApp "); System.exit(1); } val Array(brokers,topics) = args
          val sparkConf = new SparkConf() .setAppName("WebStatStreamingApp") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Seconds(60))
          val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers) val topicSet = topics.split(",").toSet val messages = KafkaUtils .createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topicSet )
          //messages.map(_._2).count().print()
          //ETL// 30.163.55.7 2020-03-10 14:32:01 "GET /class/112.html HTTP/1.1" 404 http://www.baidu.com/s?wd=Hadoop val logs = messages.map(_._2) val cleanData = logs.map(line => { val infos = line.split("\t") //infos(2) = "GET /class/112.html HTTP/1.1" val url = infos(2).split(" ")(1) var courseId = 0
          //拿到課程編號 if(url.startsWith("/class")){ val courseIdHTML = url.split("/")(2) courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt }
          ClickLog(infos(0),DateUtils.parseToMinute(infos(1)),courseId,infos(3).toInt,infos(4)) }).filter(clicklog => clicklog.courseId != 0)
          // cleanData.print()
          //需求一 cleanData.map(x => { //HBase rowkey設(shè)計:20200311_9 ((x.time.substring(0,8)) + "_" + x.courseId,1) }).reduceByKey(_+_).foreachRDD(rdd =>{ rdd.foreachPartition(partitionRecords =>{ val list = new ListBuffer[CourseClickCount]
          partitionRecords.foreach(pair =>{ list.append(CourseClickCount(pair._1,pair._2)) })
          CourseClickCountDAO.save(list) }) })
          //需求二 cleanData.map(x =>{ //http://www.baidu.com/s?wd=Spark+Streaming val referer = x.referer.replaceAll("http://","/") //http:/www.baidu.com/s?wd=Spark+Streaming val splits = referer.split("/") var host = "" //splits.length == 1 => - if(splits.length > 2){ host = splits(1) }
          (host,x.courseId,x.time) }).filter(_._1 != "").map(x =>{ (x._3.substring(0,8) + "_" + x._1 + "_" + x._2,1) }).reduceByKey(_+_).foreachRDD(rdd =>{ rdd.foreachPartition(partitionRecords =>{ val list = new ListBuffer[CourseSearchClickCount]
          partitionRecords.foreach(pair =>{ list.append(CourseSearchClickCount(pair._1,pair._2)) })
          CourseSearchClickCountDAO.save(list) }) })
          ssc.start() ssc.awaitTermination()
          }}

          測試:

          5.生產(chǎn)環(huán)境部署

          不要硬編碼,把setAppName和setMaster注釋掉:

            val sparkConf = new SparkConf()//      .setAppName("WebStatStreamingApp")//      .setMaster("local[2]")
          Maven打包部署前,需要將pom中指定build目錄的兩行注釋掉,以防報錯:
              
          Maven打包傳到服務(wù)器:
          利用spark-submit提交:
          ./spark-submit \--master local[5] \--name WebStatStreamingApp \--class com.taipark.spark.project.spark.WebStatStreamingApp \/home/hadoop/tplib/sparktrain-1.0.jar \hadoop000:9092 streamingtopic

          報錯:

          Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$

          修改,添加jar包spark-streaming-kafka-0-8_2.11:
          ./spark-submit \--master local[5] \--name WebStatStreamingApp \--class com.taipark.spark.project.spark.WebStatStreamingApp \--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \/home/hadoop/tplib/sparktrain-1.0.jar \hadoop000:9092 streamingtopic

          報錯:

          java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/HBaseAdmin

          修改,增加HBASE的jar包:

          ./spark-submit \--master local[5] \--name WebStatStreamingApp \--class com.taipark.spark.project.spark.WebStatStreamingApp \--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \--jars $(echo /home/hadoop/app/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr ' ' ',') \/home/hadoop/tplib/sparktrain-1.0.jar \hadoop000:9092 streamingtopic

          運(yùn)行:

          后臺運(yùn)行成功

          6.Spring Boot開發(fā)

          6.1 測試ECharts

          新建一個Spring Boot項目,下載ECharts,利用其在線編譯,獲得echarts.min.js,放在resources/static/js下

          pox.xml添加一個依賴:

                              org.springframework.boot            spring-boot-starter-thymeleaf        
          resources/templates里做一個test.html:
                  test        
          新建java文件:
          package com.taipark.spark.web;
          import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RestController;import org.springframework.web.servlet.ModelAndView;
          /** * 測試 */@RestControllerpublic class HelloBoot { @RequestMapping(value = "/hello",method = RequestMethod.GET) public String sayHello(){ return "HelloWorld!"; }
          @RequestMapping(value = "/first",method = RequestMethod.GET) public ModelAndView firstDemo(){ return new ModelAndView("test"); }}

          測試一下:

          成功

          6.2 動態(tài)實(shí)現(xiàn)ECharts

          添加repository:

                                  cloudera            https://repository.cloudera.com/artifactory/cloudera-repos/            
          添加依賴:
                              org.apache.hbase            hbase-client            1.2.0-cdh5.7.0        
          創(chuàng)建HBaseUtils.java:
          package com.taipark.spark.web.utils;
          import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.filter.Filter;import org.apache.hadoop.hbase.filter.PrefixFilter;import org.apache.hadoop.hbase.util.Bytes;

          import java.io.IOException;import java.util.HashMap;import java.util.Map;
          public class HBaseUtils { HBaseAdmin admin = null; Configuration configuration = null;
          //私有構(gòu)造方法(單例模式) private HBaseUtils(){ configuration = new Configuration(); configuration.set("hbase.zookeeper.quorum", "hadoop000:2181"); configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");
          try { admin = new HBaseAdmin(configuration); } catch (IOException e) { e.printStackTrace(); } } private static HBaseUtils instance = null;
          public static synchronized HBaseUtils getInstance(){ if(instance == null){ instance = new HBaseUtils(); } return instance; }
          //根據(jù)表名獲取HTable實(shí)例 public HTable getTable(String tableName){ HTable table = null; try { table = new HTable(configuration,tableName); } catch (IOException e) { e.printStackTrace(); } return table; }
          /** * 根據(jù)表名和輸入條件獲取HBASE的記錄數(shù) * @param tableName * @param dayCourse * @return */ public Map query(String tableName,String condition) throws Exception{ Map map = new HashMap<>();
          HTable table = getTable(tableName); String cf ="info"; String qualifier = "click_count";
          Scan scan = new Scan(); Filter filter = new PrefixFilter(Bytes.toBytes(condition)); scan.setFilter(filter); ResultScanner rs = table.getScanner(scan); for(Result result:rs){ String row = Bytes.toString(result.getRow()); long clickCount = Bytes.toLong(result.getValue(cf.getBytes(), qualifier.getBytes())); map.put(row,clickCount); } return map; }
          public static void main(String[] args) throws Exception{ Map map = HBaseUtils.getInstance().query("web_course_clickcount", "20200311");
          for(Map.Entry entry:map.entrySet()){ System.out.println(entry.getKey() + ":" + entry.getValue()); } }}

          測試通過:

          定義網(wǎng)頁訪問數(shù)量Bean:

          package com.taipark.spark.web.domain;
          import org.springframework.stereotype.Component;
          /** * 網(wǎng)頁訪問數(shù)量實(shí)體類 */@Componentpublic class CourseClickCount {
          private String name; private long value;
          public String getName() { return name; }
          public void setName(String name) { this.name = name; }
          public long getValue() { return value; }
          public void setValue(long value) { this.value = value; }}
          DAO層:
          package com.taipark.spark.web.dao;
          import com.taipark.spark.web.domain.CourseClickCount;import com.taipark.spark.web.utils.HBaseUtils;import org.springframework.stereotype.Component;

          import java.util.ArrayList;import java.util.List;import java.util.Map;
          /** * 網(wǎng)頁訪問數(shù)量數(shù)據(jù)訪問層 */@Componentpublic class CourseClickDAO { /** * 根據(jù)天查詢 * @param day * @return * @throws Exception */ public List query(String day) throws Exception{
          List list = new ArrayList<>(); //去HBase表中根據(jù)day獲取對應(yīng)網(wǎng)頁的訪問量 Map map = HBaseUtils.getInstance().query("web_course_clickcount", "20200311"); for(Map.Entry entry:map.entrySet()){ CourseClickCount model = new CourseClickCount(); model.setName(entry.getKey()); model.setValue(entry.getValue());
          list.add(model); } return list; }
          public static void main(String[] args) throws Exception{ CourseClickDAO dao = new CourseClickDAO(); List list = dao.query( "20200311");
          for(CourseClickCount model:list){ System.out.println(model.getName() + ":" + model.getValue()); } }}
          使用JSON需要引入:
                              net.sf.json-lib            json-lib            2.4            jdk15        
          Web層:
          package com.taipark.spark.web.spark;
          import com.taipark.spark.web.dao.CourseClickDAO;import com.taipark.spark.web.domain.CourseClickCount;import net.sf.json.JSONArray;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.ResponseBody;import org.springframework.web.bind.annotation.RestController;import org.springframework.web.servlet.ModelAndView;
          import java.util.HashMap;import java.util.List;import java.util.Map;
          /** * web層 */@RestControllerpublic class WebStatApp {
          private static Map courses = new HashMap<>(); static { courses.put("112","某些外國人對中國有多不了解?"); courses.put("128","你認(rèn)為有哪些失敗的建筑?"); courses.put("145","為什么人類想象不出四維空間?"); courses.put("146","有什么一眼看上去很舒服的頭像?"); courses.put("131","男朋友心情不好時女朋友該怎么辦?"); courses.put("130","小白如何從零開始運(yùn)營一個微信公眾號?"); courses.put("821","為什么有人不喜歡極簡主義?"); courses.put("825","有哪些書看完后會讓人很后悔沒有早看到?"); }
          // @Autowired// CourseClickDAO courseClickDAO;// @RequestMapping(value = "/course_clickcount_dynamic",method = RequestMethod.GET)// public ModelAndView courseClickCount() throws Exception{// ModelAndView view = new ModelAndView("index");// List list = courseClickDAO.query("20200311");//// for(CourseClickCount model:list){// model.setName(courses.get(model.getName().substring(9)));// }// JSONArray json = JSONArray.fromObject(list);//// view.addObject("data_json",json);//// return view;// }
          @Autowired CourseClickDAO courseClickDAO; @RequestMapping(value = "/course_clickcount_dynamic",method = RequestMethod.POST) @ResponseBody public List courseClickCount() throws Exception{ ModelAndView view = new ModelAndView("index"); List list = courseClickDAO.query("20200311"); for(CourseClickCount model:list){ model.setName(courses.get(model.getName().substring(9))); }
          return list; }
          @RequestMapping(value = "/echarts",method = RequestMethod.GET) public ModelAndView echarts(){ return new ModelAndView("echarts"); }}
          下載JQuery,并放到static/js下,新建echarts.html:
                  web_stat            

          測試一下:

          6.3 Spring的服務(wù)器部署

          Maven打包并上傳服務(wù)器

          java?-jar?web-0.0.1.jar


          --end--


          掃描下方二維碼
          添加好友,備注【交流
          可私聊交流,也可進(jìn)資源豐富學(xué)習(xí)群
          瀏覽 33
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  自拍三级片青青草视频 | 在线看a黄色片……` | 成人啪啪网站 | 美女被日的网站 | 亚洲一色在线 |