node express 多线程

我们知道标准的Javascript是单线程的, 无法真正的使用多核cpu, 过去的做法是在不同的端口上启动多个相同或者不同的服务

例如如果一个服务要运行在多个核心上, 就必需使用脚本启动在多个端口上, 然后用负载均衡服务器对请求进行分发, 实现起来较为复杂

使用cluster集群api可以让单个程序充分利用多核cpu, 下面来看看步骤

首先需要安装下面两个模块

npm install –save cluster
npm install –save os

然后是实现代码

var cluster = require("cluster");

var http = require("http");

var numCPUs = require("os").cpus().length;

if (cluster.isMaster) {

  // Fork workers.

  for (var i = 0; i < numCPUs; i++) {

    cluster.fork();

  }

 

  cluster.on("exit", function(worker, code, signal) {

    console.log("worker " + worker.process.pid + " died");

  });

} else {

  // Workers can share aclearny TCP connection

  // In this case its a HTTP server

  http

    .createServer(function(req, res) {

      res.writeHead(200);

      res.end("hello world\n");

    })

    .listen(3000);

}

cluster会在线程之间共享一个端口, 当有外部请求到达时, cluster会将这个请求转发到随机的子线程中.

这里的运行机制有点类似Nginx, 是由一个线程池 ( 通常大小为cpu物理核心数 ) 来处理所有请求, 而不是像Tomcat那样为每个请求创建一个线程.

当程序被运行后, isMaster会被设置为true, 然后就进入了第一个if块中, 在其中调用了 cluster.fork() 之后, 程序会创建一个子线程, 并重新运行, 这时cluster.isMaster会被设置成false, 需要注意的是这里有一个循环, 用于一次性创建多个子线程. 

下面这个例子里将演示如何在express环境中使用cluster多线程

express使用bin文件夹下的www脚本启动http server, 所以直接修改这个脚本就可以使用cluster

app/bin/www 将此脚本的内容修改为

#!/usr/bin/env node

 

var app = require("../app");

var debug = require("debug")("app:server");

var http = require("http");

 

var cluster = require("cluster");

var numCPUs = require("os").cpus().length;

 

if (cluster.isMaster) {

  console.log("[master] " + "start master...");

 

  for (var i = 0; i < numCPUs; i++) {

    cluster.fork();

  }

 

  cluster.on("listening", function(worker, address) {

    console.log(

      "[master] " +

        "listening: worker" +

        worker.id +

        ",pid:" +

        worker.process.pid +

        ", Address:" +

        address.address +

        ":" +

        address.port

    );

  });

} else if (cluster.isWorker) {

  console.log("[worker] " + "start worker ..." + cluster.worker.id);

  var server = http.createServer(app);

  server.listen(3000);

  server.on("error", onError);

  server.on("listening", onListening);

}

 

var port = normalizePort(process.env.PORT || "3000");

app.set("port", port);

 

function normalizePort(val) {

  var port = parseInt(val, 10);

 

  if (isNaN(port)) {

    // named pipe

    return val;

  }

 

  if (port >= 0) {

    // port number

    return port;

  }

 

  return false;

}

 

function onError(error) {

  if (error.syscall !== "listen") {

    throw error;

  }

 

  var bind = typeof port === "string" ? "Pipe " + port : "Port " + port;

 

  // handle specific listen errors with friendly messages

  switch (error.code) {

    case "EACCES":

      console.error(bind + " requires elevated privileges");

      process.exit(1);

      break;

    case "EADDRINUSE":

      console.error(bind + " is already in use");

      process.exit(1);

      break;

    default:

      throw error;

  }

}

 

function onListening() {

  var addr = server.address();

  var bind = typeof addr === "string" ? "pipe " + addr : "port " + addr.port;

  debug("Listening on " + bind);

}