Node.js pipe实现源码解析
从前面两篇文章,我们了解到。想要把 readable 的数据写到 writable,就必须先手动的将数据读入内存,然后写入 writable。换句话说,每次传递数据时,都需要写如下的模板代码
readable.on('readable', (err) => { if(err) throw err writable.write(readable.read()) })
为了方便使用,node.js 提供了 pipe() 方法,让我们可以优雅的传递数据
readable.pipe(writable)
现在,就让我们来看看它是如何实现的吧
pipe
首先需要先调用 readable 的 pipe() 方法
// lib/_stream_readable.js readable.prototype.pipe = function(dest, pipeopts) { var src = this; var state = this._readablestate; // 记录 writable switch (state.pipescount) { case 0: state.pipes = dest; break; case 1: state.pipes = [state.pipes, dest]; break; default: state.pipes.push(dest); break; } state.pipescount += 1; // ... src.once('end', endfn); dest.on('unpipe', onunpipe); // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保证 error 事件触发时,onerror 首先被执行 prependlistener(dest, 'error', onerror); // ... dest.once('close', onclose); // ... dest.once('finish', onfinish); // ... // 触发 writable 的 pipe 事件 dest.emit('pipe', src); // 将 readable 改为 flow 模式 if (!state.flowing) { debug('pipe resume'); src.resume(); } return dest; };
执行 pipe() 函数时,首先将 writable 记录到 state.pipes 中,然后绑定相关事件,最后如果 readable 不是 flow 模式,就调用 resume() 将 readable 改为 flow 模式
传递数据
readable 从数据源获取到数据后,触发 data 事件,执行 ondata()
ondata() 相关代码:
// lib/_stream_readable.js // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitdrain 重复增加,awaitdrain 不能清零,readable 卡住的情况 // 详情见 https://github.com/nodejs/node/issues/7278 var increasedawaitdrain = false; function ondata(chunk) { debug('ondata'); increasedawaitdrain = false; var ret = dest.write(chunk); if (false === ret && !increasedawaitdrain) { // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitdrain 不能清零,readable 卡住的情况 if (((state.pipescount === 1 && state.pipes === dest) || (state.pipescount > 1 && state.pipes.indexof(dest) !== -1) ) && !cleanedup) { debug('false write response, pause', src._readablestate.awaitdrain); src._readablestate.awaitdrain++; increasedawaitdrain = true; } // 进入 pause 模式 src.pause(); } }
在 ondata(chunk) 函数内,通过 dest.write(chunk) 将数据写入 writable
此时,在 _write() 内部可能会调用 src.push(chunk) 或使其 unpipe,这会导致 awaitdrain 多次增加,不能清零,readable 卡住
当不能再向 writable 写入数据时,readable 会进入 pause 模式,直到所有的 drain 事件触发
触发 drain 事件,执行 ondrain()
// lib/_stream_readable.js var ondrain = pipeondrain(src); function pipeondrain(src) { return function() { var state = src._readablestate; debug('pipeondrain', state.awaitdrain); if (state.awaitdrain) state.awaitdrain--; // awaitdrain === 0,且有 data 监听器 if (state.awaitdrain === 0 && ee.listenercount(src, 'data')) { state.flowing = true; flow(src); } }; }
每个 drain 事件触发时,都会减少 awaitdrain,直到 awaitdrain 为 0。此时,调用 flow(src),使 readable 进入 flow 模式
到这里,整个数据传递循环已经建立,数据会顺着循环源源不断的流入 writable,直到所有数据写入完成
unpipe
不管写入过程中是否出现错误,最后都会执行 unpipe()
// lib/_stream_readable.js // ... function unpipe() { debug('unpipe'); src.unpipe(dest); } // ... readable.prototype.unpipe = function(dest) { var state = this._readablestate; var unpipeinfo = { hasunpiped: false }; // 啥也没有 if (state.pipescount === 0) return this; // 只有一个 if (state.pipescount === 1) { if (dest && dest !== state.pipes) return this; // 没有指定就 unpipe 所有 if (!dest) dest = state.pipes; state.pipes = null; state.pipescount = 0; state.flowing = false; if (dest) dest.emit('unpipe', this, unpipeinfo); return this; } // 没有指定就 unpipe 所有 if (!dest) { var dests = state.pipes; var len = state.pipescount; state.pipes = null; state.pipescount = 0; state.flowing = false; for (var i = 0; i < len; i++) dests[i].emit('unpipe', this, unpipeinfo); return this; } // 找到指定 writable,并 unpipe var index = state.pipes.indexof(dest); if (index === -1) return this; state.pipes.splice(index, 1); state.pipescount -= 1; if (state.pipescount === 1) state.pipes = state.pipes[0]; dest.emit('unpipe', this, unpipeinfo); return this; };
readable.prototype.unpipe() 函数会根据 state.pipes 属性和 dest 参数,选择执行策略。最后会触发 dest 的 unpipe 事件
unpipe 事件触发后,调用 onunpipe(),清理相关数据
// lib/_stream_readable.js function onunpipe(readable, unpipeinfo) { debug('onunpipe'); if (readable === src) { if (unpipeinfo && unpipeinfo.hasunpiped === false) { unpipeinfo.hasunpiped = true; // 清理相关数据 cleanup(); } } }
end
在整个 pipe 的过程中,readable 是主动方 ( 负责整个 pipe 过程:包括数据传递、unpipe 与异常处理 ),writable 是被动方 ( 只需要触发 drain 事件 )
总结一下 pipe 的过程:
- 首先执行 readbable.pipe(writable),将 readable 与 writable 对接上
- 当 readable 中有数据时,readable.emit('data'),将数据写入 writable
- 如果 writable.write(chunk) 返回 false,则进入 pause 模式,等待 drain 事件触发
- drain 事件全部触发后,再次进入 flow 模式,写入数据
- 不管数据写入完成或发生中断,最后都会调用 unpipe()
- unpipe() 调用 readable.prototype.unpipe(),触发 dest 的 unpipe 事件,清理相关数据
参考:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: 详解使用vscode+es6写nodejs服务端调试配置
下一篇: thrift通讯协议
推荐阅读
-
SpringBoot 源码解析 (七)----- Spring Boot的核心能力 - SpringBoot如何实现SpringMvc的?
-
PHP中实现汉字转区位码应用源码实例解析
-
Vue源码解析之Template转化为AST的实现方法
-
Mybaits 源码解析 (五)----- 面试源码系列:Mapper接口底层原理(为什么Mapper不用写实现类就能访问到数据库?)
-
jsp基于XML实现用户登录与注册的实例解析(附源码)
-
微信小程序之侧边栏滑动实现过程解析(附完整源码)
-
微信小程序之下拉列表实现方法解析(附完整源码)
-
Mybaits 源码解析 (三)----- Mapper接口底层原理(为什么Mapper不用写实现类就能访问到数据库?)
-
基于数组实现的列表ArrayList源码解析
-
Android实现3D推拉门式滑动菜单源码解析