From 7d919f6fd980ed54785e86892a518f0bdf68f475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 2 Jul 2024 23:37:54 +0100 Subject: [PATCH] refactor(jupyter): move ZeroMQ server to a separate thread (#24373) Moves the ZeroMQ messaging server to a separate thread. This will allow to run blocking JS code and maintain communication with the notebook frontend. Towards https://github.com/denoland/deno/pull/23592 Towards https://github.com/denoland/deno/pull/24250 Closes https://github.com/denoland/deno/issues/23617 --- cli/ops/jupyter.rs | 4 +- cli/tools/jupyter/mod.rs | 379 +++++++++++++++++++++++++++++++++++- cli/tools/jupyter/server.rs | 251 ++++++++++-------------- cli/tools/repl/mod.rs | 1 + 4 files changed, 484 insertions(+), 151 deletions(-) diff --git a/cli/ops/jupyter.rs b/cli/ops/jupyter.rs index edc5f64ec0a953..5a16caf5400173 100644 --- a/cli/ops/jupyter.rs +++ b/cli/ops/jupyter.rs @@ -45,11 +45,11 @@ pub async fn op_jupyter_broadcast( ( s.borrow::>>().clone(), - s.borrow::>>>().clone(), + s.borrow::>>>().clone(), ) }; - let maybe_last_request = last_execution_request.borrow().clone(); + let maybe_last_request = last_execution_request.lock().await.clone(); if let Some(last_request) = maybe_last_request { let content = JupyterMessageContent::from_type_and_content( &message_type, diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs index 0dbcfe9ef070b3..a5139044faa616 100644 --- a/cli/tools/jupyter/mod.rs +++ b/cli/tools/jupyter/mod.rs @@ -2,18 +2,23 @@ use crate::args::Flags; use crate::args::JupyterFlags; +use crate::cdp; +use crate::lsp::ReplCompletionItem; use crate::ops; use crate::tools::repl; use crate::tools::test::create_single_test_event_channel; use crate::tools::test::reporters::PrettyTestReporter; use crate::tools::test::TestEventWorkerSender; use crate::CliFactory; +use deno_core::anyhow::bail; use deno_core::anyhow::Context; use deno_core::error::generic_error; use deno_core::error::AnyError; +use deno_core::futures::FutureExt; use deno_core::located_script_name; use deno_core::resolve_url_or_path; use deno_core::serde_json; +use deno_core::serde_json::json; use deno_core::url::Url; use deno_runtime::deno_io::Stdio; use deno_runtime::deno_io::StdioPipe; @@ -21,11 +26,11 @@ use deno_runtime::deno_permissions::Permissions; use deno_runtime::deno_permissions::PermissionsContainer; use deno_runtime::WorkerExecutionMode; use deno_terminal::colors; - use jupyter_runtime::jupyter::ConnectionInfo; use jupyter_runtime::messaging::StreamContent; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; mod install; pub mod server; @@ -142,7 +147,377 @@ pub async fn kernel( ) })); - server::JupyterServer::start(spec, stdio_rx, repl_session).await?; + let (tx1, rx1) = mpsc::unbounded_channel(); + let (tx2, rx2) = mpsc::unbounded_channel(); + let (startup_data_tx, startup_data_rx) = + oneshot::channel::(); + + let mut repl_session_proxy = JupyterReplSession { + repl_session, + rx: rx1, + tx: tx2, + }; + let repl_session_proxy_channels = JupyterReplProxy { tx: tx1, rx: rx2 }; + + let join_handle = std::thread::spawn(move || { + let fut = server::JupyterServer::start( + spec, + stdio_rx, + repl_session_proxy_channels, + startup_data_tx, + ) + .boxed_local(); + deno_runtime::tokio_util::create_and_run_current_thread(fut) + }); + + let Ok(startup_data) = startup_data_rx.await else { + bail!("Failed to acquire startup data"); + }; + { + let op_state_rc = + repl_session_proxy.repl_session.worker.js_runtime.op_state(); + let mut op_state = op_state_rc.borrow_mut(); + op_state.put(startup_data.iopub_connection.clone()); + op_state.put(startup_data.last_execution_request.clone()); + } + + repl_session_proxy.start().await; + let server_result = join_handle.join(); + match server_result { + Ok(result) => { + result?; + } + Err(e) => { + bail!("Jupyter kernel error: {:?}", e); + } + }; Ok(()) } + +pub enum JupyterReplRequest { + LspCompletions { + line_text: String, + position: usize, + }, + JsGetProperties { + object_id: String, + }, + JsEvaluate { + expr: String, + }, + JsGlobalLexicalScopeNames, + JsEvaluateLineWithObjectWrapping { + line: String, + }, + JsCallFunctionOnArgs { + function_declaration: String, + args: Vec, + }, + JsCallFunctionOn { + arg0: cdp::CallArgument, + arg1: cdp::CallArgument, + }, +} + +pub enum JupyterReplResponse { + LspCompletions(Vec), + JsGetProperties(Option), + JsEvaluate(Option), + JsGlobalLexicalScopeNames(cdp::GlobalLexicalScopeNamesResponse), + JsEvaluateLineWithObjectWrapping(Result), + JsCallFunctionOnArgs(Result), + JsCallFunctionOn(Option), +} + +pub struct JupyterReplProxy { + tx: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver, +} + +impl JupyterReplProxy { + pub async fn lsp_completions( + &mut self, + line_text: String, + position: usize, + ) -> Vec { + let _ = self.tx.send(JupyterReplRequest::LspCompletions { + line_text, + position, + }); + let Some(JupyterReplResponse::LspCompletions(resp)) = self.rx.recv().await + else { + unreachable!() + }; + resp + } + + pub async fn get_properties( + &mut self, + object_id: String, + ) -> Option { + let _ = self + .tx + .send(JupyterReplRequest::JsGetProperties { object_id }); + let Some(JupyterReplResponse::JsGetProperties(resp)) = self.rx.recv().await + else { + unreachable!() + }; + resp + } + + pub async fn evaluate( + &mut self, + expr: String, + ) -> Option { + let _ = self.tx.send(JupyterReplRequest::JsEvaluate { expr }); + let Some(JupyterReplResponse::JsEvaluate(resp)) = self.rx.recv().await + else { + unreachable!() + }; + resp + } + + pub async fn global_lexical_scope_names( + &mut self, + ) -> cdp::GlobalLexicalScopeNamesResponse { + let _ = self.tx.send(JupyterReplRequest::JsGlobalLexicalScopeNames); + let Some(JupyterReplResponse::JsGlobalLexicalScopeNames(resp)) = + self.rx.recv().await + else { + unreachable!() + }; + resp + } + + pub async fn evaluate_line_with_object_wrapping( + &mut self, + line: String, + ) -> Result { + let _ = self + .tx + .send(JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line }); + let Some(JupyterReplResponse::JsEvaluateLineWithObjectWrapping(resp)) = + self.rx.recv().await + else { + unreachable!() + }; + resp + } + + pub async fn call_function_on_args( + &mut self, + function_declaration: String, + args: Vec, + ) -> Result { + let _ = self.tx.send(JupyterReplRequest::JsCallFunctionOnArgs { + function_declaration, + args, + }); + let Some(JupyterReplResponse::JsCallFunctionOnArgs(resp)) = + self.rx.recv().await + else { + unreachable!() + }; + resp + } + + // TODO(bartlomieju): rename to "broadcast_result"? + pub async fn call_function_on( + &mut self, + arg0: cdp::CallArgument, + arg1: cdp::CallArgument, + ) -> Option { + let _ = self + .tx + .send(JupyterReplRequest::JsCallFunctionOn { arg0, arg1 }); + let Some(JupyterReplResponse::JsCallFunctionOn(resp)) = + self.rx.recv().await + else { + unreachable!() + }; + resp + } +} + +pub struct JupyterReplSession { + repl_session: repl::ReplSession, + rx: mpsc::UnboundedReceiver, + tx: mpsc::UnboundedSender, +} + +impl JupyterReplSession { + pub async fn start(&mut self) { + loop { + let Some(msg) = self.rx.recv().await else { + break; + }; + let resp = match msg { + JupyterReplRequest::LspCompletions { + line_text, + position, + } => JupyterReplResponse::LspCompletions( + self.lsp_completions(&line_text, position).await, + ), + JupyterReplRequest::JsGetProperties { object_id } => { + JupyterReplResponse::JsGetProperties( + self.get_properties(object_id).await, + ) + } + JupyterReplRequest::JsEvaluate { expr } => { + JupyterReplResponse::JsEvaluate(self.evaluate(expr).await) + } + JupyterReplRequest::JsGlobalLexicalScopeNames => { + JupyterReplResponse::JsGlobalLexicalScopeNames( + self.global_lexical_scope_names().await, + ) + } + JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line } => { + JupyterReplResponse::JsEvaluateLineWithObjectWrapping( + self.evaluate_line_with_object_wrapping(&line).await, + ) + } + JupyterReplRequest::JsCallFunctionOnArgs { + function_declaration, + args, + } => JupyterReplResponse::JsCallFunctionOnArgs( + self + .call_function_on_args(function_declaration, &args) + .await, + ), + JupyterReplRequest::JsCallFunctionOn { arg0, arg1 } => { + JupyterReplResponse::JsCallFunctionOn( + self.call_function_on(arg0, arg1).await, + ) + } + }; + + let Ok(()) = self.tx.send(resp) else { + break; + }; + } + } + + pub async fn lsp_completions( + &mut self, + line_text: &str, + position: usize, + ) -> Vec { + self + .repl_session + .language_server + .completions(line_text, position) + .await + } + + pub async fn get_properties( + &mut self, + object_id: String, + ) -> Option { + let get_properties_response = self + .repl_session + .post_message_with_event_loop( + "Runtime.getProperties", + Some(cdp::GetPropertiesArgs { + object_id, + own_properties: None, + accessor_properties_only: None, + generate_preview: None, + non_indexed_properties_only: Some(true), + }), + ) + .await + .ok()?; + serde_json::from_value(get_properties_response).ok() + } + + pub async fn evaluate( + &mut self, + expr: String, + ) -> Option { + let evaluate_response: serde_json::Value = self + .repl_session + .post_message_with_event_loop( + "Runtime.evaluate", + Some(cdp::EvaluateArgs { + expression: expr, + object_group: None, + include_command_line_api: None, + silent: None, + context_id: Some(self.repl_session.context_id), + return_by_value: None, + generate_preview: None, + user_gesture: None, + await_promise: None, + throw_on_side_effect: Some(true), + timeout: Some(200), + disable_breaks: None, + repl_mode: None, + allow_unsafe_eval_blocked_by_csp: None, + unique_context_id: None, + }), + ) + .await + .ok()?; + serde_json::from_value(evaluate_response).ok() + } + + pub async fn global_lexical_scope_names( + &mut self, + ) -> cdp::GlobalLexicalScopeNamesResponse { + let evaluate_response = self + .repl_session + .post_message_with_event_loop( + "Runtime.globalLexicalScopeNames", + Some(cdp::GlobalLexicalScopeNamesArgs { + execution_context_id: Some(self.repl_session.context_id), + }), + ) + .await + .unwrap(); + serde_json::from_value(evaluate_response).unwrap() + } + + pub async fn evaluate_line_with_object_wrapping( + &mut self, + line: &str, + ) -> Result { + self + .repl_session + .evaluate_line_with_object_wrapping(line) + .await + } + + pub async fn call_function_on_args( + &mut self, + function_declaration: String, + args: &[cdp::RemoteObject], + ) -> Result { + self + .repl_session + .call_function_on_args(function_declaration, args) + .await + } + + // TODO(bartlomieju): rename to "broadcast_result"? + pub async fn call_function_on( + &mut self, + arg0: cdp::CallArgument, + arg1: cdp::CallArgument, + ) -> Option { + let response = self.repl_session + .post_message_with_event_loop( + "Runtime.callFunctionOn", + Some(json!({ + "functionDeclaration": r#"async function (execution_count, result) { + await Deno[Deno.internal].jupyter.broadcastResult(execution_count, result); + }"#, + "arguments": [arg0, arg1], + "executionContextId": self.repl_session.context_id, + "awaitPromise": true, + })), + ) + .await.ok()?; + serde_json::from_value(response).ok() + } +} diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs index 36f4d5c1814705..6a3831c49feefb 100644 --- a/cli/tools/jupyter/server.rs +++ b/cli/tools/jupyter/server.rs @@ -3,20 +3,20 @@ // This file is forked/ported from // Copyright 2020 The Evcxr Authors. MIT license. -use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; use std::sync::Arc; use crate::cdp; use crate::tools::repl; +use deno_core::anyhow::bail; use deno_core::error::AnyError; use deno_core::futures; use deno_core::serde_json; -use deno_core::serde_json::json; use deno_core::CancelFuture; use deno_core::CancelHandle; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio::sync::Mutex; use jupyter_runtime::messaging; @@ -25,27 +25,32 @@ use jupyter_runtime::ConnectionInfo; use jupyter_runtime::JupyterMessage; use jupyter_runtime::JupyterMessageContent; use jupyter_runtime::KernelControlConnection; -use jupyter_runtime::KernelHeartbeatConnection; use jupyter_runtime::KernelIoPubConnection; use jupyter_runtime::KernelShellConnection; use jupyter_runtime::ReplyError; use jupyter_runtime::ReplyStatus; use jupyter_runtime::StreamContent; +use super::JupyterReplProxy; + pub struct JupyterServer { execution_count: usize, - last_execution_request: Rc>>, - // This is Arc>, so we don't hold RefCell borrows across await - // points. + last_execution_request: Arc>>, iopub_connection: Arc>, - repl_session: repl::ReplSession, + repl_session_proxy: JupyterReplProxy, +} + +pub struct StartupData { + pub iopub_connection: Arc>, + pub last_execution_request: Arc>>, } impl JupyterServer { pub async fn start( connection_info: ConnectionInfo, mut stdio_rx: mpsc::UnboundedReceiver, - mut repl_session: repl::ReplSession, + repl_session_proxy: JupyterReplProxy, + setup_tx: oneshot::Sender, ) -> Result<(), AnyError> { let mut heartbeat = connection_info.create_kernel_heartbeat_connection().await?; @@ -59,15 +64,14 @@ impl JupyterServer { connection_info.create_kernel_iopub_connection().await?; let iopub_connection = Arc::new(Mutex::new(iopub_connection)); - let last_execution_request = Rc::new(RefCell::new(None)); - - // Store `iopub_connection` in the op state so it's accessible to the runtime API. - { - let op_state_rc = repl_session.worker.js_runtime.op_state(); - let mut op_state = op_state_rc.borrow_mut(); - op_state.put(iopub_connection.clone()); - op_state.put(last_execution_request.clone()); - } + let last_execution_request = Arc::new(Mutex::new(None)); + + let Ok(()) = setup_tx.send(StartupData { + iopub_connection: iopub_connection.clone(), + last_execution_request: last_execution_request.clone(), + }) else { + bail!("Failed to send startup data"); + }; let cancel_handle = CancelHandle::new_rc(); @@ -75,20 +79,22 @@ impl JupyterServer { execution_count: 0, iopub_connection: iopub_connection.clone(), last_execution_request: last_execution_request.clone(), - repl_session, + repl_session_proxy, }; - let handle1 = deno_core::unsync::spawn(async move { - if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await { - log::error!( - "Heartbeat error: {}\nBacktrace:\n{}", - err, - err.backtrace() - ); + let hearbeat_fut = deno_core::unsync::spawn(async move { + loop { + if let Err(err) = heartbeat.single_heartbeat().await { + log::error!( + "Heartbeat error: {}\nBacktrace:\n{}", + err, + err.backtrace() + ); + } } }); - let handle2 = deno_core::unsync::spawn({ + let control_fut = deno_core::unsync::spawn({ let cancel_handle = cancel_handle.clone(); async move { if let Err(err) = @@ -103,13 +109,13 @@ impl JupyterServer { } }); - let handle3 = deno_core::unsync::spawn(async move { + let shell_fut = deno_core::unsync::spawn(async move { if let Err(err) = server.handle_shell(shell_connection).await { log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace()); } }); - let handle4 = deno_core::unsync::spawn(async move { + let stdio_fut = deno_core::unsync::spawn(async move { while let Some(stdio_msg) = stdio_rx.recv().await { Self::handle_stdio_msg( iopub_connection.clone(), @@ -120,8 +126,15 @@ impl JupyterServer { } }); - let join_fut = - futures::future::try_join_all(vec![handle1, handle2, handle3, handle4]); + let repl_session_fut = deno_core::unsync::spawn(async move {}); + + let join_fut = futures::future::try_join_all(vec![ + hearbeat_fut, + control_fut, + shell_fut, + stdio_fut, + repl_session_fut, + ]); if let Ok(result) = join_fut.or_cancel(cancel_handle).await { result?; @@ -132,26 +145,19 @@ impl JupyterServer { async fn handle_stdio_msg( iopub_connection: Arc>, - last_execution_request: Rc>>, + last_execution_request: Arc>>, stdio_msg: StreamContent, ) { - let maybe_exec_result = last_execution_request.borrow().clone(); - if let Some(exec_request) = maybe_exec_result { - let result = (iopub_connection.lock().await) - .send(stdio_msg.as_child_of(&exec_request)) - .await; + let maybe_exec_result = last_execution_request.lock().await.clone(); + let Some(exec_request) = maybe_exec_result else { + return; + }; - if let Err(err) = result { - log::error!("Output error: {}", err); - } - } - } + let mut iopub_conn = iopub_connection.lock().await; + let result = iopub_conn.send(stdio_msg.as_child_of(&exec_request)).await; - async fn handle_heartbeat( - connection: &mut KernelHeartbeatConnection, - ) -> Result<(), AnyError> { - loop { - connection.single_heartbeat().await?; + if let Err(err) = result { + log::error!("Output error: {}", err); } } @@ -222,9 +228,8 @@ impl JupyterServer { let cursor_pos = req.cursor_pos; let lsp_completions = self - .repl_session - .language_server - .completions(&user_code, cursor_pos) + .repl_session_proxy + .lsp_completions(user_code.clone(), cursor_pos) .await; if !lsp_completions.is_empty() { @@ -263,27 +268,32 @@ impl JupyterServer { { let sub_expr = &expr[..index]; let prop_name = &expr[index + 1..]; - let candidates = - get_expression_property_names(&mut self.repl_session, sub_expr) - .await - .into_iter() - .filter(|n| { - !n.starts_with("Symbol(") - && n.starts_with(prop_name) - && n != &*repl::REPL_INTERNALS_NAME - }) - .collect(); + let candidates = get_expression_property_names( + &mut self.repl_session_proxy, + sub_expr, + ) + .await + .into_iter() + .filter(|n| { + !n.starts_with("Symbol(") + && n.starts_with(prop_name) + && n != &*repl::REPL_INTERNALS_NAME + }) + .collect(); (candidates, cursor_pos - prop_name.len()) } else { // combine results of declarations and globalThis properties let mut candidates = get_expression_property_names( - &mut self.repl_session, + &mut self.repl_session_proxy, "globalThis", ) .await .into_iter() - .chain(get_global_lexical_scope_names(&mut self.repl_session).await) + .chain( + get_global_lexical_scope_names(&mut self.repl_session_proxy) + .await, + ) .filter(|n| n.starts_with(expr) && n != &*repl::REPL_INTERNALS_NAME) .collect::>(); @@ -419,7 +429,7 @@ impl JupyterServer { if !execute_request.silent && execute_request.store_history { self.execution_count += 1; } - *self.last_execution_request.borrow_mut() = Some(parent_message.clone()); + *self.last_execution_request.lock().await = Some(parent_message.clone()); self .send_iopub( @@ -432,8 +442,8 @@ impl JupyterServer { .await?; let result = self - .repl_session - .evaluate_line_with_object_wrapping(&execute_request.code) + .repl_session_proxy + .evaluate_line_with_object_wrapping(execute_request.code) .await; let evaluate_response = match result { @@ -471,8 +481,12 @@ impl JupyterServer { } = evaluate_response.value; if exception_details.is_none() { - publish_result(&mut self.repl_session, &result, self.execution_count) - .await?; + publish_result( + &mut self.repl_session_proxy, + &result, + self.execution_count, + ) + .await?; connection .send( @@ -497,7 +511,7 @@ impl JupyterServer { exception_details.exception { let result = self - .repl_session + .repl_session_proxy .call_function_on_args( r#" function(object) { @@ -513,7 +527,7 @@ impl JupyterServer { } "# .into(), - &[exception], + vec![exception], ) .await?; @@ -629,7 +643,7 @@ fn kernel_info() -> messaging::KernelInfoReply { } async fn publish_result( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, evaluate_result: &cdp::RemoteObject, execution_count: usize, ) -> Result>, AnyError> { @@ -641,21 +655,10 @@ async fn publish_result( let arg1 = cdp::CallArgument::from(evaluate_result); - let response = session - .post_message_with_event_loop( - "Runtime.callFunctionOn", - Some(json!({ - "functionDeclaration": r#"async function (execution_count, result) { - await Deno[Deno.internal].jupyter.broadcastResult(execution_count, result); - }"#, - "arguments": [arg0, arg1], - "executionContextId": session.context_id, - "awaitPromise": true, - })), - ) - .await?; - - let response: cdp::CallFunctionOnResponse = serde_json::from_value(response)?; + let Some(response) = repl_session_proxy.call_function_on(arg0, arg1).await + else { + return Ok(None); + }; if let Some(exception_details) = &response.exception_details { // If the object doesn't have a Jupyter.display method or it throws an @@ -693,34 +696,25 @@ fn is_word_boundary(c: char) -> bool { // TODO(bartlomieju): dedup with repl::editor async fn get_global_lexical_scope_names( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, ) -> Vec { - let evaluate_response = session - .post_message_with_event_loop( - "Runtime.globalLexicalScopeNames", - Some(cdp::GlobalLexicalScopeNamesArgs { - execution_context_id: Some(session.context_id), - }), - ) - .await - .unwrap(); - let evaluate_response: cdp::GlobalLexicalScopeNamesResponse = - serde_json::from_value(evaluate_response).unwrap(); - evaluate_response.names + repl_session_proxy.global_lexical_scope_names().await.names } // TODO(bartlomieju): dedup with repl::editor async fn get_expression_property_names( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, expr: &str, ) -> Vec { // try to get the properties from the expression - if let Some(properties) = get_object_expr_properties(session, expr).await { + if let Some(properties) = + get_object_expr_properties(repl_session_proxy, expr).await + { return properties; } // otherwise fall back to the prototype - let expr_type = get_expression_type(session, expr).await; + let expr_type = get_expression_type(repl_session_proxy, expr).await; let object_expr = match expr_type.as_deref() { // possibilities: https://chromedevtools.github.io/devtools-protocol/v8/Runtime/#type-RemoteObject Some("object") => "Object.prototype", @@ -732,44 +726,32 @@ async fn get_expression_property_names( _ => return Vec::new(), // undefined, symbol, and unhandled }; - get_object_expr_properties(session, object_expr) + get_object_expr_properties(repl_session_proxy, object_expr) .await .unwrap_or_default() } // TODO(bartlomieju): dedup with repl::editor async fn get_expression_type( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, expr: &str, ) -> Option { - evaluate_expression(session, expr) + evaluate_expression(repl_session_proxy, expr) .await .map(|res| res.result.kind) } // TODO(bartlomieju): dedup with repl::editor async fn get_object_expr_properties( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, object_expr: &str, ) -> Option> { - let evaluate_result = evaluate_expression(session, object_expr).await?; + let evaluate_result = + evaluate_expression(repl_session_proxy, object_expr).await?; let object_id = evaluate_result.result.object_id?; - let get_properties_response = session - .post_message_with_event_loop( - "Runtime.getProperties", - Some(cdp::GetPropertiesArgs { - object_id, - own_properties: None, - accessor_properties_only: None, - generate_preview: None, - non_indexed_properties_only: Some(true), - }), - ) - .await - .ok()?; - let get_properties_response: cdp::GetPropertiesResponse = - serde_json::from_value(get_properties_response).ok()?; + let get_properties_response = + repl_session_proxy.get_properties(object_id.clone()).await?; Some( get_properties_response .result @@ -781,35 +763,10 @@ async fn get_object_expr_properties( // TODO(bartlomieju): dedup with repl::editor async fn evaluate_expression( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, expr: &str, ) -> Option { - let evaluate_response = session - .post_message_with_event_loop( - "Runtime.evaluate", - Some(cdp::EvaluateArgs { - expression: expr.to_string(), - object_group: None, - include_command_line_api: None, - silent: None, - context_id: Some(session.context_id), - return_by_value: None, - generate_preview: None, - user_gesture: None, - await_promise: None, - throw_on_side_effect: Some(true), - timeout: Some(200), - disable_breaks: None, - repl_mode: None, - allow_unsafe_eval_blocked_by_csp: None, - unique_context_id: None, - }), - ) - .await - .ok()?; - let evaluate_response: cdp::EvaluateResponse = - serde_json::from_value(evaluate_response).ok()?; - + let evaluate_response = repl_session_proxy.evaluate(expr.to_string()).await?; if evaluate_response.exception_details.is_some() { None } else { diff --git a/cli/tools/repl/mod.rs b/cli/tools/repl/mod.rs index db1d75dadc3eb4..5bd59888d1d8cb 100644 --- a/cli/tools/repl/mod.rs +++ b/cli/tools/repl/mod.rs @@ -30,6 +30,7 @@ use editor::EditorHelper; use editor::ReplEditor; pub use session::EvaluationOutput; pub use session::ReplSession; +pub use session::TsEvaluateResponse; pub use session::REPL_INTERNALS_NAME; use super::test::create_single_test_event_channel;