Node.js中的数据处理IO-Stream-2

标签:node javaScript

上次说了在Node中两个基本的流:ReadableWritable流,接下来说说在Node.js中更高级的流。

Duplex 流

Duplex流(双向数据流)结合了可读流和可写的特性,使得Duplex流可以进行读取和写入操作。

当然,Duplex流也可以自定义实现,实现方式和可读流(可写流)的方式差不多,使用util模块提供的inherits()方法:

var util = require('util');  
util.inherits(MyDuplexStream,stream.Duplex);  

然后创建对象时调用

stream.Duplex.call(this,opt);  

创建一个Duplex流的opt参数接受一个叫allowHalfOpen的布尔值属性,若该值为true,则表示可写端已经结束,可读端也能保持打开状态;反之亦然。若为false,则表示结束可写入端可读端也会结束;反之亦然。

当然,既然称Duplex流为可读和可写流的合并,所以一个完整的Duplex流需要同时实现_read(size)_write(data,encoding,callback)方法。同时,Duplex流同时拥有可读和可写流方法和事件。具体的例子如下:

var stream = require('stream');  
var util = require('util');  
util.inherits(Duplexer, stream.Duplex);  
function Duplexer(opt) {  
  stream.Duplex.call(this, opt);
  this.data = [];
}
Duplexer.prototype._read = function readItem(size) {  
  var chunk = this.data.shift();
  if (chunk == "stop"){
    this.push(null);
  } else{
    if(chunk){
      this.push(chunk);
    } else {
      setTimeout(readItem.bind(this), 500, size);
    }
  }
};
Duplexer.prototype._write = function(data, encoding, callback) {  
  this.data.push(data);
  callback();
};
var d = new Duplexer();  
d.on('data', function(chunk){  
  console.log('read: ', chunk.toString());
});
d.on('end', function(){  
  console.log('Message Complete');
});
d.write("I think, ");  
d.write("therefore ");  
d.write("I am.");  
d.write("Rene Descartes");  
d.write("stop");

//控制台的输出
read:  I think,  
read:  therefore  
read:  I am.  
read:  Rene Descartes  
Message Complete  

Transform流

Transform流(转换流)扩展了Deplex流,但它修改了Writable流和Readable流之间的数据。当你需要从一个系统到另一个系统的数据时,这个流就能大显身手。

  • 用于压缩文件的zlib
  • 主要用于加密的crypto

DuplexTransform之间的一个主要区别是,在Transform流中不需要实现_read()_write()。相反的需要实现另外两个方法,_transform(chunk,encoding,callback)_flush(callback)

  • _transform 方法接受来自write()请求的数据,对其修改后,推出修改结果
  • _flush 方法用于刷新数据

下面就自定义实现一个Transform流:

var stream = require("stream");  
var util = require("util");  
util.inherits(JSONObjectStream, stream.Transform);  
function JSONObjectStream (opt) {  
  stream.Transform.call(this, opt);
};
JSONObjectStream.prototype._transform = function (data, encoding, callback) {  
  object = data ? JSON.parse(data.toString()) : "";
  this.emit("object", object);
  object.handled = true;
  this.push(JSON.stringify(object));
  callback();
};
JSONObjectStream.prototype._flush = function(cb) {  
  cb();
};
var tc = new JSONObjectStream();  
tc.on("object", function(object){  
  console.log("Name: %s", object.name);
  console.log("Color: %s", object.color);
});
tc.on("data", function(data){  
  console.log("Data: %s", data.toString());
});
tc.write('{"name":"Carolinus", "color": "Green"}');  
tc.write('{"name":"Solarius", "color": "Blue"}');  
tc.write('{"name":"Lo Tae Zhao", "color": "Gold"}');  
tc.write('{"name":"Ommadon", "color": "Red"}');

//控制台的输出
Name: Carolinus  
Color: Green  
Data: {"name":"Carolinus","color":"Green","handled":true}  
Name: Solarius  
Color: Blue  
Data: {"name":"Solarius","color":"Blue","handled":true}  
Name: Lo Tae Zhao  
Color: Gold  
Data: {"name":"Lo Tae Zhao","color":"Gold","handled":true}  
Name: Ommadon  
Color: Red  
Data: {"name":"Ommadon","color":"Red","handled":true}  

把Readable流用管道输送到Writable流

Readable流的方法描述中,提到一个方法pipe(writableStream,[options]),此方法会把一个Readable流链接到Writable流,会把Readable流中的数据直接输入到Writable流。option参数接受一个end属性设置为truefalse的对象。当endtrue是,Writable流随着Readable流的结束而结束,默认是true

一下例子简要的展示了使用的基本过程。

var stream = require('stream');  
var util = require('util');  
util.inherits(Reader, stream.Readable);  
util.inherits(Writer, stream.Writable);  
function Reader(opt) {  
  stream.Readable.call(this, opt);
  this._index = 1;
}
Reader.prototype._read = function(size) {  
  var i = this._index++;
  if (i > 10){
    this.push(null);
  } else {
    this.push("Item " + i.toString());
  }
};
function Writer(opt) {  
  stream.Writable.call(this, opt);
  this._index = 1;
}
Writer.prototype._write = function(data, encoding, callback) {  
  console.log(data.toString());
  callback();
};
var r = new Reader();  
var w = new Writer();  
r.pipe(w);

//控制台的输出
Item 1  
Item 2  
Item 3  
Item 4  
Item 5  
Item 6  
Item 7  
Item 8  
Item 9  
Item 10

用Zlib压缩和解压数据

在数据量吞吐量大的系统中,数据的存储量将会变得特别的大,这时压缩/解压数据就显的十分的重要。在Node.js中提供了一个Zlib的模块,这样就可以非常方便、高效地解压和压缩在缓冲区的数据了。

但是需要的记住的是,压缩数据和解压数据需要花大量的时间,所以在使用压缩/解压模块时,应该确定压缩能带来的好处。同时,压缩操作也应当考虑在程序空闲时使用。

Zlib支持以下几种压缩方式:
- gzip/gunzip 标准的gzip压缩。 - deflate/inflate 基于Huffman编码的标准deflate压缩算法。 - deflateRaw/inflateRaw 针对原始缓冲区的deflate压缩算法。

这些方法具有通用的调用方法function(buffer,callback),其中function是压缩/解压方法,buffer是被压缩/解压的缓冲区,callback为回掉函数。

以下就是在Zlib下压缩/解压的方法展示:

var zlib = require("zlib");  
var input = '...............text...............';  
zlib.deflate(input, function(err, buffer) {  
  if (!err) {
    console.log("deflate (%s): ", buffer.length, buffer.toString('base64'));
    zlib.inflate(buffer, function(err, buffer) {
      if (!err) {
        console.log("inflate (%s): ", buffer.length, buffer.toString());
      }
    });
    zlib.unzip(buffer, function(err, buffer) {
      if (!err) {
        console.log("unzip deflate (%s): ", buffer.length, buffer.toString());
      }
    });
  }
});

zlib.deflateRaw(input, function(err, buffer) {  
  if (!err) {
    console.log("deflateRaw (%s): ", buffer.length, buffer.toString('base64'));
    zlib.inflateRaw(buffer, function(err, buffer) {
      if (!err) {
        console.log("inflateRaw (%s): ", buffer.length, buffer.toString());
      }
    });
  }
});

zlib.gzip(input, function(err, buffer) {  
  if (!err) {
    console.log("gzip (%s): ", buffer.length, buffer.toString('base64'));
    zlib.gunzip(buffer, function(err, buffer) {
      if (!err) {
        console.log("gunzip (%s): ", buffer.length, buffer.toString());
      }
    });
    zlib.unzip(buffer, function(err, buffer) {
      if (!err) {
        console.log("unzip gzip (%s): ", buffer.length, buffer.toString());
      }
    });
  }
});

//控制台显示
deflate (18):  eJzT00MBJakVJagiegB9Zgcq  
deflateRaw (12):  09NDASWpFSWoInoA  
gzip (30):  H4sIAAAAAAAAC9PTQwElqRUlqCJ6AIq+x+AiAAAA  
inflate (34):  ...............text...............  
unzip deflate (34):  ...............text...............  
inflateRaw (34):  ...............text...............  
gunzip (34):  ...............text...............  
unzip gzip (34):  ...............text...............  

Zlib模块不仅仅提供了对缓冲区的压缩/解压方法,同时也有对流的压缩/解压方法,可以使用pipe()函数,通过压缩/解压对象来吧数据从一个流输出到另一个流。以下就是具体的例子:

var zlib = require("zlib");  
var gzip = zlib.createGzip();  
var fs = require('fs');  
var inFile = fs.createReadStream('zlib_file.js');  
var outFile = fs.createWriteStream('zlib_file.gz');  
inFile.pipe(gzip).pipe(outFile);  
setTimeout(function(){  
  var gunzip = zlib.createUnzip({flush: zlib.Z_FULL_FLUSH});
  var inFile = fs.createReadStream('zlib_file.gz');
  var outFile = fs.createWriteStream('zlib_file.unzipped');
  inFile.pipe(gunzip).pipe(outFile);
}, 3000);

小结:大部分密集的Web应用程序和服务的核心都是从一个系统进入另一个系统的庞大数据流,Node.js中的数据处理IO到这里也就差不多,从处理JSON到二进制的缓冲区,再到数据流的处理都进行了比较深入的了解。