我用Java幾分鐘處理完30億個數(shù)據(jù)...
點擊上方藍色字體,選擇“設(shè)為星標”

目錄
場景說明
模擬數(shù)據(jù)
場景分析
讀取數(shù)據(jù)
處理數(shù)據(jù)
遇到的問題
場景說明
????????23,31,42,19,60,30,36,........
模擬數(shù)據(jù)
package?bigdata;
import?java.io.*;
import?java.util.Random;
/**
?*?@Desc:
?*?@Author:?bingbing
?*?@Date:?2022/5/4?0004?19:05
?*/
public?class?GenerateData?{
????private?static?Random?random?=?new?Random();
????public?static?int?generateRandomData(int?start,?int?end)?{
????????return?random.nextInt(end?-?start?+?1)?+?start;
????}
????/**
?????*?產(chǎn)生10G的?1-1000的數(shù)據(jù)在D盤
?????*/
????public?void?generateData()?throws?IOException?{
????????File?file?=?new?File("D:\?User.dat");
????????if?(!file.exists())?{
????????????try?{
????????????????file.createNewFile();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????????int?start?=?18;
????????int?end?=?70;
????????long?startTime?=?System.currentTimeMillis();
????????BufferedWriter?bos?=?new?BufferedWriter(new?OutputStreamWriter(new?FileOutputStream(file,?true)));
????????for?(long?i?=?1;?i?1.7;?i++)?{
????????????String?data?=?generateRandomData(start,?end)?+?",";
????????????bos.write(data);
????????????//?每100萬條記錄成一行,100萬條數(shù)據(jù)大概4M
????????????if?(i?%?1000000?==?0)?{
????????????????bos.write("\n");
????????????}
????????}
????????System.out.println("寫入完成!?共花費時間:"?+?(System.currentTimeMillis()?-?startTime)?/?1000?+?"?s");
????????bos.close();
????}
????public?static?void?main(String[]?args)?{
????????GenerateData?generateData?=?new?GenerateData();
????????try?{
????????????generateData.generateData();
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????}
}

準備好 10G 數(shù)據(jù)后,接著寫如何處理這些數(shù)據(jù)。
場景分析
10G 的數(shù)據(jù)比當前擁有的運行內(nèi)存大的多,不能全量加載到內(nèi)存中讀取,如果采用全量加載,那么內(nèi)存會直接爆掉,只能按行讀取,Java 中的 bufferedReader 的 readLine() 按行讀取文件里的內(nèi)容。
讀取數(shù)據(jù)
????private?static?void?readData()?throws?IOException?{
????????BufferedReader?br?=?new?BufferedReader(new?InputStreamReader(new?FileInputStream(FILE_NAME),?"utf-8"));
????????String?line;
????????long?start?=?System.currentTimeMillis();
????????int?count?=?1;
????????while?((line?=?br.readLine())?!=?null)?{
????????????//?按行讀取
//????????????SplitData.splitLine(line);
????????????if?(count?%?100?==?0)?{
????????????????System.out.println("讀取100行,總耗時間:?"?+?(System.currentTimeMillis()?-?start)?/?1000?+?"?s");
????????????????System.gc();
????????????}
????????????count++;
????????}
????????running?=?false;
????????br.close();
????}

處理數(shù)據(jù)
| 思路一:通過單線程處理
????for?(int?i?=?start;?i?<=?end;?i++)?{
????????????try?{
????????????????File?subFile?=?new?File(dir?+?"\"?+?i?+?".dat");
????????????????if?(!file.exists())?{
????????????????????subFile.createNewFile();
????????????????}
????????????????countMap.computeIfAbsent(i?+?"",?integer?->?new?AtomicInteger(0));
????????????}?catch?(FileNotFoundException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
?????public?static?void?splitLine(String?lineData)?{
????????????String[]?arr?=?lineData.split(",");
????????????for?(String?str?:?arr)?{
????????????????if?(StringUtils.isEmpty(str))?{
????????????????????continue;
????????????????}
????????????????countMap.computeIfAbsent(str,?s?->?new?AtomicInteger(0)).getAndIncrement();
????????????}
????????}
??private?static?void?findMostAge()?{
????????Integer?targetValue?=?0;
????????String?targetKey?=?null;
????????Iterator<Map.Entry<String,?AtomicInteger>>?entrySetIterator?=?countMap.entrySet().iterator();
????????while?(entrySetIterator.hasNext())?{
????????????Map.Entry<String,?AtomicInteger>?entry?=?entrySetIterator.next();
????????????Integer?value?=?entry.getValue().get();
????????????String?key?=?entry.getKey();
????????????if?(value?>?targetValue)?{
????????????????targetValue?=?value;
????????????????targetKey?=?key;
????????????}
????????}
????????System.out.println("數(shù)量最多的年齡為:"?+?targetKey?+?"數(shù)量為:"?+?targetValue);
????}
完整代碼:
package?bigdata;
import?org.apache.commons.lang3.StringUtils;
import?java.io.*;
import?java.util.*;
import?java.util.concurrent.ConcurrentHashMap;
import?java.util.concurrent.atomic.AtomicInteger;
/**
?*?@Desc:
?*?@Author:?bingbing
?*?@Date:?2022/5/4?0004?19:19
?*?單線程處理
?*/
public?class?HandleMaxRepeatProblem_v0?{
????public?static?final?int?start?=?18;
????public?static?final?int?end?=?70;
????public?static?final?String?dir?=?"D:\dataDir";
????public?static?final?String?FILE_NAME?=?"D:\?User.dat";
????/**
?????*?統(tǒng)計數(shù)量
?????*/
????private?static?Map?countMap?=?new?ConcurrentHashMap<>();
????/**
?????*?開啟消費的標志
?????*/
????private?static?volatile?boolean?startConsumer?=?false;
????/**
?????*?消費者運行保證
?????*/
????private?static?volatile?boolean?consumerRunning?=?true;
????/**
?????*?按照?","?分割數(shù)據(jù),并寫入到countMap里
?????*/
????static?class?SplitData?{
????????public?static?void?splitLine(String?lineData)?{
????????????String[]?arr?=?lineData.split(",");
????????????for?(String?str?:?arr)?{
????????????????if?(StringUtils.isEmpty(str))?{
????????????????????continue;
????????????????}
????????????????countMap.computeIfAbsent(str,?s?->?new?AtomicInteger(0)).getAndIncrement();
????????????}
????????}
????}
????/**
?????*??init?map
?????*/
????static?{
????????File?file?=?new?File(dir);
????????if?(!file.exists())?{
????????????file.mkdir();
????????}
????????for?(int?i?=?start;?i?<=?end;?i++)?{
????????????try?{
????????????????File?subFile?=?new?File(dir?+?"\"?+?i?+?".dat");
????????????????if?(!file.exists())?{
????????????????????subFile.createNewFile();
????????????????}
????????????????countMap.computeIfAbsent(i?+?"",?integer?->?new?AtomicInteger(0));
????????????}?catch?(FileNotFoundException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
????public?static?void?main(String[]?args)?{
????????new?Thread(()?->?{
????????????try?{
????????????????readData();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}).start();
????}
????private?static?void?readData()?throws?IOException?{
????????BufferedReader?br?=?new?BufferedReader(new?InputStreamReader(new?FileInputStream(FILE_NAME),?"utf-8"));
????????String?line;
????????long?start?=?System.currentTimeMillis();
????????int?count?=?1;
????????while?((line?=?br.readLine())?!=?null)?{
????????????//?按行讀取,并向map里寫入數(shù)據(jù)
????????????SplitData.splitLine(line);
????????????if?(count?%?100?==?0)?{
????????????????System.out.println("讀取100行,總耗時間:?"?+?(System.currentTimeMillis()?-?start)?/?1000?+?"?s");
????????????????try?{
????????????????????Thread.sleep(1000L);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????count++;
????????}
????????findMostAge();
????????br.close();
????}
????private?static?void?findMostAge()?{
????????Integer?targetValue?=?0;
????????String?targetKey?=?null;
????????Iterator>?entrySetIterator?=?countMap.entrySet().iterator();
????????while?(entrySetIterator.hasNext())?{
????????????Map.Entry?entry?=?entrySetIterator.next();
????????????Integer?value?=?entry.getValue().get();
????????????String?key?=?entry.getKey();
????????????if?(value?>?targetValue)?{
????????????????targetValue?=?value;
????????????????targetKey?=?key;
????????????}
????????}
????????System.out.println(" 數(shù)量最多的年齡為:"?+?targetKey?+?"數(shù)量為:"?+?targetValue);
????}
????private?static?void?clearTask()?{
????????//?清理,同時找出出現(xiàn)字符最大的數(shù)
????????findMostAge();
????????System.exit(-1);
????}
}
測試結(jié)果:總共花了 3 分鐘讀取完并統(tǒng)計完所有數(shù)據(jù)。


要想提高 CPU 的利用率,那么可以使用多線程去處理。下面我們使用多線程去解決這個 CPU 利用率低的問題。
| 思路二:分治法
使用多線程去消費讀取到的數(shù)據(jù)。采用生產(chǎn)者、消費者模式去消費數(shù)據(jù),因為在讀取的時候是比較快的,單線程的數(shù)據(jù)處理能力比較差,因此思路一的性能阻塞在取數(shù)據(jù)方,又是同步的,所以導致整個鏈路的性能會變的很差。
所謂分治法就是分而治之,也就是說將海量數(shù)據(jù)分割處理。根據(jù) CPU 的能力初始化 n 個線程,每一個線程去消費一個隊列,這樣線程在消費的時候不會出現(xiàn)搶占隊列的問題。

①初始化阻塞隊列
????private?static?List>?blockQueueLists?=?new?LinkedList<>();
????//每個隊列容量為256
????????for?(int?i?=?0;?i?????????????blockQueueLists.add(new?LinkedBlockingQueue<>(256));
????????}
②生產(chǎn)者
????private?static?AtomicLong?count?=?new?AtomicLong(0);
按照行數(shù)來計算隊列的下標:long index=count.get()%threadNums。
???static?class?SplitData?{
????????public?static?void?splitLine(String?lineData)?{
//????????????System.out.println(lineData.length());
????????????String[]?arr?=?lineData.split("\n");
????????????for?(String?str?:?arr)?{
????????????????if?(StringUtils.isEmpty(str))?{
????????????????????continue;
????????????????}
????????????????long?index?=?count.get()?%?threadNums;
????????????????try?{
????????????????????//?如果滿了就阻塞
????????????????????blockQueueLists.get((int)?index).put(str);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????????count.getAndIncrement();
????????????}
????????}
③消費者
隊列線程私有化:消費方在啟動線程的時候根據(jù) index 去獲取到指定的隊列,這樣就實現(xiàn)了隊列的線程私有化。
????private?static?void?startConsumer()?throws?FileNotFoundException,?UnsupportedEncodingException?{
????????//如果共用一個隊列,那么線程不宜過多,容易出現(xiàn)搶占現(xiàn)象
????????System.out.println("開始消費...");
????????for?(int?i?=?0;?i?????????????final?int?index?=?i;
????????????//?每一個線程負責一個queue,這樣不會出現(xiàn)線程搶占隊列的情況。
????????????new?Thread(()?->?{
????????????????while?(consumerRunning)?{
????????????????????startConsumer?=?true;
????????????????????try?{
????????????????????????String?str?=?blockQueueLists.get(index).take();
????????????????????????countNum(str);
????????????????????}?catch?(InterruptedException?e)?{
????????????????????????e.printStackTrace();
????????????????????}
????????????????}
????????????}).start();
????????}
????}
多子線程分割字符串:由于從隊列中多到的字符串非常的龐大,如果又是用單線程調(diào)用 split(",") 去分割,那么性能同樣會阻塞在這個地方。
????//?按照arr的大小,運用多線程分割字符串
????private?static?void?countNum(String?str)?{
????????int[]?arr?=?new?int[2];
????????arr[1]?=?str.length()?/?3;
//????????System.out.println("分割的字符串為start位置為:"?+?arr[0]?+?",end位置為:"?+?arr[1]);
????????for?(int?i?=?0;?i?3;?i++)?{
????????????final?String?innerStr?=?SplitData.splitStr(str,?arr);
//????????????System.out.println("分割的字符串為start位置為:"?+?arr[0]?+?",end位置為:"?+?arr[1]);
????????????new?Thread(()?->?{
????????????????String[]?strArray?=?innerStr.split(",");
????????????????for?(String?s?:?strArray)?{
????????????????????countMap.computeIfAbsent(s,?s1?->?new?AtomicInteger(0)).getAndIncrement();
????????????????}
????????????}).start();
????????}
????}
分割字符串算法:分割時從 0 開始,按照等分的原則,將字符串 n 等份,每一個線程分到一份。
用一個 arr 數(shù)組的 arr[0] 記錄每次的分割開始位置,arr[1] 記錄每次分割的結(jié)束位置,如果遇到的開始的字符不為 ",",那么就 startIndex-1,如果結(jié)束的位置不為 ",",那么將 endIndex 向后移一位。
如果 endIndex 超過了字符串的最大長度,那么就把最后一個字符賦值給 arr[1]。
????????/**
?????????*?按照 x坐標?來分割?字符串,如果切到的字符不為“,”,?那么把坐標向前或者向后移動一位。
?????????*
?????????*?@param?line
?????????*?@param?arr??存放x1,x2坐標
?????????*?@return
?????????*/
????????public?static?String?splitStr(String?line,?int[]?arr)?{
????????????int?startIndex?=?arr[0];
????????????int?endIndex?=?arr[1];
????????????char?start?=?line.charAt(startIndex);
????????????char?end?=?line.charAt(endIndex);
????????????if?((startIndex?==?0?||?start?==?',')?&&?end?==?',')?{
????????????????arr[0]?=?endIndex?+?1;
????????????????arr[1]?=?arr[0]?+?line.length()?/?3;
????????????????if?(arr[1]?>=?line.length())?{
????????????????????arr[1]?=?line.length()?-?1;
????????????????}
????????????????return?line.substring(startIndex,?endIndex);
????????????}
????????????if?(startIndex?!=?0?&&?start?!=?',')?{
????????????????startIndex?=?startIndex?-?1;
????????????}
????????????if?(end?!=?',')?{
????????????????endIndex?=?endIndex?+?1;
????????????}
????????????arr[0]?=?startIndex;
????????????arr[1]?=?endIndex;
????????????if?(arr[1]?>=?line.length())?{
????????????????arr[1]?=?line.length()?-?1;
????????????}
????????????return?splitStr(line,?arr);
????????}
完整代碼:
package?bigdata;
import?cn.hutool.core.collection.CollectionUtil;
import?org.apache.commons.lang3.StringUtils;
import?java.io.*;
import?java.util.*;
import?java.util.concurrent.ConcurrentHashMap;
import?java.util.concurrent.LinkedBlockingQueue;
import?java.util.concurrent.atomic.AtomicInteger;
import?java.util.concurrent.atomic.AtomicLong;
import?java.util.concurrent.locks.ReentrantLock;
/**
?*?@Desc:
?*?@Author:?bingbing
?*?@Date:?2022/5/4?0004?19:19
?*?多線程處理
?*/
public?class?HandleMaxRepeatProblem?{
????public?static?final?int?start?=?18;
????public?static?final?int?end?=?70;
????public?static?final?String?dir?=?"D:\dataDir";
????public?static?final?String?FILE_NAME?=?"D:\?User.dat";
????private?static?final?int?threadNums?=?20;
????/**
?????*?key?為年齡,??value為所有的行列表,使用隊列
?????*/
????private?static?Map>?valueMap?=?new?ConcurrentHashMap<>();
????/**
?????*?存放數(shù)據(jù)的隊列
?????*/
????private?static?List>?blockQueueLists?=?new?LinkedList<>();
????/**
?????*?統(tǒng)計數(shù)量
?????*/
????private?static?Map?countMap?=?new?ConcurrentHashMap<>();
????private?static?Map?lockMap?=?new?ConcurrentHashMap<>();
????//?隊列負載均衡
????private?static?AtomicLong?count?=?new?AtomicLong(0);
????/**
?????*?開啟消費的標志
?????*/
????private?static?volatile?boolean?startConsumer?=?false;
????/**
?????*?消費者運行保證
?????*/
????private?static?volatile?boolean?consumerRunning?=?true;
????/**
?????*?按照?","?分割數(shù)據(jù),并寫入到文件里
?????*/
????static?class?SplitData?{
????????public?static?void?splitLine(String?lineData)?{
//????????????System.out.println(lineData.length());
????????????String[]?arr?=?lineData.split("\n");
????????????for?(String?str?:?arr)?{
????????????????if?(StringUtils.isEmpty(str))?{
????????????????????continue;
????????????????}
????????????????long?index?=?count.get()?%?threadNums;
????????????????try?{
????????????????????//?如果滿了就阻塞
????????????????????blockQueueLists.get((int)?index).put(str);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????????count.getAndIncrement();
????????????}
????????}
????????/**
?????????*?按照 x坐標?來分割?字符串,如果切到的字符不為“,”,?那么把坐標向前或者向后移動一位。
?????????*
?????????*?@param?line
?????????*?@param?arr??存放x1,x2坐標
?????????*?@return
?????????*/
????????public?static?String?splitStr(String?line,?int[]?arr)?{
????????????int?startIndex?=?arr[0];
????????????int?endIndex?=?arr[1];
????????????char?start?=?line.charAt(startIndex);
????????????char?end?=?line.charAt(endIndex);
????????????if?((startIndex?==?0?||?start?==?',')?&&?end?==?',')?{
????????????????arr[0]?=?endIndex?+?1;
????????????????arr[1]?=?arr[0]?+?line.length()?/?3;
????????????????if?(arr[1]?>=?line.length())?{
????????????????????arr[1]?=?line.length()?-?1;
????????????????}
????????????????return?line.substring(startIndex,?endIndex);
????????????}
????????????if?(startIndex?!=?0?&&?start?!=?',')?{
????????????????startIndex?=?startIndex?-?1;
????????????}
????????????if?(end?!=?',')?{
????????????????endIndex?=?endIndex?+?1;
????????????}
????????????arr[0]?=?startIndex;
????????????arr[1]?=?endIndex;
????????????if?(arr[1]?>=?line.length())?{
????????????????arr[1]?=?line.length()?-?1;
????????????}
????????????return?splitStr(line,?arr);
????????}
????????public?static?void?splitLine0(String?lineData)?{
????????????String[]?arr?=?lineData.split(",");
????????????for?(String?str?:?arr)?{
????????????????if?(StringUtils.isEmpty(str))?{
????????????????????continue;
????????????????}
????????????????int?keyIndex?=?Integer.parseInt(str);
????????????????ReentrantLock?lock?=?lockMap.computeIfAbsent(keyIndex,?lockMap?->?new?ReentrantLock());
????????????????lock.lock();
????????????????try?{
????????????????????valueMap.get(keyIndex).add(str);
????????????????}?finally?{
????????????????????lock.unlock();
????????????????}
//????????????????boolean?wait?=?true;
//????????????????for?(;?;?)?{
//????????????????????if?(!lockMap.get(Integer.parseInt(str)).isLocked())?{
//????????????????????????wait?=?false;
//????????????????????????valueMap.computeIfAbsent(Integer.parseInt(str),?integer?->?new?Vector<>()).add(str);
//????????????????????}
//????????????????????//?當前阻塞,直到釋放鎖
//????????????????????if?(!wait)?{
//????????????????????????break;
//????????????????????}
//????????????????}
????????????}
????????}
????}
????/**
?????*??init?map
?????*/
????static?{
????????File?file?=?new?File(dir);
????????if?(!file.exists())?{
????????????file.mkdir();
????????}
????????//每個隊列容量為256
????????for?(int?i?=?0;?i?????????????blockQueueLists.add(new?LinkedBlockingQueue<>(256));
????????}
????????for?(int?i?=?start;?i?<=?end;?i++)?{
????????????try?{
????????????????File?subFile?=?new?File(dir?+?"\"?+?i?+?".dat");
????????????????if?(!file.exists())?{
????????????????????subFile.createNewFile();
????????????????}
????????????????countMap.computeIfAbsent(i?+?"",?integer?->?new?AtomicInteger(0));
//????????????????lockMap.computeIfAbsent(i,?lock?->?new?ReentrantLock());
????????????}?catch?(FileNotFoundException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
????public?static?void?main(String[]?args)?{
????????new?Thread(()?->?{
????????????try?{
????????????????//?讀取數(shù)據(jù)
????????????????readData();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}).start();
????????new?Thread(()?->?{
????????????try?{
????????????????//?開始消費
????????????????startConsumer();
????????????}?catch?(FileNotFoundException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(UnsupportedEncodingException?e)?{
????????????????e.printStackTrace();
????????????}
????????}).start();
????????new?Thread(()?->?{
????????????//?監(jiān)控
????????????monitor();
????????}).start();
????}
????/**
?????*?每隔60s去檢查棧是否為空
?????*/
????private?static?void?monitor()?{
????????AtomicInteger?emptyNum?=?new?AtomicInteger(0);
????????while?(consumerRunning)?{
????????????try?{
????????????????Thread.sleep(10?*?1000);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????????if?(startConsumer)?{
????????????????//?如果所有棧的大小都為0,那么終止進程
????????????????AtomicInteger?emptyCount?=?new?AtomicInteger(0);
????????????????for?(int?i?=?0;?i?????????????????????if?(blockQueueLists.get(i).size()?==?0)?{
????????????????????????emptyCount.getAndIncrement();
????????????????????}
????????????????}
????????????????if?(emptyCount.get()?==?threadNums)?{
????????????????????emptyNum.getAndIncrement();
????????????????????//?如果連續(xù)檢查指定次數(shù)都為空,那么就停止消費
????????????????????if?(emptyNum.get()?>?12)?{
????????????????????????consumerRunning?=?false;
????????????????????????System.out.println("消費結(jié)束...");
????????????????????????try?{
????????????????????????????clearTask();
????????????????????????}?catch?(Exception?e)?{
????????????????????????????System.out.println(e.getCause());
????????????????????????}?finally?{
????????????????????????????System.exit(-1);
????????????????????????}
????????????????????}
????????????????}
????????????}
????????}
????}
????private?static?void?readData()?throws?IOException?{
????????BufferedReader?br?=?new?BufferedReader(new?InputStreamReader(new?FileInputStream(FILE_NAME),?"utf-8"));
????????String?line;
????????long?start?=?System.currentTimeMillis();
????????int?count?=?1;
????????while?((line?=?br.readLine())?!=?null)?{
????????????//?按行讀取,并向隊列寫入數(shù)據(jù)
????????????SplitData.splitLine(line);
????????????if?(count?%?100?==?0)?{
????????????????System.out.println("讀取100行,總耗時間:?"?+?(System.currentTimeMillis()?-?start)?/?1000?+?"?s");
????????????????try?{
????????????????????Thread.sleep(1000L);
????????????????????System.gc();
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????count++;
????????}
????????br.close();
????}
????private?static?void?clearTask()?{
????????//?清理,同時找出出現(xiàn)字符最大的數(shù)
????????Integer?targetValue?=?0;
????????String?targetKey?=?null;
????????Iterator>?entrySetIterator?=?countMap.entrySet().iterator();
????????while?(entrySetIterator.hasNext())?{
????????????Map.Entry?entry?=?entrySetIterator.next();
????????????Integer?value?=?entry.getValue().get();
????????????String?key?=?entry.getKey();
????????????if?(value?>?targetValue)?{
????????????????targetValue?=?value;
????????????????targetKey?=?key;
????????????}
????????}
????????System.out.println(" 數(shù)量最多的年齡為:"?+?targetKey?+?"數(shù)量為:"?+?targetValue);
????????System.exit(-1);
????}
????/**
?????*?使用linkedBlockQueue
?????*
?????*?@throws?FileNotFoundException
?????*?@throws?UnsupportedEncodingException
?????*/
????private?static?void?startConsumer()?throws?FileNotFoundException,?UnsupportedEncodingException?{
????????//如果共用一個隊列,那么線程不宜過多,容易出現(xiàn)搶占現(xiàn)象
????????System.out.println("開始消費...");
????????for?(int?i?=?0;?i?????????????final?int?index?=?i;
????????????//?每一個線程負責一個queue,這樣不會出現(xiàn)線程搶占隊列的情況。
????????????new?Thread(()?->?{
????????????????while?(consumerRunning)?{
????????????????????startConsumer?=?true;
????????????????????try?{
????????????????????????String?str?=?blockQueueLists.get(index).take();
????????????????????????countNum(str);
????????????????????}?catch?(InterruptedException?e)?{
????????????????????????e.printStackTrace();
????????????????????}
????????????????}
????????????}).start();
????????}
????}
????//?按照arr的大小,運用多線程分割字符串
????private?static?void?countNum(String?str)?{
????????int[]?arr?=?new?int[2];
????????arr[1]?=?str.length()?/?3;
//????????System.out.println("分割的字符串為start位置為:"?+?arr[0]?+?",end位置為:"?+?arr[1]);
????????for?(int?i?=?0;?i?3;?i++)?{
????????????final?String?innerStr?=?SplitData.splitStr(str,?arr);
//????????????System.out.println("分割的字符串為start位置為:"?+?arr[0]?+?",end位置為:"?+?arr[1]);
????????????new?Thread(()?->?{
????????????????String[]?strArray?=?innerStr.split(",");
????????????????for?(String?s?:?strArray)?{
????????????????????countMap.computeIfAbsent(s,?s1?->?new?AtomicInteger(0)).getAndIncrement();
????????????????}
????????????}).start();
????????}
????}
????/**
?????*?后臺線程去消費map里數(shù)據(jù)寫入到各個文件里,?如果不消費,那么會將內(nèi)存程爆
?????*/
????private?static?void?startConsumer0()?throws?FileNotFoundException,?UnsupportedEncodingException?{
????????for?(int?i?=?start;?i?<=?end;?i++)?{
????????????final?int?index?=?i;
????????????BufferedWriter?bw?=?new?BufferedWriter(new?OutputStreamWriter(new?FileOutputStream(dir?+?"\"?+?i?+?".dat",?false),?"utf-8"));
????????????new?Thread(()?->?{
????????????????int?miss?=?0;
????????????????int?countIndex?=?0;
????????????????while?(true)?{
????????????????????//?每隔100萬打印一次
????????????????????int?count?=?countMap.get(index).get();
????????????????????if?(count?>?1000000?*?countIndex)?{
????????????????????????System.out.println(index?+?"歲年齡的個數(shù)為:"?+?countMap.get(index).get());
????????????????????????countIndex?+=?1;
????????????????????}
????????????????????if?(miss?>?1000)?{
????????????????????????//?終止線程
????????????????????????try?{
????????????????????????????Thread.currentThread().interrupt();
????????????????????????????bw.close();
????????????????????????}?catch?(IOException?e)?{
????????????????????????}
????????????????????}
????????????????????if?(Thread.currentThread().isInterrupted())?{
????????????????????????break;
????????????????????}
????????????????????Vector?lines?=?valueMap.computeIfAbsent(index,?vector?->?new?Vector<>());
????????????????????//?寫入到文件里
????????????????????try?{
????????????????????????if?(CollectionUtil.isEmpty(lines))?{
????????????????????????????miss++;
????????????????????????????Thread.sleep(1000);
????????????????????????}?else?{
????????????????????????????//?100個一批
????????????????????????????if?(lines.size()?1000)?{
????????????????????????????????Thread.sleep(1000);
????????????????????????????????continue;
????????????????????????????}
????????????????????????????//?1000個的時候開始處理
????????????????????????????ReentrantLock?lock?=?lockMap.computeIfAbsent(index,?lockIndex?->?new?ReentrantLock());
????????????????????????????lock.lock();
????????????????????????????try?{
????????????????????????????????Iterator?iterator?=?lines.iterator();
????????????????????????????????StringBuilder?sb?=?new?StringBuilder();
????????????????????????????????while?(iterator.hasNext())?{
????????????????????????????????????sb.append(iterator.next());
????????????????????????????????????countMap.get(index).addAndGet(1);
????????????????????????????????}
????????????????????????????????try?{
????????????????????????????????????bw.write(sb.toString());
????????????????????????????????????bw.flush();
????????????????????????????????}?catch?(IOException?e)?{
????????????????????????????????????e.printStackTrace();
????????????????????????????????}
????????????????????????????????//?清除掉vector
????????????????????????????????valueMap.put(index,?new?Vector<>());
????????????????????????????}?finally?{
????????????????????????????????lock.unlock();
????????????????????????????}
????????????????????????}
????????????????????}?catch?(InterruptedException?e)?{
????????????????????}
????????????????}
????????????}).start();
????????}
????}
}
測試結(jié)果:



遇到的問題

解決方法:在讀取一定數(shù)量后,可以讓主線程暫停幾秒,手動調(diào)用 GC。
-------------? END??------------- 掃描下方二維碼,加入技術(shù)群。暗號:加群
評論
圖片
表情
