Skip to content

Commit

Permalink
chore: use impl Future in PubSubConnect (alloy-rs#218)
Browse files Browse the repository at this point in the history
* chore: use `impl Future` in `PubSubConnect`

* fix: wasm

* fix: wasm 2
  • Loading branch information
DaniPopes authored Feb 15, 2024
1 parent 9006513 commit e127fed
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 79 deletions.
10 changes: 5 additions & 5 deletions crates/pubsub/src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,5 1,5 @@
use crate::{handle::ConnectionHandle, service::PubSubService, PubSubFrontend};
use alloy_transport::{Pbf, TransportError};
use alloy_transport::{impl_future, TransportResult};

/// Configuration objects that contain connection details for a backend.
///
Expand All @@ -15,19 15,19 @@ pub trait PubSubConnect: Sized Send Sync 'static {
/// [`ConnectionInterface`], and return the corresponding handle.
///
/// [`ConnectionInterface`]: crate::ConnectionInterface
fn connect<'a: 'b, 'b>(&'a self) -> Pbf<'b, ConnectionHandle, TransportError>;
fn connect(&self) -> impl_future!(<Output = TransportResult<ConnectionHandle>>);

/// Attempt to reconnect the transport.
///
/// Override this to add custom reconnection logic to your connector. This
/// will be used by the internal pubsub connection managers in the event the
/// connection fails.
fn try_reconnect<'a: 'b, 'b>(&'a self) -> Pbf<'b, ConnectionHandle, TransportError> {
fn try_reconnect(&self) -> impl_future!(<Output = TransportResult<ConnectionHandle>>) {
self.connect()
}

/// Convert the configuration object into a service with a running backend.
fn into_service(self) -> Pbf<'static, PubSubFrontend, TransportError> {
Box::pin(PubSubService::connect(self))
fn into_service(self) -> impl_future!(<Output = TransportResult<PubSubFrontend>>) {
PubSubService::connect(self)
}
}
8 changes: 4 additions & 4 deletions crates/pubsub/src/frontend.rs
Original file line number Diff line number Diff line change
@@ -1,7 1,7 @@
use crate::{ix::PubSubInstruction, managers::InFlight, RawSubscription};
use alloy_json_rpc::{RequestPacket, Response, ResponsePacket, SerializedRequest};
use alloy_primitives::U256;
use alloy_transport::{TransportError, TransportErrorKind, TransportFut};
use alloy_transport::{TransportError, TransportErrorKind, TransportFut, TransportResult};
use futures::{future::try_join_all, FutureExt, TryFutureExt};
use std::{
future::Future,
Expand Down Expand Up @@ -31,7 31,7 @@ impl PubSubFrontend {
pub fn get_subscription(
&self,
id: U256,
) -> impl Future<Output = Result<RawSubscription, TransportError>> Send 'static {
) -> impl Future<Output = TransportResult<RawSubscription>> Send 'static {
let backend_tx = self.tx.clone();
async move {
let (tx, rx) = oneshot::channel();
Expand All @@ -43,7 43,7 @@ impl PubSubFrontend {
}

/// Unsubscribe from a subscription.
pub fn unsubscribe(&self, id: U256) -> Result<(), TransportError> {
pub fn unsubscribe(&self, id: U256) -> TransportResult<()> {
self.tx
.send(PubSubInstruction::Unsubscribe(id))
.map_err(|_| TransportErrorKind::backend_gone())
Expand All @@ -53,7 53,7 @@ impl PubSubFrontend {
pub fn send(
&self,
req: SerializedRequest,
) -> impl Future<Output = Result<Response, TransportError>> Send 'static {
) -> impl Future<Output = TransportResult<Response>> Send 'static {
let tx = self.tx.clone();
let channel_size = self.channel_size;

Expand Down
6 changes: 3 additions & 3 deletions crates/pubsub/src/managers/in_flight.rs
Original file line number Diff line number Diff line change
@@ -1,6 1,6 @@
use alloy_json_rpc::{Response, ResponsePayload, SerializedRequest};
use alloy_primitives::U256;
use alloy_transport::TransportError;
use alloy_transport::{TransportError, TransportResult};
use std::fmt;
use tokio::sync::oneshot;

Expand All @@ -16,7 16,7 @@ pub(crate) struct InFlight {
pub(crate) channel_size: usize,

/// The channel to send the response on.
pub(crate) tx: oneshot::Sender<Result<Response, TransportError>>,
pub(crate) tx: oneshot::Sender<TransportResult<Response>>,
}

impl fmt::Debug for InFlight {
Expand All @@ -34,7 34,7 @@ impl InFlight {
pub(crate) fn new(
request: SerializedRequest,
channel_size: usize,
) -> (Self, oneshot::Receiver<Result<Response, TransportError>>) {
) -> (Self, oneshot::Receiver<TransportResult<Response>>) {
let (tx, rx) = oneshot::channel();

(Self { request, channel_size, tx }, rx)
Expand Down
6 changes: 3 additions & 3 deletions crates/pubsub/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 8,7 @@ use alloy_json_rpc::{Id, PubSubItem, Request, Response, ResponsePayload};
use alloy_primitives::U256;
use alloy_transport::{
utils::{to_json_raw_value, Spawnable},
TransportError, TransportErrorKind, TransportResult,
TransportErrorKind, TransportResult,
};
use serde_json::value::RawValue;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -35,7 35,7 @@ pub(crate) struct PubSubService<T> {

impl<T: PubSubConnect> PubSubService<T> {
/// Create a new service from a connector.
pub(crate) async fn connect(connector: T) -> Result<PubSubFrontend, TransportError> {
pub(crate) async fn connect(connector: T) -> TransportResult<PubSubFrontend> {
let handle = connector.connect().await?;

let (tx, reqs) = mpsc::unbounded_channel();
Expand All @@ -51,7 51,7 @@ impl<T: PubSubConnect> PubSubService<T> {
}

/// Reconnect by dropping the backend and creating a new one.
async fn get_new_backend(&mut self) -> Result<ConnectionHandle, TransportError> {
async fn get_new_backend(&mut self) -> TransportResult<ConnectionHandle> {
let mut handle = self.connector.try_reconnect().await?;
std::mem::swap(&mut self.handle, &mut handle);
Ok(handle)
Expand Down
6 changes: 3 additions & 3 deletions crates/rpc-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 107,7 @@ impl<'a, T> BatchRequest<'a, T> {
fn push<Params: RpcParam, Resp: RpcReturn>(
&mut self,
request: Request<Params>,
) -> Result<Waiter<Resp>, TransportError> {
) -> TransportResult<Waiter<Resp>> {
let ser = request.serialize().map_err(TransportError::ser_err)?;
Ok(self.push_raw(ser).into())
}
Expand All @@ -127,7 127,7 @@ where
&mut self,
method: &'static str,
params: &Params,
) -> Result<Waiter<Resp>, TransportError> {
) -> TransportResult<Waiter<Resp>> {
let request = self.transport.make_request(method, Cow::Borrowed(params));
self.push(request)
}
Expand Down Expand Up @@ -245,7 245,7 @@ impl<T> Future for BatchFuture<T>
where
T: Transport Clone,
{
type Output = Result<(), TransportError>;
type Output = TransportResult<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
if matches!(*self.as_mut(), BatchFuture::Prepared { .. }) {
Expand Down
14 changes: 7 additions & 7 deletions crates/rpc-client/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 1,6 @@
use crate::RpcClient;
use alloy_transport::{
BoxTransport, BoxTransportConnect, Transport, TransportConnect, TransportError,
BoxTransport, BoxTransportConnect, Transport, TransportConnect, TransportResult,
};
use tower::{
layer::util::{Identity, Stack},
Expand Down Expand Up @@ -74,10 74,10 @@ impl<L> ClientBuilder<L> {
self.transport(transport, is_local)
}

#[cfg(feature = "pubsub")]
/// Connect a pubsub transport, producing an [`RpcClient`] with the provided
/// connection.
pub async fn pubsub<C>(self, pubsub_connect: C) -> Result<RpcClient<L::Service>, TransportError>
#[cfg(feature = "pubsub")]
pub async fn pubsub<C>(self, pubsub_connect: C) -> TransportResult<RpcClient<L::Service>>
where
C: alloy_pubsub::PubSubConnect,
L: Layer<alloy_pubsub::PubSubFrontend>,
Expand All @@ -88,13 88,13 @@ impl<L> ClientBuilder<L> {
Ok(self.transport(transport, is_local))
}

#[cfg(feature = "ws")]
/// Connect a WS transport, producing an [`RpcClient`] with the provided
/// connection
#[cfg(feature = "ws")]
pub async fn ws(
self,
ws_connect: alloy_transport_ws::WsConnect,
) -> Result<RpcClient<L::Service>, TransportError>
) -> TransportResult<RpcClient<L::Service>>
where
L: Layer<alloy_pubsub::PubSubFrontend>,
L::Service: Transport,
Expand All @@ -104,7 104,7 @@ impl<L> ClientBuilder<L> {

/// Connect a transport, producing an [`RpcClient`] with the provided
/// connection.
pub async fn connect<C>(self, connect: C) -> Result<RpcClient<L::Service>, TransportError>
pub async fn connect<C>(self, connect: C) -> TransportResult<RpcClient<L::Service>>
where
C: TransportConnect,
L: Layer<C::Transport>,
Expand All @@ -116,7 116,7 @@ impl<L> ClientBuilder<L> {

/// Connect a transport, producing an [`RpcClient`] with a [`BoxTransport`]
/// connection.
pub async fn connect_boxed<C>(self, connect: C) -> Result<RpcClient<L::Service>, TransportError>
pub async fn connect_boxed<C>(self, connect: C) -> TransportResult<RpcClient<L::Service>>
where
C: BoxTransportConnect,
L: Layer<BoxTransport>,
Expand Down
8 changes: 3 additions & 5 deletions crates/rpc-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 1,6 @@
use crate::{BatchRequest, ClientBuilder, RpcCall};
use alloy_json_rpc::{Id, Request, RpcParam, RpcReturn};
use alloy_transport::{BoxTransport, Transport, TransportConnect, TransportError};
use alloy_transport::{BoxTransport, Transport, TransportConnect, TransportResult};
use alloy_transport_http::Http;
use std::sync::atomic::{AtomicU64, Ordering};
use tower::{layer::util::Identity, ServiceBuilder};
Expand Down Expand Up @@ -41,7 41,7 @@ impl<T> RpcClient<T> {
}

/// Connect to a transport via a [`TransportConnect`] implementor.
pub async fn connect<C>(connect: C) -> Result<Self, TransportError>
pub async fn connect<C>(connect: C) -> TransportResult<Self>
where
T: Transport,
C: TransportConnect<Transport = T>,
Expand Down Expand Up @@ -148,9 148,7 @@ mod pubsub_impl {
}

/// Connect to a transport via a [`PubSubConnect`] implementor.
pub async fn connect_pubsub<C>(
connect: C,
) -> Result<RpcClient<PubSubFrontend>, TransportError>
pub async fn connect_pubsub<C>(connect: C) -> TransportResult<RpcClient<PubSubFrontend>>
where
C: PubSubConnect,
{
Expand Down
20 changes: 7 additions & 13 deletions crates/transport-ipc/src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 3,8 @@ use std::{
path::PathBuf,
};

#[derive(Debug, Clone)]
/// An IPC Connection object.
#[derive(Debug, Clone)]
pub struct IpcConnect<T> {
inner: T,
}
Expand All @@ -28,18 28,12 @@ macro_rules! impl_connect {
true
}

fn connect<'a: 'b, 'b>(
&'a self,
) -> alloy_transport::Pbf<
'b,
alloy_pubsub::ConnectionHandle,
alloy_transport::TransportError,
> {
Box::pin(async move {
crate::IpcBackend::connect(&self.inner)
.await
.map_err(alloy_transport::TransportErrorKind::custom)
})
async fn connect(
&self,
) -> Result<alloy_pubsub::ConnectionHandle, alloy_transport::TransportError> {
crate::IpcBackend::connect(&self.inner)
.await
.map_err(alloy_transport::TransportErrorKind::custom)
}
}
};
Expand Down
21 changes: 9 additions & 12 deletions crates/transport-ws/src/native.rs
Original file line number Diff line number Diff line change
@@ -1,6 1,6 @@
use crate::WsBackend;
use alloy_pubsub::PubSubConnect;
use alloy_transport::{utils::Spawnable, Authorization, Pbf, TransportError, TransportErrorKind};
use alloy_transport::{utils::Spawnable, Authorization, TransportErrorKind, TransportResult};
use futures::{SinkExt, StreamExt};
use serde_json::value::RawValue;
use std::time::Duration;
Expand Down Expand Up @@ -42,21 42,18 @@ impl PubSubConnect for WsConnect {
alloy_transport::utils::guess_local_url(&self.url)
}

fn connect<'a: 'b, 'b>(&'a self) -> Pbf<'b, alloy_pubsub::ConnectionHandle, TransportError> {
async fn connect(&self) -> TransportResult<alloy_pubsub::ConnectionHandle> {
let request = self.clone().into_client_request();
let req = request.map_err(TransportErrorKind::custom)?;
let (socket, _) =
tokio_tungstenite::connect_async(req).await.map_err(TransportErrorKind::custom)?;

Box::pin(async move {
let req = request.map_err(TransportErrorKind::custom)?;
let (socket, _) =
tokio_tungstenite::connect_async(req).await.map_err(TransportErrorKind::custom)?;
let (handle, interface) = alloy_pubsub::ConnectionHandle::new();
let backend = WsBackend { socket, interface };

let (handle, interface) = alloy_pubsub::ConnectionHandle::new();
let backend = WsBackend { socket, interface };
backend.spawn();

backend.spawn();

Ok(handle)
})
Ok(handle)
}
}

Expand Down
21 changes: 8 additions & 13 deletions crates/transport-ws/src/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,6 1,6 @@
use super::WsBackend;
use alloy_pubsub::PubSubConnect;
use alloy_transport::{utils::Spawnable, Pbf, TransportError, TransportErrorKind};
use alloy_transport::{utils::Spawnable, TransportErrorKind, TransportResult};
use futures::{
sink::SinkExt,
stream::{Fuse, StreamExt},
Expand All @@ -20,21 20,16 @@ impl PubSubConnect for WsConnect {
alloy_transport::utils::guess_local_url(&self.url)
}

fn connect<'a: 'b, 'b>(&'a self) -> Pbf<'b, alloy_pubsub::ConnectionHandle, TransportError> {
Box::pin(async move {
let socket = WsMeta::connect(&self.url, None)
.await
.map_err(TransportErrorKind::custom)?
.1
.fuse();
async fn connect(&self) -> TransportResult<alloy_pubsub::ConnectionHandle> {
let socket =
WsMeta::connect(&self.url, None).await.map_err(TransportErrorKind::custom)?.1.fuse();

let (handle, interface) = alloy_pubsub::ConnectionHandle::new();
let backend = WsBackend { socket, interface };
let (handle, interface) = alloy_pubsub::ConnectionHandle::new();
let backend = WsBackend { socket, interface };

backend.spawn();
backend.spawn();

Ok(handle)
})
Ok(handle)
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 15,7 @@ exclude.workspace = true
alloy-json-rpc.workspace = true

base64.workspace = true
futures-util.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
serde.workspace = true
thiserror.workspace = true
Expand Down
15 changes: 4 additions & 11 deletions crates/transport/src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,4 1,5 @@
use crate::{BoxTransport, Pbf, Transport, TransportError};
use futures_util::TryFutureExt;

/// Connection details for a transport.
///
Expand Down Expand Up @@ -41,23 42,15 @@ pub trait BoxTransportConnect {
fn get_boxed_transport<'a: 'b, 'b>(&'a self) -> Pbf<'b, BoxTransport, TransportError>;
}

impl<T> BoxTransportConnect for T
where
T: TransportConnect,
{
impl<T: TransportConnect> BoxTransportConnect for T {
fn is_local(&self) -> bool {
TransportConnect::is_local(self)
}

fn get_boxed_transport<'a: 'b, 'b>(&'a self) -> Pbf<'b, BoxTransport, TransportError> {
Box::pin(async move { self.get_transport().await.map(Transport::boxed) })
Box::pin(self.get_transport().map_ok(Transport::boxed))
}
}

#[cfg(test)]
mod test {
use super::*;
fn __compile_check(_: Box<dyn BoxTransportConnect>) {
todo!()
}
}
fn _object_safe(_: Box<dyn BoxTransportConnect>) {}
Loading

0 comments on commit e127fed

Please sign in to comment.