中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Node Stream中運行機制的示例分析

發布時間:2021-01-30 13:42:11 來源:億速云 閱讀:164 作者:小新 欄目:web開發

這篇文章將為大家詳細講解有關Node Stream中運行機制的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

流是什么?

你可以把流理解成一種傳輸的能力。通過流,可以以平緩的方式,無副作用的將數據傳輸到目的地。在Node中,Node Stream創建的流都是專用于String和Buffer上的,一般情況下使用Buffer。Stream表示的是一種傳輸能力,Buffer是傳輸內容的載體 (可以這樣理解,Stream:外賣小哥哥, Buffer:你的外賣)。創建流的時候將ObjectMode設置true ,Stream同樣可以傳輸任意類型的JS對象(除了null,null在流中有特殊用途)。

為什么要使用流?

現在有個需求,我們要向客戶端傳輸一個大文件。如果采用下面的方式

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
    res.end(data);
  });
});

server.listen(8000);

每次接收一個請求,就要把這個大文件讀入內存,然后再傳輸給客戶端。通過這種方式可能會產生以下三種后果:

  • 內存耗盡

  • 拖慢其他進程

  • 增加垃圾回收器的負載

所以這種方式在傳輸大文件的情況下,不是一個好的方案。并發量一大,幾百個請求過來很容易就將內存耗盡。

如果采用流呢?

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

采用這種方式,不會占用太多內存,讀取一點就傳輸一點,整個過程平緩進行,非常優雅。如果想在傳輸的過程中,想對文件進行處理,比如壓縮、加密等等,也很好擴展(后面會具體介紹)。

流在Node中無處不在。從下圖中可以看出:

Node Stream中運行機制的示例分析

Stream分類

Stream分為四大類:

  • Readable(可讀流)

  • Writable (可寫流)

  • Duplex (雙工流)

  • Transform (轉換流)

Readable

可讀流中的數據,在以下兩種模式下都能產生數據。

  • Flowing Mode

  • Non-Flowing Mode

兩種模式下,觸發的方式以及消耗的方式不一樣。

Flowing Mode:數據會源源不斷地生產出來,形成“流動”現象。監聽流的data事件便可進入該模式。

Non-Flowing Mode下:需要顯示地調用read()方法,才能獲取數據。

兩種模式可以互相轉換

Node Stream中運行機制的示例分析

流的初始狀態是Null,通過監聽data事件,或者pipe方法,調用resume方法,將流轉為Flowing Mode狀態。Flowing Mode狀態下調用pause方法,將流置為Non-Flowing Mode狀態。Non-Flowing Mode狀態下調用resume方法,同樣可以將流置為Flowing Mode狀態。

下面詳細介紹下兩種模式下,Readable流的運行機制。

Flowing Mode

在Flowing Mode狀態下,創建的myReadable讀流,直接監聽data事件,數據就源源不斷的流出來進行消費了。

myReadable.on('data',function(chunk){
      consume(chunk);//消費流
})

一旦監聽data事件之后,Readable內部的流程如下圖所示

Node Stream中運行機制的示例分析

核心的方法是流內部的read方法,它在參數n為不同值時,分別觸發不同的操作。下面描述中的hightwatermark表示的是流內部的緩沖池的大小。

  • n=undefined(消費數據,并觸發一次可讀流)

  • n=0(觸發一次可讀流,但是不會消費)

  • n>hightwatermark(修改hightwatermark的值)

  • n<buffer的總數據數(直接返回n個字節的數據)

  • n>buffer (可以返回null,也可以返回buffer所有的數據(當時最后一次讀取))

圖中黃色標識的_read(),是用戶實現流所需要自己實現的方法,這個方法就是實際讀取流的方式(可以這樣理解,外賣平臺給你提供外賣的能力,那_read()方法就相當于你下單點外賣)。后面會詳細介紹如何實現_read方法。

以上的流程可以描述為:監聽data方法,Readable內部就會調用read方法,來進行觸發讀流操作,通過判斷是同步還是異步讀取,來決定讀取的數據是否放入緩沖區。如果為異步的,那么就要調用flow方法,來繼續觸發read方法,來讀取流,同時根據size參數判定是否emit('data')來消費流,循環讀取。如果是同步的,那就emit('data')來消費流,同時繼續觸發read方法,來讀取流。一旦push方法傳入的是null,整個流就結束了。

從使用者的角度來看,在這種模式下,你可以通過下面的方式來使用流

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.on('data',function(chunk){
      writeFile1.write(chunk);
})
Non-Flowing Mode

相對于Flowing mode,Non-Flowing Mode要相對簡單很多。

消費該模式下的流,需要使用下面的方式

myReadable.on(‘readable’,function(){
     const chunk = myReadable.read()
     consume(chunk);//消費流
})

在Non-Flowing Mode下,Readable內部的流程如下圖:

Node Stream中運行機制的示例分析

從這個圖上看出,你要實現該模式的讀流,同樣要實現一個_read方法。

整個流程如下:監聽readable方法,Readable內部就會調用read方法。調用用戶實現的_read方法,來push數據到緩沖池,然后發送emit readable事件,通知用戶端消費。

從使用者的角度來看,你可以通過下面的方式來使用該模式下的流

const fs = require('fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');

readFile.on('readable',function(chunk) {
    while (null !== (chunk = myReadable.read())) {
        writeFile.write(chunk);
    }
});
Writable

相對于讀流,寫流的機制就更容易理解了。

寫流使用下面的方式進行數據寫入

myWrite.write(chunk);

調用write后,內部Writable的流程如下圖所示

Node Stream中運行機制的示例分析

類似于讀流,實現一個寫流,同樣需要用戶實現一個_write方法。

整個流程是這樣的:調用write之后,會首先判定是否要寫入緩沖區。如果不需要,那就調用用戶實現的_write方法,將流寫入到相應的地方,_write會調用一個writeable內部的一個回調函數。

從使用者的角度來看,使用一個寫流,采用下面的代碼所示的方式。

const fs = require('fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');

readFile.on('data',function(chunk) {
    writeFile.write(chunk);
})

可以看到,使用寫流是非常簡單的。

我們先講解一下如何實現一個讀流和寫流,再來看Duplex和Transform是什么,因為了解了如何實現一個讀流和寫流,再來理解Duplex和Transform就非常簡單了。

實現自定義的Readable

實現自定義的Readable,只需要實現一個_read方法即可,需要在_read方法中調用push方法來實現數據的生產。如下面的代碼所示:

const Readable = require('stream').Readable;

class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        });
    }
}

// 模擬資源池
const dataSource = {
    data: new Array(10).fill('-'),
    makeData() {
        if (!dataSource.data.length) return null;
        return dataSource.data.pop();
    }
};

const myReadable = new MyReadable(dataSource,);

myReadable.on('readable', () => {
    let chunk;
    while (null !== (chunk = myReadable.read())) {
        console.log(chunk);
    }
});
實現自定義的writable

實現自定義的writable,只需要實現一個_write方法即可。在_write中消費chunk寫入到相應地方,并且調用callback回調。如下面代碼所示:

const Writable = require('stream').Writable;
class Mywritable extends  Writable{
    constuctor(options){
        super(options);
    }
    _write(chunk,endcoding,callback){
        console.log(chunk);
        callback && callback();
    }
}
const myWritable = new Mywritable();
Duplex

雙工流:簡單理解,就是講一個Readable流和一個Writable流綁定到一起,它既可以用來做讀流,又可以用來做寫流。

實現一個Duplex流,你需要同時實現_read_write方法。

有一點需要注意的是:它所包含的 Readable流和Writable流是完全獨立,互不影響的兩個流,兩個流使用的不是同一個緩沖區。通過下面的代碼可以驗證

// 模擬資源池1
const dataSource1 = {
    data: new Array(10).fill('a'),
    makeData() {
        if (!dataSource1.data.length) return null;
        return dataSource1.data.pop();
    }
};
// 模擬資源池2
const dataSource2 = {
    data: new Array(10).fill('b'),
    makeData() {
        if (!dataSource2.data.length) return null;
        return dataSource2.data.pop();
    }
};

const Readable = require('stream').Readable;
class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })

    }
}

const Writable = require('stream').Writable;
class MyWritable extends Writable{
    constructor(options){
        super(options);
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const Duplex = require('stream').Duplex;
class MyDuplex extends Duplex{
    constructor(dataSource,options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const myWritable = new MyWritable();
const myReadable = new MyReadable(dataSource1);
const myDuplex = new MyDuplex(dataSource1);
myReadable.pipe(myDuplex).pipe(myWritable);

打印的結果是

abababababababababab

從這個結果可以看出,myReadable.pipe(myDuplex),myDuplex充當的是寫流,寫入的內容是a;myDuplex.pipe(myWritable),myDuplex充當的是讀流,往myWritable寫的卻是b;所以說它所包含的 Readable流和Writable流是完全獨立的。

Transform

理解了Duplex,就更好理解Transform了。Transform是一個轉換流,它既有讀的功能又有寫的功能,但是它和Duplex不同的是,它的讀流和寫流共用同一個緩沖區;也就是說,通過它讀入什么,那它就能寫入什么。

實現一個Transform,你只需要實現一個_transform方法。比如最簡單的Transform:PassThrough,其源代碼如下所示

Node Stream中運行機制的示例分析

PassThrough就是一個Transform,但是這個轉換流,什么也沒做,相當于一個透明的轉換流。可以看到_transform中什么都沒有,只是簡單的將數據進行回調。

如果我們在這個環節做些擴展,只需要在_transform中直接擴展就行了。比如我們可以對流進行壓縮,加密,混淆等等操作。

BackPress

最后介紹一個流中非常重要的一個概念:背壓。要了解這個,我們首先來看下pipehighWaterMaker是什么。

pipe

首先看下下面的代碼

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.pipe(writeFile);

上面的代碼和下面是等價的

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.on('data',function(data){
    var flag = ws.write(data);
    if(!flag){ // 當前寫流緩沖區已滿,暫停讀數據
        readFile.pause();
    }
})
writeFile.on('drain',function()){
    readFile.resume();// 當前寫流緩沖區已清空,重新開始讀流
}
readFile.on('end',function(data){
    writeFile.end();//將寫流緩沖區的數據全部寫入,并且關閉寫入的文件
})

pipe所做的操作就是相當于為寫流和讀流自動做了速度的匹配。

讀寫流速度不匹配的情況下,一般情況下不會造成什么問題,但是會造成內存增加。內存消耗增加,就有可能會帶來一系列的問題。所以在使用的流的時候,強烈推薦使用pipe

highWaterMaker

highWaterMaker說白了,就是定義緩沖區的大小。

  • 默認16Kb(Readable最大8M)

  • 可以自定義

背壓的概念可以理解為:為了防止讀寫流速度不匹配而產生的一種調整機制;背壓該調整機制的觸發時機,受限于highWaterMaker設置的大小。

如上面的代碼 var flag = ws.write(data);,一旦寫流的緩沖區滿了,那flag就會置為false,反向促進讀流的速度調整。

Stream的應用場景

主要有以下場景

  1. 文件操作(復制,壓縮,解壓,加密等)

下面的就很容易就實現了文件復制的功能。

const fs = require('fs');
const readFile = fs.createReadStream('big.file');
const writeFile = fs.createWriteStream('big_copy.file');
readFile.pipe(writeFile);

那我們想在復制的過程中對文件進行壓縮呢?

const fs = require('fs');
const readFile = fs.createReadStream('big.file');
const writeFile = fs.createWriteStream('big.gz');
const zlib = require('zlib');
readFile.pipe(zlib.createGzip()).pipe(writeFile);

實現解壓、加密也是類似的。

  1. 靜態文件服務器

比如需要返回一個html,可以使用如下代碼。

var http = require('http');
var fs = require('fs');
http.createServer(function(req,res){
    fs.createReadStream('./a.html').pipe(res);
}).listen(8000);

關于“Node Stream中運行機制的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宜都市| 洛宁县| 古田县| 富民县| 嘉义县| 民乐县| 南丰县| 喀什市| 建湖县| 靖远县| 吴堡县| 屯留县| 浮梁县| 济宁市| 汽车| 滁州市| 天等县| 宜川县| 山东省| 曲水县| 敖汉旗| 赞皇县| 阳城县| 扎囊县| 淮阳县| 奈曼旗| 石棉县| 信阳市| 广灵县| 蒲江县| 望都县| 会同县| 杭锦旗| 淮滨县| 荃湾区| 巨野县| 高雄县| 蛟河市| 甘孜县| 沁源县| 滨海县|