简介

之前的文章中提到了,nodejs中有两种线程,一种是event loop用来相应用户的请求和处理各种callback。另一种就是worker pool用来处理各种耗时操作。

nodejs的官网提到了一个能够使用nodejs本地woker pool的lib叫做webworker-threads。

可惜的是webworker-threads的最后一次更新还是在2年前,而在最新的nodejs 12中,根本无法使用。

而webworker-threads的作者则推荐了一个新的lib叫做web-worker。

web-worker是构建于nodejs的worker_threads之上的,本文将会详细讲解worker_threads和web-worker的使用。

worker_threads

worker_threads模块的源代码源自lib/worker_threads.js,它指的是工作线程,可以开启一个新的线程来并行执行javascript程序。

worker_threads主要用来处理cpu密集型操作,而不是io操作,因为nodejs本身的异步io已经非常强大了。

worker_threads中主要有5个属性,3个class和3个主要的方法。接下来我们将会一一讲解。

ismainthread

ismainthread用来判断代码是否在主线程中运行,我们看一个使用的例子:

上面的例子中,我们从worker_threads模块中引入了worker和ismainthread,worker就是工作线程的主类,我们将会在后面详细讲解,这里我们使用worker创建了一个工作线程。

messagechannel

messagechannel代表的是一个异步双向通信channel。messagechannel中没有方法,主要通过messagechannel来连接两端的messageport。

当我们使用new messagechannel()的时候,会自动创建两个messageport。

通过messagechannel,我们可以进行messageport间的通信。

parentport和messageport

parentport是一个messageport类型,parentport主要用于worker线程和主线程进行消息交互。

通过parentport.postmessage()发送的消息在主线程中将可以通过worker.on(‘message’)接收。

主线程中通过worker.postmessage()发送的消息将可以在工作线程中通过parentport.on(‘message’)接收。

我们看一下messageport的定义:

messageport继承自eventemitter,它表示的是异步双向通信channel的一端。这个channel就叫做messagechannel,messageport通过messagechannel来进行通信。

我们可以通过messageport来传输结构体数据,内存区域或者其他的messageports。

从源代码中,我们可以看到messageport中有两个事件,close和message。

close事件将会在channel的中任何一端断开连接的时候触发,而message事件将会在port.postmessage时候触发,下面我们看一个例子:

port.on(‘message’)实际上为message事件添加了一个listener,port还提供了addlistener方法来手动添加listener。

port.on(‘message’)会自动触发port.start()方法,表示启动一个port。

当port有listener存在的时候,这表示port存在一个ref,当存在ref的时候,程序是不会结束的。我们可以通过调用port.unref方法来取消这个ref。

接下来我们看一下怎么通过port来传输消息:

postmessage可以接受两个参数,第一个参数是value,这是一个javascript对象。第二个参数是transferlist。

先看一个传递一个参数的情况:

通常来说postmessage发送的对象都是value的拷贝,但是如果你指定了transferlist,那么在transferlist中的对象将会被transfer到channel的接受端,并且不再存在于发送端,就好像把对象传送出去一样。

transferlist是一个list,list中的对象可以是arraybuffer, messageport 和 filehandle。

如果value中包含sharedarraybuffer对象,那么该对象不能被包含在transferlist中。

看一个包含两个参数的例子:

上面的例子将输出:

uint8array(4) [ 1, 2, 3, 4 ]
uint8array(4) [ 1, 2, 3, 4 ]

第一个postmessage是拷贝,第二个postmessage是transfer uint8array底层的buffer。

如果我们再次调用port2.postmessage(uint8array),我们会得到下面的错误:

buffer是typedarray的底层存储结构,如果buffer被transfer,那么之前的typedarray将会变得不可用。

markasuntransferable

要想避免这个问题,我们可以调用markasuntransferable将buffer标记为不可transferable. 我们看一个markasuntransferable的例子:

share_env

share_env是传递给worker构造函数的一个env变量,通过设置这个变量,我们可以在主线程与工作线程进行共享环境变量的读写。

workerdata

除了postmessage(),还可以通过在主线程中传递workerdata给worker的构造函数,从而将主线程中的数据传递给worker:

worker类

先看一下worker的定义:

worker继承自eventemitter,并且包含了4个重要的事件:error,exit,message和online。

worker表示的是一个独立的 javascript 执行线程,我们可以通过传递filename或者url来构造worker。

每一个worker都有一对内置的messageport,在worker创建的时候就会相互关联。worker使用这对内置的messageport来和父线程进行通信。

通过parentport.postmessage()发送的消息在主线程中将可以通过worker.on(‘message’)接收。

主线程中通过worker.postmessage()发送的消息将可以在工作线程中通过parentport.on(‘message’)接收。

当然,你也可以显式的创建messagechannel 对象,然后将messageport作为消息传递给其他线程,我们看一个例子:

上面的例子中,我们借助了worker和parentport本身的消息传递功能,传递了一个显式的messagechannel中的messageport。

然后又通过该messageport来进行消息的分发。

receivemessageonport

除了port的on(‘message’)方法之外,我们还可以使用receivemessageonport来手动接收消息:

movemessageporttocontext

先了解一下nodejs中的context的概念,我们可以从vm中创建context,它是一个隔离的上下文环境,从而保证不同运行环境的安全性,我们看一个context的例子:

在worker中,我们可以将一个messageport move到其他的context中。

这个方法接收两个参数,第一个参数就是要move的messageport,第二个参数就是vm.createcontext()创建的context对象。

worker_threads的线程池

上面我们提到了使用单个的worker thread,但是现在程序中一个线程往往是不够的,我们需要创建一个线程池来维护worker thread对象。

nodejs提供了asyncresource类,来作为对异步资源的扩展。

asyncresource类是async_hooks模块中的。

下面我们看下怎么使用asyncresource类来创建worker的线程池。

假设我们有一个task,使用来执行两个数相加,脚本名字叫做task_processor.js:

下面是worker pool的实现:

我们给worker创建了一个新的ktaskinfo属性,并且将异步的callback封装到workerpooltaskinfo中,赋值给worker.ktaskinfo.

接下来我们就可以使用workerpool了:

到此这篇关于nodejs中使用worker_threads来创建新的线程的方法的文章就介绍到这了,更多相关nodejs使用worker_threads创建线程内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!