#

stream(流)是Node.js提供的又一个仅在服务区端可用的模块,流是一种抽象的数据结构。Stream 是一个抽象接口,Node 中有很多对象实现了这个接口。例如,对http 服务器发起请求的request 对象就是一个Stream,还有stdout(标准输出流)。

# 为什么要在node中使用流

假设目前需要一个读写文件的操作,不用流的实现方式:

var water = fs.readFileSync('a.txt', {encoding: 'utf8'});
fs.writeFileSync('b.txt', water);

但使用这种方法有几个问题:

  • 占用内存,因为使用读写方式是把文件内容全部读入内存,然后再写入文件,对于小型的文本文件问题不大,但是遇到较大的比如音频、视频文件,动辄几个GB大的文件就非常吃内存了

  • 处理数据量较大的文件时不能分块处理,导致速度慢

而流可以把文件资源拆分成小块,一块一块的运输,资源就像水流一样进行传输,使用流的话上述功能可以这样写:

var fs = require('fs');
var readStream = fs.createReadStream('a.mp4'); // 创建可读流
var writeStream = fs.createWriteStream('b.mp4'); // 创建可写流

readStream.on('data', function(chunk) { // 当有数据流出时,写入数据
    writeStream.write(chunk);
});

readStream.on('end', function() { // 当没有数据时,关闭数据流
    writeStream.end();
});

但这样写还是有一些问题的,如果说写入的速度跟不上读取的速度,有可能导致数据丢失。正常的情况应该是,写完一段,再读取下一段,如果没有写完的话,就让读取流先暂停,等写完再继续.

所以为了让可读流和可写流速度一致,就要用到流中必不可少的属性pipe了,pipe翻译过来意思是管道,顾名思义,就想上面的倒水一样,如果不用一根管子相连,A桶倒进B桶的水不会均速传输,可能会导致水的浪费,用pipe可以这样解决上述问题:

fs.createReadStream('a.mp4').pipe(fs.createWriteStream('b.mp4))

需要特别注意的是,pipe() 只是可读流的方法,也就是说只能从可读流中通过pipe方法拷贝数据到可写流,反之则不行,写的时候要注意顺序

# 流的四种类型

Stream提供了以下四种类型的流:

  • Readable 可读流:可读流是对一个可以读取数据的源的抽象。fs.createReadStream 方法是一个可读流的例子。

  • Writable 可写流:可写流是对一个可以写入数据的目标的抽象。fs.createWriteStream 方法是一个可写流的例子。

  • Duplex 可读可写流,TCP socket 就属于这种

  • Transform 在读写过程中可以修改和变换数据的Duplex流,变换流是一种特殊的双向流,它会基于写入的数据生成可供读取的数据。例如使用 zlib.createGzip 来压缩数据。你可以把一个变换流想象成一个函数,这个函数的输入部分对应可写流,输出部分对应可读流

# 可读流

创建可读流的方式:

  • 使用 stream 模块的 Readable 创建一个可读流

  • 使用 fs.createReadStream 创建可读文件流

使用 fs.createReadStream 创建可读文件流

  • 第一个参数是读取文件的路径

  • 第二个参数为 options 选项,其中有八个参数:

    • flags:标识位,默认为 r

    • encoding:字符编码,默认为 null

    • fd:文件描述符,默认为 null

    • mode:权限位,默认为 0o666

    • autoClose:是否自动关闭文件,默认为 true

    • start:读取文件的起始位置

    • end:读取文件的(包含)结束位置

    • highWaterMark:最大读取文件的字节数,默认 64 * 1024, 每次读多少字节的数据

let rs = fs.createReadStream('./files/big.file', {
    //文件系统标志
    flags: 'r',
    //数据编码,如果调置了该参数,则读取的数据会自动解析
    //如果没调置,则读取的数据会是 Buffer
    //也可以通过 rs.setEncoding() 进行设置
    encoding: 'utf8',
    //文件描述符,默认为null
    fd: null,
    //文件权限,
    mode: 0o666,
    //文件读取的开始位置
    start: 0,
    //文件读取的结束位置(包括结束位置)
    end: Infinity,
    //读取缓冲区的大小,默认64K
    // highWaterMark: 3
});

使用 stream 模块的 Readable 创建一个可读流

const { Readable } = require('stream');  

const inStream = new Readable(); // 创建一个可读流

inStream.push('ABCDEFGHIJKLM'); // 向这个可读流输入数据
inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // 没有更多数据了

inStream.pipe(process.stdout); // 输出到一个可写流

上面的例子中在把该流连接到 process.stdout 之前,我们就已经推送了所有数据。接下可以试着推送数据。 我们可以通过在可读流配置中实现 read() 方法来达成这一目的:

const inStream = new Readable({
    read(size) {
        console.log('size', size)
        this.push(String.fromCharCode(this.currentCharCode++));
        if (this.currentCharCode > 90) {
            this.push(null);
        }
    }
});

inStream.currentCharCode = 65

inStream.pipe(process.stdout);

当使用者读取该可读流时,read 方法会持续被触发,我们不断推送字母。我们需要在某处停止该循环, 这就是为何我们放置了一个 if 语句以便在 currentCharCode 大于 90(代表 Z) 时推送一个 null

const startTime = new Date()
//创建一个文件可读流
let rs = fs.createReadStream('./files/big.file', {
    //文件系统标志
    flags: 'r',
    //数据编码,如果调置了该参数,则读取的数据会自动解析
    //如果没调置,则读取的数据会是 Buffer
    //也可以通过 rs.setEncoding() 进行设置
    encoding: 'utf8',
    //文件描述符,默认为null
    fd: null,
    //文件权限,
    mode: 0o666,
    //文件读取的开始位置
    start: 0,
    //文件读取的结束位置(包括结束位置)
    end: Infinity,
    //读取缓冲区的大小,默认64K
    // highWaterMark: 3
});
    
//文件被打开时触发
rs.on('open', function () {
    console.log('文件打开');
});
//监听data事件,会让当前流切换到流动模式
//当流中将数据传给消费者后触发
//由于我们在上面配置了 highWaterMark 为 3字节,所以下面会打印多次。
rs.on('data', function (data) {
    console.log(data);
});
//流中没有数据可供消费者时触发
rs.on('end', function () {
    console.log('数据读取完毕', new Date() - startTime);
});
//读取数据出错时触发
rs.on('error', function () {
    console.log('读取错误');
});
//当文件被关闭时触发
rs.on('close', function () {
    console.log('文件关闭');
});
// 1秒后暂停读取, 再过2秒继续读以数据
setTimeout(function () {
    rs.pause()
    setTimeout(function(){
        rs.resume()
    }, 2000)
}, 1000);
  

可读流中的两种模式:

  • 流动模式 ( flowing ) :数据自动从系统底层读取,并通过事件,尽可能快地提供给应用程序

  • 暂停模式 ( paused ),必须显式的调用 read() 读取数据

暂停模式 切换到 流动模式的方式:

  • 添加 data 事件回调

  • 调用 resume()

  • 调用 pipe()

流动模式 切换到 暂停模式:

  • 如果没有管道目标,调用 pause()

  • 如果有管道目标,移除所有管道目标,调用 unpipe() 移除多个管道目标

# 可写流

  1. 使用 stream 模块的 Writable 类创建可写流
const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});
  
process.stdin.pipe(outStream);

write 方法接受三个参数

  • chunk 通常是一个 buffer,除非我们对流进行了特殊配置

  • encoding 通常可以忽略。除非 chunk 被配置为不是 buffer

  • callback 方法是一个在我们完成数据处理后要执行的回调函数。它用来表示数据是否成功写入。若是写入失败,在执行该回调函数时需要传入一个错误对象

  1. 使用 fs.createWriteStream 方法创建可写的文流
const outFile = fs.createWriteStream('./files/test.txt')
process.stdin.pipe(outFile);

# 双工流(可读可写流)

通过 streamDuplex 创建一个双工流

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;

process.stdin.pipe(inoutStream).pipe(process.stdout);

# 变换流

通过streamTransform创建一个双工流

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

在这个变换流中,我们只实现了 transform() 方法,却达到了前面双向流例子的效果。在该方法中,我们把 chunk 转换为大写然后通过 push 方法传递给下游。

# 流事件和方法

以下是一些使用可读流或可写流时用到的事件和方法:

这些事件和函数是相关的,因为我们总是把它们组合在一起使用

# 可写流的一些方法:

  • write 方法

    ws.write(chunk, [encoding], [callback]);

    • chunk 写入的数据 buffer/string

    • encoding 编码格式,chunk 为字符串时有用,是个可选参数

    • callback 写入成功后的回调

    返回值为布尔值,系统缓存区满时为 false,未满时为 true

  • end 方法

ws.end(chunk, [encoding], [callback]);

表明接下来没有数据要被写入 Writable 通过传入可选的 chunkencoding 参数,可以在关闭流之前再写入一段数据 如果传入了可选的 callback 函数,它将作为 finish 事件的回调函数

可写流的一些事件:

  • drain 事件

    ws.on('drain',function(){
        console.log('drain')
    })
    
    • 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false

    • 当前所有缓存的数据块满了,满了之后情况才会触发 drain

    • 一旦 write() 返回 false, 在 drain 事件触发前, 不能写入任何数据块

  • finish 事件

    ws.end('结束');
    ws.on('finish',function(){
        console.log('drain')
    });
    

    在调用 end 方法,且缓冲区数据都已经传给底层系统之后, finish 事件将被触发

# 一个可读流文件上的方法和事件:

一个可读流上的事件:

  • open事件上,当创建一个可读流时,会触发此事件

  • data 事件,任何时候当可读流发送数据给它的消费者时,会触发此事件

  • end 事件,当可读流没有更多的数据要发送给消费者时,会触发此事件

  • close事件上,文件关闭时,触发

一个可读流上的方法:

  • pipe:传送数据

  • unpipe: 方法解绑之前使用 pipe() 方法绑定的可写流

  • pause: 暂信传送数据

  • resume: 继传传送数据

  • unshift: 将 chunk 作为 null 传递信号表示流的末尾(EOF),其行为与 readable.push(null) 相同

# 可读流的暂停和流动模式

可读流有两种主要的模式,影响我们使用它的方式:

  • 它要么处于暂停模式

  • 要么就是处于流动模式

这些模式有时也被成为拉取和推送模式

所有的可读流默认都处于暂停模式。但它们可以按需在流动模式和暂停模式间切换。这种切换有时会自动发生。

当一个可读流处于暂停模式时,我们可以使用 read() 方法按需的读取数据。而对于一个处于流动模式的可读流,数据会源源不断的流动,我们需要通过事件监听来处理数据

在流动模式中,如果没有消费者监听事件那么数据就会丢失。这就是为何在处理流动模式的可读流时我们需要一个 data事件回调函数。事实上,通过增加一个 data 事件回调就可以把处于暂停模式的流切换到流动模式;同样的,移除 data事件回调会把流切回到暂停模式。这么做的一部分原因是为了和旧的 Node 流接口兼容。

要手动在这两个模式间切换,你可以使用 resume()pause() 方法

当使用 pipe 方法时,它会自动帮你处理好这些模式之间的切换,因此你无须关心这些细节

(opens new window)