From ecd7322637e54b5f34dfa310249d819e944c9171 Mon Sep 17 00:00:00 2001 From: Chris Sauve Date: Thu, 15 Aug 2024 23:15:50 -0400 Subject: [PATCH] Update `ThreadAbortSignal` API to use classes. (#816) --- .changeset/orange-rockets-listen.md | 49 +++++ packages/quilt/source/threads.ts | 6 +- packages/threads/README.md | 66 +++++-- packages/threads/source/ThreadAbortSignal.ts | 173 ++++++++++++++++++ packages/threads/source/abort-signal.ts | 3 - .../threads/source/abort-signal/accept.ts | 48 ----- .../threads/source/abort-signal/create.ts | 45 ----- packages/threads/source/abort-signal/types.ts | 19 -- packages/threads/source/index.ts | 8 +- packages/threads/source/signals/accept.ts | 5 +- packages/threads/source/signals/create.ts | 4 +- packages/threads/source/signals/types.ts | 4 +- 12 files changed, 291 insertions(+), 139 deletions(-) create mode 100644 .changeset/orange-rockets-listen.md create mode 100644 packages/threads/source/ThreadAbortSignal.ts delete mode 100644 packages/threads/source/abort-signal.ts delete mode 100644 packages/threads/source/abort-signal/accept.ts delete mode 100644 packages/threads/source/abort-signal/create.ts delete mode 100644 packages/threads/source/abort-signal/types.ts diff --git a/.changeset/orange-rockets-listen.md b/.changeset/orange-rockets-listen.md new file mode 100644 index 000000000..61c5634d5 --- /dev/null +++ b/.changeset/orange-rockets-listen.md @@ -0,0 +1,49 @@ +--- +'@quilted/threads': major +'@quilted/quilt': patch +--- + +Changed `ThreadAbortSignal` utilities to be class-based instead of being a collection of utility functions. This change aligns the API more closely with `AbortController` in the browser, which is created with `new AbortController()`. + +Previously, you used `createThreadAbortSignal()` to serialize an `AbortSignal` to pass over a thread, and `acceptThreadAbortSignal()` to turn it into a “live” `AbortSignal`. With the new API, you will do the same steps, but with `ThreadAbortSignal.serialize()` and `new ThreadAbortSignal`: + +```ts +import { + createThreadAbortSignal, + acceptThreadAbortSignal, +} from '@quilted/threads'; + +const abortController = new AbortController(); +const serializedAbortSignal = createThreadAbortSignal(abortController.signal); +const liveAbortSignal = acceptThreadAbortSignal(serializedAbortSignal); + +await fetch('/', {signal: liveAbortSignal}); + +// Becomes: + +import { ThreadAbortSignal } from '@quilted/threads';\ + +const abortController = new AbortController(); +const serializedAbortSignal = ThreadAbortSignal.serialize(abortController.signal); +const liveAbortSignal = new ThreadAbortSignal(serializedAbortSignal); + +await fetch('/', {signal: liveAbortSignal}); +``` + +Additionally, the new `ThreadAbortSignal` class assumes you are not doing manual memory management by default. If your target environment does not support automatic memory management of transferred functions, you will need to manually pass the `retain` and `release` functions to the new APIs: + +```ts +import {retain, release, ThreadAbortSignal} from '@quilted/threads'; + +const abortController = new AbortController(); +const serializedAbortSignal = ThreadAbortSignal.serialize( + abortController.signal, + {retain, release}, +); +const liveAbortSignal = new ThreadAbortSignal(serializedAbortSignal, { + retain, + release, +}); + +await fetch('/', {signal: liveAbortSignal}); +``` diff --git a/packages/quilt/source/threads.ts b/packages/quilt/source/threads.ts index 00383e5a5..cfbf7b87c 100644 --- a/packages/quilt/source/threads.ts +++ b/packages/quilt/source/threads.ts @@ -4,8 +4,7 @@ export { markAsTransferable, isMemoryManageable, createThread, - createThreadAbortSignal, - acceptThreadAbortSignal, + ThreadAbortSignal, createBasicEncoder, createThreadFromIframe, createThreadFromInsideIframe, @@ -20,7 +19,8 @@ export type { Thread, ThreadOptions, ThreadTarget, - ThreadAbortSignal, + ThreadAbortSignalOptions, + ThreadAbortSignalSerialization, ThreadEncoder, ThreadEncoderApi, ThreadEncodable, diff --git a/packages/threads/README.md b/packages/threads/README.md index 40955b3d0..50ec753b7 100644 --- a/packages/threads/README.md +++ b/packages/threads/README.md @@ -237,37 +237,81 @@ Once an object is fully released, any attempt to call its proxied functions will #### [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) -[`AbortSignal`s](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) allow you to communicate that an asynchronous operation should stop. Because all methods exposed through `@quilted/threads` are asynchronous, you may find many uses -for `AbortSignal`s. However, it can be a bit tricky to communicate an abort signal across threads yourself. To make this easier, this library provides a pair of utilities to create a "thread-safe" `AbortSignal` on one thread, and to "accept" that signal on another thread. In the thread sending a signal, use the `createThreadAbortSignal()` function from this library, passing it an `AbortSignal`: +[`AbortSignal`s](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) allow you to communicate that an asynchronous operation should stop. Because all methods exposed through `@quilted/threads` are asynchronous, you may find many uses for `AbortSignal`s. However, it can be a bit tricky to communicate an abort signal across threads yourself. To make this easier, this library provides utilities to create a serialized `AbortSignal` on one thread, and to convert that serialized version into a “live” `AbortSignal` on another thread. In the thread sending a signal, use the `ThreadAbortSignal.serialize()` method to serialize your `AbortSignal`: + +```ts +import {createThreadFromWebWorker, ThreadAbortSignal} from '@quilted/threads'; + +const worker = new Worker('worker.js'); +const thread = createThreadFromWebWorker(worker); + +const abort = new AbortController(); +await thread.calculateResult({ + signal: ThreadAbortSignal.serialize(abort.signal), +}); +``` + +On the receiving thread, use `new ThreadAbortSignal()` to turn it back into a live `AbortSignal`, in the current thread’s JavaScript environment: ```ts import { createThreadFromWebWorker, - createThreadAbortSignal, + ThreadAbortSignal, + type ThreadAbortSignalSerialization, +} from '@quilted/threads'; + +const thread = createThreadFromWebWorker(self, { + expose: {calculateResult}, +}); + +function calculateResult({ + signal: threadSignal, +}: { + signal: ThreadAbortSignalSerialization; +}) { + const signal = new ThreadAbortSignal(threadSignal); + return await figureOutResult({signal}); +} +``` + +If you are using `@quilted/threads`’ manual memory management approach, you must explicitly pass `retain` and `release` functions to `ThreadAbortSignal.serialize()` and `new ThreadAbortSignal()` methods: + +```ts +import { + retain, + release, + createThreadFromWebWorker, + ThreadAbortSignal, } from '@quilted/threads'; const worker = new Worker('worker.js'); const thread = createThreadFromWebWorker(worker); const abort = new AbortController(); -await thread.calculateResult({signal: createThreadSignal(abort.signal)}); -``` +await thread.calculateResult({ + signal: ThreadAbortSignal.serialize(abort.signal, {retain, release}), +}); -On the receiving thread, use the `acceptThreadAbortSignal()` to turn it back into a "live" `AbortSignal`, in the current thread’s JavaScript environment: +// In the worker: -```ts import { + retain, + release, createThreadFromWebWorker, - acceptThreadAbortSignal, - type ThreadAbortSignal, + ThreadAbortSignal, + type ThreadAbortSignalSerialization, } from '@quilted/threads'; const thread = createThreadFromWebWorker(self, { expose: {calculateResult}, }); -function calculateResult({signal: threadSignal}: {signal: ThreadAbortSignal}) { - const signal = acceptThreadAbortSignal(threadSignal); +function calculateResult({ + signal: threadSignal, +}: { + signal: ThreadAbortSignalSerialization; +}) { + const signal = new ThreadAbortSignal(threadSignal, {retain, release}); return await figureOutResult({signal}); } ``` diff --git a/packages/threads/source/ThreadAbortSignal.ts b/packages/threads/source/ThreadAbortSignal.ts new file mode 100644 index 000000000..a96f4977f --- /dev/null +++ b/packages/threads/source/ThreadAbortSignal.ts @@ -0,0 +1,173 @@ +/** + * A representation of an `AbortSignal` that can be serialized between + * two threads. + */ +export interface ThreadAbortSignalSerialization { + /** + * Whether the signal was already aborted at the time it was + * sent to the sibling thread. + */ + readonly aborted: boolean; + + /** + * A function to connect the signal between the two threads. This + * function should be called by the sibling thread when the abort + * state changes (including changes since the thread-safe abort signal + * was created). + */ + start?(listener: (aborted: boolean) => void): void; +} + +export interface ThreadAbortSignalOptions { + /** + * An optional function to call in order to manually retain the memory + * associated with the `start` function of the serialized signal. + * You only need to use this when using a strategy for serializing the + * abort signal that requires manual memory management. + */ + retain?(value: unknown): void; + + /** + * An optional function to call in order to manually release the memory + * associated with the `start` function of the serialized signal. + * You only need to use this when using a strategy for serializing the + * abort signal that requires manual memory management. + */ + release?(value: unknown): void; +} + +/** + * Converts a serialized `AbortSignal` into a “live” one, which you can + * use to cancel operations in the current environment. When the signal aborts, + * all memory associated with the signal will be released automatically. + */ +export class ThreadAbortSignal implements AbortSignal { + #abortController: AbortController | undefined; + #abortSignal: AbortSignal; + #onabort: AbortSignal['onabort'] | null = null; + + // Proxy properties + get aborted(): boolean { + return this.#abortSignal.aborted; + } + + get reason(): any { + return this.#abortSignal.reason; + } + + get onabort() { + return this.#onabort; + } + + set onabort(value) { + if (this.#onabort) { + this.#abortSignal.removeEventListener('abort', this.#onabort); + } + + this.#onabort = value; + + if (value) { + this.#abortSignal.addEventListener('abort', value); + } + } + + constructor( + signal: AbortSignal | ThreadAbortSignalSerialization | undefined, + {retain, release}: ThreadAbortSignalOptions = {}, + ) { + if (isAbortSignal(signal)) { + this.#abortSignal = signal; + } else { + this.#abortController = new AbortController(); + this.#abortSignal = this.#abortController.signal; + + const {aborted, start} = signal ?? {}; + + if (aborted) { + this.#abortController.abort(); + } else if (start) { + retain?.(start); + + start((aborted) => { + if (aborted) this.#abortController!.abort(); + }); + + if (release) { + this.#abortSignal.addEventListener('abort', () => release(start), { + once: true, + }); + } + } + } + } + + // Proxy methods + addEventListener(...args: Parameters) { + return this.#abortSignal.addEventListener(...args); + } + + removeEventListener(...args: Parameters) { + return this.#abortSignal.removeEventListener(...args); + } + + dispatchEvent(...args: Parameters): boolean { + return this.#abortSignal.dispatchEvent(...args); + } + + throwIfAborted() { + return this.#abortSignal.throwIfAborted(); + } + + /** + * Converts an `AbortSignal` into a version of that signal that can + * be transferred to a target `Thread`. The resulting object can be + * serialized using the RPC utilities provided in this library, and + * passed to `new ThreadAbortSignal()` to be converted into a “live” + * `AbortSignal`. + */ + static serialize( + signal: AbortSignal, + {retain, release}: ThreadAbortSignalOptions = {}, + ): ThreadAbortSignalSerialization { + if (signal.aborted) { + return { + aborted: true, + }; + } + + const listeners = new Set<(aborted: boolean) => void>(); + + signal.addEventListener( + 'abort', + () => { + for (const listener of listeners) { + listener(signal.aborted); + release?.(listener); + } + + listeners.clear(); + }, + {once: true}, + ); + + return { + aborted: false, + start(listener) { + if (signal.aborted) { + listener(true); + } else { + retain?.(listener); + listeners.add(listener); + } + }, + }; + } +} + +function isAbortSignal(value: unknown): value is AbortSignal { + return ( + value != null && + typeof (value as any).aborted === 'boolean' && + typeof (value as any).addEventListener === 'function' + ); +} diff --git a/packages/threads/source/abort-signal.ts b/packages/threads/source/abort-signal.ts deleted file mode 100644 index 81709152d..000000000 --- a/packages/threads/source/abort-signal.ts +++ /dev/null @@ -1,3 +0,0 @@ -export {acceptThreadAbortSignal} from './abort-signal/accept.ts'; -export {createThreadAbortSignal} from './abort-signal/create.ts'; -export type {ThreadAbortSignal} from './abort-signal/types.ts'; diff --git a/packages/threads/source/abort-signal/accept.ts b/packages/threads/source/abort-signal/accept.ts deleted file mode 100644 index ae86239b7..000000000 --- a/packages/threads/source/abort-signal/accept.ts +++ /dev/null @@ -1,48 +0,0 @@ -import {retain, release} from '../memory.ts'; -import type {ThreadAbortSignal} from './types.ts'; - -/** - * Call this function in a thread receiving a `ThreadAbortSignal` to - * turn it into a "live" `AbortSignal`. The resulting signal will - * connect the thread to its sending pair, and will abort it when the - * original `AbortSignal` is aborted. - */ -export function acceptThreadAbortSignal( - signal: AbortSignal | ThreadAbortSignal, -): AbortSignal { - if (isAbortSignal(signal)) return signal; - - const abort = new AbortController(); - - const {aborted, start} = signal; - - if (aborted) { - abort.abort(); - return abort.signal; - } - - if (start) { - retain(start); - start((aborted) => { - if (aborted) abort.abort(); - }); - } - - abort.signal.addEventListener( - 'abort', - async () => { - release(start); - }, - {once: true}, - ); - - return abort.signal; -} - -function isAbortSignal(value?: unknown): value is AbortSignal { - return ( - value != null && - (value as any).aborted != null && - typeof (value as any).addEventListener === 'function' - ); -} diff --git a/packages/threads/source/abort-signal/create.ts b/packages/threads/source/abort-signal/create.ts deleted file mode 100644 index a94eb4303..000000000 --- a/packages/threads/source/abort-signal/create.ts +++ /dev/null @@ -1,45 +0,0 @@ -import {retain, release} from '../memory.ts'; -import type {ThreadAbortSignal} from './types.ts'; - -/** - * Converts an `AbortSignal` into a version of that signal that can - * be transferred to a target `Thread`. The resulting object can be - * transferred to the paired thread, and turned into an actual `AbortSignal` - * object using `acceptThreadAbortSignal()`. - */ -export function createThreadAbortSignal( - signal: AbortSignal, -): ThreadAbortSignal { - const listeners = new Set<(aborted: boolean) => void>(); - - if (signal.aborted) { - return { - aborted: true, - }; - } - - signal.addEventListener( - 'abort', - () => { - for (const listener of listeners) { - listener(signal.aborted); - release(listener); - } - - listeners.clear(); - }, - {once: true}, - ); - - return { - aborted: false, - start(listener) { - if (signal.aborted) { - listener(true); - } else { - retain(listener); - listeners.add(listener); - } - }, - }; -} diff --git a/packages/threads/source/abort-signal/types.ts b/packages/threads/source/abort-signal/types.ts deleted file mode 100644 index 7be479215..000000000 --- a/packages/threads/source/abort-signal/types.ts +++ /dev/null @@ -1,19 +0,0 @@ -/** - * A representation of an `AbortSignal` that can be serialized between - * two threads. - */ -export interface ThreadAbortSignal { - /** - * Whether the signal was already aborted at the time it was - * sent to the sibling thread. - */ - aborted: boolean; - - /** - * A function to connect the signal between the two threads. This - * function should be called by the sibling thread when the abort - * state changes (including changes since the thread-safe abort signal - * was created). - */ - start?(listener: (aborted: boolean) => void): void; -} diff --git a/packages/threads/source/index.ts b/packages/threads/source/index.ts index 883888f8d..e3b119d4f 100644 --- a/packages/threads/source/index.ts +++ b/packages/threads/source/index.ts @@ -28,10 +28,10 @@ export { } from './targets.ts'; export {createBasicEncoder} from './encoding.ts'; export { - createThreadAbortSignal, - acceptThreadAbortSignal, - type ThreadAbortSignal, -} from './abort-signal.ts'; + ThreadAbortSignal, + type ThreadAbortSignalOptions, + type ThreadAbortSignalSerialization, +} from './ThreadAbortSignal.ts'; export type { Thread, ThreadTarget, diff --git a/packages/threads/source/signals/accept.ts b/packages/threads/source/signals/accept.ts index b35956931..441b75c3d 100644 --- a/packages/threads/source/signals/accept.ts +++ b/packages/threads/source/signals/accept.ts @@ -1,6 +1,6 @@ import {signal as createSignal, type Signal} from '@preact/signals-core'; -import {createThreadAbortSignal} from '../abort-signal.ts'; +import {ThreadAbortSignal} from '../ThreadAbortSignal.ts'; import {type ThreadSignal} from './types.ts'; @@ -25,7 +25,8 @@ export function acceptThreadSignal( } = {}, ): Signal { const signal = createSignal(threadSignal.initial); - const threadAbortSignal = abortSignal && createThreadAbortSignal(abortSignal); + const threadAbortSignal = + abortSignal && ThreadAbortSignal.serialize(abortSignal); const valueDescriptor = Object.getOwnPropertyDescriptor( Object.getPrototypeOf(signal), diff --git a/packages/threads/source/signals/create.ts b/packages/threads/source/signals/create.ts index 0c0815306..2abc77de9 100644 --- a/packages/threads/source/signals/create.ts +++ b/packages/threads/source/signals/create.ts @@ -2,7 +2,7 @@ import {type Signal} from '@preact/signals-core'; import {NestedAbortController} from '@quilted/events'; import {retain, release} from '../memory.ts'; -import {acceptThreadAbortSignal} from '../abort-signal.ts'; +import {ThreadAbortSignal} from '../ThreadAbortSignal.ts'; import {type ThreadSignal} from './types.ts'; @@ -50,7 +50,7 @@ export function createThreadSignal( retain(subscriber); const abortSignal = - threadAbortSignal && acceptThreadAbortSignal(threadAbortSignal); + threadAbortSignal && new ThreadAbortSignal(threadAbortSignal); const finalAbortSignal = abortSignal && teardownAbortSignal diff --git a/packages/threads/source/signals/types.ts b/packages/threads/source/signals/types.ts index 1ec843164..e3f29d146 100644 --- a/packages/threads/source/signals/types.ts +++ b/packages/threads/source/signals/types.ts @@ -1,4 +1,4 @@ -import type {ThreadAbortSignal} from '../abort-signal.ts'; +import type {ThreadAbortSignalSerialization} from '../ThreadAbortSignal.ts'; /** * A representation of a Preact signal that can be serialized between @@ -30,7 +30,7 @@ export interface ThreadSignal { * An `AbortSignal` that can be used to stop synchronizing the signal * between the two threads. */ - signal?: AbortSignal | ThreadAbortSignal; + signal?: AbortSignal | ThreadAbortSignalSerialization; }, ): void; }