中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Node.js pipe實現源碼解析

發布時間:2020-08-28 22:28:38 來源:腳本之家 閱讀:215 作者:xiedacon 欄目:web開發

從前面兩篇文章,我們了解到。想要把 Readable 的數據寫到 Writable,就必須先手動的將數據讀入內存,然后寫入 Writable。換句話說,每次傳遞數據時,都需要寫如下的模板代碼

readable.on('readable', (err) => {
 if(err) throw err

 writable.write(readable.read())
})

為了方便使用,Node.js 提供了 pipe() 方法,讓我們可以優雅的傳遞數據

readable.pipe(writable)

現在,就讓我們來看看它是如何實現的吧

pipe

首先需要先調用 Readable 的 pipe() 方法

// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
 var src = this;
 var state = this._readableState;

 // 記錄 Writable
 switch (state.pipesCount) {
  case 0:
   state.pipes = dest;
   break;
  case 1:
   state.pipes = [state.pipes, dest];
   break;
  default:
   state.pipes.push(dest);
   break;
 }
 state.pipesCount += 1;

 // ...

  src.once('end', endFn);

 dest.on('unpipe', onunpipe);
 
 // ...

 dest.on('drain', ondrain);

 // ...

 src.on('data', ondata);

 // ...

 // 保證 error 事件觸發時,onerror 首先被執行
 prependListener(dest, 'error', onerror);

 // ...

 dest.once('close', onclose);
 
 // ...

 dest.once('finish', onfinish);

 // ...

 // 觸發 Writable 的 pipe 事件
 dest.emit('pipe', src);

 // 將 Readable 改為 flow 模式
 if (!state.flowing) {
  debug('pipe resume');
  src.resume();
 }

 return dest;
};

執行 pipe() 函數時,首先將 Writable 記錄到 state.pipes 中,然后綁定相關事件,最后如果 Readable 不是 flow 模式,就調用 resume() 將 Readable 改為 flow 模式

傳遞數據

Readable 從數據源獲取到數據后,觸發 data 事件,執行 ondata()

ondata() 相關代碼:

// lib/_stream_readable.js

 // 防止在 dest.write(chunk) 內調用 src.push(chunk) 造成 awaitDrain 重復增加,awaitDrain 不能清零,Readable 卡住的情況
 // 詳情見 https://github.com/nodejs/node/issues/7278
 var increasedAwaitDrain = false;
 function ondata(chunk) {
  debug('ondata');
  increasedAwaitDrain = false;
  var ret = dest.write(chunk);
  if (false === ret && !increasedAwaitDrain) {
   // 防止在 dest.write() 內調用 src.unpipe(dest),導致 awaitDrain 不能清零,Readable 卡住的情況
   if (((state.pipesCount === 1 && state.pipes === dest) ||
      (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
     ) && 
     !cleanedUp) {
    debug('false write response, pause', src._readableState.awaitDrain);
    src._readableState.awaitDrain++;
    increasedAwaitDrain = true;
   }
   // 進入 pause 模式
   src.pause();
  }
 }

在 ondata(chunk) 函數內,通過 dest.write(chunk) 將數據寫入 Writable

此時,在 _write() 內部可能會調用 src.push(chunk) 或使其 unpipe,這會導致 awaitDrain 多次增加,不能清零,Readable 卡住

當不能再向 Writable 寫入數據時,Readable 會進入 pause 模式,直到所有的 drain 事件觸發

觸發 drain 事件,執行 ondrain()

// lib/_stream_readable.js

 var ondrain = pipeOnDrain(src);

 function pipeOnDrain(src) {
  return function() {
   var state = src._readableState;
   debug('pipeOnDrain', state.awaitDrain);
   if (state.awaitDrain)
    state.awaitDrain--;
   // awaitDrain === 0,且有 data 監聽器
   if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
    state.flowing = true;
    flow(src);
   }
  };
 }

每個 drain 事件觸發時,都會減少 awaitDrain,直到 awaitDrain 為 0。此時,調用 flow(src),使 Readable 進入 flow 模式

到這里,整個數據傳遞循環已經建立,數據會順著循環源源不斷的流入 Writable,直到所有數據寫入完成

unpipe

不管寫入過程中是否出現錯誤,最后都會執行 unpipe()

// lib/_stream_readable.js

// ...

 function unpipe() {
  debug('unpipe');
  src.unpipe(dest);
 }

// ...

Readable.prototype.unpipe = function(dest) {
 var state = this._readableState;
 var unpipeInfo = { hasUnpiped: false };

 // 啥也沒有
 if (state.pipesCount === 0)
  return this;

 // 只有一個
 if (state.pipesCount === 1) {
  if (dest && dest !== state.pipes)
   return this;
  // 沒有指定就 unpipe 所有
  if (!dest)
   dest = state.pipes;

  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;
  if (dest)
   dest.emit('unpipe', this, unpipeInfo);
  return this;
 }

 // 沒有指定就 unpipe 所有
 if (!dest) {
  var dests = state.pipes;
  var len = state.pipesCount;
  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;

  for (var i = 0; i < len; i++)
   dests[i].emit('unpipe', this, unpipeInfo);
  return this;
 }

 // 找到指定 Writable,并 unpipe
 var index = state.pipes.indexOf(dest);
 if (index === -1)
  return this;

 state.pipes.splice(index, 1);
 state.pipesCount -= 1;
 if (state.pipesCount === 1)
  state.pipes = state.pipes[0];

 dest.emit('unpipe', this, unpipeInfo);

 return this;
};

Readable.prototype.unpipe() 函數會根據 state.pipes 屬性和 dest 參數,選擇執行策略。最后會觸發 dest 的 unpipe 事件

unpipe 事件觸發后,調用 onunpipe(),清理相關數據

// lib/_stream_readable.js

 function onunpipe(readable, unpipeInfo) {
  debug('onunpipe');
  if (readable === src) {
   if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
    unpipeInfo.hasUnpiped = true;
    // 清理相關數據
    cleanup();
   }
  }
 }

End

在整個 pipe 的過程中,Readable 是主動方 ( 負責整個 pipe 過程:包括數據傳遞、unpipe 與異常處理 ),Writable 是被動方 ( 只需要觸發 drain 事件 )

總結一下 pipe 的過程:

  • 首先執行 readbable.pipe(writable),將 readable 與 writable 對接上
  • 當 readable 中有數據時,readable.emit('data'),將數據寫入 writable
  • 如果 writable.write(chunk) 返回 false,則進入 pause 模式,等待 drain 事件觸發
  • drain 事件全部觸發后,再次進入 flow 模式,寫入數據
  • 不管數據寫入完成或發生中斷,最后都會調用 unpipe()
  • unpipe() 調用 Readable.prototype.unpipe(),觸發 dest 的 unpipe 事件,清理相關數據

參考:

https://github.com/nodejs/node/blob/master/lib/_stream_readable.js

https://github.com/nodejs/node/blob/master/lib/_stream_writable.js

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

文成县| 鲜城| 潜山县| 藁城市| 三河市| 金塔县| 汽车| 靖远县| 手机| 清远市| 杭锦旗| 黔江区| 滕州市| 特克斯县| 建瓯市| 含山县| 湛江市| 丰原市| 高邮市| 峨眉山市| 扎兰屯市| 休宁县| 桦甸市| 重庆市| 大厂| 蓬溪县| 都江堰市| 海南省| 阳新县| 南召县| 峡江县| 安义县| 花莲县| 金平| 钟山县| 禹州市| 同江市| 彰化县| 安陆市| 威宁| 乐清市|