您好,登錄后才能下訂單哦!
這篇文章主要介紹“Nodejs中的可讀流的作用和實現方法”,在日常操作中,相信很多人在Nodejs中的可讀流的作用和實現方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Nodejs中的可讀流的作用和實現方法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
流(stream)是 Node.js 中處理流式數據的抽象接口。 stream 模塊用于構建實現了流接口的對象。【推薦學習:《nodejs 教程》】
讀寫大文件的過程中,不會一次性的讀寫到內存中。可以控制每次讀寫的個數
1、可讀流-Readable
例:fs.createReadStream;
源碼位置:lib/_stream_readable.js
2、可寫流-Writable
例:fs.createWriteStream;
源碼位置:lib/_stream_writable.js
3、雙工流-Duplex:滿足讀寫的功能
例:net.Socket();
源碼位置:lib/_stream_duplex.js
4、轉化流-Transform:用途:壓縮,轉碼
例:
const { Transform } = require('stream'); Transform.call(this, '要轉換的數據');//具體的使用詳情 見node官網
-源碼位置:lib/_stream_tranform.js
讀取文件代碼過程
const path = require("path"); const aPath = path.join(__dirname, "a.txt");//需要讀取的文件 const fs = require("fs"); let rs = fs.createReadStream(aPath, { flags: "r", encoding: null,//默認編碼格式是buffer,深挖buffer又要學習字符編碼,留個坑 到時候寫一個編碼規范的學習整理 autoClose: true,//相當于需要調用close方法,如果為false 文件讀取end的時候 就不會執行 close start: 0, highWaterMark: 3,//每次讀取的個數 默認是64*1024個字節 }); rs.on("open", function (fd) { // fd number類型 console.log("fd", fd); }); // 他會監聽用戶,綁定了data事件,就會觸發對應的回調,不停的觸發 rs.on("data", function (chunk) { //這里會打印的是ascII 值 ,所以可以toString查看詳情自己看得懂的樣子 console.log({ chunk }, "chunk.toString", chunk.toString()); //如果想每一段事件 讀一點 可以用rs.pause() 做暫停,然后計時器 里rs.resume()再次觸發data事件 rs.pause();//暫停讀取 }); rs.on("close", function () { //當文件讀取完畢后 會 觸發 end事件 console.log("close"); }); setInterval(() => { rs.resume(); //再次觸發data,直到讀完數據為止 }, 1000);
題外話:想說下 文件流和普通可讀流的區別
1、open 和close是文件流獨有,支持open和close便是文件流
2、可讀流都具備 (on('data'),on('end'),on('error'),resume,pause;所以只要支持這些方法就是可讀流
寫入文件代碼過程
const fs = require("fs"); const path = require("path"); const bPath = path.join(__dirname, "b.txt"); let ws = fs.createWriteStream(bPath, { //參數和可讀流的類似 flags: "w", encoding: "utf-8", autoClose: true, start: 0, highWaterMark: 3, }); ws.on("open", function (fd) { console.log("open", fd); }); ws.on("close", function () { console.log("close"); }); //write的參數string 或者buffer,ws.write 還有一個boolea的返回值表示是真實寫入文件還是放入緩存中 ws.write("1"); let flag = ws.write("1"); console.log({ flag });//true flag = ws.write("1"); console.log({ flag });//true flag = ws.write("1"); console.log({ flag });//false
寫一個本地服務 做例子
1、server(服務器代碼)實現
const net = require("net"); //net 模塊是 node自己封裝的tcp層 //socket 就是雙工流 能讀能寫 http源碼就是用net模塊寫的 基于tcp const server = net.createServer(function (socket) { socket.on("data", function (data) {//監聽客戶端發來的消息 console.log(data.toString) socket.write("server:hello");//寫入server:hello }); socket.on("end", function () { console.log("客戶端關閉"); }); }); server.on("err", function (err) { console.log(err); }); server.listen(8080);//服務端監聽8080端口
2、client(客戶端) 實現
const net = require("net"); //net 模塊是 node自己封裝的tcp層 const socket = new net.Socket(); // socket.connect(8080, "localhost"); // 表示鏈接服務器本地8080端口 socket.on("connect", function (data) { //和服務器建立鏈接后 socket.write("connect server"); }); socket.on("data", function (data) { //監聽數據,讀取服務器傳來的數據 console.log(data.toString()); socket.destroy() }); socket.write('ok') socket.on("error", function (err) { console.log(err); });
3.題外話 如果想看tcp的三次握手和四次揮手 可以 通過我上述代碼 用wireshark(一個抓包工具)看實際過程
轉化流是雙工流的一種, 允許實現輸入,并在對數據執行某些操作后返回輸出,兩者有依賴關系
代碼過程(這個例子我的參考來處)
const stream = require('stream') let c = 0; const readable = stream.Readable({ highWaterMark: 2, read: function () { let data = c < 26 ? Number(c++ + 97) : null; console.log('push', data); this.push( String.fromCharCode(data)); } }) const transform = stream.Transform({ highWaterMark: 2, transform: function (buf, enc, next) { console.log('transform', buf.toString()); next(null, buf); } }) readable.pipe(transform);
打印結果
跟著斷點先了解 可讀流的調用過程
就前面可讀流文件的讀取過程的代碼為例子 打斷點
rs.on('open')
rs.on('open')為斷點入口進入
1、通過Stream.prototype.on.call 繼承Stream類
源文件位置:no dlib/_stream_readable.js(我是通過斷點點到這里 直接找,我也沒找到)
再點進去 發現 Stream 是EventEmitter的子類 那么 可讀流也可以支持發布訂閱
2、監聽的事件類型是否是data和readable任意一個 不是 繼續 下一個事件的監聽
rs.on('data')
data的部分做兩件事
1、判斷flowing(默認值是null)不為false 就自動resume方法執行繼續 文件讀取(這里我的案例是rs.pause();手動將flowing 值為false了所以不會繼續調用)
2、那如果我沒有調用rs.pause() 會繼續調用resume 看看resume里做了什么
2.1 最終調用了 stream.read()繼續讀取文件;直到文件讀取結束依次去emit end 和close事件
小結:所以data默認是會不斷的讀取文件直到文件讀取完畢 ,如果想要文件讀取變可控可以和我一樣用rs.pause()
自己實現
實現思路
const fs = require("fs"); const EventEmitter = require("events"); class ReadStream extends EventEmitter { } module.exports = ReadStream;
constructor(path, options = {}) { super(); //參考fs 寫實例需要用到的參數 this.path = path; this.flags = options.flags || "r"; this.encoding - options.encoding || null;//默認編碼格式是buffer this.autoClose = options.autoClose || true;//相當于需要調用close方法,如果為false 文件讀取end的時候 就不會執行 close this.start = options.start || 0;//數據讀取的開始位置 this.end = options.end; this.highWaterMark = options.highWaterMark || 64 * 1024;//默認一次讀取64個字節的數據 this.offset = this.start;//fs.read的偏移量 this.fd = undefined; //初始化fd 用于 open成功后的fd做賦值 供 read里使用 this.flowing = false;//實現pause和resume備用,設置flag,當監聽到data事件的時候 改 flowing為true, this.open(); //初始化的時候就要調用open this.on("readStreamListener", function (type) { // console.log(type)//這里打印就能看到 實例上所有 通過on 綁定的事件名稱 if (type === "data") { //監聽到data事件的時候 改 flowing為true this.flowing = true; this.read(); } }); }
open() { // 調用fs.open 讀取目標文件 fs.open(this.path, this.flags, (err, fd) => { this.fd = fd; //賦值一個fd 供后面的 read()方式使用,文件讀取成功,fd是返回一個數字 this.emit("open", fd); });
read() { // console.log("一開始read里的", this.fd); //但是這樣依舊拿不到 open后的fd,用 發布訂閱 通過on來獲取 綁定的事件type //這里要做一個容錯處理 ,因為open是異步讀取文件,read里無法馬上拿到open結果 if (typeof this.fd !== "number") { //訂閱open,給綁定一個回調事件read 直到this.fd有值 return this.once("open", () => this.read()); } } //fd打開后 調用fs.read //實例上的start值是未知number,存在實際剩余的可讀的文件大小<highWaterMar的情況 ,用howMuchToRead 替換highWaterMark 去做fs.read的每次讀取buffer的大小 let howMuchToRead = this.end ? Math.min(this.end - this.offset + 1, this.highWaterMark) : this.highWaterMark; //定義一個用戶 傳進來的highWaterMark 大小的buffer對象 const buffer = Buffer.alloc(this.highWaterMark); //讀取文件中的內容fd給buffer 從0位置開始,每次讀取howMuchToRead個。插入數據,同時更新偏移量 fs.read( this.fd, buffer, 0, howMuchToRead, this.offset, (err, bytesRead) => { if (bytesRead) { // 每讀完一次,偏移量=已經讀到的數量 this.offset += bytesRead; this.emit("data", buffer.slice(0, bytesRead)); //寫到這里實例上的data 已經可以打印出數據了 但是 繼續讀取 調用this.read() 直到bytesRead不存在 說明數據讀取完畢了 走else //回調 this.read();時候判斷 this.flowing 是否為true //pause調用后this.flowing將為false if (this.flowing) { this.read(); } } else { // 執行到這 bytesRead不存在說明 文件數據讀取完畢了已經 觸發end this.emit("end");//emit 實例上綁定的end事件 //destroy 還沒寫到 稍等 馬上后面就實現... this.destroy(); } } );
文件讀取不去data事件,會觸發對應的回調,不停的觸發 所以想要變可控可以手動調用 resume()& pause()
pause的實現,調用的時候設置 this.flowing=false,打斷 read()
pause() { this.flowing = false; }
pause 打斷 read()多次讀取,可以使用resume 打開 this.flowing=true 并調用read
resume() { if (!this.flowing) { this.flowing = true; this.read(); } }
文件open不成功時候拋錯時調用
文件讀取完畢后&&this.autoClose===true ,read()里文件讀取end的時候 就執行close
destroy(err) { if (err) { this.emit("error"); } // 把close放destroy里 并 在read里調用 if (this.autoClose) { fs.close(this.fd, () => { this.emit("close"); }); } }
實現代碼
/** *實現簡單的可讀流 */ const fs = require("fs"); const EventEmitter = require("events"); class ReadStream extends EventEmitter { constructor(path, options = {}) { super(); //參考fs 寫實例需要用到的參數 this.path = path; this.flags = options.flags || "r"; this.encoding - options.encoding || null; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.end = options.end; this.highWaterMark = options.highWaterMark || 64 * 1024; this.fd = undefined; this.offset = this.start; this.flowing = false; this.open(); this.on("newListener", function (type) { if (type === "data") { this.flowing = true; this.read(); } }); } destroy(err) { if (err) { this.emit("error"); } if (this.autoClose) { fs.close(this.fd, () => { this.emit("close"); }); } } open() { fs.open(this.path, this.flags, (err, fd) => { if (err) { return this.destroy(err); } this.fd = fd; this.emit("open", fd); }); } resume() { if (!this.flowing) { this.flowing = true; this.read(); } } pause() { this.flowing = false; } read() { if (typeof this.fd !== "number") { return this.once("open", () => this.read()); } let howMuchToRead = this.end ? Math.min(this.end - this.offset + 1, this.highWaterMark) : this.highWaterMark; const buffer = Buffer.alloc(this.highWaterMark); fs.read( this.fd, buffer, 0, howMuchToRead, this.offset, (err, bytesRead) => { if (bytesRead) { this.offset += bytesRead; this.emit("data", buffer.slice(0, bytesRead)); if (this.flowing) { this.read(); } } else { this.emit("end"); this.destroy(); } } ); } } module.exports = ReadStream;
調用代碼
const ReadStream = require("./initReadStream"); let rs = new ReadStream(aPath, { flags: "r", encoding: null, //默認編碼格式是buffer autoClose: true, //相當于需要調用close方法,如果為false 文件讀取end的時候 就不會執行 close start: 0, highWaterMark: 3, //每次讀取的個數 默認是64*1024個字節 });
到此,關于“Nodejs中的可讀流的作用和實現方法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。