中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Node.js中如何解決“背壓”問題

發布時間:2021-09-13 11:07:55 來源:億速云 閱讀:195 作者:柒染 欄目:web開發

Node.js中如何解決“背壓”問題,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

Node.js的4種stream是什么?什么是背壓問題?把一個東西從 A 搬到 B 該怎么搬呢?

抬起來,移動到目的地,放下不就行了么。

那如果這個東西有一噸重呢?

那就一部分一部分的搬。

其實 IO 也就是搬東西,包括網絡的 IO、文件的 IO,如果數據量少,那么直接傳送全部內容就行了,但如果內容特別多,一次性加載到內存會崩潰,而且速度也慢,這時候就可以一部分一部分的處理,這就是流的思想。

各種語言基本都實現了 stream 的 api,Node.js 也是,stream api 是比較常用的

流的直觀感受

從一個地方流到另一個地方,顯然有流出的一方和流入的一方,流出的一方就是可讀流(readable),而流入的一方就是可寫流(writable)。

Node.js中如何解決“背壓”問題

當然,也有的流既可以流入又可以流出,這種叫做雙工流(duplex)

Node.js中如何解決“背壓”問題

既然可以流入又可以流出,那么是不是可以對流入的內容做下轉換再流出呢,這種流叫做轉換流(transform)

Node.js中如何解決“背壓”問題

duplex 流的流入和流出內容不需要相關,而 transform 流的流入和流出是相關的,這是兩者的區別。

流的 api

Node.js 提供的 stream 就是上面介紹的那 4 種:

const stream = require('stream');

// 可讀流
const Readable = stream.Readable;
// 可寫流
const Writable = stream.Writable;
// 雙工流
const Duplex = stream.Duplex;
// 轉換流
const Transform = stream.Transform;

它們都有要實現的方法:

  • Readable 需要實現 _read 方法來返回內容

  • Writable 需要實現 _write 方法來接受內容

  • Duplex 需要實現 _read 和 _write 方法來接受和返回內容

  • Transform 需要實現 _transform 方法來把接受的內容轉換之后返回

我們分別來看一下:

Readable

Readable 要實現 _read 方法,通過 push 返回具體的數據。

const Stream = require('stream');
const readableStream = Stream.Readable();
readableStream._read = function() {
    this.push('阿門阿前一棵葡萄樹,');
    this.push('阿東阿東綠的剛發芽,');
    this.push('阿東背著那重重的的殼呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}
readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});

當 push 一個 null 時,就代表結束流。

執行效果如下:

Node.js中如何解決“背壓”問題

創建 Readable 也可以通過繼承的方式:

const Stream = require('stream');

class ReadableDong extends Stream.Readable {
    constructor() {
        super();
    }
    _read() {
        this.push('阿門阿前一棵葡萄樹,');
        this.push('阿東阿東綠的剛發芽,');
        this.push('阿東背著那重重的的殼呀,');
        this.push('一步一步地往上爬。')
        this.push(null);
    }
}

const readableStream = new ReadableDong();

readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});

可讀流是生成內容的,那么很自然可以和生成器結合:

const Stream = require('stream');

class ReadableDong extends Stream.Readable {
    constructor(iterator) {
        super();
        this.iterator = iterator;
    }
    _read() {
        const next = this.iterator.next();
        if(next.done) {
            return this.push(null);
        } else {
            this.push(next.value)
        }
    }
}
function *songGenerator() {
    yield '阿門阿前一棵葡萄樹,';
    yield '阿東阿東綠的剛發芽,';
    yield '阿東背著那重重的的殼呀,';
    yield '一步一步地往上爬。';
}
const songIterator = songGenerator();
const readableStream = new ReadableDong(songIterator);
readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});

這就是可讀流,通過實現 _read 方法來返回內容。

Writable

Writable 要實現 _write 方法,接收寫入的內容。

const Stream = require('stream');
const writableStream = Stream.Writable();
writableStream._write = function (data, enc, next) {
   console.log(data.toString());
   // 每秒寫一次
   setTimeout(() => {
       next();
   }, 1000);
}
writableStream.on('finish', () => console.log('done~'));
writableStream.write('阿門阿前一棵葡萄樹,');
writableStream.write('阿東阿東綠的剛發芽,');
writableStream.write('阿東背著那重重的的殼呀,');
writableStream.write('一步一步地往上爬。');
writableStream.end();

接收寫入的內容,打印出來,并且調用 next 來處理下一個寫入的內容,這里調用 next 是異步的,可以控制頻率。

跑了一下,確實可以正常的處理寫入的內容:

Node.js中如何解決“背壓”問題

這就是可寫流,通過實現 _write 方法來處理寫入的內容。

Duplex

Duplex 是可讀可寫,同時實現 _read 和 _write 就可以了

const Stream = require('stream');
var duplexStream = Stream.Duplex();
duplexStream._read = function () {
    this.push('阿門阿前一棵葡萄樹,');
    this.push('阿東阿東綠的剛發芽,');
    this.push('阿東背著那重重的的殼呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}
duplexStream._write = function (data, enc, next) {
    console.log(data.toString());
    next();
}
duplexStream.on('data', data => console.log(data.toString()));
duplexStream.on('end', data => console.log('read done~'));
duplexStream.write('阿門阿前一棵葡萄樹,');
duplexStream.write('阿東阿東綠的剛發芽,');
duplexStream.write('阿東背著那重重的的殼呀,');
duplexStream.write('一步一步地往上爬。');
duplexStream.end();
duplexStream.on('finish', data => console.log('write done~'));

整合了 Readable 流和 Writable 流的功能,這就是雙工流 Duplex。

Node.js中如何解決“背壓”問題

Transform

Duplex 流雖然可讀可寫,但是兩者之間沒啥關聯,而有的時候需要對流入的內容做轉換之后流出,這時候就需要轉換流 Transform。

Transform 流要實現 _transform 的 api,我們實現下對內容做反轉的轉換流:

const Stream = require('stream');
class TransformReverse extends Stream.Transform {
  constructor() {
    super()
  }
  _transform(buf, enc, next) {
    const res = buf.toString().split('').reverse().join('');
    this.push(res)
    next()
  }
}
var transformStream = new TransformReverse();
transformStream.on('data', data => console.log(data.toString()))
transformStream.on('end', data => console.log('read done~'));
transformStream.write('阿門阿前一棵葡萄樹');
transformStream.write('阿東阿東綠的剛發芽');
transformStream.write('阿東背著那重重的的殼呀');
transformStream.write('一步一步地往上爬');
transformStream.end()
transformStream.on('finish', data => console.log('write done~'));

跑了一下,效果如下:

Node.js中如何解決“背壓”問題

流的暫停和流動

我們從 Readable 流中獲取內容,然后流入 Writable 流,兩邊分別做 _read 和 _write 的實現,就實現了流動。

Node.js中如何解決“背壓”問題

背壓

但是 read 和 write 都是異步的,如果兩者速率不一致呢?

如果 Readable 讀入數據的速率大于 Writable 寫入速度的速率,這樣就會積累一些數據在緩沖區,如果緩沖的數據過多,就會爆掉,會丟失數據。

而如果 Readable 讀入數據的速率小于 Writable 寫入速度的速率呢?那沒關系,最多就是中間有段空閑時期。

這種讀入速率大于寫入速率的現象叫做“背壓”,或者“負壓”。也很好理解,寫入段壓力比較大,寫不進去了,會爆緩沖區,導致數據丟失。

這個緩沖區大小可以通過 readableHighWaterMark 和 writableHightWaterMark 來查看,是 16k。

Node.js中如何解決“背壓”問題

解決背壓

怎么解決這種讀寫速率不一致的問題呢?

當沒寫完的時候,暫停讀就行了。這樣就不會讀入的數據越來越多,駐留在緩沖區。

readable stream 有個 readableFlowing 的屬性,代表是否自動讀入數據,默認為 true,也就是自動讀入數據,然后監聽 data 事件就可以拿到了。

當 readableFlowing 設置為 false 就不會自動讀了,需要手動通過 read 來讀入。

readableStream.readableFlowing = false;

let data;
while((data = readableStream.read()) != null) {
    console.log(data.toString());
}

但自己手動 read 比較麻煩,我們依然可以用自動流入的方式,調用 pause 和 resume 來暫停和恢復就行了。

當調用 writable stream 的 write 方法的時候會返回一個 boolean 值代表是寫入了目標還是放在了緩沖區:

  • true: 數據已經寫入目標

  • false:目標不可寫入,暫時放在緩沖區

我們可以判斷返回 false 的時候就 pause,然后等緩沖區清空了就 resume:

const rs = fs.createReadStream(src);
const ws = fs.createWriteStream(dst);
rs.on('data', function (chunk) {
    if (ws.write(chunk) === false) {
        rs.pause();
    }
});
rs.on('end', function () {
    ws.end();
});
ws.on('drain', function () {
    rs.resume();
});

這樣就能達到根據寫入速率暫停和恢復讀入速率的功能,解決了背壓問題。

pipe 有背壓問題么?

平時我們經常會用 pipe 來直接把 Readable 流對接到 Writable 流,但是好像也沒遇到過背壓問題,其實是 pipe 內部已經做了讀入速率的動態調節了。

const rs = fs.createReadStream(src);
const ws = fs.createWriteStream(dst);

rs.pipe(ws);

總結

流是傳輸數據時常見的思想,就是一部分一部分的傳輸內容,是文件讀寫、網絡通信的基礎概念。

Node.js 也提供了 stream 的 api,包括 Readable 可讀流、Writable 可寫流、Duplex 雙工流、Transform 轉換流。它們分別實現 _read、_write、_read + _write、_transform 方法,來做數據的返回和處理。

創建 Readable 對象既可以直接調用 Readable api 創建,然后重寫 _read 方法,也可以繼承 Readable 實現一個子類,之后實例化。其他流同理。(Readable 可以很容易的和 generator 結合)

當讀入的速率大于寫入速率的時候就會出現“背壓”現象,會爆緩沖區導致數據丟失,解決的方式是根據 write 的速率來動態 pause 和 resume 可讀流的速率。pipe 就沒有這個問題,因為內部做了處理。

流是掌握 IO 繞不過去的一個概念,而背壓問題也是流很常見的問題,遇到了數據丟失可以考慮是否發生了背壓。希望這篇文章能夠幫大家理清思路,真正掌握 stream!

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

乌拉特前旗| 呼伦贝尔市| 汾西县| 丰顺县| 大城县| 衢州市| 陈巴尔虎旗| 齐齐哈尔市| 丰台区| 昌都县| 阳山县| 集贤县| 德惠市| 石嘴山市| 荃湾区| 绥芬河市| 芜湖县| 客服| 太和县| 哈密市| 德庆县| 南郑县| 平度市| 富民县| 临沧市| 安庆市| 当涂县| 琼海市| 大足县| 高陵县| 高台县| 新源县| 孝感市| 镇雄县| 玉环县| 抚宁县| 志丹县| 资溪县| 满城县| 兴安盟| 凤翔县|