nodejs で worker_threads を使用して新しいスレッドを作成する方法

nodejs で worker_threads を使用して新しいスレッドを作成する方法

導入

前の記事で述べたように、NodeJS には 2 種類のスレッドがあります。1 つは、ユーザー リクエストに応答し、さまざまなコールバックを処理するために使用されるイベント ループです。もう 1 つは、さまざまな時間のかかる操作を処理するために使用されるワーカー プールです。

nodejs の公式 Web サイトには、nodejs ローカル ワーカー プールを使用できる webworker-threads というライブラリが記載されています。

残念ながら、webworker-threads の最終更新は 2 年前であり、最新の nodejs 12 ではまったく使用できません。

webworker-threads の作成者は、web-worker と呼ばれる新しいライブラリを推奨しました。

Web-worker は、Node.js の worker_threads 上に構築されています。この記事では、worker_threads と Web-worker の使用について詳しく説明します。

ワーカースレッド

worker_threads モジュールのソース コードは lib/worker_threads.js から取得されます。これはワーカー スレッドを参照し、新しいスレッドを開始して JavaScript プログラムを並列に実行できます。

nodejs 自体の非同期 IO は非常に強力であるため、worker_threads は主に IO 操作ではなく CPU を集中的に使用する操作を処理するために使用されます。

worker_threads には 5 つのメイン属性、3 つのクラス、3 つのメインメソッドがあります。次に一つずつ説明していきます。

メインスレッド

isMainThread は、コードがメイン スレッドで実行されているかどうかを判断するために使用されます。使用例を見てみましょう。

const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
 console.log('メインスレッド内');
 新しいワーカー(__filename);
} それ以外 {
 console.log('ワーカースレッド内');
 console.log(isMainThread); // 'false' を出力します。
}

上記の例では、worker_threads モジュールから Worker と isMainThread を導入しました。Worker はワーカー スレッドのメイン クラスであり、後で詳しく説明します。ここでは、Worker を使用してワーカー スレッドを作成します。

メッセージチャネル

MessageChannel は非同期の双方向通信チャネルを表します。 MessageChannel にはメソッドはありません。MessageChannel は主に両端の MessagePorts を接続するために使用されます。

クラスMessageChannel {
  読み取り専用ポート1: MessagePort;
  読み取り専用ポート2: MessagePort;
 }

new MessageChannel() を使用すると、2 つの MessagePort が自動的に作成されます。

const { MessageChannel } = require('worker_threads');

const { port1, port2 } = 新しい MessageChannel();
port1.on('メッセージ', (メッセージ) => console.log('受信したメッセージ', メッセージ));
port2.postMessage({ foo: 'bar' });
// 出力: `port1.on('message')` リスナーから { foo: 'bar' } を受信しました

MessageChannel を介して、MessagePort 間で通信できます。

親ポートとメッセージポート

parentPort は MessagePort 型であり、主にワーカー スレッドとメイン スレッド間のメッセージのやり取りに使用されます。

parentPort.postMessage() 経由で送信されたメッセージは、worker.on('message') 経由でメイン スレッドで利用できるようになります。

メイン スレッドで worker.postMessage() 経由で送信されたメッセージは、parentPort.on('message') 経由でワーカー スレッドで受信されます。

MessagePort の定義を見てみましょう。

クラスMessagePortはEventEmitterを拡張します{
  閉じる(): void;
  postMessage(値: 任意、転送リスト?: Array<ArrayBuffer | MessagePort>): void;
  ref(): void;
  参照を解除します。
  開始(): void;

  addListener(イベント: "close", リスナー: () => void): this;
  addListener(イベント: "message", リスナー: (値: any) => void): this;
  addListener(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  発行(イベント: "close"): ブール値;
  出力(イベント: "メッセージ", 値: 任意): boolean;
  出力(イベント: 文字列 | シンボル、...引数: any[]): boolean;

  on(イベント: "close", リスナー: () => void): this;
  on(イベント: "message", リスナー: (値: any) => void): this;
  on(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  一度(イベント: "close", リスナー: () => void): this;
  once(イベント: "message", リスナー: (値: any) => void): this;
  once(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  prependListener(イベント: "close", リスナー: () => void): this;
  prependListener(イベント: "message", リスナー: (値: any) => void): this;
  prependListener(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  prependOnceListener(イベント: "close", リスナー: () => void): this;
  prependOnceListener(イベント: "message", リスナー: (値: any) => void): this;
  prependOnceListener(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  リスナーを削除します(イベント: "close", リスナー: () => void): this;
  リスナーを削除します(イベント: "message", リスナー: (値: any) => void): this;
  リスナーを削除します(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  off(イベント: "close", リスナー: () => void): this;
  off(イベント: "message", リスナー: (値: any) => void): this;
  off(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;
 }

MessagePort は、非同期双方向通信チャネルの一方の端を表す EventEmitter から継承されます。このチャネルは MessageChannel と呼ばれ、MessagePort は MessageChannel を介して通信します。

MessagePort を使用して、構造データ、メモリ領域、または他の MessagePort を転送できます。

ソース コードから、MessagePort に close と message の 2 つのイベントがあることがわかります。

チャネルのいずれかの端が切断されると、close イベントがトリガーされ、port.postMessage が呼び出されると、message イベントがトリガーされます。例を見てみましょう。

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = 新しい MessageChannel();

// 印刷:
// フーバー
// 閉店しました!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));

port1.postMessage('foobar');
ポート1を閉じます。

port.on('message') は、実際にはメッセージ イベントのリスナーを追加します。Port は、リスナーを手動で追加するための addListener メソッドも提供します。

port.on('message') は自動的に port.start() メソッドをトリガーし、ポートの開始を示します。

ポートにリスナーがある場合、ポートには参照があることを意味します。参照が存在する場合、プログラムは終了しません。 port.unref メソッドを呼び出すことで、この参照をキャンセルできます。

次に、ポートを介してメッセージを送信する方法を見てみましょう。

port.postMessage(値[, 転送リスト])

postMessage は 2 つのパラメータを受け入れることができ、最初のパラメータは JavaScript オブジェクトである value です。 2 番目のパラメータは transferList です。

まず、1 つのパラメータが渡される場合を見てみましょう。

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = 新しい MessageChannel();

port1.on('message', (message) => console.log(message));

const 円形データ = {};
円形データ.foo = 円形データ;
// 出力: { foo: [Circular] }
port2.postMessage(円形データ);

通常、postMessage によって送信されるオブジェクトは値のコピーですが、transferList を指定すると、transferList 内のオブジェクトはチャネルの受信側に転送され、オブジェクトを送信する場合と同様に、送信側には存在しなくなります。

transferList はリストであり、リスト内のオブジェクトは ArrayBuffer、MessagePort、FileHandle になります。

値に SharedArrayBuffer オブジェクトが含まれている場合、そのオブジェクトを transferList に含めることはできません。

2 つのパラメータを持つ例を見てみましょう。

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = 新しい MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = 新しい Uint8Array([ 1, 2, 3, 4 ]);
// uint8Array のコピーを投稿します:
port2.postMessage(uint8Array);

port2.postMessage(uint8Array, [ uint8Array.buffer ]);

//port2.postMessage(uint8Array);

上記の例では次のように出力されます:

Uint8Array(4) [ 1, 2, 3, 4 ]
Uint8Array(4) [ 1, 2, 3, 4 ]

最初の postMessage はコピーであり、2 番目の postMessage は Uint8Array の基になるバッファの転送です。

port2.postMessage(uint8Array) を再度呼び出すと、次のエラーが発生します。

DOMException [DataCloneError]: ArrayBuffer が分離されており、複製できませんでした。

バッファは TypedArray の基礎となるストレージ構造です。バッファが転送されると、以前の TypedArray は使用できなくなります。

譲渡不可としてマーク

この問題を回避するには、バッファを転送不可としてマークするために markAsUntransferable を呼び出します。markAsUntransferable の例を見てみましょう。

const { MessageChannel、markAsUntransferable } = require('worker_threads');

const pooledBuffer = 新しいArrayBuffer(8);
const typedArray1 = 新しい Uint8Array(pooledBuffer);
const typedArray2 = 新しい Float64Array(pooledBuffer);

転送不可としてマークします(pooledBuffer);

const { port1 } = 新しい MessageChannel();
port1.postMessage(typedArray1、[typedArray1.buffer]);

コンソールにログ出力します。
コンソールにログ出力します。

シェア_ENV

SHARE_ENV はワーカーコンストラクタに渡される環境変数です。この変数を設定することで、メインスレッドとワーカースレッド間で共有環境変数を読み書きできるようになります。

const { ワーカー、SHARE_ENV } = require('worker_threads');
新しいワーカー('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
 .on('終了', () => {
 console.log(process.env.SET_IN_WORKER); // 'foo' を出力します。
 });

ワーカーデータ

postMessage() に加えて、メイン スレッドのワーカー コンストラクターに workerData を渡すことによって、メイン スレッドからワーカーにデータを渡すこともできます。

const { Worker、isMainThread、workerData } = require('worker_threads');

if (isMainThread) {
 const worker = new Worker(__filename, { workerData: 'Hello, world!' });
} それ以外 {
 console.log(workerData); // 'Hello, world!' を出力します。
}

労働者階級

まず労働者の定義を見てみましょう。

 クラスWorkerはEventEmitterを拡張します{
  読み取り専用 stdin: 書き込み可能 | null;
  読み取り専用 stdout: 読み取り可能;
  読み取り専用 stderr: 読み取り可能;
  読み取り専用スレッドID: 数値;
  読み取り専用リソース制限?: ResourceLimits;

  コンストラクター(ファイル名: 文字列 | URL、オプション: WorkerOptions);

  postMessage(値: 任意、転送リスト?: Array<ArrayBuffer | MessagePort>): void;
  ref(): void;
  参照を解除します。

  終了(): Promise<数値>;

  getHeapSnapshot(): Promise<Readable>;

  addListener(イベント: "error", リスナー: (err: Error) => void): this;
  addListener(イベント: "exit", リスナー: (exitCode: number) => void): this;
  addListener(イベント: "message", リスナー: (値: any) => void): this;
  リスナーを追加します(イベント: "online", リスナー: () => void): this;
  addListener(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  ... 
 }

Worker は EventEmitter を継承し、エラー、終了、メッセージ、オンラインの 4 つの重要なイベントが含まれます。

ワーカーは独立した JavaScript 実行スレッドを表します。ファイル名または URL を渡すことでワーカーを構築できます。

各ワーカーには、ワーカーの作成時に互いに関連付けられた組み込み MessagePort のペアがあります。ワーカーは、この組み込み MessagePort のペアを使用して親スレッドと通信します。

parentPort.postMessage() 経由で送信されたメッセージは、worker.on('message') 経由でメイン スレッドで利用できるようになります。

メイン スレッドで worker.postMessage() 経由で送信されたメッセージは、parentPort.on('message') 経由でワーカー スレッドで受信されます。

もちろん、MessageChannel オブジェクトを明示的に作成し、MessagePort をメッセージとして他のスレッドに渡すこともできます。例を見てみましょう。

const assert = require('assert');
定数{
 ワーカー、メッセージチャネル、メッセージポート、isMainThread、親ポート
} = 'worker_threads' が必要です。
if (isMainThread) {
 定数ワーカー = 新しいワーカー (__filename);
 定数サブチャネル = 新しいメッセージチャネル();
 ワーカー.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
 subChannel.port2.on('メッセージ', (値) => {
 console.log('受信:', 値);
 });
} それ以外 {
 parentPort.once('メッセージ', (値) => {
 assert(value.hereIsYourPort は MessagePort のインスタンスです);
 value.hereIsYourPort.postMessage('ワーカースレッドがこのメッセージを送信しています');
 value.hereIsYourPort.close();
 });
}

上記の例では、worker と parentPort のメッセージング機能を利用して、明示的な MessageChannel で MessagePort を渡しました。

その後、メッセージは MessagePort を通じて配信されます。

受信メッセージオンポート

ポートの on('message') メソッドに加えて、receiveMessageOnPort を使用してメッセージを手動で受信することもできます。

const { MessageChannel、receiveMessageOnPort } = require('worker_threads');
const { port1, port2 } = 新しい MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(ポート2でメッセージを受信);
// 出力: { message: { hello: 'world' } }
console.log(ポート2でメッセージを受信);
// 出力: undefined

メッセージポートをコンテキストへ移動

まず、Node.js のコンテキストの概念を理解しましょう。VM からコンテキストを作成できます。これは分離されたコンテキスト環境であり、さまざまなオペレーティング環境のセキュリティを確保します。コンテキストの例を見てみましょう。

定数 vm = require('vm');

定数x = 1;

定数コンテキスト = { x: 2 };
vm.createContext(context); // コンテキスト分離オブジェクト。

定数コード = 'x += 40; var y = 17;';
// `x` と `y` はコンテキスト内のグローバル変数です。
// 最初は、context.x の値が 2 であるため、x の値は 2 になります。
vm.runInContext(コード、コンテキスト);

コンソールログ(コンテキストx); // 42
コンソールログ(context.y); // 17

console.log(x); // 1; yは定義されていません。

ワーカーでは、MessagePort を別のコンテキストに移動できます。

ワーカー.メッセージポートをコンテキストに移動する(ポート、コンテキスト化されたサンドボックス)

このメソッドは 2 つのパラメータを受け取ります。最初のパラメータは移動する MessagePort であり、2 番目のパラメータは vm.createContext() によって作成されたコンテキスト オブジェクトです。

worker_threads スレッドプール

上記では単一のワーカー スレッドの使用について説明しましたが、プログラムでは 1 つのスレッドでは不十分な場合が多く、ワーカー スレッド オブジェクトを維持するためにスレッド プールを作成する必要があります。

Nodejs は、非同期リソースの拡張として AsyncResource クラスを提供します。

AsyncResource クラスは async_hooks モジュールにあります。

次に、AsyncResource クラスを使用してワーカー スレッド プールを作成する方法を見てみましょう。

2 つの数値を加算するタスクがあり、スクリプト名が task_processor.js であるとします。

const { 親ポート } = require('worker_threads');
parentPort.on('メッセージ', (タスク) => {
 親ポートにメッセージを投稿します。
});

ワーカー プールの実装は次のとおりです。

const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
定数パス = require('path');
const { ワーカー } = require('worker_threads');

const kTaskInfo = シンボル('kTaskInfo');
const kWorkerFreedEvent = シンボル('kWorkerFreedEvent');

WorkerPoolTask​​InfoクラスはAsyncResourceを拡張します{
 コンストラクタ(コールバック) {
 スーパー('ワーカープールタスク情報');
 this.callback = コールバック;
 }

 完了(エラー、結果) {
 this.runInAsyncScope(this.callback、null、err、結果);
 this.emitDestroy(); // `TaskInfo` は 1 回だけ使用されます。
 }
}

クラス WorkerPool は EventEmitter を拡張します {
 コンストラクター(スレッド数) {
 素晴らしい();
 this.numThreads = numThreads;
 this.workers = [];
 this.freeWorkers = [];

 (i = 0 とします; i < numThreads; i++)
  新しいワーカーを追加します。
 }

 新しいワーカーを追加します(){
 定数ワーカー = 新しいワーカー (path.resolve(__dirname, 'task_processor.js'));
 ワーカー.on('メッセージ', (結果) => {
  // 成功した場合: `runTask` に渡されたコールバックを呼び出します。
  // Worker に関連付けられた `TaskInfo` を削除し、空きとしてマークします
  // また。
  ワーカー[kTaskInfo].done(null、結果);
  ワーカー[kTaskInfo] = null;
  this.freeWorkers.push(ワーカー);
  this.emit(kWorkerFreedEvent);
 });
 ワーカー.on('エラー', (err) => {
  // キャッチされない例外の場合: 渡されたコールバックを呼び出します
  // エラーのある `runTask`。
  if (ワーカー[kTaskInfo])
  ワーカー[kTaskInfo].done(err, null);
  それ以外
  this.emit('エラー', err);
  // リストからワーカーを削除し、新しいワーカーを開始して置き換えます。
  // 現在のもの。
  this.workers.splice(this.workers.indexOf(ワーカー), 1);
  新しいワーカーを追加します。
 });
 this.workers.push(ワーカー);
 this.freeWorkers.push(ワーカー);
 this.emit(kWorkerFreedEvent);
 }

 runTask(タスク、コールバック) {
 if (this.freeWorkers.length === 0) {
  // 空きスレッドがないので、ワーカー スレッドが空くまで待機します。
  this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
  戻る;
 }

 ワーカーを解放します。
 ワーカー[kTaskInfo] = 新しい WorkerPoolTask​​Info(コールバック);
 ワーカー.postMessage(タスク);
 }

 近い() {
 (this.workers の const ワーカー) に対して、worker.terminate();
 }
}

モジュールをエクスポートします。

ワーカーの新しい kTaskInfo プロパティを作成し、非同期コールバックを WorkerPoolTask​​Info にカプセル化して、worker.kTaskInfo に割り当てます。

次にworkerPoolを使用します。

ワーカープールを require('./worker_pool.js');
os が必要です。

定数プール = 新しい WorkerPool(os.cpus().length);

終了 = 0 にします。
(i = 0; i < 10; i++ とします) {
 pool.runTask({ a: 42, b: 100 }, (err, 結果) => {
 console.log(i, err, 結果);
 (++終了 === 10)の場合
  プールを閉じます。
 });
}

これで、nodejs で worker_threads を使用して新しいスレッドを作成する方法についての記事は終了です。nodejs で worker_threads を使用してスレッドを作成する方法の詳細については、123WORDPRESS.COM の以前の記事を検索するか、次の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。

以下もご興味があるかもしれません:
  • プロセス解析を使用する Javascript Web Worker
  • Yii2 と Workerman の Websocket の例を組み合わせた詳細な説明
  • JavaScript での Web ワーカー マルチスレッド API の研究
  • Node.js のワーカー スレッドの詳細な理解
  • 複数ページ通信を実現する JavaScript の sharedWorker の詳細な例
  • Javascript ワーカー サブスレッド コード例
  • JavaScript でのワーカー イベント API の理解
  • JS で webWorker を使用する方法

<<:  MySQL countの詳細な説明と関数のサンプルコード

>>:  Win7 の VMware 仮想マシンに Linux7.2 をインストールするインターネット アクセス構成チュートリアル

推薦する

NginxとLuaによるグレースケールリリースの実装

memcachedをインストールする yum インストール -y memcached #memcac...

Linux で PHP curl 拡張機能をインストールする方法の詳細な説明

この記事では、Linux で PHP curl 拡張機能をインストールする方法について説明します。ご...

JavaScript でグレイウルフのポットビーティングゲームを実装

1. プロジェクト文書 2. ページレイアウトにHTMLとCSSを使用するHTML部分 <di...

Linux システムに docker をインストールし、ssh 経由で docker コンテナにログインする方法

注: 私はCentosを使ってdockerをインストールしていますステップ1: Dockerをインス...

Nginx ログ出力のリクエスト後パラメータを設定する方法

【序文】当プロジェクトの SMS 機能は、第三者に接続することです。第三者からの元の受信確認要求は ...

Windows 10 での MySQL 8.0.19 のインストールと設定のチュートリアル

来学期にMySQLを勉強します。事前に自宅で練習していませんでした。インストールに時間がかかるとは思...

無効と読み取り専用の機能と違い

1: readonly は、このコントロールをロックして、インターフェイス上で変更できないようにしま...

MySQL ロック(テーブルロック、行ロック、共有ロック、排他ロック、ギャップロック)の詳細な説明

現実世界では、鍵は外の世界から身を隠したいときに使用するツールです。コンピュータでは、複数のプロセス...

CentOS システムのディスク パーティションを拡張する方法

問題/障害/シナリオ/要件Eve-ng の仮想マシン OVA のハードディスクは 38G しかないた...

docker run後にコンテナがExited (0)と表示される問題を解決する

Centos7 上で openresty 用の Dockerfile を作成し、ビルドしました。 d...

MySQL ストアドプロシージャとストアドファンクションの詳細な説明

1 ストアドプロシージャ1.1 ストアドプロシージャとは何かストアド プロシージャは、特定の機能を実...

js ドラッグ アンド ドロップ テーブルでコンテンツ計算を実現する

この記事の例では、コンテンツの計算を実現するためのjsドラッグアンドドロップテーブルの具体的なコード...

レスポンシブフレームワークのテーブルヘッダーの自動改行問題に対する簡単な解決策

最近、Bootstrap を使って Web サイトを開発しています。表を処理していたところ、PC で...

JSはプログレスバーのスムーズバージョンの詳細な計画を実装します

進捗バーがスムーズではないフロントエンドを学ぶ学生のほとんどは、オーディオプレーヤーやビデオプレーヤ...

MySQL の自動増分 ID (主キー) が不足した場合の解決策

MySQL で使用される自動インクリメント ID には多くの種類があり、各自動インクリメント ID ...