前端工程Technical Deep Dive

Node.js 进阶:事件循环、Stream 与集群

发布时间2026/03/29
分类前端工程
预计阅读12 分钟
作者吴长龙
*

Node.js 进阶:事件循环、Stream 与集群

01.内容

# Node.js 进阶:事件循环、Stream 与集群

多数前端工程师会用 Node.js 写脚手架或工具,但真正理解其核心能力的并不多。本文深入解析 Node.js 的事件循环机制、Stream 流编程和集群模式,帮你建立起完整的后端能力体系。

02.一、事件循环:Node.js 的心脏

1.1 什么是事件循环

事件循环是 Node.js 能够实现非阻塞 I/O 的核心机制。它让单线程的 JavaScript 能够在等待 I/O 操作时继续执行其他任务,从而实现高并发。

javascript snippetjavascript
// 这段代码展示了事件循环的基本行为
console.log('1. 开始');

setTimeout(() => {
  console.log('2. setTimeout 回调');
}, 0);

Promise.resolve().then(() => {
  console.log('3. Promise.then 回调');
});

console.log('4. 结束');

// 输出顺序:1 -> 4 -> 3 -> 2

1.2 事件循环的六个阶段

Node.js 的事件循环分为六个阶段,每个阶段都有对应的任务队列:

code snippetcode
   ┌───────────────────────────┐
   │   ┌──────────────────┐    │
   │   │     timers       │    │ ← setTimeout, setInterval
   │   └────────┬─────────┘    │
   │   ┌────────┴─────────┐    │
   │   │  pending callbacks│   │ ← I/O 回调(内部使用)
   │   └────────┬─────────┘    │
   │   ┌────────┴─────────┐    │
   │   │    idle, prepare  │   │ ← 内部使用
   │   └────────┬─────────┘    │
   │   ┌────────┴─────────┐    │
   │   │      poll        │    │ ← I/O 回调、大部分代码
   │   └────────┬─────────┘    │
   │   ┌────────┴─────────┐    │
   │   │       check      │    │ ← setImmediate 回调
   │   └────────┬─────────┘    │
   │   ┌────────┴─────────┐    │
   │   │   close callbacks│    │ ← socket.on('close')
   │   └──────────────────┘    │
   └───────────────────────────┘

1.3 微任务与宏任务的优先级

在 Node.js 中,微任务的执行时机非常重要:

javascript snippetjavascript
setTimeout(() => {
  console.log('1. 宏任务 - setTimeout');
}, 0);

setImmediate(() => {
  console.log('2. 宏任务 - setImmediate');
});

new Promise((resolve) => {
  resolve('3. 同步Promise');
}).then((val) => {
  console.log(val); // 3. 微任务 - Promise.then
});

process.nextTick(() => {
  console.log('4. 微任务 - nextTick(最高优先级)');
});

// 在 Node.js 中输出顺序可能是:
// 3. 同步Promise -> 4. 微任务 - nextTick -> 1. 宏任务 - setTimeout -> 2. 宏任务 - setImmediate

关键点

  • process.nextTick() 的优先级高于所有微任务
  • 微任务在每个阶段之间执行
  • setImmediatesetTimeout(0) 的执行顺序不确定,取决于系统状态

1.4 常见的陷阱与解决方案

#### 陷阱一:CPU 密集型任务阻塞事件循环

javascript snippetjavascript
// ❌ 错误示例:CPU 密集型任务会阻塞事件循环
app.get('/heavy-computation', (req, res) => {
  let result = 0;
  for (let i = 0; i < 1000000000; i++) {
    result += i;  // 这会阻塞整个事件循环
  }
  res.json({ result });
});

// ✅ 正确做法:使用 Worker Threads
const { Worker } = require('worker_threads');

app.get('/heavy-computation', (req, res) => {
  const worker = new Worker('./heavy-worker.js', {
    workerData: { iterations: 1000000000 }
  });
  
  worker.on('message', (result) => {
    res.json({ result });
  });
  
  worker.on('error', (err) => {
    res.status(500).json({ error: err.message });
  });
});

#### 陷阱二:微任务堆积

javascript snippetjavascript
// ❌ 错误示例:递归 Promise 会导致微任务堆积
async function processAll(items) {
  for (const item of items) {
    await processItem(item); // 每次 await 都会创建微任务
  }
}

// ✅ 正确做法:分批处理
async function processAllBatched(items, batchSize = 100) {
  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);
    await Promise.all(batch.map(processItem));
    // 让出事件循环
    await new Promise(resolve => setImmediate(resolve));
  }
}

03.二、Stream:Node.js 的灵魂

2.1 为什么要用 Stream

Stream 是 Node.js 处理大规模数据的核心方式。想象一下读取一个 5GB 的视频文件:

javascript snippetjavascript
const fs = require('fs');

// ❌ 错误做法:一次性加载到内存
const data = fs.readFileSync('./large-file.mp4'); // 内存爆炸

// ✅ 正确做法:使用 Stream 边读边处理
const readStream = fs.createReadStream('./large-file.mp4');
readStream.on('data', (chunk) => {
  // 每次只读取一小块(默认 64KB)
  processChunk(chunk);
});

2.2 四种基本 Stream 类型

javascript snippetjavascript
const { 
  Readable,   // 可读流(数据源)
  Writable,   // 可写流(数据目标)
  Transform,  // 转换流(加工数据)
  Duplex      // 双工流(既可读又可写)
} = require('stream');

// 可读流示例:从数组创建
const readable = Readable.from(['Hello', ' ', 'World']);

// 可写流示例:写入文件
const writable = fs.createWriteStream('./output.txt');

// 转换流示例:大写转换
const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, chunk.toString().toUpperCase());
  }
});

// 管道连接
readable.pipe(upperCase).pipe(writable);

2.3 实际应用:构建一个文件处理流水线

javascript snippetjavascript
const { createReadStream, createWriteStream } = require('fs');
const { createGzip } = require('zlib');
const { pipeline } = require('stream/promises');

// 读取 -> 压缩 -> 写入
async function compressFile(inputPath, outputPath) {
  const source = createReadStream(inputPath);
  const gzip = createGzip();
  const destination = createWriteStream(outputPath);
  
  try {
    await pipeline(source, gzip, destination);
    console.log('压缩完成');
  } catch (err) {
    console.error('压缩失败:', err);
    // 需要手动清理
    fs.unlink(outputPath, () => {});
  }
}

2.4 Stream 的背压问题

背压(Backpressure)是指下游处理速度跟不上上游产生数据的速度:

javascript snippetjavascript
// ❌ 错误示例:没有处理背压
source.on('data', (chunk) => {
  // 如果 process 慢于数据产生速度,会导致内存暴涨
  destination.write(chunk);
});

// ✅ 正确做法:处理背压
source.on('data', (chunk) => {
  if (!destination.write(chunk)) {
    // 缓冲区满了,暂停读取
    source.pause();
  }
});

destination.on('drain', () => {
  // 缓冲区清空,恢复读取
  source.resume();
});

// 或者使用 pipeline 自动处理
const pipeline = require('stream/promises');
await pipeline(source, transform, destination);
// pipeline 自动处理背压

2.5 实践:处理大文件上传

javascript snippetjavascript
const { createWriteStream } = require('fs');
const { pipeline } = require('stream/promises');

app.post('/upload', async (req, res) => {
  const filename = `${Date.now()}-${req.headers['x-filename']}`;
  const out = createWriteStream(`./uploads/${filename}`);
  
  // 使用 pipeline 自动处理背压和错误
  try {
    await pipeline(req, out);
    res.json({ success: true, filename });
  } catch (err) {
    res.status(500).json({ error: '上传失败' });
  }
});

04.三、Cluster:多核利用

3.1 为什么需要 Cluster

Node.js 是单线程的,无法直接利用多核 CPU。Cluster 模块让我们可以创建多个工作进程,共享同一个端口:

javascript snippetjavascript
const cluster = require('cluster');
const os = require('os');

if (cluster.isMaster) {
  // 主进程
  const numCPUs = os.cpus().length;
  
  console.log(`主进程 ${process.pid} 启动`);
  console.log(`创建 ${numCPUs} 个工作进程`);
  
  // 创建工作进程
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  
  // 监听工作进程退出
  cluster.on('exit', (worker, code, signal) => {
    console.log(`工作进程 ${worker.process.pid} 退出`);
    // 重启退出 的进程
    cluster.fork();
  });
  
} else {
  // 工作进程
  const app = express();
  app.get('/', (req, res) => {
    res.json({ 
      pid: process.pid, 
      message: `处理请求的进程 ${process.pid}` 
    });
  });
  
  app.listen(3000, () => {
    console.log(`工作进程 ${process.pid} 监听 3000 端口`);
  });
}

3.2 Cluster 的工作模式

Cluster 支持两种负载均衡策略:

javascript snippetjavascript
// 轮询模式(默认)
// 适用于无状态服务
cluster.schedulingPolicy = cluster.SCHED_RR;

// 各凭本事模式(不推荐)
// 依赖操作系统调度
cluster.schedulingPolicy = cluster.SCHED_NONE;

3.3 进程间通信

javascript snippetjavascript
// 主进程
if (cluster.isMaster) {
  const worker = cluster.fork();
  
  // 发送消息给工作进程
  worker.send({ type: 'broadcast', message: 'hello' });
  
  // 接收工作进程消息
  worker.on('message', (msg) => {
    console.log('收到工作进程消息:', msg);
  });
}

// 工作进程
if (cluster.isWorker) {
  // 接收主进程消息
  process.on('message', (msg) => {
    console.log('收到主进程消息:', msg);
  });
  
  // 发送消息给主进程
  process.send({ type: 'result', data: '处理完成' });
}

3.4 零停机重启

javascript snippetjavascript
// 优雅重启:逐个重启工作进程,不中断服务
function gracefulRestart() {
  const workerIds = Object.keys(cluster.workers);
  let current = 0;
  
  function restartNext() {
    if (current >= workerIds.length) return;
    
    const workerId = workerIds[current++];
    const worker = cluster.workers[workerId];
    
    worker.on('exit', () => {
      // 旧进程退出后,创建新进程
      const newWorker = cluster.fork();
      newWorker.on('listening', () => {
        // 新进程就绪后,继续重启下一个
        restartNext();
      });
    });
    
    // 发送优雅退出信号
    worker.send('shutdown');
    
    // 5秒后强制退出
    setTimeout(() => {
      worker.kill();
    }, 5000);
  }
  
  restartNext();
}

05.四、生产环境最佳实践

4.1 进程管理:PM2

javascript snippetjavascript
// ecosystem.config.js
module.exports = {
  apps: [{
    name: 'my-api',
    script: './index.js',
    instances: 'max', // 使用所有 CPU 核心
    exec_mode: 'cluster',
    env: {
      NODE_ENV: 'development'
    },
    env_production: {
      NODE_ENV: 'production'
    },
    // 自动重启配置
    autorestart: true,
    watch: false,
    max_memory_restart: '1G',
    // 日志配置
    log_file: './logs/combined.log',
    out_file: './logs/out.log',
    error_file: './logs/error.log',
    log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
    // 健康检查
    wait_ready: true,
    listen_timeout: 3000,
    kill_timeout: 5000
  }]
};

4.2 内存监控

javascript snippetjavascript
// 定期检查内存使用
setInterval(() => {
  const used = process.memoryUsage();
  const heapUsed = Math.round(used.heapUsed / 1024 / 1024);
  const heapTotal = Math.round(used.heapTotal / 1024 / 1024);
  const rss = Math.round(used.rss / 1024 / 1024);
  
  console.log(`内存使用: heap=${heapUsed}MB/${heapTotal}MB, rss=${rss}MB`);
  
  // 如果内存使用超过阈值,记录警告
  if (heapUsed > 500) {
    console.warn('⚠️ 内存使用过高');
  }
}, 10000);

4.3 错误监控

javascript snippetjavascript
// 全局错误处理
process.on('uncaughtException', (err) => {
  console.error('未捕获的异常:', err);
  // 记录日志
  logger.error(err);
  // 优雅退出
  process.exit(1);
});

process.on('unhandledRejection', (reason, promise) => {
  console.error('未处理的 Promise 拒绝:', reason);
  logger.error({ reason, promise }, 'Unhandled Rejection');
});

06.总结

主题核心概念实际应用
事件循环6阶段、微任务优先避免阻塞、合理使用 async
Stream边读边处理、背压大文件处理、管道流
Cluster多进程、进程通信多核利用、零停机重启

掌握这些内容,你已经具备了 Node.js 后端开发的核心能力。下一篇文章我们将对比 Express 与 Fastify,帮助你选择合适的 Web 框架。

---

*如果你想了解更多关于 Node.js 性能调优的内容,可以阅读后续的《Node.js 性能分析与排查》专题文章。*