<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hadoop重點難點:Shuffle過程中的環(huán)形緩沖區(qū)

          共 27292字,需瀏覽 55分鐘

           ·

          2021-09-18 19:45

          點擊上方藍色字體,選擇“設(shè)為星標”

          回復(fù)”面試“獲取更多驚喜

          這篇文章來自一個讀者在面試過程中的一個問題,Hadoop在shuffle過程中使用了一個數(shù)據(jù)結(jié)構(gòu)-環(huán)形緩沖區(qū)。


          環(huán)形隊列是在實際編程極為有用的數(shù)據(jù)結(jié)構(gòu),它是一個首尾相連的FIFO的數(shù)據(jù)結(jié)構(gòu),采用數(shù)組的線性空間,數(shù)據(jù)組織簡單。能很快知道隊列是否滿為空。能以很快速度的來存取數(shù)據(jù)。 因為有簡單高效的原因,甚至在硬件都實現(xiàn)了環(huán)形隊列。

           

          環(huán)形隊列廣泛用于網(wǎng)絡(luò)數(shù)據(jù)收發(fā),和不同程序間數(shù)據(jù)交換(比如內(nèi)核與應(yīng)用程序大量交換數(shù)據(jù),從硬件接收大量數(shù)據(jù))均使用了環(huán)形隊列。

          環(huán)形緩沖區(qū)數(shù)據(jù)結(jié)構(gòu)

          Map過程中環(huán)形緩沖區(qū)是指數(shù)據(jù)被map處理之后會先放入內(nèi)存,內(nèi)存中的這片區(qū)域就是環(huán)形緩沖區(qū)。

          環(huán)形緩沖區(qū)是在MapTask.MapOutputBuffer中定義的,相關(guān)的屬性如下:











          // k/v accounting
          // 存放meta數(shù)據(jù)的IntBuffer,都是int entry,占4byte
          private IntBuffer kvmeta; // metadata overlay on backing store
          int kvstart; // marks origin of spill metadata
          int kvend; // marks end of spill metadata
          int kvindex; // marks end of fully serialized records
          // 分割meta和key value內(nèi)容的標識
          // meta數(shù)據(jù)和key value內(nèi)容都存放在同一個環(huán)形緩沖區(qū),所以需要分隔開
          int equator; // marks origin of meta/serialization
          int bufstart; // marks beginning of spill
          int bufend; // marks beginning of collectable
          int bufmark; // marks end of record
          int bufindex; // marks end of collected
          int bufvoid; // marks the point where we should stop
          // reading at the end of the buffer
          // 存放key value的byte數(shù)組,單位是byte,注意與kvmeta區(qū)分
          byte[] kvbuffer; // main output buffer
          private final byte[] b0 = new byte[0];

          // key value在kvbuffer中的地址存放在偏移kvindex的距離
          private static final int VALSTART = 0; // val offset in acct
          private static final int KEYSTART = 1; // key offset in acct
          // partition信息存在kvmeta中偏移kvindex的距離
          private static final int PARTITION = 2; // partition offset in acct
          private static final int VALLEN = 3; // length of value
          // 一對key value的meta數(shù)據(jù)在kvmeta中占用的個數(shù)
          private static final int NMETA = 4; // num meta ints
          // 一對key value的meta數(shù)據(jù)在kvmeta中占用的byte數(shù)
          private static final int METASIZE = NMETA * 4; // size in bytes

          環(huán)形緩沖區(qū)其實是一個數(shù)組,數(shù)組中存放著key、value的序列化數(shù)據(jù)和key、value的元數(shù)據(jù)信息,key/value的元數(shù)據(jù)存儲的格式是int類型,每個key/value對應(yīng)一個元數(shù)據(jù),元數(shù)據(jù)由4個int組成,第一個int存放value的起始位置,第二個存放key的起始位置,第三個存放partition,最后一個存放value的長度。

          key/value序列化的數(shù)據(jù)和元數(shù)據(jù)在環(huán)形緩沖區(qū)中的存儲是由equator分隔的,key/value按照索引遞增的方向存儲,meta則按照索引遞減的方向存儲,將其數(shù)組抽象為一個環(huán)形結(jié)構(gòu)之后,以equator為界,key/value順時針存儲,meta逆時針存儲

          初始化

          環(huán)形緩沖區(qū)的結(jié)構(gòu)在MapOutputBuffer.init中創(chuàng)建。











          public void init(MapOutputCollector.Context context
          ) throws IOException, ClassNotFoundException {
          ...
          //MAP_SORT_SPILL_PERCENT = mapreduce.map.sort.spill.percent
          // map 端buffer所占的百分比
          //sanity checks
          final float spillper =
          job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
          //IO_SORT_MB = "mapreduce.task.io.sort.mb"
          // map 端buffer大小
          // mapreduce.task.io.sort.mb * mapreduce.map.sort.spill.percent 最好是16的整數(shù)倍
          final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
          // 所有的spill index 在內(nèi)存所占的大小的閾值
          indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
          INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
          ...
          // 排序的實現(xiàn)類,可以自己實現(xiàn)。這里用的是改寫的快排
          sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
          QuickSort.class, IndexedSorter.class), job);
          // buffers and accounting
          // 上面IO_SORT_MB的單位是MB,左移20位將單位轉(zhuǎn)化為byte
          int maxMemUsage = sortmb << 20;
          // METASIZE是元數(shù)據(jù)的長度,元數(shù)據(jù)有4個int單元,分別為
          // VALSTART、KEYSTART、PARTITION、VALLEN,而int為4個byte,
          // 所以METASIZE長度為16。下面是計算buffer中最多有多少byte來存元數(shù)據(jù)
          maxMemUsage -= maxMemUsage % METASIZE;
          // 元數(shù)據(jù)數(shù)組 以byte為單位
          kvbuffer = new byte[maxMemUsage];
          bufvoid = kvbuffer.length;
          // 將kvbuffer轉(zhuǎn)化為int型的kvmeta 以int為單位,也就是4byte
          kvmeta = ByteBuffer.wrap(kvbuffer)
          .order(ByteOrder.nativeOrder())
          .asIntBuffer();
          // 設(shè)置buf和kvmeta的分界線
          setEquator(0);
          bufstart = bufend = bufindex = equator;
          kvstart = kvend = kvindex;
          // kvmeta中存放元數(shù)據(jù)實體的最大個數(shù)
          maxRec = kvmeta.capacity() / NMETA;
          // buffer spill時的閾值(不單單是sortmb*spillper)
          // 更加精確的是kvbuffer.length*spiller
          softLimit = (int)(kvbuffer.length * spillper);
          // 此變量較為重要,作為spill的動態(tài)衡量標準
          bufferRemaining = softLimit;
          ...
          // k/v serialization
          comparator = job.getOutputKeyComparator();
          keyClass = (Class<K>)job.getMapOutputKeyClass();
          valClass = (Class<V>)job.getMapOutputValueClass();
          serializationFactory = new SerializationFactory(job);
          keySerializer = serializationFactory.getSerializer(keyClass);
          // 將bb作為key序列化寫入的output
          keySerializer.open(bb);
          valSerializer = serializationFactory.getSerializer(valClass);
          // 將bb作為value序列化寫入的output
          valSerializer.open(bb);
          ...
          // combiner
          ...
          spillInProgress = false;
          // 最后一次merge時,在有combiner的情況下,超過此閾值才執(zhí)行combiner
          minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
          spillThread.setDaemon(true);
          spillThread.setName("SpillThread");
          spillLock.lock();
          try {
          spillThread.start();
          while (!spillThreadRunning) {
          spillDone.await();
          }
          } catch (InterruptedException e) {
          throw new IOException("Spill thread failed to initialize", e);
          } finally {
          spillLock.unlock();
          }
          if (sortSpillException != null) {
          throw new IOException("Spill thread failed to initialize",
          sortSpillException);
          }
          }

          init是對環(huán)形緩沖區(qū)進行初始化構(gòu)造,由mapreduce.task.io.sort.mb決定map中環(huán)形緩沖區(qū)的大小sortmb,默認是100M。

          此緩沖區(qū)也用于存放meta,一個meta占用METASIZE(16byte),則其中用于存放數(shù)據(jù)的大小是maxMemUsage -= sortmb << 20 % METASIZE(由此可知最好設(shè)置sortmb轉(zhuǎn)換為byte之后是16的整數(shù)倍),然后用maxMemUsage初始化kvbuffer字節(jié)數(shù)組kvmeta整形數(shù)組,最后設(shè)置數(shù)組的一些標識信息。利用setEquator(0)設(shè)置kvbuffer和kvmeta的分界線,初始化的時候以0為分界線,kvindex為aligned - METASIZE + kvbuffer.length,其位置在環(huán)形數(shù)組中相當于按照逆時針方向減去METASIZE,由kvindex設(shè)置kvstart = kvend = kvindex,由equator設(shè)置bufstart = bufend = bufindex = equator,還得設(shè)置bufvoid = kvbuffer.length,bufvoid用于標識用于存放數(shù)據(jù)的最大位置。

          為了提高效率,當buffer占用達到閾值之后,會進行spill,這個閾值是由bufferRemaining進行檢查的,bufferRemaining由softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit;進行初始化賦值,這里需要注意的是softLimit并不是sortmb*spillper,而是kvbuffer.length * spillper,當sortmb << 20是16的整數(shù)倍時,才可以認為softLimit是sortmb*spillper。

          下面是setEquator的代碼

          // setEquator(0)的代碼如下
          private void setEquator(int pos) {
          equator = pos;
          // set index prior to first entry, aligned at meta boundary
          // 第一個 entry的末尾位置,即元數(shù)據(jù)和kv數(shù)據(jù)的分界線 單位是byte
          final int aligned = pos - (pos % METASIZE);
          // Cast one of the operands to long to avoid integer overflow
          // 元數(shù)據(jù)中存放數(shù)據(jù)的起始位置
          kvindex = (int)
          (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
          LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
          "(" + (kvindex * 4) + ")");
          }


          buffer初始化之后的抽象數(shù)據(jù)結(jié)構(gòu)如下圖所示:

          環(huán)形緩沖區(qū)數(shù)據(jù)結(jié)構(gòu)圖

          寫入buffer

          Map通過NewOutputCollector.write方法調(diào)用collector.collect向buffer中寫入數(shù)據(jù),數(shù)據(jù)寫入之前已在NewOutputCollector.write中對要寫入的數(shù)據(jù)進行逐條分區(qū),下面看下collect

          // MapOutputBuffer.collect
          public synchronized void collect(K key, V value, final int partition
          ) throws IOException {
          ...
          // 新數(shù)據(jù)collect時,先將剩余的空間減去元數(shù)據(jù)的長度,之后進行判斷
          bufferRemaining -= METASIZE;
          if (bufferRemaining <= 0) {
          // start spill if the thread is not running and the soft limit has been
          // reached
          spillLock.lock();
          try {
          do {
          // 首次spill時,spillInProgress是false
          if (!spillInProgress) {
          // 得到kvindex的byte位置
          final int kvbidx = 4 * kvindex;
          // 得到kvend的byte位置
          final int kvbend = 4 * kvend;
          // serialized, unspilled bytes always lie between kvindex and
          // bufindex, crossing the equator. Note that any void space
          // created by a reset must be included in "used" bytes
          final int bUsed = distanceTo(kvbidx, bufindex);
          final boolean bufsoftlimit = bUsed >= softLimit;
          if ((kvbend + METASIZE) % kvbuffer.length !=
          equator - (equator % METASIZE)) {
          // spill finished, reclaim space
          resetSpill();
          bufferRemaining = Math.min(
          distanceTo(bufindex, kvbidx) - 2 * METASIZE,
          softLimit - bUsed) - METASIZE;
          continue;
          } else if (bufsoftlimit && kvindex != kvend) {
          // spill records, if any collected; check latter, as it may
          // be possible for metadata alignment to hit spill pcnt
          startSpill();
          final int avgRec = (int)
          (mapOutputByteCounter.getCounter() /
          mapOutputRecordCounter.getCounter());
          // leave at least half the split buffer for serialization data
          // ensure that kvindex >= bufindex
          final int distkvi = distanceTo(bufindex, kvbidx);
          final int newPos = (bufindex +
          Math.max(2 * METASIZE - 1,
          Math.min(distkvi / 2,
          distkvi / (METASIZE + avgRec) * METASIZE)))
          % kvbuffer.length;
          setEquator(newPos);
          bufmark = bufindex = newPos;
          final int serBound = 4 * kvend;
          // bytes remaining before the lock must be held and limits
          // checked is the minimum of three arcs: the metadata space, the
          // serialization space, and the soft limit
          bufferRemaining = Math.min(
          // metadata max
          distanceTo(bufend, newPos),
          Math.min(
          // serialization max
          distanceTo(newPos, serBound),
          // soft limit
          softLimit)) - 2 * METASIZE;
          }
          }
          } while (false);
          } finally {
          spillLock.unlock();
          }
          }
          // 將key value 及元數(shù)據(jù)信息寫入緩沖區(qū)
          try {
          // serialize key bytes into buffer
          int keystart = bufindex;
          // 將key序列化寫入kvbuffer中,并移動bufindex
          keySerializer.serialize(key);
          // key所占空間被bufvoid分隔,則移動key,
          // 將其值放在連續(xù)的空間中便于sort時key的對比
          if (bufindex < keystart) {
          // wrapped the key; must make contiguous
          bb.shiftBufferedKey();
          keystart = 0;
          }
          // serialize value bytes into buffer
          final int valstart = bufindex;
          valSerializer.serialize(value);
          // It's possible for records to have zero length, i.e. the serializer
          // will perform no writes. To ensure that the boundary conditions are
          // checked and that the kvindex invariant is maintained, perform a
          // zero-length write into the buffer. The logic monitoring this could be
          // moved into collect, but this is cleaner and inexpensive. For now, it
          // is acceptable.
          bb.write(b0, 0, 0);

          // the record must be marked after the preceding write, as the metadata
          // for this record are not yet written
          int valend = bb.markRecord();

          mapOutputRecordCounter.increment(1);
          mapOutputByteCounter.increment(
          distanceTo(keystart, valend, bufvoid));

          // write accounting info
          kvmeta.put(kvindex + PARTITION, partition);
          kvmeta.put(kvindex + KEYSTART, keystart);
          kvmeta.put(kvindex + VALSTART, valstart);
          kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
          // advance kvindex
          kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
          } catch (MapBufferTooSmallException e) {
          LOG.info("Record too large for in-memory buffer: " + e.getMessage());
          spillSingleRecord(key, value, partition);
          mapOutputRecordCounter.increment(1);
          return;
          }
          }

          每次寫入數(shù)據(jù)時,執(zhí)行bufferRemaining -= METASIZE之后,檢查bufferRemaining

          如果大于0,直接將key/value序列化對和對應(yīng)的meta寫入buffer中,key/value是序列化之后寫入的,key/value經(jīng)過一些列的方法調(diào)用Serializer.serialize(key/value) -> WritableSerializer.serialize(key/value) -> BytesWritable.write(dataOut) -> DataOutputStream.write(bytes, 0, size) -> MapOutputBuffer.Buffer.write(b, off, len),最后由MapOutputBuffer.Buffer.write(b, off, len)將數(shù)據(jù)寫入kvbuffer中,write方法如下:

          public void write(byte b[], int off, int len)
          throws IOException {
          // must always verify the invariant that at least METASIZE bytes are
          // available beyond kvindex, even when len == 0
          bufferRemaining -= len;
          if (bufferRemaining <= 0) {
          // writing these bytes could exhaust available buffer space or fill
          // the buffer to soft limit. check if spill or blocking are necessary
          boolean blockwrite = false;
          spillLock.lock();
          try {
          do {
          checkSpillException();

          final int kvbidx = 4 * kvindex;
          final int kvbend = 4 * kvend;
          // ser distance to key index
          final int distkvi = distanceTo(bufindex, kvbidx);
          // ser distance to spill end index
          final int distkve = distanceTo(bufindex, kvbend);

          // if kvindex is closer than kvend, then a spill is neither in
          // progress nor complete and reset since the lock was held. The
          // write should block only if there is insufficient space to
          // complete the current write, write the metadata for this record,
          // and write the metadata for the next record. If kvend is closer,
          // then the write should block if there is too little space for
          // either the metadata or the current write. Note that collect
          // ensures its metadata requirement with a zero-length write
          blockwrite = distkvi <= distkve
          ? distkvi <= len + 2 * METASIZE
          : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;

          if (!spillInProgress) {
          if (blockwrite) {
          if ((kvbend + METASIZE) % kvbuffer.length !=
          equator - (equator % METASIZE)) {
          // spill finished, reclaim space
          // need to use meta exclusively; zero-len rec & 100% spill
          // pcnt would fail
          resetSpill(); // resetSpill doesn't move bufindex, kvindex
          bufferRemaining = Math.min(
          distkvi - 2 * METASIZE,
          softLimit - distanceTo(kvbidx, bufindex)) - len;
          continue;
          }
          // we have records we can spill; only spill if blocked
          if (kvindex != kvend) {
          startSpill();
          // Blocked on this write, waiting for the spill just
          // initiated to finish. Instead of repositioning the marker
          // and copying the partial record, we set the record start
          // to be the new equator
          setEquator(bufmark);
          } else {
          // We have no buffered records, and this record is too large
          // to write into kvbuffer. We must spill it directly from
          // collect
          final int size = distanceTo(bufstart, bufindex) + len;
          setEquator(0);
          bufstart = bufend = bufindex = equator;
          kvstart = kvend = kvindex;
          bufvoid = kvbuffer.length;
          throw new MapBufferTooSmallException(size + " bytes");
          }
          }
          }

          if (blockwrite) {
          // wait for spill
          try {
          while (spillInProgress) {
          reporter.progress();
          spillDone.await();
          }
          } catch (InterruptedException e) {
          throw new IOException(
          "Buffer interrupted while waiting for the writer", e);
          }
          }
          } while (blockwrite);
          } finally {
          spillLock.unlock();
          }
          }
          // here, we know that we have sufficient space to write
          if (bufindex + len > bufvoid) {
          final int gaplen = bufvoid - bufindex;
          System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
          len -= gaplen;
          off += gaplen;
          bufindex = 0;
          }
          System.arraycopy(b, off, kvbuffer, bufindex, len);
          bufindex += len;
          }

          write方法將key/value寫入kvbuffer中,如果bufindex+len超過了bufvoid,則將寫入的內(nèi)容分開存儲,將一部分寫入bufindex和bufvoid之間,然后重置bufindex,將剩余的部分寫入,這里不區(qū)分key和value,寫入key之后會在collect中判斷bufindex < keystart,當bufindex小時,則key被分開存儲,執(zhí)行bb.shiftBufferedKey(),value則直接寫入,不用判斷是否被分開存儲,key不能分開存儲是因為要對key進行排序。

          這里需要注意的是要寫入的數(shù)據(jù)太長,并且kvinde==kvend,則拋出MapBufferTooSmallException異常,在collect中捕獲,將此數(shù)據(jù)直接spill到磁盤spillSingleRecord也就是當單條記錄過長時,不寫buffer,直接寫入磁盤。

          下面看下bb.shiftBufferedKey()代碼

          // BlockingBuffer.shiftBufferedKey
          protected void shiftBufferedKey() throws IOException {
          // spillLock unnecessary; both kvend and kvindex are current
          int headbytelen = bufvoid - bufmark;
          bufvoid = bufmark;
          final int kvbidx = 4 * kvindex;
          final int kvbend = 4 * kvend;
          final int avail =
          Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
          if (bufindex + headbytelen < avail) {
          System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
          System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
          bufindex += headbytelen;
          bufferRemaining -= kvbuffer.length - bufvoid;
          } else {
          byte[] keytmp = new byte[bufindex];
          System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
          bufindex = 0;
          out.write(kvbuffer, bufmark, headbytelen);
          out.write(keytmp);
          }
          }

          shiftBufferedKey時,判斷首部是否有足夠的空間存放key,有沒有足夠的空間,則先將首部的部分key寫入keytmp中,然后分兩次寫入,再次調(diào)用Buffer.write,如果有足夠的空間,分兩次copy,先將首部的部分key復(fù)制到headbytelen的位置,然后將末尾的部分key復(fù)制到首部,移動bufindex,重置bufferRemaining的值。

          key/value寫入之后,繼續(xù)寫入元數(shù)據(jù)信息并重置kvindex的值。

          spill

          一次寫入buffer結(jié)束,當寫入數(shù)據(jù)比較多,bufferRemaining小于等于0時,準備進行spill,首次spill,spillInProgress為false,此時查看bUsed = distanceTo(kvbidx, bufindex),此時bUsed >= softLimit 并且 (kvbend + METASIZE) % kvbuffer.length == equator - (equator % METASIZE),則進行spill,調(diào)用startSpill

          private void startSpill() {
          // 元數(shù)據(jù)的邊界賦值
          kvend = (kvindex + NMETA) % kvmeta.capacity();
          // key/value的邊界賦值
          bufend = bufmark;
          // 設(shè)置spill運行標識
          spillInProgress = true;
          ...
          // 利用重入鎖,對spill線程進行喚醒
          spillReady.signal();
          }

          startSpill喚醒spill線程之后,進程spill操作,但此時map向buffer的寫入操作并沒有阻塞,需要重新邊界equator和bufferRemaining的值,先來看下equator和bufferRemaining值的設(shè)定:











          // 根據(jù)已經(jīng)寫入的kv得出每個record的平均長度
          final int avgRec = (int) (mapOutputByteCounter.getCounter() /
          mapOutputRecordCounter.getCounter());
          // leave at least half the split buffer for serialization data
          // ensure that kvindex >= bufindex
          // 得到空余空間的大小
          final int distkvi = distanceTo(bufindex, kvbidx);
          // 得出新equator的位置
          final int newPos = (bufindex +
          Math.max(2 * METASIZE - 1,
          Math.min(distkvi / 2,
          distkvi / (METASIZE + avgRec) * METASIZE)))
          % kvbuffer.length;
          setEquator(newPos);
          bufmark = bufindex = newPos;
          final int serBound = 4 * kvend;
          // bytes remaining before the lock must be held and limits
          // checked is the minimum of three arcs: the metadata space, the
          // serialization space, and the soft limit
          bufferRemaining = Math.min(
          // metadata max
          distanceTo(bufend, newPos),
          Math.min(
          // serialization max
          distanceTo(newPos, serBound),
          // soft limit
          softLimit)) - 2 * METASIZE;

          因為equator是kvbuffer和kvmeta的分界線,為了更多的空間存儲kv,則最多拿出distkvi的一半來存儲meta,并且利用avgRec估算distkvi能存放多少個record和meta對,根據(jù)record和meta對的個數(shù)估算meta所占空間的大小,從distkvi/2和meta所占空間的大小中取最小值,又因為distkvi中最少得存放一個meta,所占空間為METASIZE,在選取kvindex時需要求aligned,aligned最多為METASIZE-1,總和上述因素,最終選取equator為(bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE)))。equator選取之后,設(shè)置bufmark = bufindex = newPos和kvindex,但此時并不設(shè)置bufstart、bufend和kvstart、kvend,因為這幾個值要用來表示spill數(shù)據(jù)的邊界。

          spill之后,可用的空間減少了,則控制spill的bufferRemaining也應(yīng)該重新設(shè)置,bufferRemaining取三個值的最小值減去2*METASIZE,三個值分別是meta可用占用的空間distanceTo(bufend, newPos),kv可用空間distanceTo(newPos, serBound)和softLimit。這里為什么要減去2*METASIZE,一個是spill之前kvend到kvindex的距離,另一個是當時的kvindex空間????此時,已有一個record要寫入buffer,需要從bufferRemaining中減去當前record的元數(shù)據(jù)占用的空間,即減去METASIZE,另一個METASIZE是在計算equator時,沒有包括kvindex到kvend(spill之前)的這段METASIZE,所以要減去這個METASIZE。

          接下來解析下SpillThread線程,查看其run方法:


          public void run() {
          spillLock.lock();
          spillThreadRunning = true;
          try {
          while (true) {
          spillDone.signal();
          // 判斷是否在spill,false則掛起SpillThread線程,等待喚醒
          while (!spillInProgress) {
          spillReady.await();
          }
          try {
          spillLock.unlock();
          // 喚醒之后,進行排序和溢寫到磁盤
          sortAndSpill();
          } catch (Throwable t) {
          sortSpillException = t;
          } finally {
          spillLock.lock();
          if (bufend < bufstart) {
          bufvoid = kvbuffer.length;
          }
          kvstart = kvend;
          bufstart = bufend;
          spillInProgress = false;
          }
          }
          } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          } finally {
          spillLock.unlock();
          spillThreadRunning = false;
          }
          }

          run中主要是sortAndSpill,

          private void sortAndSpill() throws IOException, ClassNotFoundException,
          InterruptedException {
          //approximate the length of the output file to be the length of the
          //buffer + header lengths for the partitions
          final long size = distanceTo(bufstart, bufend, bufvoid) +
          partitions * APPROX_HEADER_LENGTH;
          FSDataOutputStream out = null;
          try {
          // create spill file
          // 用來存儲index文件
          final SpillRecord spillRec = new SpillRecord(partitions);
          // 創(chuàng)建寫入磁盤的spill文件
          final Path filename =
          mapOutputFile.getSpillFileForWrite(numSpills, size);
          // 打開文件流
          out = rfs.create(filename);
          // kvend/4 是截止到當前位置能存放多少個元數(shù)據(jù)實體
          final int mstart = kvend / NMETA;
          // kvstart 處能存放多少個元數(shù)據(jù)實體
          // 元數(shù)據(jù)則在mstart和mend之間,(mstart - mend)則是元數(shù)據(jù)的個數(shù)
          final int mend = 1 + // kvend is a valid record
          (kvstart >= kvend
          ? kvstart
          : kvmeta.capacity() + kvstart) / NMETA;
          // 排序 只對元數(shù)據(jù)進行排序,只調(diào)整元數(shù)據(jù)在kvmeta中的順序
          // 排序規(guī)則是MapOutputBuffer.compare,
          // 先對partition進行排序其次對key值排序
          sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
          int spindex = mstart;
          // 創(chuàng)建rec,用于存放該分區(qū)在數(shù)據(jù)文件中的信息
          final IndexRecord rec = new IndexRecord();
          final InMemValBytes value = new InMemValBytes();
          for (int i = 0; i < partitions; ++i) {
          // 臨時文件是IFile格式的
          IFile.Writer<K, V> writer = null;
          try {
          long segmentStart = out.getPos();
          FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
          writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
          spilledRecordsCounter);
          // 往磁盤寫數(shù)據(jù)時先判斷是否有combiner
          if (combinerRunner == null) {
          // spill directly
          DataInputBuffer key = new DataInputBuffer();
          // 寫入相同partition的數(shù)據(jù)
          while (spindex < mend &&
          kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
          final int kvoff = offsetFor(spindex % maxRec);
          int keystart = kvmeta.get(kvoff + KEYSTART);
          int valstart = kvmeta.get(kvoff + VALSTART);
          key.reset(kvbuffer, keystart, valstart - keystart);
          getVBytesForOffset(kvoff, value);
          writer.append(key, value);
          ++spindex;
          }
          } else {
          int spstart = spindex;
          while (spindex < mend &&
          kvmeta.get(offsetFor(spindex % maxRec)
          + PARTITION) == i) {
          ++spindex;
          }
          // Note: we would like to avoid the combiner if we've fewer
          // than some threshold of records for a partition
          if (spstart != spindex) {
          combineCollector.setWriter(writer);
          RawKeyValueIterator kvIter =
          new MRResultIterator(spstart, spindex);
          combinerRunner.combine(kvIter, combineCollector);
          }
          }

          // close the writer
          writer.close();

          // record offsets
          // 記錄當前partition i的信息寫入索文件rec中
          rec.startOffset = segmentStart;
          rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
          rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
          // spillRec中存放了spill中partition的信息,便于后續(xù)堆排序時,取出partition相關(guān)的數(shù)據(jù)進行排序
          spillRec.putIndex(rec, i);

          writer = null;
          } finally {
          if (null != writer) writer.close();
          }
          }
          // 判斷內(nèi)存中的index文件是否超出閾值,超出則將index文件寫入磁盤
          // 當超出閾值時只是把當前index和之后的index寫入磁盤
          if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
          // create spill index file
          // 創(chuàng)建index文件
          Path indexFilename =
          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
          * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);
          } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
          spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
          }
          LOG.info("Finished spill " + numSpills);
          ++numSpills;
          } finally {
          if (out != null) out.close();
          }
          }

          sortAndSpill中,有mstart和mend得到一共有多少條record需要spill到磁盤,調(diào)用sorter.sort對meta進行排序,先對partition進行排序,然后按key排序,排序的結(jié)果只調(diào)整meta的順序。

          排序之后,判斷是否有combiner,沒有則直接將record寫入磁盤,寫入時是一個partition一個IndexRecord,如果有combiner,則將該partition的record寫入kvIter,然后調(diào)用combinerRunner.combine執(zhí)行combiner。

          寫入磁盤之后,將spillx.out對應(yīng)的spillRec放入內(nèi)存indexCacheList.add(spillRec),如果所占內(nèi)存totalIndexCacheMemory超過了indexCacheMemoryLimit,則創(chuàng)建index文件,將此次及以后的spillRec寫入index文件存入磁盤。

          最后spill次數(shù)遞增。sortAndSpill結(jié)束之后,回到run方法中,執(zhí)行finally中的代碼,對kvstart和bufstart賦值,kvstart = kvend,bufstart = bufend,設(shè)置spillInProgress的狀態(tài)為false。

          在spill的同時,map往buffer的寫操作并沒有停止,依然在調(diào)用collect,再次回到collect方法中,

          // MapOutputBuffer.collect
          public synchronized void collect(K key, V value, final int partition
          ) throws IOException {
          ...
          // 新數(shù)據(jù)collect時,先將剩余的空間減去元數(shù)據(jù)的長度,之后進行判斷
          bufferRemaining -= METASIZE;
          if (bufferRemaining <= 0) {
          // start spill if the thread is not running and the soft limit has been
          // reached
          spillLock.lock();
          try {
          do {
          // 首次spill時,spillInProgress是false
          if (!spillInProgress) {
          // 得到kvindex的byte位置
          final int kvbidx = 4 * kvindex;
          // 得到kvend的byte位置
          final int kvbend = 4 * kvend;
          // serialized, unspilled bytes always lie between kvindex and
          // bufindex, crossing the equator. Note that any void space
          // created by a reset must be included in "used" bytes
          final int bUsed = distanceTo(kvbidx, bufindex);
          final boolean bufsoftlimit = bUsed >= softLimit;
          if ((kvbend + METASIZE) % kvbuffer.length !=
          equator - (equator % METASIZE)) {
          // spill finished, reclaim space
          resetSpill();
          bufferRemaining = Math.min(
          distanceTo(bufindex, kvbidx) - 2 * METASIZE,
          softLimit - bUsed) - METASIZE;
          continue;
          } else if (bufsoftlimit && kvindex != kvend) {
          ...
          }
          }
          } while (false);
          } finally {
          spillLock.unlock();
          }
          }
          ...
          }

          有新的record需要寫入buffer時,判斷bufferRemaining -= METASIZE,此時的bufferRemaining是在開始spill時被重置過的(此時的bufferRemaining應(yīng)該比初始的softLimit要小),當bufferRemaining小于等最后一個METASIZE是當前record進入collect之后bufferRemaining減去的那個METASIZE。

          于0時,進入if,此時spillInProgress的狀態(tài)為false,進入if (!spillInProgress),startSpill時對kvend和bufend進行了重置,則此時(kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE),調(diào)用resetSpill(),將kvstart、kvend和bufstart、bufend設(shè)置為上次startSpill時的位置。此時buffer已將一部分內(nèi)容寫入磁盤,有大量空余的空間,則對bufferRemaining進行重置,此次不spill。

          bufferRemaining取值為Math.min(distanceTo(bufindex, kvbidx) - 2 * METASIZE, softLimit - bUsed) - METASIZE











          private void resetSpill() {
          final int e = equator;
          bufstart = bufend = e;
          final int aligned = e - (e % METASIZE);
          // set start/end to point to first meta record
          // Cast one of the operands to long to avoid integer overflow
          kvstart = kvend = (int)
          (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
          LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
          (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
          }

          當bufferRemaining再次小于等于0時,進行spill,這以后就都是套路了。環(huán)形緩沖區(qū)分析到此結(jié)束。

          八千里路云和月 | 從零到大數(shù)據(jù)專家學習路徑指南

          我們在學習Flink的時候,到底在學習什么?

          193篇文章暴揍Flink,這個合集你需要關(guān)注一下

          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS

          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點

          我們在學習Spark的時候,到底在學習什么?

          在所有Spark模塊中,我愿稱SparkSQL為最強!

          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)

          數(shù)據(jù)治理方法論和實踐小百科全書

          標簽體系下的用戶畫像建設(shè)小指南

          4萬字長文 | ClickHouse基礎(chǔ)&實踐&調(diào)優(yōu)全視角解析

          【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談

          大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)

          我寫過的關(guān)于成長/面試/職場進階的文章

          當我們在學習Hive的時候在學習什么?「硬剛Hive續(xù)集」


          你好,我是王知無,一個大數(shù)據(jù)領(lǐng)域的硬核原創(chuàng)作者。

          做過后端架構(gòu)、數(shù)據(jù)中間件、數(shù)據(jù)平臺&架構(gòu)、算法工程化。

          專注大數(shù)據(jù)領(lǐng)域?qū)崟r動態(tài)&技術(shù)提升&個人成長&職場進階,歡迎關(guān)注。

          瀏覽 52
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人人插人人射 | 逼逼五月天| 99少妇精品 | 特黄AAAAAAA片免费视频 | 中国操逼视频网站 |