Skip to content

Commit

Permalink
fix(ext/node): make worker ids sequential (#22884)
Browse files Browse the repository at this point in the history
  • Loading branch information
satyarohith committed Mar 13, 2024
1 parent 0fd8f54 commit bbc2119
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 21 deletions.
7 changes: 6 additions & 1 deletion ext/node/polyfills/02_init.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 14,7 @@ function initialize(
usesLocalNodeModulesDir,
argv0,
runningOnMainThread,
workerId,
maybeWorkerMetadata,
) {
if (initialized) {
Expand All @@ -39,7 40,11 @@ function initialize(
// FIXME(bartlomieju): not nice to depend on `Deno` namespace here
// but it's the only way to get `args` and `version` and this point.
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
internals.__initWorkerThreads(runningOnMainThread, maybeWorkerMetadata);
internals.__initWorkerThreads(
runningOnMainThread,
workerId,
maybeWorkerMetadata,
);
internals.__setupChildProcessIpcChannel();
// `Deno[Deno.internal].requireImpl` will be unreachable after this line.
delete internals.requireImpl;
Expand Down
7 changes: 3 additions & 4 deletions ext/node/polyfills/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 134,6 @@ function toFileUrl(path: string): URL {
: toFileUrlPosix(path);
}

let threads = 0;
const privateWorkerRef = Symbol("privateWorkerRef");
class NodeWorker extends EventEmitter {
#id = 0;
Expand Down Expand Up @@ -195,12 194,10 @@ class NodeWorker extends EventEmitter {
name = "[worker eval]";
}
this.#name = name;
this.threadId = threads;

const serializedWorkerMetadata = serializeJsMessageData({
workerData: options?.workerData,
environmentData: environmentData,
threadId: this.threadId,
}, options?.transferList ?? []);
const id = op_create_worker(
{
Expand All @@ -216,6 213,7 @@ class NodeWorker extends EventEmitter {
serializedWorkerMetadata,
);
this.#id = id;
this.threadId = id;
this.#pollControl();
this.#pollMessages();
// https://nodejs.org/api/worker_threads.html#event-online
Expand Down Expand Up @@ -391,6 389,7 @@ let parentPort: ParentPort = null as any;

internals.__initWorkerThreads = (
runningOnMainThread: boolean,
workerId,
maybeWorkerMetadata,
) => {
isMainThread = runningOnMainThread;
Expand All @@ -414,11 413,11 @@ internals.__initWorkerThreads = (
>();

parentPort = self as ParentPort;
threadId = workerId;
if (maybeWorkerMetadata) {
const { 0: metadata, 1: _ } = maybeWorkerMetadata;
workerData = metadata.workerData;
environmentData = metadata.environmentData;
threadId = metadata.threadId;
}
defaultExport.workerData = workerData;
defaultExport.parentPort = parentPort;
Expand Down
2 changes: 2 additions & 0 deletions runtime/js/99_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 794,7 @@ function bootstrapWorkerRuntime(
runtimeOptions,
name,
internalName,
workerId,
maybeWorkerMetadata,
) {
if (hasBootstrapped) {
Expand Down Expand Up @@ -929,6 930,7 @@ function bootstrapWorkerRuntime(
hasNodeModulesDir,
argv0,
/* runningOnMainThread */ false,
workerId,
workerMetadata,
);
}
Expand Down
4 changes: 1 addition & 3 deletions runtime/ops/worker_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 95,6 @@ deno_core::extension!(
},
state = |state, options| {
state.put::<WorkersTable>(WorkersTable::default());
state.put::<WorkerId>(WorkerId::default());

let create_web_worker_cb_holder =
CreateWebWorkerCbHolder(options.create_web_worker_cb);
Expand Down Expand Up @@ -163,10 162,9 @@ fn op_create_worker(
parent_permissions.clone()
};
let parent_permissions = parent_permissions.clone();
let worker_id = state.take::<WorkerId>();
let create_web_worker_cb = state.borrow::<CreateWebWorkerCbHolder>().clone();
let format_js_error_fn = state.borrow::<FormatJsErrorFnHolder>().clone();
state.put::<WorkerId>(worker_id.next().unwrap());
let worker_id = WorkerId::new();

let module_specifier = deno_core::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_default();
Expand Down
35 changes: 22 additions & 13 deletions runtime/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,33 55,40 @@ use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum WebWorkerType {
Classic,
Module,
}
static WORKER_ID_COUNTER: AtomicU32 = AtomicU32::new(1);

#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize,
)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct WorkerId(u32);
impl WorkerId {
pub fn new() -> WorkerId {
let id = WORKER_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
WorkerId(id)
}
}
impl fmt::Display for WorkerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "worker-{}", self.0)
}
}
impl WorkerId {
pub fn next(&self) -> Option<WorkerId> {
self.0.checked_add(1).map(WorkerId)
impl Default for WorkerId {
fn default() -> Self {
Self::new()
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum WebWorkerType {
Classic,
Module,
}

/// Events that are sent to host from child
/// worker.
pub enum WorkerControlEvent {
Expand Down Expand Up @@ -630,11 637,13 @@ impl WebWorker {
v8::String::new(scope, &format!("{}", self.id))
.unwrap()
.into();
let id: v8::Local<v8::Value> =
v8::Integer::new(scope, self.id.0 as i32).into();
bootstrap_fn
.call(
scope,
undefined.into(),
&[args, name_str, id_str, worker_data],
&[args, name_str, id_str, id, worker_data],
)
.unwrap();
}
Expand Down
6 changes: 6 additions & 0 deletions tests/integration/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 112,12 @@ itest!(worker_doest_stall_event_loop {
exit_code: 0,
});

itest!(worker_ids_are_sequential {
args: "run --quiet -A workers/worker_ids_are_sequential.ts",
output: "workers/worker_ids_are_sequential.ts.out",
exit_code: 0,
});

// Test for https://github.com/denoland/deno/issues/22629
itest!(node_worker_auto_exits {
args: "run --quiet --allow-read workers/node_worker_auto_exits.mjs",
Expand Down
34 changes: 34 additions & 0 deletions tests/testdata/workers/worker_ids_are_sequential.ts
Original file line number Diff line number Diff line change
@@ -0,0 1,34 @@
import {
isMainThread,
parentPort,
threadId,
Worker,
} from "node:worker_threads";

console.log("threadId", threadId);

if (isMainThread) {
const worker = new Worker(new URL(import.meta.url));
worker.on("message", (msg) => console.log("from worker:", msg));
worker.on("error", () => {
throw new Error("error");
});
worker.on("exit", (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
} else if (threadId == 1) {
const worker = new Worker(new URL(import.meta.url));
worker.on("message", (msg) => console.log("from worker:", msg));
worker.on("error", () => {
throw new Error("error");
});
worker.on("exit", (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
} else {
parentPort.postMessage("hello!");
}
4 changes: 4 additions & 0 deletions tests/testdata/workers/worker_ids_are_sequential.ts.out
Original file line number Diff line number Diff line change
@@ -0,0 1,4 @@
threadId 0
threadId 1
threadId 2
from worker: hello!

0 comments on commit bbc2119

Please sign in to comment.