In this article, I’m going to talk about two functions that I’ve been working on recently, jsext.parallel and jsext.run, these functions allow us to run functions in parallel worker threads or child processes, whether in Node.js, Bun, Deno, or browsers.
Run functions in parallel threads
Unlike other libraries, jsext.parallel doesn’t need to register the function in order to be invoked. It is designed to be straightforward, it wraps the given module as a parallel module and returns its exported functions as threaded functions, when the function is called in the main thread, the call will be redirected to a worker thread instead.
// main.ts
import { parallel } from "@ayonli/jsext";
const { default: hash, sequence } = parallel(() => import("./worker"));
// Don't be fooled by the import syntax, the worker.ts will not be imported into the current thread,
// this syntax is for TypeScript/IDE to import types from the target module.
const content = await readFile("./package.json", "utf8");
const res = await hash(content);
console.log(res);
for await (const num of sequence(10)) {
console.log(num);
}
TypeScript// worker.ts
import SparkMd5 from "spark-md5";
export default function hash(str: string) {
return new SparkMd5().append(str).end();
}
export function* sequence(end: number) {
let i = 0;
while (i++ < end) {
yield i;
}
}
TypeScriptWe can wrap any module and call any function we want, even Node.js built-in modules, 3rd-party NPM modules, or URL modules in Deno and the browser.
// main.ts
import { parallel } from "@ayonli/jsext";
const { gzipSync: gzip } = parallel(() => import("node:zlib"));
const { sync: glob } = parallel(() => import("glob"));
console.log(await gzip(Buffer.from("Hello, World!")));
console.log(await glob("*.ts", {}));
TypeScript// main.ts
import { parallel } from "https://deno.land/x/ayonli_jsext/index.ts";
const { parse } = parallel(() => import("https://deno.land/std@0.204.0/jsonc/mod.ts"));
console.log(await parse(`
{
"foo": "Hello",
"bar": "World", // trailing comma (,) is allowed in jsonc
}
`))
TypeScript// Copy and paste this code in the DevTools of the browser and just run,
// support Chrome, Firefox and Safari (line by line)
var jsext = await import("https://ayonli.github.io/jsext/esm/index.js");
var nanoid = jsext.parallel(() => import("https://unpkg.com/nanoid@5.0.2/nanoid.js"));
await nanoid.nanoid();
JavaScriptjsext.parallel is a dependency level function
What this statement means, is that we not only can use palleral()
to wrap a module to run in a worker thread, but also, we can use it inside a dependency module, considering the following files.
// sum.ts
export default function sum(...values: number[]): number {
return values.reduce((sum, value) => sum + value, 0);
}
TypeScript// avg.ts
import { parallel } from "@ayonli/jsext";
const { default: sum } = parallel(() => import("./sum.ts"));
export default async function avg(...values: number[]): Promise<number> {
return (await sum(...values)) / values.length;
}
TypeScript// main.ts
import avg from "./avg.ts";
console.log(await avg(1, 2, 3, 4, 5, 6, 7, 8, 9)); // 5
TypeScriptMoreover, we don’t have to consider whether our code will be run in the main thread or the worker thread. If the dependency is imported into the main thread via jsext.parallel, then it will start the worker thread for the module. If it is already in a worker thread, parallel()
will skip the process.
// main.ts
import { parallel } from "@ayonli/jsext";
const { default: avg } = parallel(() => import("./avg.ts"));
// This code will work just fine, even though `parallel()` is also called in the avg.ts.
console.log(await avg(1, 2, 3, 4, 5, 6, 7, 8, 9)); // 5
TypeScriptRun tasks via jsext.run
jsext.run can also be used to run functions in worker threads, although its primary design purpose is to run tasks that can be aborted instead.
// main.ts
import { readFile } from "node:fs/promises";
import { run } from "@ayonli/jsext";
const file = await readFile("./package.json");
// this invokes the default function of worker.ts
// NOTE: jsext.run only supports relative or absolute module path,
// the suffix can be omitted for convinience.
const task = await run<string, [Buffer]>("./worker", [file]);
const res = await task.result(); // result() is used to retrieve the return value
console.log(res);
// this invokes the `sequence` function in worker.ts
const task2 = await run<string, [number]>("./worker", [10], { fn: "sequence" });
for await (const num of task2.iterate()) { // iterate() is used to retrieve yield value
console.log(num);
}
TypeScriptWorker threads may be suitable for CPU-intensive tasks, as Node.js’ official document demonstrated, but if we want to run some task that is IO-intensive, we’d better use a child process instead, jsext.run makes this very easy for us.
// main.ts
import { run } from "@ayonli/jsext";
const task = await run<string, [string]>("./worker", ["https://ayon.li"], {
adapter: "child_process",
keepAlive: true, // jsext.run by default drops the worker when settles, set this option
// so that the worker can be reused in the next call.
});
const res = await task.result();
console.log(res);
TypeScript// worker.ts
import SparkMD5 from "spark-md5";
export default async function fetchAndHash(url: string): Promise<string> {
const res = await fetch(url);
if (res.status === 200) {
return new SparkMD5().append(await res.text()).end();
} else {
throw new Error(`unable to fetch: ${url}`);
}
}
TypeScriptAbort during the middle of the task
Although JavaScript has AbortSignal API, it can only handle situations where the task is dealing with streaming operations, for example, uploading and downloading files. It doesn’t guarantee the task can be forcibly interrupted. Not to mention that it must be wrapped in a Promise executor, which levels up the challenge of using it in practice.
Moreover, after a function starts, it cannot be stopped unless returns or throws, there isn’t much for AbortSignal to do to stop the program. However, although we cannot interrupt the function, we can terminate the thread/process instead.
This is where jsext.run function shines, it wraps the termination of the thread in a very handy way, after calling the function, the object it returns contains an abort
method, we can use it to terminate the thread, along with any execution in it.
// main.ts
import { run } from "@ayonli/jsext";
const task = await run<number, [number]>("./worker", [1_000_000_000]);
(async () => {
try {
const res = await task.result();
console.log(res);
} catch (err) {
console.error(err); // In this case, Error: operation aborted
}
})();
(async () => {
await task.abort(new Error("operation aborted"));
})();
TypeScript// worker.ts
export default function slowFn(limit: number): number {
let last = 0;
for (let i = 0; i < limit; i++) {
last = i + 1;
}
return last;
}
TypeScriptThe example code may not seem useful, but there are some scenarios that make this function useful, for example, a long-running task, such as a schedule syncing data from a third-party resource, which typically takes a long time, and during the process, some unexpected behavior (such as connection hang-up) may occur and we need to interrupt the process (actually this was my case).
jsext.run guarantees that there will only be one function call each time for a single thread because we don’t want to interrupt other function calls if abort()
is called.
Apart from aborting the task manually, we can also set an timeout
option to enforce cancellation of the task automatically, like this:
// main.ts
import { run } from "@ayonli/jsext";
const task = await run<number, [number]>("./worker", [1_000_000_000], {
timeout: 2_000,
});
// ...
TypeScriptSpawning more workers
Unlike cluster mode, which suggests that we should not configure more workers than the CPU cores, well, this limitation is not applicable for worker threads. Because worker threads tend to run CPU-intensive tasks that may block the thread, or like jsext.run will queue the jobs and run them one by one for each thread. Spawning more threads will allow more tasks to be run concurrently at the same time, and leave it to the OS to switch the context for us.
By default, a maximum of 16 workers is configured for Node.js (and navigator.navigator.hardwareConcurrency
is used if available), we can change this limit according to our machine’s specs.
import { parallel } from "@ayonli/jsext";
parallel.maxWorkers = 256; // this setting affects both parallel() and run()
TypeScriptHowever, we should be very careful when setting this threshold, and design our worker module wisely, because, at the end of the day, a worker thread isn’t like a traditional shared-memory thread, a worker thread in JavaScript takes much more memory since it acts like an individual process, the virtual machine, our dependencies, all will be scaled up again for each thread.
Use channel to communicate between threads
For regular functions and async functions, sending and receiving data is very straightforward, they are in the form of function arguments and return values. However, this kind of function doesn’t suit the need for processing streaming data.
Although generator functions provide a way to continuously send data into the function and receive responses accordingly, it has a problem, the next(value)
function is coupled with a yield value
, meaning we send one piece of data and we will get one piece of response to the last piece of data. Not only the order is incorrect, but it also suffers from the head-of-line blocking problem. Obviously, this isn’t what we want.
For example, in Deno, unlike Node.js, Deno doesn’t have a cluster model, in order to utilize multi-core performance for our applications, we need to use worker threads to handle client requests. But how do we do this? The data transferred between worker threads are cloned via the structured clone algorithm, it only supports limited data types, and we cannot pass the Request
object, whose body
is a readable stream, to the threaded function. It will simply not work. So what should we do?
Well, there is a solution. The @ayonli/jsext package provides a channel implementation similar to Golang’s, and like in Golang, it can be used to communicate (send and receive streaming data) between threads. Although we cannot pass a stream object to the threaded function, we can, however, continuously read chunks from the stream and send them to the threaded function via a channel (, and we will do that for the response as well).
// main.ts
import { parallel, chan } from "https://ayonli.github.io/jsext/index.ts";
import { readChannel, wireChannel } from "./util.ts";
const { handleRequest } = parallel(() => import("./worker.ts"));
Deno.serve(async req => {
const channel = chan<{ value: Uint8Array | undefined; done: boolean; }>();
// Pass the request information and the channel to the threaded function
// so it can rebuild the request object in the worker thread for use.
const getResMsg = handleRequest({
url: req.url,
method: req.method,
headers: Object.fromEntries(req.headers.entries()),
hasBody: !!req.body,
cache: req.cache,
credentials: req.credentials,
integrity: req.integrity,
keepalive: req.keepalive,
mode: req.mode,
redirect: req.redirect,
referrer: req.referrer,
referrerPolicy: req.referrerPolicy,
}, channel); // pass channel as argument to the threaded function
req.body && wireChannel(req.body, channel);
const { hasBody, ...init } = await getResMsg;
return new Response(hasBody ? readChannel(channel, true) : null, init);
});
TypeScript// worker.ts
import type { Channel } from "https://ayonli.github.io/jsext/index.ts";
import { readChannel, wireChannel } from "./util.ts";
import handle from "./handler.ts";
export interface RequestMessage extends Omit<RequestInit, "body" | "signal"> {
url: string;
headers: Record<string, string>;
hasBody: boolean;
}
export interface ResponseMessage extends ResponseInit {
url: string;
headers: Record<string, string>;
hasBody: boolean;
}
export async function handleRequest(
reqMsg: RequestMessage,
channel: Channel<{ value: Uint8Array | undefined, done: boolean; }>
): Promise<ResponseMessage> {
const { url, hasBody, ...init } = reqMsg;
// Rebuild the request object in the worker.
const req = new Request(url, {
...init,
body: hasBody ? readChannel(channel) : null,
});
const res = await handle(req);
res.body && wireChannel(res.body, channel);
return {
url: res.url,
status: res.status,
statusText: res.statusText,
headers: Object.fromEntries(res.headers.entries()),
hasBody: !!res.body,
};
}
TypeScript// handler.ts
export default async function handle(req: Request): Promise<Response> {
// Simple example here, it can be very complicated in a real application.
// This function can serve as the entry point of all routes.
const text = await req.text();
return new Response("The client sent: " + text);
}
TypeScript// util.ts
import type { Channel } from "https://ayonli.github.io/jsext/index.ts";;
export async function wireChannel<T = Uint8Array>(
stream: ReadableStream<T>,
channel: Channel<{ value: T | undefined; done: boolean; }>
) {
const reader = stream.getReader();
while (true) {
const { value, done } = await reader.read();
await channel.push({ value, done });
if (done) {
break;
}
}
}
export function readChannel<T = Uint8Array>(
channel: Channel<{ value: T | undefined; done: boolean; }>,
closeAfterRead = false
) {
return new ReadableStream({
async start(controller) {
for await (const { value, done } of channel) {
if (done) {
controller.close();
break;
} else {
controller.enqueue(value as Uint8Array);
}
}
if (closeAfterRead) {
// This will release the channel for GC, only close
// in the response stream
channel.close();
}
}
});
}
TypeScriptTL;DR
The @ayonli/jsext package provides many useful functions for developing strong JavaScript/TypeScript applications, it’s the fruit of my years of JS/TS programming experience, check it out if you’re interested.