中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Nodejs中的可讀流的作用和實現方法

發布時間:2021-06-17 10:39:48 來源:億速云 閱讀:366 作者:chen 欄目:web開發

這篇文章主要介紹“Nodejs中的可讀流的作用和實現方法”,在日常操作中,相信很多人在Nodejs中的可讀流的作用和實現方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Nodejs中的可讀流的作用和實現方法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

stream的概念

流(stream)是 Node.js 中處理流式數據的抽象接口。 stream 模塊用于構建實現了流接口的對象。【推薦學習:《nodejs 教程》】

stream的作用

讀寫大文件的過程中,不會一次性的讀寫到內存中。可以控制每次讀寫的個數

stream的分類

1、可讀流-Readable

例:fs.createReadStream;

源碼位置:lib/_stream_readable.js

2、可寫流-Writable

例:fs.createWriteStream;

源碼位置:lib/_stream_writable.js

3、雙工流-Duplex:滿足讀寫的功能

例:net.Socket();

源碼位置:lib/_stream_duplex.js

4、轉化流-Transform:用途:壓縮,轉碼

例:

const { Transform } = require('stream');
Transform.call(this, '要轉換的數據');//具體的使用詳情 見node官網

-源碼位置:lib/_stream_tranform.js

可讀流讀取文件的過程

  • 讀取文件代碼過程

const path = require("path");
const aPath = path.join(__dirname, "a.txt");//需要讀取的文件
const fs = require("fs");
let rs = fs.createReadStream(aPath, {
  flags: "r",
  encoding: null,//默認編碼格式是buffer,深挖buffer又要學習字符編碼,留個坑 到時候寫一個編碼規范的學習整理
  autoClose: true,//相當于需要調用close方法,如果為false  文件讀取end的時候 就不會執行 close
  start: 0,
  highWaterMark: 3,//每次讀取的個數 默認是64*1024個字節
});

rs.on("open", function (fd) {
  // fd  number類型
  console.log("fd", fd);
});
// 他會監聽用戶,綁定了data事件,就會觸發對應的回調,不停的觸發
rs.on("data", function (chunk) {
//這里會打印的是ascII 值 ,所以可以toString查看詳情自己看得懂的樣子
  console.log({ chunk }, "chunk.toString", chunk.toString()); 
  //如果想每一段事件 讀一點 可以用rs.pause() 做暫停,然后計時器 里rs.resume()再次觸發data事件
  rs.pause();//暫停讀取
});
rs.on("close", function () {
  //當文件讀取完畢后 會 觸發 end事件
  console.log("close");
});
setInterval(() => {
  rs.resume(); //再次觸發data,直到讀完數據為止
}, 1000);
  • 題外話:想說下 文件流和普通可讀流的區別

1、open 和close是文件流獨有,支持open和close便是文件流

2、可讀流都具備 (on('data'),on('end'),on('error'),resume,pause;所以只要支持這些方法就是可讀流

可寫流寫入文件的過程

  • 寫入文件代碼過程

const fs = require("fs");
const path = require("path");
const bPath = path.join(__dirname, "b.txt");
let ws = fs.createWriteStream(bPath, {
//參數和可讀流的類似
  flags: "w",
  encoding: "utf-8",
  autoClose: true,
  start: 0,
  highWaterMark: 3,
});
ws.on("open", function (fd) {
  console.log("open", fd);
});
ws.on("close", function () {
  console.log("close");
});

//write的參數string 或者buffer,ws.write 還有一個boolea的返回值表示是真實寫入文件還是放入緩存中
ws.write("1");
let flag = ws.write("1");
console.log({ flag });//true
flag = ws.write("1");
console.log({ flag });//true
flag = ws.write("1");
console.log({ flag });//false

雙工流的寫入和讀取過程

  • 寫一個本地服務 做例子

1、server(服務器代碼)實現

const net = require("net"); //net 模塊是 node自己封裝的tcp層
//socket 就是雙工流 能讀能寫  http源碼就是用net模塊寫的 基于tcp
const server = net.createServer(function (socket) {
  socket.on("data", function (data) {//監聽客戶端發來的消息
    console.log(data.toString)
    socket.write("server:hello");//寫入server:hello
  });
  socket.on("end", function () {
    console.log("客戶端關閉");
  });
});
server.on("err", function (err) {
  console.log(err);
});
server.listen(8080);//服務端監聽8080端口

2、client(客戶端) 實現

const net = require("net"); //net 模塊是 node自己封裝的tcp層
const socket = new net.Socket(); //
socket.connect(8080, "localhost"); //  表示鏈接服務器本地8080端口
socket.on("connect", function (data) {
  //和服務器建立鏈接后
  socket.write("connect server");
});
socket.on("data", function (data) {
  //監聽數據,讀取服務器傳來的數據
  console.log(data.toString());
  socket.destroy()
});
socket.write('ok')
socket.on("error", function (err) {
  console.log(err);
});

3.題外話 如果想看tcp的三次握手和四次揮手 可以 通過我上述代碼 用wireshark(一個抓包工具)看實際過程

轉化流 transform過程

轉化流是雙工流的一種, 允許實現輸入,并在對數據執行某些操作后返回輸出,兩者有依賴關系

  • 代碼過程(這個例子我的參考來處)

const stream = require('stream')
let c = 0;
const readable = stream.Readable({
  highWaterMark: 2,
  read: function () {
    let data = c < 26 ? Number(c++ + 97) : null;
    console.log('push', data);
    this.push( String.fromCharCode(data));
}
})

const transform = stream.Transform({
  highWaterMark: 2,
  transform: function (buf, enc, next) {
    console.log('transform', buf.toString());
    next(null, buf);
  }
})

readable.pipe(transform);
  • 打印結果

Nodejs中的可讀流的作用和實現方法

可讀流的實現

跟著斷點先了解 可讀流的調用過程

就前面可讀流文件的讀取過程的代碼為例子 打斷點

rs.on('open')

rs.on('open')為斷點入口進入

Nodejs中的可讀流的作用和實現方法

1、通過Stream.prototype.on.call 繼承Stream類

源文件位置:no dlib/_stream_readable.js(我是通過斷點點到這里 直接找,我也沒找到)

Nodejs中的可讀流的作用和實現方法

  • 再點進去 發現 Stream 是EventEmitter的子類 那么 可讀流也可以支持發布訂閱

Nodejs中的可讀流的作用和實現方法

2、監聽的事件類型是否是data和readable任意一個 不是 繼續 下一個事件的監聽

Nodejs中的可讀流的作用和實現方法

rs.on('data')

Nodejs中的可讀流的作用和實現方法

  • data的部分做兩件事

    1、判斷flowing(默認值是null)不為false 就自動resume方法執行繼續 文件讀取(這里我的案例是rs.pause();手動將flowing 值為false了所以不會繼續調用)

    2、那如果我沒有調用rs.pause() 會繼續調用resume 看看resume里做了什么

Nodejs中的可讀流的作用和實現方法

2.1 最終調用了 stream.read()繼續讀取文件;直到文件讀取結束依次去emit end 和close事件

小結:所以data默認是會不斷的讀取文件直到文件讀取完畢 ,如果想要文件讀取變可控可以和我一樣用rs.pause()

自己實現

實現思路

繼承EventEmitter發布訂閱管理我們的事件

const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {

}
module.exports = ReadStream;

數據初始化

constructor(path, options = {}) {
    super();
    //參考fs 寫實例需要用到的參數
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding - options.encoding || null;//默認編碼格式是buffer
    this.autoClose = options.autoClose || true;//相當于需要調用close方法,如果為false  文件讀取end的時候 就不會執行 close
    this.start = options.start || 0;//數據讀取的開始位置
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;//默認一次讀取64個字節的數據 
    this.offset = this.start;//fs.read的偏移量
    this.fd = undefined; //初始化fd 用于 open成功后的fd做賦值  供 read里使用
    this.flowing = false;//實現pause和resume備用,設置flag,當監聽到data事件的時候 改 flowing為true,
    this.open(); //初始化的時候就要調用open
    this.on("readStreamListener", function (type) {
      // console.log(type)//這里打印就能看到 實例上所有 通過on 綁定的事件名稱
      if (type === "data") {
      //監聽到data事件的時候 改 flowing為true
        this.flowing = true;
        this.read();
      }
    });
    }

文件讀取方法read,pause,resume,open和destroy的實現

open()
 open() {
 // 調用fs.open 讀取目標文件 
    fs.open(this.path, this.flags, (err, fd) => { 
      this.fd = fd; //賦值一個fd 供后面的 read()方式使用,文件讀取成功,fd是返回一個數字
      this.emit("open", fd);
    });
read()
 read() {
   // console.log("一開始read里的", this.fd); //但是這樣依舊拿不到 open后的fd,用 發布訂閱 通過on來獲取 綁定的事件type
    //這里要做一個容錯處理 ,因為open是異步讀取文件,read里無法馬上拿到open結果
  if (typeof this.fd !== "number") {
      //訂閱open,給綁定一個回調事件read 直到this.fd有值
      return this.once("open", () => this.read());
    }
 }
  //fd打開后 調用fs.read
  //實例上的start值是未知number,存在實際剩余的可讀的文件大小<highWaterMar的情況 ,用howMuchToRead 替換highWaterMark 去做fs.read的每次讀取buffer的大小
    let howMuchToRead = this.end
      ? Math.min(this.end - this.offset + 1, this.highWaterMark)
      : this.highWaterMark;
  //定義一個用戶 傳進來的highWaterMark 大小的buffer對象
    const buffer = Buffer.alloc(this.highWaterMark);
       //讀取文件中的內容fd給buffer 從0位置開始,每次讀取howMuchToRead個。插入數據,同時更新偏移量
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead,
      this.offset,
      (err, bytesRead) => {
        if (bytesRead) {
          // 每讀完一次,偏移量=已經讀到的數量
          this.offset += bytesRead;
          this.emit("data", buffer.slice(0, bytesRead));
          //寫到這里實例上的data 已經可以打印出數據了 但是 繼續讀取 調用this.read() 直到bytesRead不存在 說明數據讀取完畢了 走else
          //回調 this.read();時候判斷 this.flowing 是否為true
          //pause調用后this.flowing將為false
          if (this.flowing) {
            this.read();
          }
        } else {
          // 執行到這 bytesRead不存在說明  文件數據讀取完畢了已經 觸發end
          this.emit("end");//emit 實例上綁定的end事件
          //destroy 還沒寫到 稍等 馬上后面就實現...
          this.destroy();
        }
      }
    );
resume()

文件讀取不去data事件,會觸發對應的回調,不停的觸發 所以想要變可控可以手動調用 resume()& pause()

  • pause的實現,調用的時候設置 this.flowing=false,打斷 read()

  pause() {
    this.flowing = false;
  }
pause()
  • pause 打斷 read()多次讀取,可以使用resume 打開 this.flowing=true 并調用read

resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }
destroy()
  • 文件open不成功時候拋錯時調用

  • 文件讀取完畢后&&this.autoClose===true ,read()里文件讀取end的時候 就執行close

  destroy(err) {
    if (err) {
      this.emit("error");
    }
    // 把close放destroy里 并 在read里調用
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
完整代碼
  • 實現代碼

/**
 *實現簡單的可讀流
 */

const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    //參考fs 寫實例需要用到的參數
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding - options.encoding || null;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.fd = undefined;
    this.offset = this.start;
    this.flowing = false;
    this.open(); 
    this.on("newListener", function (type) {
      if (type === "data") {
        this.flowing = true;
        this.read();
      }
    });
  }
  destroy(err) {
    if (err) {
      this.emit("error");
    }
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        return this.destroy(err);
      }
      this.fd = fd;
      this.emit("open", fd);
    });
  }
  resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }
  pause() {
    this.flowing = false;
  }

  read() {
    if (typeof this.fd !== "number") {
      return this.once("open", () => this.read());
    }
    let howMuchToRead = this.end
      ? Math.min(this.end - this.offset + 1, this.highWaterMark)
      : this.highWaterMark;
    const buffer = Buffer.alloc(this.highWaterMark);
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead,
      this.offset,
      (err, bytesRead) => {
        if (bytesRead) {
          this.offset += bytesRead;
          this.emit("data", buffer.slice(0, bytesRead));
          if (this.flowing) {
            this.read();
          }
        } else {
          this.emit("end");
          this.destroy();
        }
      }
    );
  }
}

module.exports = ReadStream;
  • 調用代碼

const ReadStream = require("./initReadStream");
let rs = new ReadStream(aPath, {
  flags: "r",
  encoding: null, //默認編碼格式是buffer
  autoClose: true, //相當于需要調用close方法,如果為false  文件讀取end的時候 就不會執行 close
  start: 0,
  highWaterMark: 3, //每次讀取的個數 默認是64*1024個字節
});

到此,關于“Nodejs中的可讀流的作用和實現方法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

进贤县| 兴仁县| 通江县| 恩施市| 游戏| 德安县| 晴隆县| 浠水县| 吉水县| 宜都市| 嘉荫县| 棋牌| 南雄市| 大安市| 深圳市| 阳曲县| 逊克县| 河北区| 黄石市| 普陀区| 资中县| 读书| 大余县| 临武县| 西城区| 宝清县| 海安县| 奉节县| 大连市| 瓦房店市| 新泰市| 通江县| 大方县| 东乡| 巴楚县| 酒泉市| 若羌县| 蓝田县| 会东县| 鱼台县| 巩义市|