<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          如何用 Java 幾分鐘處理完 30 億個數(shù)據(jù)?

          共 75088字,需瀏覽 151分鐘

           ·

          2024-04-18 10:10

          來源:c1n.cn/GM8hb

          ?? 歡迎加入小哈的星球 ,你將獲得: 專屬的項目實戰(zhàn) / Java 學(xué)習(xí)路線 / 一對一提問 / 學(xué)習(xí)打卡 /  贈書福利


          全棧前后端分離博客項目 2.0 版本完結(jié)啦, 演示鏈接http://116.62.199.48/ ,新項目正在醞釀中。全程手摸手,后端 + 前端全棧開發(fā),從 0 到 1 講解每個功能點開發(fā)步驟,1v1 答疑,直到項目上線。目前已更新了239小節(jié),累計38w+字,講解圖:1645張,還在持續(xù)爆肝中.. 后續(xù)還會上新更多項目,目標(biāo)是將Java領(lǐng)域典型的項目都整一波,如秒殺系統(tǒng), 在線商城, IM即時通訊,Spring Cloud Alibaba 等等,戳我加入學(xué)習(xí),已有1200+小伙伴加入(早鳥價超低)


          • 場景說明
          • 模擬數(shù)據(jù)
          • 場景分析
          • 讀取數(shù)據(jù)
          • 處理數(shù)據(jù)
            • 思路一:通過單線程處理
            • 思路二:分治法
          • 遇到的問題

          場景說明

          現(xiàn)有一個 10G 文件的數(shù)據(jù),里面包含了 18-70 之間的整數(shù),分別表示 18-70 歲的人群數(shù)量統(tǒng)計,假設(shè)年齡范圍分布均勻,分別表示系統(tǒng)中所有用戶的年齡數(shù),找出重復(fù)次數(shù)最多的那個數(shù),現(xiàn)有一臺內(nèi)存為 4G、2 核 CPU 的電腦,請寫一個算法實現(xiàn)。

                  23,31,42,19,60,30,36,........

          模擬數(shù)據(jù)

          Java 中一個整數(shù)占 4 個字節(jié),模擬 10G 為 30 億左右個數(shù)據(jù), 采用追加模式寫入 10G 數(shù)據(jù)到硬盤里。每 100 萬個記錄寫一行,大概 4M 一行,10G 大概 2500 行數(shù)據(jù)。

          package bigdata;

          import java.io.*;
          import java.util.Random;

          /**
           * @Desc:
           * @Author: bingbing
           * @Date: 2022/5/4 0004 19:05
           */
          public class GenerateData {
              private static Random random = new Random();


              public static int generateRandomData(int start, int end) {
                  return random.nextInt(end - start + 1) + start;
              }


              /**
               * 產(chǎn)生10G的 1-1000的數(shù)據(jù)在D盤
               */
              public void generateData() throws IOException {
                  File file = new File("D:\ User.dat");
                  if (!file.exists()) {
                      try {
                          file.createNewFile();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }

                  int start = 18;
                  int end = 70;
                  long startTime = System.currentTimeMillis();
                  BufferedWriter bos = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true)));
                  for (long i = 1; i < Integer.MAX_VALUE * 1.7; i++) {
                      String data = generateRandomData(start, end) + ",";
                      bos.write(data);
                      // 每100萬條記錄成一行,100萬條數(shù)據(jù)大概4M
                      if (i % 1000000 == 0) {
                          bos.write("\n");
                      }
                  }
                  System.out.println("寫入完成! 共花費時間:" + (System.currentTimeMillis() - startTime) / 1000 + " s");
                  bos.close();
              }


              public static void main(String[] args) {
                  GenerateData generateData = new GenerateData();
                  try {
                      generateData.generateData();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }

              }
          }

          上述代碼調(diào)整參數(shù)執(zhí)行 2 次,湊 10 個 G 的數(shù)據(jù)在 D 盤的 User.dat 文件里。

          圖片

          準(zhǔn)備好 10G 數(shù)據(jù)后,接著寫如何處理這些數(shù)據(jù)。

          場景分析

          10G 的數(shù)據(jù)比當(dāng)前擁有的運行內(nèi)存大的多,不能全量加載到內(nèi)存中讀取,如果采用全量加載,那么內(nèi)存會直接爆掉,只能按行讀取,Java 中的 bufferedReader 的 readLine() 按行讀取文件里的內(nèi)容。

          讀取數(shù)據(jù)

          首先我們寫一個方法單線程讀完這 30E 數(shù)據(jù)需要多少時間,每讀 100 行打印一次:

              private static void readData() throws IOException {

                  BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "utf-8"));
                  String line;
                  long start = System.currentTimeMillis();
                  int count = 1;
                  while ((line = br.readLine()) != null) {
                      // 按行讀取
          //            SplitData.splitLine(line);
                      if (count % 100 == 0) {
                          System.out.println("讀取100行,總耗時間: " + (System.currentTimeMillis() - start) / 1000 + " s");
                          System.gc();
                      }
                      count++;
                  }
                  running = false;
                  br.close();

              }

          按行讀完 10G 的數(shù)據(jù)大概 20 秒,基本每 100 行,1E 多數(shù)據(jù)花 1S,速度還挺快:

          圖片

          處理數(shù)據(jù)

          思路一:通過單線程處理

          通過單線程處理,初始化一個 countMap,key 為年齡,value 為出現(xiàn)的次數(shù),將每行讀取到的數(shù)據(jù)按照 "," 進行分割,然后獲取到的每一項進行保存到 countMap 里,如果存在,那么值 key 的 value+1。

              for (int i = start; i <= end; i++) {
                      try {
                          File subFile = new File(dir + "\" + i + ".dat");
                          if (!file.exists()) {
                              subFile.createNewFile();
                          }
                          countMap.computeIfAbsent(i + "
          ", integer -> new AtomicInteger(0));
                      } catch (FileNotFoundException e) {
                          e.printStackTrace();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }

          單線程讀取并統(tǒng)計 countMap:

               public static void splitLine(String lineData) {
                      String[] arr = lineData.split(",");
                      for (String str : arr) {
                          if (StringUtils.isEmpty(str)) {
                              continue;
                          }
                          countMap.computeIfAbsent(str, s -> new AtomicInteger(0)).getAndIncrement();
                      }
                  }

          通過比較找出年齡數(shù)最多的年齡并打印出來:

            private static void findMostAge() {
                  Integer targetValue = 0;
                  String targetKey = null;
                  Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
                  while (entrySetIterator.hasNext()) {
                      Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();
                      Integer value = entry.getValue().get();
                      String key = entry.getKey();
                      if (value > targetValue) {
                          targetValue = value;
                          targetKey = key;
                      }
                  }
                  System.out.println("數(shù)量最多的年齡為:" + targetKey + "數(shù)量為:" + targetValue);
              }

          完整代碼:

          package bigdata;

          import org.apache.commons.lang3.StringUtils;

          import java.io.*;
          import java.util.*;
          import java.util.concurrent.ConcurrentHashMap;
          import java.util.concurrent.atomic.AtomicInteger;


          /**
           * @Desc:
           * @Author: bingbing
           * @Date: 2022/5/4 0004 19:19
           * 單線程處理
           */
          public class HandleMaxRepeatProblem_v0 {

              public static final int start = 18;
              public static final int end = 70;

              public static final String dir = "D:\dataDir";

              public static final String FILE_NAME = "D:\ User.dat";


              /**
               * 統(tǒng)計數(shù)量
               */
              private static Map<String, AtomicInteger> countMap = new ConcurrentHashMap<>();


              /**
               * 開啟消費的標(biāo)志
               */
              private static volatile boolean startConsumer = false;

              /**
               * 消費者運行保證
               */
              private static volatile boolean consumerRunning = true;


              /**
               * 按照 "," 分割數(shù)據(jù),并寫入到countMap里
               */
              static class SplitData {

                  public static void splitLine(String lineData) {
                      String[] arr = lineData.split(",");
                      for (String str : arr) {
                          if (StringUtils.isEmpty(str)) {
                              continue;
                          }
                          countMap.computeIfAbsent(str, s -> new AtomicInteger(0)).getAndIncrement();
                      }
                  }


              }

              /**
               *  init map
               */

              static {
                  File file = new File(dir);
                  if (!file.exists()) {
                      file.mkdir();
                  }


                  for (int i = start; i <= end; i++) {
                      try {
                          File subFile = new File(dir + "\" + i + ".dat");
                          if (!file.exists()) {
                              subFile.createNewFile();
                          }
                          countMap.computeIfAbsent(i + "
          ", integer -> new AtomicInteger(0));
                      } catch (FileNotFoundException e) {
                          e.printStackTrace();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
              }

              public static void main(String[] args) {


                  new Thread(() -> {
                      try {
                          readData();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }

                  }).start();


              }


              private static void readData() throws IOException {

                  BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "
          utf-8"));
                  String line;
                  long start = System.currentTimeMillis();
                  int count = 1;
                  while ((line = br.readLine()) != null) {
                      // 按行讀取,并向map里寫入數(shù)據(jù)
                      SplitData.splitLine(line);
                      if (count % 100 == 0) {
                          System.out.println("
          讀取100行,總耗時間: " + (System.currentTimeMillis() - start) / 1000 + " s");
                          try {
                              Thread.sleep(1000L);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                      count++;
                  }
                  findMostAge();

                  br.close();
              }

              private static void findMostAge() {
                  Integer targetValue = 0;
                  String targetKey = null;
                  Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
                  while (entrySetIterator.hasNext()) {
                      Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();
                      Integer value = entry.getValue().get();
                      String key = entry.getKey();
                      if (value > targetValue) {
                          targetValue = value;
                          targetKey = key;
                      }
                  }
                  System.out.println("
          數(shù)量最多的年齡為:" + targetKey + "數(shù)量為:" + targetValue);
              }

              private static void clearTask() {
                  // 清理,同時找出出現(xiàn)字符最大的數(shù)
                  findMostAge();
                  System.exit(-1);
              }


          }

          測試結(jié)果: 總共花了 3 分鐘讀取完并統(tǒng)計完所有數(shù)據(jù)。

          圖片

          內(nèi)存消耗為 2G-2.5G,CPU 利用率太低,只向上浮動了 20%-25% 之間:

          圖片

          要想提高 CPU 的利用率,那么可以使用多線程去處理。下面我們使用多線程去解決這個 CPU 利用率低的問題。

          思路二:分治法

          使用多線程去消費讀取到的數(shù)據(jù)。采用生產(chǎn)者、消費者模式去消費數(shù)據(jù),因為在讀取的時候是比較快的,單線程的數(shù)據(jù)處理能力比較差,因此思路一的性能阻塞在取數(shù)據(jù)方,又是同步的,所以導(dǎo)致整個鏈路的性能會變的很差。

          所謂分治法就是分而治之,也就是說將海量數(shù)據(jù)分割處理。根據(jù) CPU 的能力初始化 n 個線程,每一個線程去消費一個隊列,這樣線程在消費的時候不會出現(xiàn)搶占隊列的問題。

          同時為了保證線程安全和生產(chǎn)者消費者模式的完整,采用阻塞隊列,Java 中提供了 LinkedBlockingQueue 就是一個阻塞隊列。

          圖片

          ①初始化阻塞隊列

          使用 linkedList 創(chuàng)建一個阻塞隊列列表:

              private static List<LinkedBlockingQueue<String>> blockQueueLists = new LinkedList<>();

          在 static 塊里初始化阻塞隊列的數(shù)量和單個阻塞隊列的容量為 256,上面講到了 30E 數(shù)據(jù)大概 2500 行,按行塞到隊列里,20 個隊列,那么每個隊列 125 個,因此可以容量可以設(shè)計為 256 即可:

              //每個隊列容量為256
                  for (int i = 0; i < threadNums; i++) {
                      blockQueueLists.add(new LinkedBlockingQueue<>(256));
                  }

          ②生產(chǎn)者

          為了實現(xiàn)負(fù)載的功能, 首先定義一個 count 計數(shù)器,用來記錄行數(shù):

              private static AtomicLong count = new AtomicLong(0);

          按照行數(shù)來計算隊列的下標(biāo):long index=count.get()%threadNums。

          下面算法就實現(xiàn)了對隊列列表中的隊列進行輪詢的投放:

             static class SplitData {

                  public static void splitLine(String lineData) {
          //            System.out.println(lineData.length());
                      String[] arr = lineData.split("\n");
                      for (String str : arr) {
                          if (StringUtils.isEmpty(str)) {
                              continue;
                          }
                          long index = count.get() % threadNums;
                          try {
                              // 如果滿了就阻塞
                              blockQueueLists.get((int) index).put(str);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          count.getAndIncrement();

                      }
                  }

          ③消費者

          隊列線程私有化: 消費方在啟動線程的時候根據(jù) index 去獲取到指定的隊列,這樣就實現(xiàn)了隊列的線程私有化。

              private static void startConsumer() throws FileNotFoundException, UnsupportedEncodingException {
                  //如果共用一個隊列,那么線程不宜過多,容易出現(xiàn)搶占現(xiàn)象
                  System.out.println("開始消費...");
                  for (int i = 0; i < threadNums; i++) {
                      final int index = i;
                      // 每一個線程負(fù)責(zé)一個queue,這樣不會出現(xiàn)線程搶占隊列的情況。
                      new Thread(() -> {
                          while (consumerRunning) {
                              startConsumer = true;
                              try {
                                  String str = blockQueueLists.get(index).take();
                                  countNum(str);
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                          }
                      }).start();
                  }


              }

          多子線程分割字符串: 由于從隊列中多到的字符串非常的龐大,如果又是用單線程調(diào)用 split(",") 去分割,那么性能同樣會阻塞在這個地方。

              // 按照arr的大小,運用多線程分割字符串
              private static void countNum(String str) {
                  int[] arr = new int[2];
                  arr[1] = str.length() / 3;
          //        System.out.println("分割的字符串為start位置為:" + arr[0] + ",end位置為:" + arr[1]);
                  for (int i = 0; i < 3; i++) {
                      final String innerStr = SplitData.splitStr(str, arr);
          //            System.out.println("分割的字符串為start位置為:" + arr[0] + ",end位置為:" + arr[1]);
                      new Thread(() -> {
                          String[] strArray = innerStr.split(",");
                          for (String s : strArray) {
                              countMap.computeIfAbsent(s, s1 -> new AtomicInteger(0)).getAndIncrement();
                          }
                      }).start();
                  }
              }

          分割字符串算法: 分割時從 0 開始,按照等分的原則,將字符串 n 等份,每一個線程分到一份。

          用一個 arr 數(shù)組的 arr[0] 記錄每次的分割開始位置,arr[1] 記錄每次分割的結(jié)束位置,如果遇到的開始的字符不為 ",",那么就 startIndex-1,如果結(jié)束的位置不為 ",",那么將 endIndex 向后移一位。

          如果 endIndex 超過了字符串的最大長度,那么就把最后一個字符賦值給 arr[1]。

                  /**
                   * 按照 x坐標(biāo) 來分割 字符串,如果切到的字符不為“,”, 那么把坐標(biāo)向前或者向后移動一位。
                   *
                   * @param line
                   * @param arr  存放x1,x2坐標(biāo)
                   * @return
                   */
                  public static String splitStr(String line, int[] arr) {

                      int startIndex = arr[0];
                      int endIndex = arr[1];
                      char start = line.charAt(startIndex);
                      char end = line.charAt(endIndex);
                      if ((startIndex == 0 || start == ',') && end == ',') {
                          arr[0] = endIndex + 1;
                          arr[1] = arr[0] + line.length() / 3;
                          if (arr[1] >= line.length()) {
                              arr[1] = line.length() - 1;
                          }
                          return line.substring(startIndex, endIndex);
                      }

                      if (startIndex != 0 && start != ',') {
                          startIndex = startIndex - 1;
                      }

                      if (end != ',') {
                          endIndex = endIndex + 1;
                      }

                      arr[0] = startIndex;
                      arr[1] = endIndex;
                      if (arr[1] >= line.length()) {
                          arr[1] = line.length() - 1;
                      }
                      return splitStr(line, arr);
                  }

          完整代碼:

          package bigdata;

          import cn.hutool.core.collection.CollectionUtil;
          import org.apache.commons.lang3.StringUtils;

          import java.io.*;
          import java.util.*;
          import java.util.concurrent.ConcurrentHashMap;
          import java.util.concurrent.LinkedBlockingQueue;
          import java.util.concurrent.atomic.AtomicInteger;
          import java.util.concurrent.atomic.AtomicLong;
          import java.util.concurrent.locks.ReentrantLock;

          /**
           * @Desc:
           * @Author: bingbing
           * @Date: 2022/5/4 0004 19:19
           * 多線程處理
           */
          public class HandleMaxRepeatProblem {

              public static final int start = 18;
              public static final int end = 70;

              public static final String dir = "D:\dataDir";

              public static final String FILE_NAME = "D:\ User.dat";

              private static final int threadNums = 20;


              /**
               * key 為年齡,  value為所有的行列表,使用隊列
               */
              private static Map<Integer, Vector<String>> valueMap = new ConcurrentHashMap<>();


              /**
               * 存放數(shù)據(jù)的隊列
               */
              private static List<LinkedBlockingQueue<String>> blockQueueLists = new LinkedList<>();


              /**
               * 統(tǒng)計數(shù)量
               */
              private static Map<String, AtomicInteger> countMap = new ConcurrentHashMap<>();


              private static Map<Integer, ReentrantLock> lockMap = new ConcurrentHashMap<>();

              // 隊列負(fù)載均衡
              private static AtomicLong count = new AtomicLong(0);

              /**
               * 開啟消費的標(biāo)志
               */
              private static volatile boolean startConsumer = false;

              /**
               * 消費者運行保證
               */
              private static volatile boolean consumerRunning = true;


              /**
               * 按照 "," 分割數(shù)據(jù),并寫入到文件里
               */
              static class SplitData {

                  public static void splitLine(String lineData) {
          //            System.out.println(lineData.length());
                      String[] arr = lineData.split("\n");
                      for (String str : arr) {
                          if (StringUtils.isEmpty(str)) {
                              continue;
                          }
                          long index = count.get() % threadNums;
                          try {
                              // 如果滿了就阻塞
                              blockQueueLists.get((int) index).put(str);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          count.getAndIncrement();

                      }
                  }

                  /**
                   * 按照 x坐標(biāo) 來分割 字符串,如果切到的字符不為“,”, 那么把坐標(biāo)向前或者向后移動一位。
                   *
                   * @param line
                   * @param arr  存放x1,x2坐標(biāo)
                   * @return
                   */
                  public static String splitStr(String line, int[] arr) {

                      int startIndex = arr[0];
                      int endIndex = arr[1];
                      char start = line.charAt(startIndex);
                      char end = line.charAt(endIndex);
                      if ((startIndex == 0 || start == ',') && end == ',') {
                          arr[0] = endIndex + 1;
                          arr[1] = arr[0] + line.length() / 3;
                          if (arr[1] >= line.length()) {
                              arr[1] = line.length() - 1;
                          }
                          return line.substring(startIndex, endIndex);
                      }

                      if (startIndex != 0 && start != ',') {
                          startIndex = startIndex - 1;
                      }

                      if (end != ',') {
                          endIndex = endIndex + 1;
                      }

                      arr[0] = startIndex;
                      arr[1] = endIndex;
                      if (arr[1] >= line.length()) {
                          arr[1] = line.length() - 1;
                      }
                      return splitStr(line, arr);
                  }


                  public static void splitLine0(String lineData) {
                      String[] arr = lineData.split(",");
                      for (String str : arr) {
                          if (StringUtils.isEmpty(str)) {
                              continue;
                          }
                          int keyIndex = Integer.parseInt(str);
                          ReentrantLock lock = lockMap.computeIfAbsent(keyIndex, lockMap -> new ReentrantLock());
                          lock.lock();
                          try {
                              valueMap.get(keyIndex).add(str);
                          } finally {
                              lock.unlock();
                          }

          //                boolean wait = true;
          //                for (; ; ) {
          //                    if (!lockMap.get(Integer.parseInt(str)).isLocked()) {
          //                        wait = false;
          //                        valueMap.computeIfAbsent(Integer.parseInt(str), integer -> new Vector<>()).add(str);
          //                    }
          //                    // 當(dāng)前阻塞,直到釋放鎖
          //                    if (!wait) {
          //                        break;
          //                    }
          //                }

                      }
                  }

              }

              /**
               *  init map
               */

              static {
                  File file = new File(dir);
                  if (!file.exists()) {
                      file.mkdir();
                  }

                  //每個隊列容量為256
                  for (int i = 0; i < threadNums; i++) {
                      blockQueueLists.add(new LinkedBlockingQueue<>(256));
                  }


                  for (int i = start; i <= end; i++) {
                      try {
                          File subFile = new File(dir + "\" + i + ".dat");
                          if (!file.exists()) {
                              subFile.createNewFile();
                          }
                          countMap.computeIfAbsent(i + "
          ", integer -> new AtomicInteger(0));
          //                lockMap.computeIfAbsent(i, lock -> new ReentrantLock());
                      } catch (FileNotFoundException e) {
                          e.printStackTrace();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
              }

              public static void main(String[] args) {


                  new Thread(() -> {
                      try {
                          // 讀取數(shù)據(jù)
                          readData();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }


                  }).start();

                  new Thread(() -> {
                      try {
                          // 開始消費
                          startConsumer();
                      } catch (FileNotFoundException e) {
                          e.printStackTrace();
                      } catch (UnsupportedEncodingException e) {
                          e.printStackTrace();
                      }
                  }).start();

                  new Thread(() -> {
                      // 監(jiān)控
                      monitor();
                  }).start();


              }


              /**
               * 每隔60s去檢查棧是否為空
               */
              private static void monitor() {
                  AtomicInteger emptyNum = new AtomicInteger(0);
                  while (consumerRunning) {
                      try {
                          Thread.sleep(10 * 1000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      if (startConsumer) {
                          // 如果所有棧的大小都為0,那么終止進程
                          AtomicInteger emptyCount = new AtomicInteger(0);
                          for (int i = 0; i < threadNums; i++) {
                              if (blockQueueLists.get(i).size() == 0) {
                                  emptyCount.getAndIncrement();
                              }
                          }
                          if (emptyCount.get() == threadNums) {
                              emptyNum.getAndIncrement();
                              // 如果連續(xù)檢查指定次數(shù)都為空,那么就停止消費
                              if (emptyNum.get() > 12) {
                                  consumerRunning = false;
                                  System.out.println("
          消費結(jié)束...");
                                  try {
                                      clearTask();
                                  } catch (Exception e) {
                                      System.out.println(e.getCause());
                                  } finally {
                                      System.exit(-1);
                                  }
                              }
                          }
                      }

                  }
              }


              private static void readData() throws IOException {

                  BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "
          utf-8"));
                  String line;
                  long start = System.currentTimeMillis();
                  int count = 1;
                  while ((line = br.readLine()) != null) {
                      // 按行讀取,并向隊列寫入數(shù)據(jù)
                      SplitData.splitLine(line);
                      if (count % 100 == 0) {
                          System.out.println("
          讀取100行,總耗時間: " + (System.currentTimeMillis() - start) / 1000 + " s");
                          try {
                              Thread.sleep(1000L);
                              System.gc();
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                      count++;
                  }

                  br.close();
              }

              private static void clearTask() {
                  // 清理,同時找出出現(xiàn)字符最大的數(shù)
                  Integer targetValue = 0;
                  String targetKey = null;
                  Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
                  while (entrySetIterator.hasNext()) {
                      Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();
                      Integer value = entry.getValue().get();
                      String key = entry.getKey();
                      if (value > targetValue) {
                          targetValue = value;
                          targetKey = key;
                      }
                  }
                  System.out.println("
          數(shù)量最多的年齡為:" + targetKey + "數(shù)量為:" + targetValue);
                  System.exit(-1);
              }

              /**
               * 使用linkedBlockQueue
               *
               * @throws FileNotFoundException
               * @throws UnsupportedEncodingException
               */
              private static void startConsumer() throws FileNotFoundException, UnsupportedEncodingException {
                  //如果共用一個隊列,那么線程不宜過多,容易出現(xiàn)搶占現(xiàn)象
                  System.out.println("
          開始消費...");
                  for (int i = 0; i < threadNums; i++) {
                      final int index = i;
                      // 每一個線程負(fù)責(zé)一個queue,這樣不會出現(xiàn)線程搶占隊列的情況。
                      new Thread(() -> {
                          while (consumerRunning) {
                              startConsumer = true;
                              try {
                                  String str = blockQueueLists.get(index).take();
                                  countNum(str);
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                          }
                      }).start();
                  }


              }

              // 按照arr的大小,運用多線程分割字符串
              private static void countNum(String str) {
                  int[] arr = new int[2];
                  arr[1] = str.length() / 3;
          //        System.out.println("
          分割的字符串為start位置為:" + arr[0] + ",end位置為:" + arr[1]);
                  for (int i = 0; i < 3; i++) {
                      final String innerStr = SplitData.splitStr(str, arr);
          //            System.out.println("
          分割的字符串為start位置為:" + arr[0] + ",end位置為:" + arr[1]);
                      new Thread(() -> {
                          String[] strArray = innerStr.split("
          ,");
                          for (String s : strArray) {
                              countMap.computeIfAbsent(s, s1 -> new AtomicInteger(0)).getAndIncrement();
                          }
                      }).start();
                  }
              }


              /**
               * 后臺線程去消費map里數(shù)據(jù)寫入到各個文件里, 如果不消費,那么會將內(nèi)存程爆
               */
              private static void startConsumer0() throws FileNotFoundException, UnsupportedEncodingException {
                  for (int i = start; i <= end; i++) {
                      final int index = i;
                      BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(dir + "
          \" + i + ".dat"false), "utf-8"));
                      new Thread(() -> {
                          int miss = 0;
                          int countIndex = 0;
                          while (true) {
                              // 每隔100萬打印一次
                              int count = countMap.get(index).get();
                              if (count > 1000000 * countIndex) {
                                  System.out.println(index + "歲年齡的個數(shù)為:" + countMap.get(index).get());
                                  countIndex += 1;
                              }
                              if (miss > 1000) {
                                  // 終止線程
                                  try {
                                      Thread.currentThread().interrupt();
                                      bw.close();
                                  } catch (IOException e) {

                                  }
                              }
                              if (Thread.currentThread().isInterrupted()) {
                                  break;
                              }


                              Vector<String> lines = valueMap.computeIfAbsent(index, vector -> new Vector<>());
                              // 寫入到文件里
                              try {

                                  if (CollectionUtil.isEmpty(lines)) {
                                      miss++;
                                      Thread.sleep(1000);
                                  } else {
                                      // 100個一批
                                      if (lines.size() < 1000) {
                                          Thread.sleep(1000);
                                          continue;
                                      }
                                      // 1000個的時候開始處理
                                      ReentrantLock lock = lockMap.computeIfAbsent(index, lockIndex -> new ReentrantLock());
                                      lock.lock();
                                      try {
                                          Iterator<String> iterator = lines.iterator();
                                          StringBuilder sb = new StringBuilder();
                                          while (iterator.hasNext()) {
                                              sb.append(iterator.next());
                                              countMap.get(index).addAndGet(1);
                                          }
                                          try {
                                              bw.write(sb.toString());
                                              bw.flush();
                                          } catch (IOException e) {
                                              e.printStackTrace();
                                          }
                                          // 清除掉vector
                                          valueMap.put(index, new Vector<>());
                                      } finally {
                                          lock.unlock();
                                      }

                                  }
                              } catch (InterruptedException e) {

                              }
                          }
                      }).start();
                  }

              }
          }

          測試結(jié)果:

          內(nèi)存和 CPU 初始占用大小:

          圖片

          啟動后,運行時穩(wěn)定在 11.7,CPU 穩(wěn)定利用在 90% 以上。

          圖片

          總耗時由 180S 縮減到 103S,效率提升 75%,得到的結(jié)果也與單線程處理的一致!

          圖片

          遇到的問題

          如果在運行了的時候,發(fā)現(xiàn) GC 突然罷工了,開始不工作了,有可能是 JVM 的堆中存在的垃圾太多,沒回收導(dǎo)致內(nèi)存的突增。

          圖片

          解決方法:在讀取一定數(shù)量后,可以讓主線程暫停幾秒,手動調(diào)用 GC。

          提示:本 demo 的線程創(chuàng)建都是手動創(chuàng)建的,實際開發(fā)中使用的是線程池!

          ?? 歡迎加入小哈的星球 ,你將獲得: 專屬的項目實戰(zhàn) / Java 學(xué)習(xí)路線 / 一對一提問 / 學(xué)習(xí)打卡 /  贈書福利


          全棧前后端分離博客項目 2.0 版本完結(jié)啦, 演示鏈接http://116.62.199.48/ ,新項目正在醞釀中。全程手摸手,后端 + 前端全棧開發(fā),從 0 到 1 講解每個功能點開發(fā)步驟,1v1 答疑,直到項目上線。目前已更新了239小節(jié),累計38w+字,講解圖:1645張,還在持續(xù)爆肝中.. 后續(xù)還會上新更多項目,目標(biāo)是將Java領(lǐng)域典型的項目都整一波,如秒殺系統(tǒng), 在線商城, IM即時通訊,Spring Cloud Alibaba 等等,戳我加入學(xué)習(xí),已有1200+小伙伴加入(早鳥價超低)



              
                 

          1. 我的私密學(xué)習(xí)小圈子~

          2. 炫技Groovy!SpringBoot中的動態(tài)編程實戰(zhàn)

          3. 別再分庫分表了,試試TiDB!

          4. 百度又開源一款壓測工具,可模擬幾十億的并發(fā)場景,太強悍了!

          最近面試BAT,整理一份面試資料Java面試BATJ通關(guān)手冊,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。

          獲取方式:點“在看”,關(guān)注公眾號并回復(fù) Java 領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

          PS:因公眾號平臺更改了推送規(guī)則,如果不想錯過內(nèi)容,記得讀完點一下在看,加個星標(biāo),這樣每次新文章推送才會第一時間出現(xiàn)在你的訂閱列表里。

          “在看”支持小哈呀,謝謝啦

          瀏覽 166
          10點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          10點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人妻A√无码一区三级无套 | 日本AA黄色片网站 | 国产视频swag | 一本色道久久综合狠狠躁小说 | 免费h片在线观看网址 |