Skip to content

Commit

Permalink
fix(ext/node): support MessagePort in WorkerOptions.workerData (#22950
Browse files Browse the repository at this point in the history
)

This commit fixes passing `MessagePort` instances to
`WorkerOptions.workerData`.

Before they were not serialized and deserialized properly when spawning
a worker thread.

Closes #22935
  • Loading branch information
bartlomieju authored Mar 16, 2024
1 parent ebbc897 commit 92576fd
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 12 deletions.
3 changes: 3 additions & 0 deletions ext/web/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 43,15 @@ pub use crate::blob::BlobStore;
pub use crate::blob::InMemoryBlobPart;

pub use crate::message_port::create_entangled_message_port;
pub use crate::message_port::deserialize_js_transferables;
use crate::message_port::op_message_port_create_entangled;
use crate::message_port::op_message_port_post_message;
use crate::message_port::op_message_port_recv_message;
use crate::message_port::op_message_port_recv_message_sync;
pub use crate::message_port::serialize_transferables;
pub use crate::message_port::JsMessageData;
pub use crate::message_port::MessagePort;
pub use crate::message_port::Transferable;

use crate::timers::op_defer;
use crate::timers::op_now;
Expand Down
11 changes: 5 additions & 6 deletions ext/web/message_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 22,7 @@ use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;

enum Transferable {
pub enum Transferable {
MessagePort(MessagePort),
ArrayBuffer(u32),
}
Expand Down Expand Up @@ -140,7 140,7 @@ pub enum JsTransferable {
ArrayBuffer(u32),
}

fn deserialize_js_transferables(
pub fn deserialize_js_transferables(
state: &mut OpState,
js_transferables: Vec<JsTransferable>,
) -> Result<Vec<Transferable>, AnyError> {
Expand All @@ -165,7 165,7 @@ fn deserialize_js_transferables(
Ok(transferables)
}

fn serialize_transferables(
pub fn serialize_transferables(
state: &mut OpState,
transferables: Vec<Transferable>,
) -> Vec<JsTransferable> {
Expand All @@ -189,8 189,8 @@ fn serialize_transferables(

#[derive(Deserialize, Serialize)]
pub struct JsMessageData {
data: DetachedBuffer,
transferables: Vec<JsTransferable>,
pub data: DetachedBuffer,
pub transferables: Vec<JsTransferable>,
}

#[op2]
Expand All @@ -208,7 208,6 @@ pub fn op_message_port_post_message(
}

let resource = state.resource_table.get::<MessagePortResource>(rid)?;

resource.port.send(state, data)
}

Expand Down
15 changes: 13 additions & 2 deletions runtime/ops/worker_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 11,7 @@ use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WebWorkerType;
use crate::web_worker::WorkerControlEvent;
use crate::web_worker::WorkerId;
use crate::web_worker::WorkerMetadata;
use crate::worker::FormatJsErrorFn;
use deno_core::error::AnyError;
use deno_core::op2;
Expand All @@ -19,6 20,7 @@ use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
use deno_web::deserialize_js_transferables;
use deno_web::JsMessageData;
use log::debug;
use std::cell::RefCell;
Expand All @@ -36,7 38,7 @@ pub struct CreateWebWorkerArgs {
pub main_module: ModuleSpecifier,
pub worker_type: WebWorkerType,
pub close_on_idle: bool,
pub maybe_worker_metadata: Option<JsMessageData>,
pub maybe_worker_metadata: Option<WorkerMetadata>,
}

pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
Expand Down Expand Up @@ -175,7 177,16 @@ fn op_create_worker(

// Setup new thread
let thread_builder = std::thread::Builder::new().name(format!("{worker_id}"));

let maybe_worker_metadata = if let Some(data) = maybe_worker_metadata {
let transferables =
deserialize_js_transferables(state, data.transferables)?;
Some(WorkerMetadata {
buffer: data.data,
transferables,
})
} else {
None
};
// Spawn it
thread_builder.spawn(move || {
// Any error inside this block is terminal:
Expand Down
26 changes: 22 additions & 4 deletions runtime/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 27,7 @@ use deno_core::serde_json::json;
use deno_core::v8;
use deno_core::CancelHandle;
use deno_core::CompiledWasmModuleStore;
use deno_core::DetachedBuffer;
use deno_core::Extension;
use deno_core::FeatureChecker;
use deno_core::GetErrorClassFn;
Expand All @@ -47,9 48,11 @@ use deno_kv::dynamic::MultiBackendDbHandler;
use deno_terminal::colors;
use deno_tls::RootCertStoreProvider;
use deno_web::create_entangled_message_port;
use deno_web::serialize_transferables;
use deno_web::BlobStore;
use deno_web::JsMessageData;
use deno_web::MessagePort;
use deno_web::Transferable;
use log::debug;
use std::cell::RefCell;
use std::fmt;
Expand All @@ -61,6 64,11 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

pub struct WorkerMetadata {
pub buffer: DetachedBuffer,
pub transferables: Vec<Transferable>,
}

static WORKER_ID_COUNTER: AtomicU32 = AtomicU32::new(1);

#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
Expand Down Expand Up @@ -343,7 351,7 @@ pub struct WebWorker {
has_message_event_listener_fn: Option<v8::Global<v8::Value>>,
bootstrap_fn_global: Option<v8::Global<v8::Function>>,
// Consumed when `bootstrap_fn` is called
maybe_worker_metadata: Option<JsMessageData>,
maybe_worker_metadata: Option<WorkerMetadata>,
}

pub struct WebWorkerOptions {
Expand Down Expand Up @@ -371,7 379,7 @@ pub struct WebWorkerOptions {
pub feature_checker: Arc<FeatureChecker>,
pub strace_ops: Option<Vec<String>>,
pub close_on_idle: bool,
pub maybe_worker_metadata: Option<JsMessageData>,
pub maybe_worker_metadata: Option<WorkerMetadata>,
}

impl WebWorker {
Expand Down Expand Up @@ -622,7 630,8 @@ impl WebWorker {
}

pub fn bootstrap(&mut self, options: &BootstrapOptions) {
self.js_runtime.op_state().borrow_mut().put(options.clone());
let op_state = self.js_runtime.op_state();
op_state.borrow_mut().put(options.clone());
// Instead of using name for log we use `worker-${id}` because
// WebWorkers can have empty string as name.
{
Expand All @@ -633,7 642,16 @@ impl WebWorker {
let undefined = v8::undefined(scope);
let mut worker_data: v8::Local<v8::Value> = v8::undefined(scope).into();
if let Some(data) = self.maybe_worker_metadata.take() {
worker_data = deno_core::serde_v8::to_v8(scope, data).unwrap();
let js_transferables = serialize_transferables(
&mut op_state.borrow_mut(),
data.transferables,
);
let js_message_data = JsMessageData {
data: data.buffer,
transferables: js_transferables,
};
worker_data =
deno_core::serde_v8::to_v8(scope, js_message_data).unwrap();
}
let name_str: v8::Local<v8::Value> =
v8::String::new(scope, &self.name).unwrap().into();
Expand Down
49 changes: 49 additions & 0 deletions tests/unit_node/worker_threads_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 238,52 @@ Deno.test({
},
sanitizeResources: false,
});

Deno.test({
name: "[worker_threads] Worker workerData with MessagePort",
async fn() {
const { port1: mainPort, port2: workerPort } = new workerThreads
.MessageChannel();
const deferred = Promise.withResolvers<void>();
const worker = new workerThreads.Worker(
`
import {
isMainThread,
MessageChannel,
parentPort,
receiveMessageOnPort,
Worker,
workerData,
} from "node:worker_threads";
parentPort.on("message", (msg) => {
console.log("message from main", msg);
parentPort.postMessage("Hello from worker on parentPort!");
workerData.workerPort.postMessage("Hello from worker on workerPort!");
});
`,
{
eval: true,
workerData: { workerPort },
transferList: [workerPort],
},
);

worker.on("message", (data) => {
assertEquals(data, "Hello from worker on parentPort!");
// TODO(bartlomieju): it would be better to use `mainPort.on("message")`,
// but we currently don't support it.
// https://github.com/denoland/deno/issues/22951
// Wait a bit so the message can arrive.
setTimeout(() => {
const msg = workerThreads.receiveMessageOnPort(mainPort)!.message;
assertEquals(msg, "Hello from worker on workerPort!");
deferred.resolve();
}, 500);
});

worker.postMessage("Hello from parent");
await deferred.promise;
await worker.terminate();
mainPort.close();
},
});

0 comments on commit 92576fd

Please sign in to comment.