TrafficTeach大數(shù)據(jù)項目:車流量監(jiān)控
車流量監(jiān)控
前言
目的 1.對業(yè)務(wù)場景抽象,熟練Spark編碼 2.增加自定義累加器,自定義UDF 3.Spark 優(yōu)化方式
項目 數(shù)據(jù)處理架構(gòu)
模塊介紹
-
卡扣流量分析 Spark Core
-
卡扣車流量轉(zhuǎn)化率 Spark Core
-
各區(qū)域車流量最高top5的道路統(tǒng)計 SparkSQL
-
稽查布控,道路實時擁堵統(tǒng)計 SparkStreaming
hive表
monitor_flow_action表 – date 日期 天 – monitor_id 卡口號 – camera_id 攝像頭編號 – car 車牌 – action_time 某個攝像頭拍攝時間 s – speed 通過卡扣的速度 – road_id 道路id – area_id 區(qū)域ID
monitor_camera_info表 某一個卡扣對應(yīng)的攝像頭編號 – monitor_id:卡扣編號 – camera_id:攝像頭編號
數(shù)據(jù)加載hive中 1). 創(chuàng)建表,加載數(shù)據(jù)load data Data2File
hive -f createHiveTab.sql
2). 集群中提交spark作業(yè),使用代碼生成到hive Data2Hive
大數(shù)據(jù)開發(fā)流程
- 數(shù)據(jù)調(diào)研(對底層的數(shù)據(jù)的表結(jié)構(gòu)進(jìn)行調(diào)研,分析和研究)
- 需求分析(與PM討論需求,畫原型圖 axure)
- 基于討論出來的結(jié)果做出技術(shù)方案(某個難點用什么技術(shù),數(shù)據(jù)庫選型)
- 具體實施
功能點
-
根據(jù)使用者(平臺使用者)指定的某些條件,篩選出指定的一批卡扣信息(比如根據(jù)區(qū)域、時間篩選)
-
檢測卡扣狀態(tài),對于篩選出來的所有的卡口(不代表一個攝像頭)信息統(tǒng)計 ? 卡口正常數(shù) ? 異常數(shù) ? camera的正常數(shù) ? camera的異常數(shù) ? camera的詳細(xì)信息( monitor_id:camera_id)
-
車流量最多的TonN卡扣號 ? 獲取每一個卡扣的詳細(xì)信息( Top5 )
-
隨機(jī)抽取N個車輛信息,對這些數(shù)據(jù)可以進(jìn)行多維度分析(因為隨機(jī)抽取出來的N個車輛信息可以很權(quán)威的代表整個 區(qū)域的車輛)
-
計算出經(jīng)常高速通過的TopN卡口 (查看哪些卡扣經(jīng)常被高速通過,高速,中速,正常,低速 根據(jù)三個速度段進(jìn)行四 次排序,高速通過的車輛數(shù)相同就比較中速通過的車輛數(shù),以此來推)
-
跟車分析
需求分析
按條件篩選卡扣信息
? 可以指定 不同的條件,時間范圍、區(qū)域范圍、卡扣號等 可以靈活的分析不同區(qū)域的卡扣信息
監(jiān)測卡扣狀態(tài)
? 對符合條件的卡扣信息,可以動態(tài)的檢查每一個卡扣的狀態(tài),查看卡扣是否正常工作,也可以查看攝像頭
車流量最多的TonN卡扣
? 查看哪些卡扣的車流量最高,為什么會出現(xiàn)這么高的車流量。分析原因,例如今天出城的車輛非常多,啥原因,今天進(jìn) 城的車輛非常多,啥原因? 要造反? 這個功能點里面也會拿到具體的車輛的信息,分析一下是京牌車造成的還是外地 車牌?
在符合條件的卡扣信息中隨機(jī)抽取N個車輛信息
? 隨機(jī)抽取N輛車的信息,可以權(quán)威的代表整個區(qū)域的車輛,這時候可以分析這些車的軌跡,看一下在不同的時間點車輛 的流動方向。以便于道路的規(guī)劃。
計算出經(jīng)常高速通過的TopN卡口
? 統(tǒng)計出是否存在飆車現(xiàn)象,或者經(jīng)常進(jìn)行超速行駛,可以在此處安裝違章拍攝設(shè)備
跟車分析
? 計算出所有車是否被跟蹤過,然后將結(jié)果存儲在MySQL中,以便后期進(jìn)行查詢
項目分析
monitor_flow_action 監(jiān)控數(shù)據(jù)表
monitor_camera_info 卡扣與攝像頭基本關(guān)系表
1.卡扣監(jiān)控
統(tǒng)計: 正常的卡扣個數(shù),異常的卡扣個數(shù),正常的攝像頭個數(shù),異常的攝像頭個數(shù),異常的攝像頭詳細(xì)信息
正??蹅€數(shù):
monitor_camera_info 基本關(guān)系表中卡扣與攝像頭的關(guān)系與在monitor_flow_action 監(jiān)控數(shù)據(jù)表 中,卡扣與攝像頭的關(guān)系完全對應(yīng)上
0001:11111,22222
0001 11111 xxx
0001 22222 xxx
RDD思路-正常的卡扣數(shù)為例:
monitor_flow_action表 -> RDD<Monitor_id,Camera_id> -> RDD<Monitor_id,[camera_ids]> - RDD<Monitor_id,camera_ids>
monitor_camera_info表 -> RDD<Monitor_id,Camera_id> -> RDD<Monitor_id,[camera_ids]>
異常的卡扣個數(shù):
1.monitor_camera_info 基本關(guān)系表中 卡扣 與攝像頭的關(guān)系,在監(jiān)控的數(shù)據(jù)表中 一條都沒有對應(yīng)。
2.monitor_camera_info 基本關(guān)系表中 卡扣 與攝像頭的關(guān)系,在監(jiān)控的數(shù)據(jù)表中 部分?jǐn)?shù)據(jù)有對應(yīng)。
正常的攝像頭個數(shù):
異常的攝像頭個數(shù):
異常的攝像頭詳細(xì)信息:0001:11111,22222,33333
~0004:76789,27449,87911,61106,45624,37726,09506
~0001:70037,23828,34361,92206,76657,26608
~0003:36687,99260,49613,97165
~0006:82302,11645,73565,36440
~0002:60478,07738,53139,75127,16494,48312
~0008:34144,27504,83395,62222,49656,18640
~0007:19179,72906,55656,60720,74161,85939,51743,40565,13972,79216,35128,27369,84616,09553
~0000:67157,85327,08658,57407,64297,15568,31898,36621
~0005:09761,12853,91031,33015,52841,15425,45548,36528
注意:
求個數(shù): 累加器實現(xiàn)(并行 分布式)
異常的攝像頭信息,用累加器實現(xiàn),無非拼的是字符串
更新累加器與take使用時,take算子可以觸發(fā)多個job執(zhí)行,可以造成累加器重復(fù)計算。
./spark-submit --master spark://node1:7077,node2:7077 --jars ../lib/fastjson-1.2.11.jar,../lib/mysql-connector-java-5.1.6.jar --class MonitorFlowAnalyze ../lib/Test.jar 1
~0001:13846,54785,51995,64341,45994,32228,82054,87746
~0003:38780,08844,03281,07183,50318,87000,16722,11604,26508,45523,46380
~0007:61833,19140,38387
~0005:63920,23464,37389,01219,96765,24844,32101,24141~
~0004:60778,35444,35403,68811,73819,81893
~0006:09621,67028,96375,60036,91237,53743,10305
~0002:24694,01172,25945,79625,83215,72235,26855
~0008:24630,40432,96808,78708,28294
~0000:68070,12865,49505,26035,36931,38053,91868
2.通過車輛數(shù)最多的topN卡扣
3.統(tǒng)計topN卡扣下經(jīng)過的所有車輛詳細(xì)信息
4.車輛通過速度相對比較快的topN卡扣
車速: 120=<speed 高速 90<=speed<120 中速 60<=speed<90 正常 0<speed<60 低速
5.卡扣“0001”下所有車輛軌跡
1.過濾日期范圍內(nèi) 卡扣“0001”下 有哪些車輛?
2.過濾日期范圍內(nèi) 這些車輛經(jīng)過卡扣的時間,按照時間升序排序
6.車輛碰撞
01,02中這一天同時出現(xiàn)的車輛
01:(car,row) 02:(car,row)
(car,row).join(car,row)
01:car 02:car
car02.intersection(car02)
7.隨機(jī)抽取車輛
在一天中要隨機(jī)抽取100輛車,抽取的車輛可以權(quán)威代表當(dāng)天交通運行情況。
假如這天一共有10000輛車,要隨機(jī)抽取100輛車:
sample(true,0.1,seed)
00~01 100 100/10000*100 = 1
01~02 100 1
02~03 100 1
04~05 200 2
05~06 200 2
06~07 300 3
08~09 500 5
09~10 200 2
10~11 200 2
11~12 300 3
12~13 500 5
13~14 700 7
。。
。。
。。
8.卡扣流量轉(zhuǎn)換率
卡扣流量轉(zhuǎn)換率,是指車輛運行連續(xù)卡口的轉(zhuǎn)換率 如:一車輛經(jīng)過卡口的軌跡為:0001,0002,0004,0001,0002,0003,0001,0002。 0001,0002:卡扣0001到卡扣0002的轉(zhuǎn)換率為:經(jīng)過卡扣0001,又經(jīng)過卡扣0002的次數(shù)/經(jīng)過卡扣0001的次數(shù),針對上面的例子, 經(jīng)過卡口0001的次數(shù)為:3次,經(jīng)過卡口0001,又經(jīng)過卡口0002的次數(shù)為:3次,那么卡扣0001到卡扣0002的轉(zhuǎn)換率為:3/3=100%,同理: 0001,0002,0003 代表卡扣0001,0002到卡扣0003的轉(zhuǎn)換率。 0001,0002,0003,0004代表卡扣0001,0002,0003到卡扣00004的轉(zhuǎn)換率。 MonitorOneStepConvertRateAnalyze.java
一輛車的軌跡: 0001->0002->0003->0001->0002->0004->0005->0001 0001,0002----卡扣0001到卡扣0002 的車流量轉(zhuǎn)化率:通過卡扣0001又通過卡扣0002的次數(shù)/通過卡扣0001的次數(shù) 2/3 0001,0002,0003 ---- 卡扣0001,0002到0003的車輛轉(zhuǎn)換率:通過卡扣0001,0002,0003的次數(shù) /通過卡扣0001,0002 0001,0002,0003,0004 -----卡扣0001,0002,0003到0004的車輛轉(zhuǎn)換率:通過卡扣0001,0002,0003,0004的次數(shù) /通過卡扣0001,0002,0003 0001,0002,0003,0004,0005 -----卡扣0001,0002,0003,0004到0005的車輛轉(zhuǎn)換率:通過卡扣0001,0002,0003,0004,0005的次數(shù) /通過卡扣0001,0002,0003,0004的次數(shù) 手動輸入卡扣號: 0001,0002,0003,0004,0005 求: 0001,0002 0001,0002,0003 0001,0002,0003,0004 0001,0002,0003,0004,0005
粵A11111: ("0001",100) ("0001,0002",30) ("0001,0002,0003",10) 粵B22222: ("0001",200) ("0001,0002",100) ("0001,0002,0003",70) ("0001,0002,0003,0004",10)
9.實時道路擁堵情況
計算一段時間內(nèi)卡扣下通過的車輛的平均速度。
這段時間不能太短,也不能太長。就計算當(dāng)前時間的前五分鐘 當(dāng)前卡扣下通過所有車輛的平均速度。
每隔5s 計算一次當(dāng)前卡扣過去5分鐘 所有車輛的平均速度。
SparkStreaming 窗口函數(shù)
window lenth:5min
slide interval:5s
10.動態(tài)改變廣播變量
`transform` `foreachRDD`
11.統(tǒng)計每個區(qū)域中車輛最多的前3道路
道路車輛:道路中的每個卡扣經(jīng)過的車輛累加
天河區(qū) 元崗路1 0001=30,0002=50,0003=100,0004=20 200
天河區(qū) 元崗路2 0005=50,0006=100 150
天河區(qū) 元崗路3 100
越秀區(qū) xxx1 200
越秀區(qū) xxx2 150
越秀區(qū) xxx3 100
SparkSQL
Hive 表 --t1 :
monitor_id car road_id area_id
-----
areaId area_name road_id monitor_id car ------ tmp_car_flow_basic
sql:
select area_name,road_id,count(car) as car_count,UDAF(monitor_id) as monitor_infos from t1 group by area_name,road_id ---- tmp_area_road_flow_count
開窗函數(shù):row_number() over (partition by xxx order by xxx ) rank
select area_name,road_id,car_count,monitor_infos, row_number() over (partition by area_name order by car_count desc ) rank from tmp_area_road_flow_count ---- tmp
select area_name,road_id,car_count,monitor_infos from tmp where rank <=3
-----------------------------------------------------------------------
總sql:
select
area_name,road_id,car_count,monitor_infos
from
(
select
area_name,road_id,car_count,monitor_infos, row_number() over (partition by area_id order by carCount desc ) rank
from
(
select
area_name,road_id,count(car) as car_count ,UDAF(monitor_id) as monitor_infos
from
t1
group by area_name,road_id
) t2
) t3
where rank <=3
=================================================================================================================== sql: select prefix_area_name_road_id,count(car) as car_count,UDAF(monitor_id) as monitor_infos from t1 group by prefix_area_name_road_id ---- tmp_area_road_flow_count
select area_name,road_id,car_count,monitor_infos, row_number() over (partition by area_name order by car_count desc ) rank from tmp_area_road_flow_count ---- tmp
select area_name,road_id,car_count,monitor_infos from tmp where rank <=3
-----------------------------------------------------------------------
總sql:
select
area_name,road_id,car_count,monitor_infos
from
(
select
area_name,road_id,car_count,monitor_infos, row_number() over (partition by area_id order by carCount desc ) rank
from
(
select
area_name,road_id,count(car) as car_count ,UDAF(monitor_id) as monitor_infos
from
t1
group by area_name,road_id
) t2
) t3
where rank <=3
車輛軌跡
統(tǒng)計卡扣0001下所有車輛的軌跡 -- take(20)
各區(qū)域車流量最高topN的道路統(tǒng)計 1.會將小于spark.sql.autoBroadcastJoinThreshold值(默認(rèn)為10M)的表廣播到executor節(jié)點,不走shuffle過程,更加高效。 sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "20971520"); //單位:字節(jié) 2.在Hive中執(zhí)行sql文件: hive –f sql.sql 3.提交命令: --master spark://node1:7077,node2:7077 --jars ../lib/mysql-connector-java-5.1.6.jar,../lib/fastjson-1.2.11.jar --driver-class-path ../lib/mysql-connector-java-5.1.6.jar:../lib/fastjson-1.2.11.jar ../lib/Test.jar 4
3.緝查布控,道路實時擁堵統(tǒng)計 動態(tài)改變廣播變量的值:可以通過transform和foreachRDD
屏蔽過多黃色警告,忽略java類方法的參數(shù) 與注釋; File -> Settings -> Editor -> Inspections -> java ->javadoc: 參數(shù)不一致的屏蔽: Declaration has problems in Javadoc refere 紅色 改成 waring黃色 參數(shù)沒有注釋: Dangling Javadoc comment 去掉勾選 Declaration has Javadoc problems 去掉勾選
