我用Java幾分鐘處理完30億個(gè)數(shù)據(jù)...
來(lái)源:?https://c1n.cn/GM8hb
目錄
場(chǎng)景說(shuō)明
模擬數(shù)據(jù)
場(chǎng)景分析
讀取數(shù)據(jù)
處理數(shù)據(jù)
遇到的問(wèn)題
場(chǎng)景說(shuō)明
????????23,31,42,19,60,30,36,........
模擬數(shù)據(jù)
package?bigdata;
import?java.io.*;
import?java.util.Random;
/**
?*?@Desc:
?*?@Author:?bingbing
?*?@Date:?2022/5/4?0004?19:05
?*/
public?class?GenerateData?{
????private?static?Random?random?=?new?Random();
????public?static?int?generateRandomData(int?start,?int?end)?{
????????return?random.nextInt(end?-?start?+?1)?+?start;
????}
????/**
?????*?產(chǎn)生10G的?1-1000的數(shù)據(jù)在D盤
?????*/
????public?void?generateData()?throws?IOException?{
????????File?file?=?new?File("D:\?User.dat");
????????if?(!file.exists())?{
????????????try?{
????????????????file.createNewFile();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????????int?start?=?18;
????????int?end?=?70;
????????long?startTime?=?System.currentTimeMillis();
????????BufferedWriter?bos?=?new?BufferedWriter(new?OutputStreamWriter(new?FileOutputStream(file,?true)));
????????for?(long?i?=?1;?i?1.7;?i++)?{
????????????String?data?=?generateRandomData(start,?end)?+?",";
????????????bos.write(data);
????????????//?每100萬(wàn)條記錄成一行,100萬(wàn)條數(shù)據(jù)大概4M
????????????if?(i?%?1000000?==?0)?{
????????????????bos.write("\n");
????????????}
????????}
????????System.out.println("寫入完成!?共花費(fèi)時(shí)間:"?+?(System.currentTimeMillis()?-?startTime)?/?1000?+?"?s");
????????bos.close();
????}
????public?static?void?main(String[]?args)?{
????????GenerateData?generateData?=?new?GenerateData();
????????try?{
????????????generateData.generateData();
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????}
}

準(zhǔn)備好 10G 數(shù)據(jù)后,接著寫如何處理這些數(shù)據(jù)。
場(chǎng)景分析
10G 的數(shù)據(jù)比當(dāng)前擁有的運(yùn)行內(nèi)存大的多,不能全量加載到內(nèi)存中讀取,如果采用全量加載,那么內(nèi)存會(huì)直接爆掉,只能按行讀取,Java 中的 bufferedReader 的 readLine() 按行讀取文件里的內(nèi)容。
讀取數(shù)據(jù)
????private?static?void?readData()?throws?IOException?{
????????BufferedReader?br?=?new?BufferedReader(new?InputStreamReader(new?FileInputStream(FILE_NAME),?"utf-8"));
????????String?line;
????????long?start?=?System.currentTimeMillis();
????????int?count?=?1;
????????while?((line?=?br.readLine())?!=?null)?{
????????????//?按行讀取
//????????????SplitData.splitLine(line);
????????????if?(count?%?100?==?0)?{
????????????????System.out.println("讀取100行,總耗時(shí)間:?"?+?(System.currentTimeMillis()?-?start)?/?1000?+?"?s");
????????????????System.gc();
????????????}
????????????count++;
????????}
????????running?=?false;
????????br.close();
????}

處理數(shù)據(jù)
| 思路一:通過(guò)單線程處理
????for?(int?i?=?start;?i?<=?end;?i++)?{
????????????try?{
????????????????File?subFile?=?new?File(dir?+?"\"?+?i?+?".dat");
????????????????if?(!file.exists())?{
????????????????????subFile.createNewFile();
????????????????}
????????????????countMap.computeIfAbsent(i?+?"",?integer?->?new?AtomicInteger(0));
????????????}?catch?(FileNotFoundException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
?????public?static?void?splitLine(String?lineData)?{
????????????String[]?arr?=?lineData.split(",");
????????????for?(String?str?:?arr)?{
????????????????if?(StringUtils.isEmpty(str))?{
????????????????????continue;
????????????????}
????????????????countMap.computeIfAbsent(str,?s?->?new?AtomicInteger(0)).getAndIncrement();
????????????}
????????}
??private?static?void?findMostAge()?{
????????Integer?targetValue?=?0;
????????String?targetKey?=?null;
????????Iterator<Map.Entry<String,?AtomicInteger>>?entrySetIterator?=?countMap.entrySet().iterator();
????????while?(entrySetIterator.hasNext())?{
????????????Map.Entry<String,?AtomicInteger>?entry?=?entrySetIterator.next();
????????????Integer?value?=?entry.getValue().get();
????????????String?key?=?entry.getKey();
????????????if?(value?>?targetValue)?{
????????????????targetValue?=?value;
????????????????targetKey?=?key;
????????????}
????????}
????????System.out.println("數(shù)量最多的年齡為:"?+?targetKey?+?"數(shù)量為:"?+?targetValue);
????}
完整代碼:
package?bigdata;
import?org.apache.commons.lang3.StringUtils;
import?java.io.*;
import?java.util.*;
import?java.util.concurrent.ConcurrentHashMap;
import?java.util.concurrent.atomic.AtomicInteger;
/**
?*?@Desc:
?*?@Author:?bingbing
?*?@Date:?2022/5/4?0004?19:19
?*?單線程處理
?*/
public?class?HandleMaxRepeatProblem_v0?{
????public?static?final?int?start?=?18;
????public?static?final?int?end?=?70;
????public?static?final?String?dir?=?"D:\dataDir";
????public?static?final?String?FILE_NAME?=?"D:\?User.dat";
????/**
?????*?統(tǒng)計(jì)數(shù)量
?????*/
????private?static?Map?countMap?=?new?ConcurrentHashMap<>();
????/**
?????*?開(kāi)啟消費(fèi)的標(biāo)志
?????*/
????private?static?volatile?boolean?startConsumer?=?false;
????/**
?????*?消費(fèi)者運(yùn)行保證
?????*/
????private?static?volatile?boolean?consumerRunning?=?true;
????/**
?????*?按照?","?分割數(shù)據(jù),并寫入到countMap里
?????*/
????static?class?SplitData?{
????????public?static?void?splitLine(String?lineData)?{
????????????String[]?arr?=?lineData.split(",");
????????????for?(String?str?:?arr)?{
????????????????if?(StringUtils.isEmpty(str))?{
????????????????????continue;
????????????????}
????????????????countMap.computeIfAbsent(str,?s?->?new?AtomicInteger(0)).getAndIncrement();
????????????}
????????}
????}
????/**
?????*??init?map
?????*/
????static?{
????????File?file?=?new?File(dir);
????????if?(!file.exists())?{
????????????file.mkdir();
????????}
????????for?(int?i?=?start;?i?<=?end;?i++)?{
????????????try?{
????????????????File?subFile?=?new?File(dir?+?"\"?+?i?+?".dat");
????????????????if?(!file.exists())?{
????????????????????subFile.createNewFile();
????????????????}
????????????????countMap.computeIfAbsent(i?+?"",?integer?->?new?AtomicInteger(0));
????????????}?catch?(FileNotFoundException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
????public?static?void?main(String[]?args)?{
????????new?Thread(()?->?{
????????????try?{
????????????????readData();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}).start();
????}
????private?static?void?readData()?throws?IOException?{
????????BufferedReader?br?=?new?BufferedReader(new?InputStreamReader(new?FileInputStream(FILE_NAME),?"utf-8"));
????????String?line;
????????long?start?=?System.currentTimeMillis();
????????int?count?=?1;
????????while?((line?=?br.readLine())?!=?null)?{
????????????//?按行讀取,并向map里寫入數(shù)據(jù)
????????????SplitData.splitLine(line);
????????????if?(count?%?100?==?0)?{
????????????????System.out.println("讀取100行,總耗時(shí)間:?"?+?(System.currentTimeMillis()?-?start)?/?1000?+?"?s");
????????????????try?{
????????????????????Thread.sleep(1000L);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????count++;
????????}
????????findMostAge();
????????br.close();
????}
????private?static?void?findMostAge()?{
????????Integer?targetValue?=?0;
????????String?targetKey?=?null;
????????Iterator>?entrySetIterator?=?countMap.entrySet().iterator();
????????while?(entrySetIterator.hasNext())?{
????????????Map.Entry?entry?=?entrySetIterator.next();
????????????Integer?value?=?entry.getValue().get();
????????????String?key?=?entry.getKey();
????????????if?(value?>?targetValue)?{
????????????????targetValue?=?value;
????????????????targetKey?=?key;
????????????}
????????}
????????System.out.println(" 數(shù)量最多的年齡為:"?+?targetKey?+?"數(shù)量為:"?+?targetValue);
????}
????private?static?void?clearTask()?{
????????//?清理,同時(shí)找出出現(xiàn)字符最大的數(shù)
????????findMostAge();
????????System.exit(-1);
????}
}
測(cè)試結(jié)果:總共花了 3 分鐘讀取完并統(tǒng)計(jì)完所有數(shù)據(jù)。


要想提高 CPU 的利用率,那么可以使用多線程去處理。下面我們使用多線程去解決這個(gè) CPU 利用率低的問(wèn)題。
| 思路二:分治法
使用多線程去消費(fèi)讀取到的數(shù)據(jù)。采用生產(chǎn)者、消費(fèi)者模式去消費(fèi)數(shù)據(jù),因?yàn)樵谧x取的時(shí)候是比較快的,單線程的數(shù)據(jù)處理能力比較差,因此思路一的性能阻塞在取數(shù)據(jù)方,又是同步的,所以導(dǎo)致整個(gè)鏈路的性能會(huì)變的很差。
所謂分治法就是分而治之,也就是說(shuō)將海量數(shù)據(jù)分割處理。根據(jù) CPU 的能力初始化 n 個(gè)線程,每一個(gè)線程去消費(fèi)一個(gè)隊(duì)列,這樣線程在消費(fèi)的時(shí)候不會(huì)出現(xiàn)搶占隊(duì)列的問(wèn)題。

①初始化阻塞隊(duì)列
????private?static?List>?blockQueueLists?=?new?LinkedList<>();
????//每個(gè)隊(duì)列容量為256
????????for?(int?i?=?0;?i?????????????blockQueueLists.add(new?LinkedBlockingQueue<>(256));
????????}
②生產(chǎn)者
????private?static?AtomicLong?count?=?new?AtomicLong(0);
按照行數(shù)來(lái)計(jì)算隊(duì)列的下標(biāo):long index=count.get()%threadNums。
???static?class?SplitData?{
????????public?static?void?splitLine(String?lineData)?{
//????????????System.out.println(lineData.length());
????????????String[]?arr?=?lineData.split("\n");
????????????for?(String?str?:?arr)?{
????????????????if?(StringUtils.isEmpty(str))?{
????????????????????continue;
????????????????}
????????????????long?index?=?count.get()?%?threadNums;
????????????????try?{
????????????????????//?如果滿了就阻塞
????????????????????blockQueueLists.get((int)?index).put(str);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????????count.getAndIncrement();
????????????}
????????}
③消費(fèi)者
隊(duì)列線程私有化:消費(fèi)方在啟動(dòng)線程的時(shí)候根據(jù) index 去獲取到指定的隊(duì)列,這樣就實(shí)現(xiàn)了隊(duì)列的線程私有化。
????private?static?void?startConsumer()?throws?FileNotFoundException,?UnsupportedEncodingException?{
????????//如果共用一個(gè)隊(duì)列,那么線程不宜過(guò)多,容易出現(xiàn)搶占現(xiàn)象
????????System.out.println("開(kāi)始消費(fèi)...");
????????for?(int?i?=?0;?i?????????????final?int?index?=?i;
????????????//?每一個(gè)線程負(fù)責(zé)一個(gè)queue,這樣不會(huì)出現(xiàn)線程搶占隊(duì)列的情況。
????????????new?Thread(()?->?{
????????????????while?(consumerRunning)?{
????????????????????startConsumer?=?true;
????????????????????try?{
????????????????????????String?str?=?blockQueueLists.get(index).take();
????????????????????????countNum(str);
????????????????????}?catch?(InterruptedException?e)?{
????????????????????????e.printStackTrace();
????????????????????}
????????????????}
????????????}).start();
????????}
????}
多子線程分割字符串:由于從隊(duì)列中多到的字符串非常的龐大,如果又是用單線程調(diào)用 split(",") 去分割,那么性能同樣會(huì)阻塞在這個(gè)地方。
????//?按照arr的大小,運(yùn)用多線程分割字符串
????private?static?void?countNum(String?str)?{
????????int[]?arr?=?new?int[2];
????????arr[1]?=?str.length()?/?3;
//????????System.out.println("分割的字符串為start位置為:"?+?arr[0]?+?",end位置為:"?+?arr[1]);
????????for?(int?i?=?0;?i?3;?i++)?{
????????????final?String?innerStr?=?SplitData.splitStr(str,?arr);
//????????????System.out.println("分割的字符串為start位置為:"?+?arr[0]?+?",end位置為:"?+?arr[1]);
????????????new?Thread(()?->?{
????????????????String[]?strArray?=?innerStr.split(",");
????????????????for?(String?s?:?strArray)?{
????????????????????countMap.computeIfAbsent(s,?s1?->?new?AtomicInteger(0)).getAndIncrement();
????????????????}
????????????}).start();
????????}
????}
分割字符串算法:分割時(shí)從 0 開(kāi)始,按照等分的原則,將字符串 n 等份,每一個(gè)線程分到一份。
用一個(gè) arr 數(shù)組的 arr[0] 記錄每次的分割開(kāi)始位置,arr[1] 記錄每次分割的結(jié)束位置,如果遇到的開(kāi)始的字符不為 ",",那么就 startIndex-1,如果結(jié)束的位置不為 ",",那么將 endIndex 向后移一位。
如果 endIndex 超過(guò)了字符串的最大長(zhǎng)度,那么就把最后一個(gè)字符賦值給 arr[1]。
????????/**
?????????*?按照 x坐標(biāo)?來(lái)分割?字符串,如果切到的字符不為“,”,?那么把坐標(biāo)向前或者向后移動(dòng)一位。
?????????*
?????????*?@param?line
?????????*?@param?arr??存放x1,x2坐標(biāo)
?????????*?@return
?????????*/
????????public?static?String?splitStr(String?line,?int[]?arr)?{
????????????int?startIndex?=?arr[0];
????????????int?endIndex?=?arr[1];
????????????char?start?=?line.charAt(startIndex);
????????????char?end?=?line.charAt(endIndex);
????????????if?((startIndex?==?0?||?start?==?',')?&&?end?==?',')?{
????????????????arr[0]?=?endIndex?+?1;
????????????????arr[1]?=?arr[0]?+?line.length()?/?3;
????????????????if?(arr[1]?>=?line.length())?{
????????????????????arr[1]?=?line.length()?-?1;
????????????????}
????????????????return?line.substring(startIndex,?endIndex);
????????????}
????????????if?(startIndex?!=?0?&&?start?!=?',')?{
????????????????startIndex?=?startIndex?-?1;
????????????}
????????????if?(end?!=?',')?{
????????????????endIndex?=?endIndex?+?1;
????????????}
????????????arr[0]?=?startIndex;
????????????arr[1]?=?endIndex;
????????????if?(arr[1]?>=?line.length())?{
????????????????arr[1]?=?line.length()?-?1;
????????????}
????????????return?splitStr(line,?arr);
????????}
完整代碼:
package?bigdata;
import?cn.hutool.core.collection.CollectionUtil;
import?org.apache.commons.lang3.StringUtils;
import?java.io.*;
import?java.util.*;
import?java.util.concurrent.ConcurrentHashMap;
import?java.util.concurrent.LinkedBlockingQueue;
import?java.util.concurrent.atomic.AtomicInteger;
import?java.util.concurrent.atomic.AtomicLong;
import?java.util.concurrent.locks.ReentrantLock;
/**
?*?@Desc:
?*?@Author:?bingbing
?*?@Date:?2022/5/4?0004?19:19
?*?多線程處理
?*/
public?class?HandleMaxRepeatProblem?{
????public?static?final?int?start?=?18;
????public?static?final?int?end?=?70;
????public?static?final?String?dir?=?"D:\dataDir";
????public?static?final?String?FILE_NAME?=?"D:\?User.dat";
????private?static?final?int?threadNums?=?20;
????/**
?????*?key?為年齡,??value為所有的行列表,使用隊(duì)列
?????*/
????private?static?Map>?valueMap?=?new?ConcurrentHashMap<>();
????/**
?????*?存放數(shù)據(jù)的隊(duì)列
?????*/
????private?static?List>?blockQueueLists?=?new?LinkedList<>();
????/**
?????*?統(tǒng)計(jì)數(shù)量
?????*/
????private?static?Map?countMap?=?new?ConcurrentHashMap<>();
????private?static?Map?lockMap?=?new?ConcurrentHashMap<>();
????//?隊(duì)列負(fù)載均衡
????private?static?AtomicLong?count?=?new?AtomicLong(0);
????/**
?????*?開(kāi)啟消費(fèi)的標(biāo)志
?????*/
????private?static?volatile?boolean?startConsumer?=?false;
????/**
?????*?消費(fèi)者運(yùn)行保證
?????*/
????private?static?volatile?boolean?consumerRunning?=?true;
????/**
?????*?按照?","?分割數(shù)據(jù),并寫入到文件里
?????*/
????static?class?SplitData?{
????????public?static?void?splitLine(String?lineData)?{
//????????????System.out.println(lineData.length());
????????????String[]?arr?=?lineData.split("\n");
????????????for?(String?str?:?arr)?{
????????????????if?(StringUtils.isEmpty(str))?{
????????????????????continue;
????????????????}
????????????????long?index?=?count.get()?%?threadNums;
????????????????try?{
????????????????????//?如果滿了就阻塞
????????????????????blockQueueLists.get((int)?index).put(str);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????????count.getAndIncrement();
????????????}
????????}
????????/**
?????????*?按照 x坐標(biāo)?來(lái)分割?字符串,如果切到的字符不為“,”,?那么把坐標(biāo)向前或者向后移動(dòng)一位。
?????????*
?????????*?@param?line
?????????*?@param?arr??存放x1,x2坐標(biāo)
?????????*?@return
?????????*/
????????public?static?String?splitStr(String?line,?int[]?arr)?{
????????????int?startIndex?=?arr[0];
????????????int?endIndex?=?arr[1];
????????????char?start?=?line.charAt(startIndex);
????????????char?end?=?line.charAt(endIndex);
????????????if?((startIndex?==?0?||?start?==?',')?&&?end?==?',')?{
????????????????arr[0]?=?endIndex?+?1;
????????????????arr[1]?=?arr[0]?+?line.length()?/?3;
????????????????if?(arr[1]?>=?line.length())?{
????????????????????arr[1]?=?line.length()?-?1;
????????????????}
????????????????return?line.substring(startIndex,?endIndex);
????????????}
????????????if?(startIndex?!=?0?&&?start?!=?',')?{
????????????????startIndex?=?startIndex?-?1;
????????????}
????????????if?(end?!=?',')?{
????????????????endIndex?=?endIndex?+?1;
????????????}
????????????arr[0]?=?startIndex;
????????????arr[1]?=?endIndex;
????????????if?(arr[1]?>=?line.length())?{
????????????????arr[1]?=?line.length()?-?1;
????????????}
????????????return?splitStr(line,?arr);
????????}
????????public?static?void?splitLine0(String?lineData)?{
????????????String[]?arr?=?lineData.split(",");
????????????for?(String?str?:?arr)?{
????????????????if?(StringUtils.isEmpty(str))?{
????????????????????continue;
????????????????}
????????????????int?keyIndex?=?Integer.parseInt(str);
????????????????ReentrantLock?lock?=?lockMap.computeIfAbsent(keyIndex,?lockMap?->?new?ReentrantLock());
????????????????lock.lock();
????????????????try?{
????????????????????valueMap.get(keyIndex).add(str);
????????????????}?finally?{
????????????????????lock.unlock();
????????????????}
//????????????????boolean?wait?=?true;
//????????????????for?(;?;?)?{
//????????????????????if?(!lockMap.get(Integer.parseInt(str)).isLocked())?{
//????????????????????????wait?=?false;
//????????????????????????valueMap.computeIfAbsent(Integer.parseInt(str),?integer?->?new?Vector<>()).add(str);
//????????????????????}
//????????????????????//?當(dāng)前阻塞,直到釋放鎖
//????????????????????if?(!wait)?{
//????????????????????????break;
//????????????????????}
//????????????????}
????????????}
????????}
????}
????/**
?????*??init?map
?????*/
????static?{
????????File?file?=?new?File(dir);
????????if?(!file.exists())?{
????????????file.mkdir();
????????}
????????//每個(gè)隊(duì)列容量為256
????????for?(int?i?=?0;?i?????????????blockQueueLists.add(new?LinkedBlockingQueue<>(256));
????????}
????????for?(int?i?=?start;?i?<=?end;?i++)?{
????????????try?{
????????????????File?subFile?=?new?File(dir?+?"\"?+?i?+?".dat");
????????????????if?(!file.exists())?{
????????????????????subFile.createNewFile();
????????????????}
????????????????countMap.computeIfAbsent(i?+?"",?integer?->?new?AtomicInteger(0));
//????????????????lockMap.computeIfAbsent(i,?lock?->?new?ReentrantLock());
????????????}?catch?(FileNotFoundException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
????public?static?void?main(String[]?args)?{
????????new?Thread(()?->?{
????????????try?{
????????????????//?讀取數(shù)據(jù)
????????????????readData();
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????}
????????}).start();
????????new?Thread(()?->?{
????????????try?{
????????????????//?開(kāi)始消費(fèi)
????????????????startConsumer();
????????????}?catch?(FileNotFoundException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(UnsupportedEncodingException?e)?{
????????????????e.printStackTrace();
????????????}
????????}).start();
????????new?Thread(()?->?{
????????????//?監(jiān)控
????????????monitor();
????????}).start();
????}
????/**
?????*?每隔60s去檢查棧是否為空
?????*/
????private?static?void?monitor()?{
????????AtomicInteger?emptyNum?=?new?AtomicInteger(0);
????????while?(consumerRunning)?{
????????????try?{
????????????????Thread.sleep(10?*?1000);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????????if?(startConsumer)?{
????????????????//?如果所有棧的大小都為0,那么終止進(jìn)程
????????????????AtomicInteger?emptyCount?=?new?AtomicInteger(0);
????????????????for?(int?i?=?0;?i?????????????????????if?(blockQueueLists.get(i).size()?==?0)?{
????????????????????????emptyCount.getAndIncrement();
????????????????????}
????????????????}
????????????????if?(emptyCount.get()?==?threadNums)?{
????????????????????emptyNum.getAndIncrement();
????????????????????//?如果連續(xù)檢查指定次數(shù)都為空,那么就停止消費(fèi)
????????????????????if?(emptyNum.get()?>?12)?{
????????????????????????consumerRunning?=?false;
????????????????????????System.out.println("消費(fèi)結(jié)束...");
????????????????????????try?{
????????????????????????????clearTask();
????????????????????????}?catch?(Exception?e)?{
????????????????????????????System.out.println(e.getCause());
????????????????????????}?finally?{
????????????????????????????System.exit(-1);
????????????????????????}
????????????????????}
????????????????}
????????????}
????????}
????}
????private?static?void?readData()?throws?IOException?{
????????BufferedReader?br?=?new?BufferedReader(new?InputStreamReader(new?FileInputStream(FILE_NAME),?"utf-8"));
????????String?line;
????????long?start?=?System.currentTimeMillis();
????????int?count?=?1;
????????while?((line?=?br.readLine())?!=?null)?{
????????????//?按行讀取,并向隊(duì)列寫入數(shù)據(jù)
????????????SplitData.splitLine(line);
????????????if?(count?%?100?==?0)?{
????????????????System.out.println("讀取100行,總耗時(shí)間:?"?+?(System.currentTimeMillis()?-?start)?/?1000?+?"?s");
????????????????try?{
????????????????????Thread.sleep(1000L);
????????????????????System.gc();
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????count++;
????????}
????????br.close();
????}
????private?static?void?clearTask()?{
????????//?清理,同時(shí)找出出現(xiàn)字符最大的數(shù)
????????Integer?targetValue?=?0;
????????String?targetKey?=?null;
????????Iterator>?entrySetIterator?=?countMap.entrySet().iterator();
????????while?(entrySetIterator.hasNext())?{
????????????Map.Entry?entry?=?entrySetIterator.next();
????????????Integer?value?=?entry.getValue().get();
????????????String?key?=?entry.getKey();
????????????if?(value?>?targetValue)?{
????????????????targetValue?=?value;
????????????????targetKey?=?key;
????????????}
????????}
????????System.out.println(" 數(shù)量最多的年齡為:"?+?targetKey?+?"數(shù)量為:"?+?targetValue);
????????System.exit(-1);
????}
????/**
?????*?使用linkedBlockQueue
?????*
?????*?@throws?FileNotFoundException
?????*?@throws?UnsupportedEncodingException
?????*/
????private?static?void?startConsumer()?throws?FileNotFoundException,?UnsupportedEncodingException?{
????????//如果共用一個(gè)隊(duì)列,那么線程不宜過(guò)多,容易出現(xiàn)搶占現(xiàn)象
????????System.out.println("開(kāi)始消費(fèi)...");
????????for?(int?i?=?0;?i?????????????final?int?index?=?i;
????????????//?每一個(gè)線程負(fù)責(zé)一個(gè)queue,這樣不會(huì)出現(xiàn)線程搶占隊(duì)列的情況。
????????????new?Thread(()?->?{
????????????????while?(consumerRunning)?{
????????????????????startConsumer?=?true;
????????????????????try?{
????????????????????????String?str?=?blockQueueLists.get(index).take();
????????????????????????countNum(str);
????????????????????}?catch?(InterruptedException?e)?{
????????????????????????e.printStackTrace();
????????????????????}
????????????????}
????????????}).start();
????????}
????}
????//?按照arr的大小,運(yùn)用多線程分割字符串
????private?static?void?countNum(String?str)?{
????????int[]?arr?=?new?int[2];
????????arr[1]?=?str.length()?/?3;
//????????System.out.println("分割的字符串為start位置為:"?+?arr[0]?+?",end位置為:"?+?arr[1]);
????????for?(int?i?=?0;?i?3;?i++)?{
????????????final?String?innerStr?=?SplitData.splitStr(str,?arr);
//????????????System.out.println("分割的字符串為start位置為:"?+?arr[0]?+?",end位置為:"?+?arr[1]);
????????????new?Thread(()?->?{
????????????????String[]?strArray?=?innerStr.split(",");
????????????????for?(String?s?:?strArray)?{
????????????????????countMap.computeIfAbsent(s,?s1?->?new?AtomicInteger(0)).getAndIncrement();
????????????????}
????????????}).start();
????????}
????}
????/**
?????*?后臺(tái)線程去消費(fèi)map里數(shù)據(jù)寫入到各個(gè)文件里,?如果不消費(fèi),那么會(huì)將內(nèi)存程爆
?????*/
????private?static?void?startConsumer0()?throws?FileNotFoundException,?UnsupportedEncodingException?{
????????for?(int?i?=?start;?i?<=?end;?i++)?{
????????????final?int?index?=?i;
????????????BufferedWriter?bw?=?new?BufferedWriter(new?OutputStreamWriter(new?FileOutputStream(dir?+?"\"?+?i?+?".dat",?false),?"utf-8"));
????????????new?Thread(()?->?{
????????????????int?miss?=?0;
????????????????int?countIndex?=?0;
????????????????while?(true)?{
????????????????????//?每隔100萬(wàn)打印一次
????????????????????int?count?=?countMap.get(index).get();
????????????????????if?(count?>?1000000?*?countIndex)?{
????????????????????????System.out.println(index?+?"歲年齡的個(gè)數(shù)為:"?+?countMap.get(index).get());
????????????????????????countIndex?+=?1;
????????????????????}
????????????????????if?(miss?>?1000)?{
????????????????????????//?終止線程
????????????????????????try?{
????????????????????????????Thread.currentThread().interrupt();
????????????????????????????bw.close();
????????????????????????}?catch?(IOException?e)?{
????????????????????????}
????????????????????}
????????????????????if?(Thread.currentThread().isInterrupted())?{
????????????????????????break;
????????????????????}
????????????????????Vector?lines?=?valueMap.computeIfAbsent(index,?vector?->?new?Vector<>());
????????????????????//?寫入到文件里
????????????????????try?{
????????????????????????if?(CollectionUtil.isEmpty(lines))?{
????????????????????????????miss++;
????????????????????????????Thread.sleep(1000);
????????????????????????}?else?{
????????????????????????????//?100個(gè)一批
????????????????????????????if?(lines.size()?1000)?{
????????????????????????????????Thread.sleep(1000);
????????????????????????????????continue;
????????????????????????????}
????????????????????????????//?1000個(gè)的時(shí)候開(kāi)始處理
????????????????????????????ReentrantLock?lock?=?lockMap.computeIfAbsent(index,?lockIndex?->?new?ReentrantLock());
????????????????????????????lock.lock();
????????????????????????????try?{
????????????????????????????????Iterator?iterator?=?lines.iterator();
????????????????????????????????StringBuilder?sb?=?new?StringBuilder();
????????????????????????????????while?(iterator.hasNext())?{
????????????????????????????????????sb.append(iterator.next());
????????????????????????????????????countMap.get(index).addAndGet(1);
????????????????????????????????}
????????????????????????????????try?{
????????????????????????????????????bw.write(sb.toString());
????????????????????????????????????bw.flush();
????????????????????????????????}?catch?(IOException?e)?{
????????????????????????????????????e.printStackTrace();
????????????????????????????????}
????????????????????????????????//?清除掉vector
????????????????????????????????valueMap.put(index,?new?Vector<>());
????????????????????????????}?finally?{
????????????????????????????????lock.unlock();
????????????????????????????}
????????????????????????}
????????????????????}?catch?(InterruptedException?e)?{
????????????????????}
????????????????}
????????????}).start();
????????}
????}
}
測(cè)試結(jié)果:



遇到的問(wèn)題

解決方法:在讀取一定數(shù)量后,可以讓主線程暫停幾秒,手動(dòng)調(diào)用 GC。
最近熱文閱讀:
1、一款自動(dòng)生成單元測(cè)試的 IDEA 插件 2、微軟 10 大最受歡迎 GitHub 項(xiàng)目,最高 Star 數(shù)量 13 萬(wàn) 3、Spring Boot 中實(shí)現(xiàn)跨域的 5 種方式,你一定要知道! 4、Mybatis批處理踩坑,糾正一些錯(cuò)誤寫法 5、Java 8?Java之父都不用! 6、生產(chǎn)環(huán)境,清理大文件不生效?應(yīng)該這樣做! 7、Nginx面試40問(wèn) 8、如何設(shè)計(jì) API 接口,實(shí)現(xiàn)統(tǒng)一格式返回? 9、竟然有一半的人不知道 for 與 foreach 的區(qū)別??? 10、5 款頂級(jí) Docker GUI 工具!免費(fèi)又好用 關(guān)注公眾號(hào),你想要的Java都在這里
