前端工程Technical Deep Dive
Node.js 进阶:事件循环、Stream 与集群
发布时间2026/03/29
分类前端工程
预计阅读12 分钟
作者吴长龙
*
Node.js 进阶:事件循环、Stream 与集群
01.内容
# Node.js 进阶:事件循环、Stream 与集群
多数前端工程师会用 Node.js 写脚手架或工具,但真正理解其核心能力的并不多。本文深入解析 Node.js 的事件循环机制、Stream 流编程和集群模式,帮你建立起完整的后端能力体系。
02.一、事件循环:Node.js 的心脏
1.1 什么是事件循环
事件循环是 Node.js 能够实现非阻塞 I/O 的核心机制。它让单线程的 JavaScript 能够在等待 I/O 操作时继续执行其他任务,从而实现高并发。
javascript snippetjavascript
// 这段代码展示了事件循环的基本行为
console.log('1. 开始');
setTimeout(() => {
console.log('2. setTimeout 回调');
}, 0);
Promise.resolve().then(() => {
console.log('3. Promise.then 回调');
});
console.log('4. 结束');
// 输出顺序:1 -> 4 -> 3 -> 21.2 事件循环的六个阶段
Node.js 的事件循环分为六个阶段,每个阶段都有对应的任务队列:
code snippetcode
┌───────────────────────────┐
│ ┌──────────────────┐ │
│ │ timers │ │ ← setTimeout, setInterval
│ └────────┬─────────┘ │
│ ┌────────┴─────────┐ │
│ │ pending callbacks│ │ ← I/O 回调(内部使用)
│ └────────┬─────────┘ │
│ ┌────────┴─────────┐ │
│ │ idle, prepare │ │ ← 内部使用
│ └────────┬─────────┘ │
│ ┌────────┴─────────┐ │
│ │ poll │ │ ← I/O 回调、大部分代码
│ └────────┬─────────┘ │
│ ┌────────┴─────────┐ │
│ │ check │ │ ← setImmediate 回调
│ └────────┬─────────┘ │
│ ┌────────┴─────────┐ │
│ │ close callbacks│ │ ← socket.on('close')
│ └──────────────────┘ │
└───────────────────────────┘1.3 微任务与宏任务的优先级
在 Node.js 中,微任务的执行时机非常重要:
javascript snippetjavascript
setTimeout(() => {
console.log('1. 宏任务 - setTimeout');
}, 0);
setImmediate(() => {
console.log('2. 宏任务 - setImmediate');
});
new Promise((resolve) => {
resolve('3. 同步Promise');
}).then((val) => {
console.log(val); // 3. 微任务 - Promise.then
});
process.nextTick(() => {
console.log('4. 微任务 - nextTick(最高优先级)');
});
// 在 Node.js 中输出顺序可能是:
// 3. 同步Promise -> 4. 微任务 - nextTick -> 1. 宏任务 - setTimeout -> 2. 宏任务 - setImmediate关键点:
- •
process.nextTick()的优先级高于所有微任务 - •微任务在每个阶段之间执行
- •
setImmediate和setTimeout(0)的执行顺序不确定,取决于系统状态
1.4 常见的陷阱与解决方案
#### 陷阱一:CPU 密集型任务阻塞事件循环
javascript snippetjavascript
// ❌ 错误示例:CPU 密集型任务会阻塞事件循环
app.get('/heavy-computation', (req, res) => {
let result = 0;
for (let i = 0; i < 1000000000; i++) {
result += i; // 这会阻塞整个事件循环
}
res.json({ result });
});
// ✅ 正确做法:使用 Worker Threads
const { Worker } = require('worker_threads');
app.get('/heavy-computation', (req, res) => {
const worker = new Worker('./heavy-worker.js', {
workerData: { iterations: 1000000000 }
});
worker.on('message', (result) => {
res.json({ result });
});
worker.on('error', (err) => {
res.status(500).json({ error: err.message });
});
});#### 陷阱二:微任务堆积
javascript snippetjavascript
// ❌ 错误示例:递归 Promise 会导致微任务堆积
async function processAll(items) {
for (const item of items) {
await processItem(item); // 每次 await 都会创建微任务
}
}
// ✅ 正确做法:分批处理
async function processAllBatched(items, batchSize = 100) {
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
await Promise.all(batch.map(processItem));
// 让出事件循环
await new Promise(resolve => setImmediate(resolve));
}
}03.二、Stream:Node.js 的灵魂
2.1 为什么要用 Stream
Stream 是 Node.js 处理大规模数据的核心方式。想象一下读取一个 5GB 的视频文件:
javascript snippetjavascript
const fs = require('fs');
// ❌ 错误做法:一次性加载到内存
const data = fs.readFileSync('./large-file.mp4'); // 内存爆炸
// ✅ 正确做法:使用 Stream 边读边处理
const readStream = fs.createReadStream('./large-file.mp4');
readStream.on('data', (chunk) => {
// 每次只读取一小块(默认 64KB)
processChunk(chunk);
});2.2 四种基本 Stream 类型
javascript snippetjavascript
const {
Readable, // 可读流(数据源)
Writable, // 可写流(数据目标)
Transform, // 转换流(加工数据)
Duplex // 双工流(既可读又可写)
} = require('stream');
// 可读流示例:从数组创建
const readable = Readable.from(['Hello', ' ', 'World']);
// 可写流示例:写入文件
const writable = fs.createWriteStream('./output.txt');
// 转换流示例:大写转换
const upperCase = new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
});
// 管道连接
readable.pipe(upperCase).pipe(writable);2.3 实际应用:构建一个文件处理流水线
javascript snippetjavascript
const { createReadStream, createWriteStream } = require('fs');
const { createGzip } = require('zlib');
const { pipeline } = require('stream/promises');
// 读取 -> 压缩 -> 写入
async function compressFile(inputPath, outputPath) {
const source = createReadStream(inputPath);
const gzip = createGzip();
const destination = createWriteStream(outputPath);
try {
await pipeline(source, gzip, destination);
console.log('压缩完成');
} catch (err) {
console.error('压缩失败:', err);
// 需要手动清理
fs.unlink(outputPath, () => {});
}
}2.4 Stream 的背压问题
背压(Backpressure)是指下游处理速度跟不上上游产生数据的速度:
javascript snippetjavascript
// ❌ 错误示例:没有处理背压
source.on('data', (chunk) => {
// 如果 process 慢于数据产生速度,会导致内存暴涨
destination.write(chunk);
});
// ✅ 正确做法:处理背压
source.on('data', (chunk) => {
if (!destination.write(chunk)) {
// 缓冲区满了,暂停读取
source.pause();
}
});
destination.on('drain', () => {
// 缓冲区清空,恢复读取
source.resume();
});
// 或者使用 pipeline 自动处理
const pipeline = require('stream/promises');
await pipeline(source, transform, destination);
// pipeline 自动处理背压2.5 实践:处理大文件上传
javascript snippetjavascript
const { createWriteStream } = require('fs');
const { pipeline } = require('stream/promises');
app.post('/upload', async (req, res) => {
const filename = `${Date.now()}-${req.headers['x-filename']}`;
const out = createWriteStream(`./uploads/${filename}`);
// 使用 pipeline 自动处理背压和错误
try {
await pipeline(req, out);
res.json({ success: true, filename });
} catch (err) {
res.status(500).json({ error: '上传失败' });
}
});04.三、Cluster:多核利用
3.1 为什么需要 Cluster
Node.js 是单线程的,无法直接利用多核 CPU。Cluster 模块让我们可以创建多个工作进程,共享同一个端口:
javascript snippetjavascript
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
// 主进程
const numCPUs = os.cpus().length;
console.log(`主进程 ${process.pid} 启动`);
console.log(`创建 ${numCPUs} 个工作进程`);
// 创建工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// 监听工作进程退出
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 退出`);
// 重启退出 的进程
cluster.fork();
});
} else {
// 工作进程
const app = express();
app.get('/', (req, res) => {
res.json({
pid: process.pid,
message: `处理请求的进程 ${process.pid}`
});
});
app.listen(3000, () => {
console.log(`工作进程 ${process.pid} 监听 3000 端口`);
});
}3.2 Cluster 的工作模式
Cluster 支持两种负载均衡策略:
javascript snippetjavascript
// 轮询模式(默认)
// 适用于无状态服务
cluster.schedulingPolicy = cluster.SCHED_RR;
// 各凭本事模式(不推荐)
// 依赖操作系统调度
cluster.schedulingPolicy = cluster.SCHED_NONE;3.3 进程间通信
javascript snippetjavascript
// 主进程
if (cluster.isMaster) {
const worker = cluster.fork();
// 发送消息给工作进程
worker.send({ type: 'broadcast', message: 'hello' });
// 接收工作进程消息
worker.on('message', (msg) => {
console.log('收到工作进程消息:', msg);
});
}
// 工作进程
if (cluster.isWorker) {
// 接收主进程消息
process.on('message', (msg) => {
console.log('收到主进程消息:', msg);
});
// 发送消息给主进程
process.send({ type: 'result', data: '处理完成' });
}3.4 零停机重启
javascript snippetjavascript
// 优雅重启:逐个重启工作进程,不中断服务
function gracefulRestart() {
const workerIds = Object.keys(cluster.workers);
let current = 0;
function restartNext() {
if (current >= workerIds.length) return;
const workerId = workerIds[current++];
const worker = cluster.workers[workerId];
worker.on('exit', () => {
// 旧进程退出后,创建新进程
const newWorker = cluster.fork();
newWorker.on('listening', () => {
// 新进程就绪后,继续重启下一个
restartNext();
});
});
// 发送优雅退出信号
worker.send('shutdown');
// 5秒后强制退出
setTimeout(() => {
worker.kill();
}, 5000);
}
restartNext();
}05.四、生产环境最佳实践
4.1 进程管理:PM2
javascript snippetjavascript
// ecosystem.config.js
module.exports = {
apps: [{
name: 'my-api',
script: './index.js',
instances: 'max', // 使用所有 CPU 核心
exec_mode: 'cluster',
env: {
NODE_ENV: 'development'
},
env_production: {
NODE_ENV: 'production'
},
// 自动重启配置
autorestart: true,
watch: false,
max_memory_restart: '1G',
// 日志配置
log_file: './logs/combined.log',
out_file: './logs/out.log',
error_file: './logs/error.log',
log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
// 健康检查
wait_ready: true,
listen_timeout: 3000,
kill_timeout: 5000
}]
};4.2 内存监控
javascript snippetjavascript
// 定期检查内存使用
setInterval(() => {
const used = process.memoryUsage();
const heapUsed = Math.round(used.heapUsed / 1024 / 1024);
const heapTotal = Math.round(used.heapTotal / 1024 / 1024);
const rss = Math.round(used.rss / 1024 / 1024);
console.log(`内存使用: heap=${heapUsed}MB/${heapTotal}MB, rss=${rss}MB`);
// 如果内存使用超过阈值,记录警告
if (heapUsed > 500) {
console.warn('⚠️ 内存使用过高');
}
}, 10000);4.3 错误监控
javascript snippetjavascript
// 全局错误处理
process.on('uncaughtException', (err) => {
console.error('未捕获的异常:', err);
// 记录日志
logger.error(err);
// 优雅退出
process.exit(1);
});
process.on('unhandledRejection', (reason, promise) => {
console.error('未处理的 Promise 拒绝:', reason);
logger.error({ reason, promise }, 'Unhandled Rejection');
});06.总结
| 主题 | 核心概念 | 实际应用 |
|---|---|---|
| 事件循环 | 6阶段、微任务优先 | 避免阻塞、合理使用 async |
| Stream | 边读边处理、背压 | 大文件处理、管道流 |
| Cluster | 多进程、进程通信 | 多核利用、零停机重启 |
掌握这些内容,你已经具备了 Node.js 后端开发的核心能力。下一篇文章我们将对比 Express 与 Fastify,帮助你选择合适的 Web 框架。
---
*如果你想了解更多关于 Node.js 性能调优的内容,可以阅读后续的《Node.js 性能分析与排查》专题文章。*