Atomics.wait
と Atomics.notify
は、ミューテックスやその他の同期手段を実装するのに役立つ低レベルの同期プリミティブです。ただし、Atomics.wait
はブロッキングであるため、メインスレッドで呼び出すことはできません(そうしようとすると TypeError
がスローされます)。
バージョン 8.7 以降、V8 はブロッキングしないバージョンである Atomics.waitAsync
をサポートしており、メインスレッドでも使用できます。
この記事では、これらの低レベル API を使用して、同期(ワーカースレッド用)と非同期(ワーカースレッドまたはメインスレッド用)の両方で動作するミューテックスを実装する方法について説明します。
Atomics.wait
と Atomics.waitAsync
は、次のパラメータを取ります。
buffer
:SharedArrayBuffer
によってバッキングされたInt32Array
またはBigInt64Array
index
: 配列内の有効なインデックスexpectedValue
:(buffer, index)
で記述されるメモリ位置に存在すると予想される値timeout
: タイムアウト(ミリ秒単位)(オプション、デフォルトはInfinity
)
Atomics.wait
の戻り値は文字列です。メモリ位置に期待値が含まれていない場合、Atomics.wait
はすぐに 'not-equal'
を返します。それ以外の場合、スレッドは、別のスレッドが同じメモリ位置で Atomics.notify
を呼び出すか、タイムアウトに達するまでブロックされます。前者の場合、Atomics.wait
は 'ok'
を返し、後者の場合、Atomics.wait
は 'timed-out'
を返します。
Atomics.notify
は、次のパラメータを取ります。
SharedArrayBuffer
によってバッキングされたInt32Array
またはBigInt64Array
- インデックス(配列内で有効)
- 通知する待機中のスレッドの数(オプション、デフォルトは
Infinity
)
(buffer, index)
で記述されるメモリ位置で待機している、指定された数の待機中のスレッドに、FIFO 順に通知します。同じ場所に関連する複数の保留中の Atomics.wait
呼び出しまたは Atomics.waitAsync
呼び出しがある場合、それらはすべて同じ FIFO キューにあります。
Atomics.wait
とは対照的に、Atomics.waitAsync
は常にすぐに返ります。戻り値は次のいずれかです。
{ async: false, value: 'not-equal' }
(メモリ位置に期待値が含まれていない場合){ async: false, value: 'timed-out' }
(即時タイムアウト 0 の場合のみ){ async: true, value: promise }
promise は、後で文字列値 'ok'
(Atomics.notify
が同じメモリ位置で呼び出された場合)または 'timed-out'
(タイムアウトに達した場合)で解決される場合があります。promise が拒否されることはありません。
次の例は、Atomics.waitAsync
の基本的な使用方法を示しています。
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ timeout (opt)
// | ^ expected value
// ^ index
if (result.value === 'not-equal') {
// The value in the SharedArrayBuffer was not the expected one.
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* notified */ }
else { /* value is 'timed-out' */ }
});
}
// In this thread, or in another thread:
Atomics.notify(i32a, 0);
次に、同期と非同期の両方で使用できるミューテックスを実装する方法を示します。ミューテックスの同期バージョンの実装については、以前このブログ記事などで説明されています。
この例では、Atomics.wait
と Atomics.waitAsync
でタイムアウトパラメータを使用していません。このパラメータは、タイムアウト付きの条件変数を実装するために使用できます。
ミューテックスクラス AsyncLock
は、SharedArrayBuffer
を操作し、次のメソッドを実装します。
lock
— ミューテックスをロックできるようになるまでスレッドをブロックします(ワーカースレッドでのみ使用可能)unlock
— ミューテックスのロックを解除します(lock
の対応部分)executeLocked(callback)
— ブロッキングしないロック。メインスレッドで使用できます。ロックを取得できたらcallback
を実行するようにスケジュールします。
それぞれがどのように実装できるかを見てみましょう。クラス定義には、定数と、SharedArrayBuffer
をパラメータとして取るコンストラクタが含まれています。
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
ここで、i32a[0]
には、LOCKED
または UNLOCKED
のいずれかの値が含まれています。また、Atomics.wait
と Atomics.waitAsync
の待機場所でもあります。AsyncLock
クラスは、次の不変条件を保証します。
i32a[0] == LOCKED
の場合、スレッドがi32a[0]
で待機を開始すると(Atomics.wait
またはAtomics.waitAsync
を介して)、最終的に通知されます。- 通知を受け取った後、スレッドはロックの取得を試みます。ロックを取得した場合、解放時に再び通知します。
同期ロックとロック解除 #
次に、ワーカースレッドからのみ呼び出すことができるブロッキング lock
メソッドを示します。
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< expected value at start
}
}
スレッドが lock()
を呼び出すと、最初に Atomics.compareExchange
を使用してロック状態を UNLOCKED
から LOCKED
に変更することで、ロックの取得を試みます。Atomics.compareExchange
は状態変更をアトミックに実行しようとし、メモリ位置の元の値を返します。元の値が UNLOCKED
だった場合、状態変更が成功し、スレッドがロックを取得したことがわかります。それ以上の操作は必要ありません。
Atomics.compareExchange
がロック状態の変更に失敗した場合、別のスレッドがロックを保持している必要があります。したがって、このスレッドは、他のスレッドがロックを解放するのを待つために Atomics.wait
を試みます。メモリ位置にまだ期待値(この場合は AsyncLock.LOCKED
)が保持されている場合、Atomics.wait
を呼び出すとスレッドがブロックされ、別のスレッドが Atomics.notify
を呼び出すまで Atomics.wait
呼び出しは返されません。
unlock
メソッドは、ロックを UNLOCKED
状態に設定し、Atomics.notify
を呼び出して、ロックを待機していた待機中のスレッドの1つをウェイクアップします。このスレッドがロックを保持しており、その間に他の誰も unlock()
を呼び出すべきではないため、状態変更は常に成功すると予想されます。
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.LOCKED,
/* new value >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
単純なケースは次のとおりです。ロックは解放されており、スレッド T1 は Atomics.compareExchange
でロック状態を変更することでロックを取得します。スレッド T2 は Atomics.compareExchange
を呼び出してロックを取得しようとしますが、ロック状態の変更に失敗します。次に、T2 は Atomics.wait
を呼び出し、スレッドをブロックします。ある時点で、T1 はロックを解放し、Atomics.notify
を呼び出します。これにより、T2 の Atomics.wait
呼び出しが 'ok'
を返し、T2 がウェイクアップします。次に、T2 は再びロックの取得を試み、今回は成功します。
2つのコーナーケースも考えられます。これらは、Atomics.wait
と Atomics.waitAsync
がインデックスで特定の値をチェックする理由を示しています。
- T1 がロックを保持しており、T2 がロックを取得しようとします。最初に、T2 は
Atomics.compareExchange
でロック状態を変更しようとしますが、成功しません。しかし、その後、T2 がAtomics.wait
を呼び出す前に、T1 はロックを解放します。T2 がAtomics.wait
を呼び出すと、すぐに値'not-equal'
を返します。その場合、T2 は次のループ反復を続行し、再びロックの取得を試みます。 - T1 がロックを保持しており、T2 が
Atomics.wait
でロックを待機しています。T1 がロックを解放すると、T2 はウェイクアップし(Atomics.wait
呼び出しが返されます)、Atomics.compareExchange
を実行してロックを取得しようとしますが、別のスレッド T3 の方が高速で、すでにロックを取得しています。そのため、Atomics.compareExchange
の呼び出しはロックの取得に失敗し、T2 は再びAtomics.wait
を呼び出し、T3 がロックを解放するまでブロックされます。
後者のコーナーケースのため、ミューテックスは「公平」ではありません。T2 がロックの解放を待機していても、T3 が来てすぐにロックを取得する可能性があります。より現実的なロック実装では、複数の状態を使用して、「ロック済み」と「競合状態でロック済み」を区別する場合があります。
非同期ロック #
ブロッキングしない executeLocked
メソッドは、ブロッキング lock
メソッドとは異なり、メインスレッドから呼び出すことができます。コールバック関数を唯一のパラメータとして受け取り、ロックの取得に成功したらコールバックを実行するようにスケジュールします。
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ expected value at start
await result.value;
}
}
tryGetLock();
}
内部関数 tryGetLock
は、以前と同様に、最初に Atomics.compareExchange
でロックの取得を試みます。それが正常にロック状態を変更した場合、コールバックを実行し、ロックを解除して、戻ることができます.
Atomics.compareExchange
がロックの取得に失敗した場合、ロックが解放されている可能性があるときに再試行する必要があります。ブロックしてロックが解放されるのを待つことはできません。代わりに、Atomics.waitAsync
とそれが返す Promise を使用して新しい試行をスケジュールします。
Atomics.waitAsync
を正常に開始できた場合、返された Promise は、ロックを保持しているスレッドが Atomics.notify
を実行すると解決されます。その後、ロックを待機していたスレッドは、以前のように再びロックの取得を試みます。
非同期バージョンでも同じコーナーケース(Atomics.compareExchange
呼び出しと Atomics.waitAsync
呼び出しの間でロックが解放される、および Promise の解決と Atomics.compareExchange
呼び出しの間でロックが再び取得される)が発生する可能性があるため、コードはそれらを堅牢な方法で処理する必要があります。
結論 #
この記事では、同期プリミティブ Atomics.wait
、Atomics.waitAsync
、および Atomics.notify
を使用して、メインスレッドとワーカースレッドの両方で使用できるミューテックスを実装する方法を示しました。
機能サポート #
Atomics.wait
と Atomics.notify
#
- Chrome: バージョン 68 以降サポートされています
- Firefox: バージョン 78 以降サポートされています
- Safari: サポートされていません
- Node.js: バージョン 8.10.0 以降サポートされています
- Babel: サポートされていません
Atomics.waitAsync
#
- Chrome: バージョン 87 以降サポートされています
- Firefox: サポートされていません
- Safari: サポートされていません
- Node.js: バージョン 16 以降サポートされています
- Babel: サポートされていません