# 流
~~~
穩(wěn)定度: 2 - 不穩(wěn)定
~~~
流是一個(gè)抽象接口,被 Node 中的很多對(duì)象所實(shí)現(xiàn)。比如[對(duì)一個(gè) HTTP 服務(wù)器的請(qǐng)求](#)是一個(gè)流,[stdout](#) 也是一個(gè)流。流是可讀、可寫(xiě)或兼具兩者的。所有流都是 [EventEmitter](#) 的實(shí)例。
您可以通過(guò) `require('stream')` 加載 Stream 基類,其中包括了 [Readable](#) 流、[Writable](#) 流、[Duplex](#) 流和 [Transform](#) 流的基類。
本文檔分為三個(gè)章節(jié)。第一章節(jié)解釋了您在您的程序中使用流時(shí)需要了解的那部分 API,如果您不打算自己實(shí)現(xiàn)一個(gè)流式 API,您可以只閱讀這一章節(jié)。
第二章節(jié)解釋了當(dāng)您自己實(shí)現(xiàn)一個(gè)流時(shí)需要用到的那部分 API,這些 API 是為了方便您這么做而設(shè)計(jì)的。
第三章節(jié)深入講解了流的工作方式,包括一些內(nèi)部機(jī)制和函數(shù),除非您明確知道您在做什么,否則盡量不要改動(dòng)它們。
### 面向流消費(fèi)者的 API
流可以是可讀([Readable](#))或可寫(xiě)([Writable](#)),或者兼具兩者([Duplex](#),雙工)的。
所有流都是 EventEmitter,但它們也具有其它自定義方法和屬性,取決于它們是 Readable、Writable 或 Duplex。
如果一個(gè)流既可讀(Readable)也可寫(xiě)(Writable),則它實(shí)現(xiàn)了下文所述的所有方法和事件。因此,這些 API 同時(shí)也涵蓋了 [Duplex](#) 或 [Transform](#) 流,即便它們的實(shí)現(xiàn)可能有點(diǎn)不同。
為了消費(fèi)流而在您的程序中自己實(shí)現(xiàn) Stream 接口是沒(méi)有必要的。如果您**確實(shí)**正在您自己的程序中實(shí)現(xiàn)流式接口,請(qǐng)同時(shí)參考下文[面向流實(shí)現(xiàn)者的 API](#)。
幾乎所有 Node 程序,無(wú)論多簡(jiǎn)單,都在某種途徑用到了流。這里有一個(gè)使用流的 Node 程序的例子:
~~~
var http = require('http');
<!-- endsection -->
<!-- section:5dd53fb86ef5aa2fb0a6e831e46cc135 -->
var server = http.createServer(function (req, res) {
// req 為 http.IncomingMessage,是一個(gè)可讀流(Readable Stream)
// res 為 http.ServerResponse,是一個(gè)可寫(xiě)流(Writable Stream)
<!-- endsection -->
<!-- section:fd5e086becb475ded97300c6e8b1f889 -->
var body = '';
// 我們打算以 UTF-8 字符串的形式獲取數(shù)據(jù)
// 如果您不設(shè)置編碼,您將得到一個(gè) Buffer 對(duì)象
req.setEncoding('utf8');
<!-- endsection -->
<!-- section:bb5a4bf69e5c71de2331fe85918ed96b -->
// 一旦監(jiān)聽(tīng)器被添加,可讀流會(huì)觸發(fā) 'data' 事件
req.on('data', function (chunk) {
body += chunk;
})
<!-- endsection -->
<!-- section:5768f3afd395c860ba272f79026a6799 -->
// 'end' 事件表明您已經(jīng)得到了完整的 body
req.on('end', function () {
try {
var data = JSON.parse(body);
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end('錯(cuò)誤: ' + er.message);
}
<!-- endsection -->
<!-- section:812496c72ef4682c63a7ba8837f9610a -->
// 向用戶回寫(xiě)一些有趣的信息
res.write(typeof data);
res.end();
})
})
<!-- endsection -->
<!-- section:3bbc30d951532659ecc70a505ea1e985 -->
server.listen(1337);
<!-- endsection -->
<!-- section:f0dea661693acf21ed203ec804a4f05a -->
// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// 錯(cuò)誤: Unexpected token o
~~~
### 類: stream.Readable
Readable(可讀)流接口是對(duì)您正在讀取的數(shù)據(jù)的*來(lái)源*的抽象。換言之,數(shù)據(jù)*出自*一個(gè) Readable 流。
在您表明您就緒接收之前,Readable 流并不會(huì)開(kāi)始發(fā)生數(shù)據(jù)。
Readable 流有兩種“模式”:**流動(dòng)模式**和**暫停模式**。當(dāng)處于流動(dòng)模式時(shí),數(shù)據(jù)由底層系統(tǒng)讀出,并盡可能快地提供給您的程序;當(dāng)處于暫停模式時(shí),您必須明確地調(diào)用 `stream.read()` 來(lái)取出若干數(shù)據(jù)塊。流默認(rèn)處于暫停模式。
**注意**:如果沒(méi)有綁定 data 事件處理器,并且沒(méi)有 [`pipe()`](#) 目標(biāo),同時(shí)流被切換到流動(dòng)模式,那么數(shù)據(jù)會(huì)流失。
您可以通過(guò)下面幾種做法切換到流動(dòng)模式:
- 添加一個(gè) [`'data'` 事件](#)處理器來(lái)監(jiān)聽(tīng)數(shù)據(jù)。
- 調(diào)用 [`resume()`](#) 方法來(lái)明確開(kāi)啟數(shù)據(jù)流。
- 調(diào)用 [`pipe()`](#) 方法將數(shù)據(jù)發(fā)送到一個(gè) [Writable](#)。
您可以通過(guò)下面其中一種做法切換回暫停模式:
- 如果沒(méi)有導(dǎo)流目標(biāo),調(diào)用 [`pause()`](#) 方法。
- 如果有導(dǎo)流目標(biāo),移除所有 [`'data'` 事件][] 處理器、調(diào)用 [`unpipe()`](#) 方法移除所有導(dǎo)流目標(biāo)。
請(qǐng)注意,為了向后兼容考慮,移除 `'data'` 事件監(jiān)聽(tīng)器并**不會(huì)**自動(dòng)暫停流。同樣的,當(dāng)有導(dǎo)流目標(biāo)時(shí),調(diào)用 `pause()` 并不能保證流在那些目標(biāo)排空并請(qǐng)求更多數(shù)據(jù)時(shí)*維持*暫停狀態(tài)。
一些可讀流的例子:
- [客戶端上的 HTTP 響應(yīng)](#)
- [服務(wù)器上的 HTTP 請(qǐng)求](#)
- [fs 讀取流](#)
- [zlib 流](#)
- [crypto 流](#)
- [TCP 嵌套字](#)
- [子進(jìn)程的 stdout 和 stderr](#)
- [process.stdin](#)
#### 事件: 'readable'
當(dāng)一個(gè)數(shù)據(jù)塊可以從流中被讀出時(shí),它會(huì)觸發(fā)一個(gè) `'readable'` 事件。
在某些情況下,假如未準(zhǔn)備好,監(jiān)聽(tīng)一個(gè) `'readable'` 事件會(huì)使得一些數(shù)據(jù)從底層系統(tǒng)被讀出到內(nèi)部緩沖區(qū)中。
~~~
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
// 現(xiàn)在有數(shù)據(jù)可以讀了
})
~~~
當(dāng)內(nèi)部緩沖區(qū)被排空后,一旦更多數(shù)據(jù)時(shí),一個(gè) `readable` 事件會(huì)被再次觸發(fā)。
#### 事件: 'data'
- `chunk` {Buffer | String} 數(shù)據(jù)塊。
綁定一個(gè) `data` 事件監(jiān)聽(tīng)器到一個(gè)未被明確暫停的流會(huì)將流切換到流動(dòng)模式,數(shù)據(jù)會(huì)被盡可能地傳遞。
如果您想從流盡快取出所有數(shù)據(jù),這是最理想的方式。
~~~
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('得到了 %d 字節(jié)的數(shù)據(jù)', chunk.length);
})
~~~
#### 事件: 'end'
該事件會(huì)在沒(méi)有更多數(shù)據(jù)能夠提供時(shí)被觸發(fā)。
請(qǐng)注意,`end` 事件在數(shù)據(jù)被完全消費(fèi)之前**不會(huì)被觸發(fā)**。這可通過(guò)切換到流動(dòng)模式,或者在到達(dá)末端前不斷調(diào)用 `read()` 來(lái)實(shí)現(xiàn)。
~~~
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('得到了 %d 字節(jié)的數(shù)據(jù)', chunk.length);
})
readable.on('end', function() {
console.log('讀取完畢。');
});
~~~
#### 事件: 'close'
當(dāng)?shù)讓訑?shù)據(jù)源(比如,源頭的文件描述符)被關(guān)閉時(shí)觸發(fā)。并不是所有流都會(huì)觸發(fā)這個(gè)事件。
#### 事件: 'error'
當(dāng)數(shù)據(jù)接收時(shí)發(fā)生錯(cuò)誤時(shí)觸發(fā)。
#### readable.read([size])
- `size` {Number} 可選參數(shù),指定要讀取多少數(shù)據(jù)。
- 返回 {String | Buffer | null}
`read()` 方法從內(nèi)部緩沖區(qū)中拉取并返回若干數(shù)據(jù)。當(dāng)沒(méi)有更多數(shù)據(jù)可用時(shí),它會(huì)返回 `null`。
若您傳入了一個(gè) `size` 參數(shù),那么它會(huì)返回相當(dāng)字節(jié)的數(shù)據(jù);當(dāng) `size` 字節(jié)不可用時(shí),它則返回 `null`。
若您沒(méi)有指定 `size` 參數(shù),那么它會(huì)返回內(nèi)部緩沖區(qū)中的所有數(shù)據(jù)。
該方法僅應(yīng)在暫停模式時(shí)被調(diào)用。在流動(dòng)模式中,該方法會(huì)被自動(dòng)調(diào)用直到內(nèi)部緩沖區(qū)排空。
~~~
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
var chunk;
while (null !== (chunk = readable.read())) {
console.log('得到了 %d 字節(jié)的數(shù)據(jù)', chunk.length);
}
});
~~~
當(dāng)該方法返回了一個(gè)數(shù)據(jù)塊,它同時(shí)也會(huì)觸發(fā) [`'data'` 事件](#)。
#### readable.setEncoding(encoding)
- `encoding` {String} 要使用的編碼。
- 返回: `this`
調(diào)用此函數(shù)會(huì)使得流返回指定編碼的字符串而不是 Buffer 對(duì)象。比如,當(dāng)您 `readable.setEncoding('utf8')`,那么輸出數(shù)據(jù)會(huì)被作為 UTF-8 數(shù)據(jù)解析,并以字符串返回。如果您 `readable.setEncoding('hex')`,那么數(shù)據(jù)會(huì)被編碼成十六進(jìn)制字符串格式。
該方法能正確處理多字節(jié)字符。假如您不這么做,僅僅直接取出 Buffer 并對(duì)它們調(diào)用 `buf.toString(encoding)`,很可能會(huì)導(dǎo)致字節(jié)錯(cuò)位。因此如果您打算以字符串讀取數(shù)據(jù),請(qǐng)總是使用這個(gè)方法。
~~~
var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
assert.equal(typeof chunk, 'string');
console.log('得到了 %d 個(gè)字符的字符串?dāng)?shù)據(jù)', chunk.length);
})
~~~
#### readable.resume()
- 返回: `this`
該方法讓可讀流繼續(xù)觸發(fā) `data` 事件。
該方法會(huì)將流切換到流動(dòng)模式。如果您*不想*從流中消費(fèi)數(shù)據(jù),但您*想*得到它的 `end` 事件,您可以調(diào)用 [`readable.resume()`](#) 來(lái)啟動(dòng)數(shù)據(jù)流。
~~~
var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function(chunk) {
console.log('到達(dá)末端,但并未讀取任何東西');
})
~~~
#### readable.pause()
- 返回: `this`
該方法會(huì)使一個(gè)處于流動(dòng)模式的流停止觸發(fā) `data` 事件,切換到非流動(dòng)模式,并讓后續(xù)可用數(shù)據(jù)留在內(nèi)部緩沖區(qū)中。
~~~
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('取得 %d 字節(jié)數(shù)據(jù)', chunk.length);
readable.pause();
console.log('接下來(lái) 1 秒內(nèi)不會(huì)有數(shù)據(jù)');
setTimeout(function() {
console.log('現(xiàn)在數(shù)據(jù)會(huì)再次開(kāi)始流動(dòng)');
readable.resume();
}, 1000);
})
~~~
#### readable.pipe(destination, [options])
- `destination` {[Writable](#) Stream} 寫(xiě)入數(shù)據(jù)的目標(biāo)
- `options` {Object} 導(dǎo)流選項(xiàng)
- `end` {Boolean} 在讀取者結(jié)束時(shí)結(jié)束寫(xiě)入者。缺省為 `true`
該方法從可讀流中拉取所有數(shù)據(jù),并寫(xiě)入到所提供的目標(biāo)。該方法能自動(dòng)控制流量以避免目標(biāo)被快速讀取的可讀流所淹沒(méi)。
可以導(dǎo)流到多個(gè)目標(biāo)。
~~~
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// 所有來(lái)自 readable 的數(shù)據(jù)會(huì)被寫(xiě)入到 'file.txt'
readable.pipe(writable);
~~~
該函數(shù)返回目標(biāo)流,因此您可以建立導(dǎo)流鏈:
~~~
var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
~~~
例如,模擬 Unix 的 `cat` 命令:
~~~
process.stdin.pipe(process.stdout);
~~~
缺省情況下當(dāng)來(lái)源流觸發(fā) `end` 時(shí)目標(biāo)的 [`end()`](#) 會(huì)被調(diào)用,所以此時(shí) `destination` 不再可寫(xiě)。傳入 `{ end: false }` 作為 `options` 可以讓目標(biāo)流保持開(kāi)啟狀態(tài)。
這將讓 `writer` 保持開(kāi)啟,因此最后可以寫(xiě)入 "Goodbye"。
~~~
reader.pipe(writer, { end: false });
reader.on('end', function() {
writer.end('Goodbye\n');
});
~~~
請(qǐng)注意 `process.stderr` 和 `process.stdout` 在進(jìn)程結(jié)束前都不會(huì)被關(guān)閉,無(wú)論是否指定選項(xiàng)。
#### readable.unpipe([destination])
- `destination` {[Writable](#) Stream} 可選,指定解除導(dǎo)流的流
該方法會(huì)移除之前調(diào)用 `pipe()` 所設(shè)定的鉤子。
如果不指定目標(biāo),所有導(dǎo)流都會(huì)被移除。
如果指定了目標(biāo),但并沒(méi)有與之建立導(dǎo)流,則什么事都不會(huì)發(fā)生。
~~~
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// 來(lái)自 readable 的所有數(shù)據(jù)都會(huì)被寫(xiě)入 'file.txt',
// 但僅發(fā)生在第 1 秒
readable.pipe(writable);
setTimeout(function() {
console.log('停止寫(xiě)入到 file.txt');
readable.unpipe(writable);
console.log('自行關(guān)閉文件流');
writable.end();
}, 1000);
~~~
#### readable.unshift(chunk)
- `chunk` {Buffer | String} 要插回讀取隊(duì)列開(kāi)頭的數(shù)據(jù)塊
該方法在許多場(chǎng)景中都很有用,比如一個(gè)流正在被一個(gè)解析器消費(fèi),解析器可能需要將某些剛拉取出的數(shù)據(jù)“逆消費(fèi)”回來(lái)源,以便流能將它傳遞給其它消費(fèi)者。
如果您發(fā)現(xiàn)您需要在您的程序中頻繁調(diào)用 `stream.unshift(chunk)`,請(qǐng)考慮實(shí)現(xiàn)一個(gè) [Transform](#) 流。(詳見(jiàn)下文面向流實(shí)現(xiàn)者的 API。)
~~~
// 取出以 \n\n 分割的頭部并將多余部分 unshift() 回去
// callback 以 (error, header, stream) 形式調(diào)用
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
var decoder = new StringDecoder('utf8');
var header = '';
function onReadable() {
var chunk;
while (null !== (chunk = stream.read())) {
var str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// 找到頭部邊界
var split = str.split(/\n\n/);
header += split.shift();
var remaining = split.join('\n\n');
var buf = new Buffer(remaining, 'utf8');
if (buf.length)
stream.unshift(buf);
stream.removeListener('error', callback);
stream.removeListener('readable', onReadable);
// 現(xiàn)在可以從流中讀取消息的主體了
callback(null, header, stream);
} else {
// 仍在讀取頭部
header += str;
}
}
}
}
~~~
#### readable.wrap(stream)
- `stream` {Stream} 一個(gè)“舊式”可讀流
Node v0.10 版本之前的流并未實(shí)現(xiàn)現(xiàn)今所有流 API。(更多信息詳見(jiàn)下文“兼容性”章節(jié)。)
如果您正在使用早前版本的 Node 庫(kù),它觸發(fā) `'data'` 事件并且有一個(gè)
- 關(guān)于本文檔
- 概述
- 斷言 (assert)
- Buffer
- Addons插件
- 子進(jìn)程
- 集群
- 控制臺(tái)
- 加密(Crypto)
- 調(diào)試器
- DNS
- 域
- 事件 (Events)
- File System
- 全局對(duì)象
- HTTP
- HTTPS
- Modules
- net
- 操作系統(tǒng)
- 路徑 (Path)
- process
- punycode
- Query String
- Readline
- REPL
- Smalloc
- 流
- 字符串解碼器
- 定時(shí)器
- TLS (SSL)
- TTY
- UDP / 數(shù)據(jù)報(bào)套接字
- URL
- utils
- 執(zhí)行 JavaScript
- Zlib
- 進(jìn)度
- 感謝
