Atomics.waitAtomics.notifyAtomics.waitAsync

公開日 · タグ: ECMAScript ES2020

Atomics.waitAtomics.notify は、ミューテックスやその他の同期手段を実装するのに役立つ低レベルの同期プリミティブです。ただし、Atomics.wait はブロッキングであるため、メインスレッドで呼び出すことはできません(そうしようとすると TypeError がスローされます)。

バージョン 8.7 以降、V8 はブロッキングしないバージョンである Atomics.waitAsync をサポートしており、メインスレッドでも使用できます。

この記事では、これらの低レベル API を使用して、同期(ワーカースレッド用)と非同期(ワーカースレッドまたはメインスレッド用)の両方で動作するミューテックスを実装する方法について説明します。

Atomics.waitAtomics.waitAsync は、次のパラメータを取ります。

  • buffer: SharedArrayBuffer によってバッキングされた Int32Array または BigInt64Array
  • index: 配列内の有効なインデックス
  • expectedValue: (buffer, index) で記述されるメモリ位置に存在すると予想される値
  • timeout: タイムアウト(ミリ秒単位)(オプション、デフォルトは Infinity

Atomics.wait の戻り値は文字列です。メモリ位置に期待値が含まれていない場合、Atomics.wait はすぐに 'not-equal' を返します。それ以外の場合、スレッドは、別のスレッドが同じメモリ位置で Atomics.notify を呼び出すか、タイムアウトに達するまでブロックされます。前者の場合、Atomics.wait'ok' を返し、後者の場合、Atomics.wait'timed-out' を返します。

Atomics.notify は、次のパラメータを取ります。

  • SharedArrayBuffer によってバッキングされた Int32Array または BigInt64Array
  • インデックス(配列内で有効)
  • 通知する待機中のスレッドの数(オプション、デフォルトは Infinity

(buffer, index) で記述されるメモリ位置で待機している、指定された数の待機中のスレッドに、FIFO 順に通知します。同じ場所に関連する複数の保留中の Atomics.wait 呼び出しまたは Atomics.waitAsync 呼び出しがある場合、それらはすべて同じ FIFO キューにあります。

Atomics.wait とは対照的に、Atomics.waitAsync は常にすぐに返ります。戻り値は次のいずれかです。

  • { async: false, value: 'not-equal' } (メモリ位置に期待値が含まれていない場合)
  • { async: false, value: 'timed-out' } (即時タイムアウト 0 の場合のみ)
  • { async: true, value: promise }

promise は、後で文字列値 'ok'Atomics.notify が同じメモリ位置で呼び出された場合)または 'timed-out'(タイムアウトに達した場合)で解決される場合があります。promise が拒否されることはありません。

次の例は、Atomics.waitAsync の基本的な使用方法を示しています。

const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ timeout (opt)
// | ^ expected value
// ^ index

if (result.value === 'not-equal') {
// The value in the SharedArrayBuffer was not the expected one.
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* notified */ }
else { /* value is 'timed-out' */ }
});
}

// In this thread, or in another thread:
Atomics.notify(i32a, 0);

次に、同期と非同期の両方で使用できるミューテックスを実装する方法を示します。ミューテックスの同期バージョンの実装については、以前このブログ記事などで説明されています。

この例では、Atomics.waitAtomics.waitAsync でタイムアウトパラメータを使用していません。このパラメータは、タイムアウト付きの条件変数を実装するために使用できます。

ミューテックスクラス AsyncLock は、SharedArrayBuffer を操作し、次のメソッドを実装します。

  • lock — ミューテックスをロックできるようになるまでスレッドをブロックします(ワーカースレッドでのみ使用可能)
  • unlock — ミューテックスのロックを解除します(lock の対応部分)
  • executeLocked(callback) — ブロッキングしないロック。メインスレッドで使用できます。ロックを取得できたら callback を実行するようにスケジュールします。

それぞれがどのように実装できるかを見てみましょう。クラス定義には、定数と、SharedArrayBuffer をパラメータとして取るコンストラクタが含まれています。

class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;

constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}

lock() {
/* … */
}

unlock() {
/* … */
}

executeLocked(f) {
/* … */
}
}

ここで、i32a[0] には、LOCKED または UNLOCKED のいずれかの値が含まれています。また、Atomics.waitAtomics.waitAsync の待機場所でもあります。AsyncLock クラスは、次の不変条件を保証します。

  1. i32a[0] == LOCKED の場合、スレッドが i32a[0] で待機を開始すると(Atomics.wait または Atomics.waitAsync を介して)、最終的に通知されます。
  2. 通知を受け取った後、スレッドはロックの取得を試みます。ロックを取得した場合、解放時に再び通知します。

同期ロックとロック解除 #

次に、ワーカースレッドからのみ呼び出すことができるブロッキング lock メソッドを示します。

lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< expected value at start
}
}

スレッドが lock() を呼び出すと、最初に Atomics.compareExchange を使用してロック状態を UNLOCKED から LOCKED に変更することで、ロックの取得を試みます。Atomics.compareExchange は状態変更をアトミックに実行しようとし、メモリ位置の元の値を返します。元の値が UNLOCKED だった場合、状態変更が成功し、スレッドがロックを取得したことがわかります。それ以上の操作は必要ありません。

Atomics.compareExchange がロック状態の変更に失敗した場合、別のスレッドがロックを保持している必要があります。したがって、このスレッドは、他のスレッドがロックを解放するのを待つために Atomics.wait を試みます。メモリ位置にまだ期待値(この場合は AsyncLock.LOCKED)が保持されている場合、Atomics.wait を呼び出すとスレッドがブロックされ、別のスレッドが Atomics.notify を呼び出すまで Atomics.wait 呼び出しは返されません。

unlock メソッドは、ロックを UNLOCKED 状態に設定し、Atomics.notify を呼び出して、ロックを待機していた待機中のスレッドの1つをウェイクアップします。このスレッドがロックを保持しており、その間に他の誰も unlock() を呼び出すべきではないため、状態変更は常に成功すると予想されます。

unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.LOCKED,
/* new value >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}

単純なケースは次のとおりです。ロックは解放されており、スレッド T1 は Atomics.compareExchange でロック状態を変更することでロックを取得します。スレッド T2 は Atomics.compareExchange を呼び出してロックを取得しようとしますが、ロック状態の変更に失敗します。次に、T2 は Atomics.wait を呼び出し、スレッドをブロックします。ある時点で、T1 はロックを解放し、Atomics.notify を呼び出します。これにより、T2 の Atomics.wait 呼び出しが 'ok' を返し、T2 がウェイクアップします。次に、T2 は再びロックの取得を試み、今回は成功します。

2つのコーナーケースも考えられます。これらは、Atomics.waitAtomics.waitAsync がインデックスで特定の値をチェックする理由を示しています。

  • T1 がロックを保持しており、T2 がロックを取得しようとします。最初に、T2 は Atomics.compareExchange でロック状態を変更しようとしますが、成功しません。しかし、その後、T2 が Atomics.wait を呼び出す前に、T1 はロックを解放します。T2 が Atomics.wait を呼び出すと、すぐに値 'not-equal' を返します。その場合、T2 は次のループ反復を続行し、再びロックの取得を試みます。
  • T1 がロックを保持しており、T2 が Atomics.wait でロックを待機しています。T1 がロックを解放すると、T2 はウェイクアップし(Atomics.wait 呼び出しが返されます)、Atomics.compareExchange を実行してロックを取得しようとしますが、別のスレッド T3 の方が高速で、すでにロックを取得しています。そのため、Atomics.compareExchange の呼び出しはロックの取得に失敗し、T2 は再び Atomics.wait を呼び出し、T3 がロックを解放するまでブロックされます。

後者のコーナーケースのため、ミューテックスは「公平」ではありません。T2 がロックの解放を待機していても、T3 が来てすぐにロックを取得する可能性があります。より現実的なロック実装では、複数の状態を使用して、「ロック済み」と「競合状態でロック済み」を区別する場合があります。

非同期ロック #

ブロッキングしない executeLocked メソッドは、ブロッキング lock メソッドとは異なり、メインスレッドから呼び出すことができます。コールバック関数を唯一のパラメータとして受け取り、ロックの取得に成功したらコールバックを実行するようにスケジュールします。

executeLocked(f) {
const self = this;

async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ expected value at start
await result.value;
}
}

tryGetLock();
}

内部関数 tryGetLock は、以前と同様に、最初に Atomics.compareExchange でロックの取得を試みます。それが正常にロック状態を変更した場合、コールバックを実行し、ロックを解除して、戻ることができます.

Atomics.compareExchange がロックの取得に失敗した場合、ロックが解放されている可能性があるときに再試行する必要があります。ブロックしてロックが解放されるのを待つことはできません。代わりに、Atomics.waitAsync とそれが返す Promise を使用して新しい試行をスケジュールします。

Atomics.waitAsync を正常に開始できた場合、返された Promise は、ロックを保持しているスレッドが Atomics.notify を実行すると解決されます。その後、ロックを待機していたスレッドは、以前のように再びロックの取得を試みます。

非同期バージョンでも同じコーナーケース(Atomics.compareExchange 呼び出しと Atomics.waitAsync 呼び出しの間でロックが解放される、および Promise の解決と Atomics.compareExchange 呼び出しの間でロックが再び取得される)が発生する可能性があるため、コードはそれらを堅牢な方法で処理する必要があります。

結論 #

この記事では、同期プリミティブ Atomics.waitAtomics.waitAsync、および Atomics.notify を使用して、メインスレッドとワーカースレッドの両方で使用できるミューテックスを実装する方法を示しました。

機能サポート #

Atomics.waitAtomics.notify #

  • Chrome: バージョン 68 以降サポートされています
  • Firefox: バージョン 78 以降サポートされています
  • Safari: サポートされていません
  • Node.js: バージョン 8.10.0 以降サポートされています
  • Babel: サポートされていません

Atomics.waitAsync #

  • Chrome: バージョン 87 以降サポートされています
  • Firefox: サポートされていません
  • Safari: サポートされていません
  • Node.js: バージョン 16 以降サポートされています
  • Babel: サポートされていません