Skip to content

Commit

Permalink
Use BufReader and BufWriter for TCP in SDK (iggy-rs#641)
Browse files Browse the repository at this point in the history
  • Loading branch information
hubcio authored Feb 3, 2024
1 parent 354dea7 commit f3e6d67
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions sdk/src/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 8,8 @@ use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::time::sleep;
Expand Down Expand Up @@ -38,11 39,23 @@ unsafe impl Sync for TcpClient {}
pub(crate) trait ConnectionStream: Debug Sync Send {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, IggyError>;
async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError>;
async fn flush(&mut self) -> Result<(), IggyError>;
}

#[derive(Debug)]
struct TcpConnectionStream {
stream: TcpStream,
reader: BufReader<OwnedReadHalf>,
writer: BufWriter<OwnedWriteHalf>,
}

impl TcpConnectionStream {
pub fn new(stream: TcpStream) -> Self {
let (reader, writer) = stream.into_split();
Self {
reader: BufReader::new(reader),
writer: BufWriter::new(writer),
}
}
}

#[derive(Debug)]
Expand All @@ -59,7 72,7 @@ unsafe impl Sync for TcpTlsConnectionStream {}
#[async_trait]
impl ConnectionStream for TcpConnectionStream {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, IggyError> {
let result = self.stream.read_exact(buf).await;
let result = self.reader.read_exact(buf).await;
if let Err(error) = result {
return Err(IggyError::from(error));
}
Expand All @@ -68,7 81,11 @@ impl ConnectionStream for TcpConnectionStream {
}

async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError> {
Ok(self.stream.write_all(buf).await?)
Ok(self.writer.write_all(buf).await?)
}

async fn flush(&mut self) -> Result<(), IggyError> {
Ok(self.writer.flush().await?)
}
}

Expand All @@ -91,6 108,10 @@ impl ConnectionStream for TcpTlsConnectionStream {

Ok(())
}

async fn flush(&mut self) -> Result<(), IggyError> {
Ok(())
}
}

impl Default for TcpClient {
Expand Down Expand Up @@ -142,7 163,7 @@ impl Client for TcpClient {
remote_address = stream.peer_addr()?;

if !tls_enabled {
connection_stream = Box::new(TcpConnectionStream { stream });
connection_stream = Box::new(TcpConnectionStream::new(stream));
break;
}

Expand Down Expand Up @@ -205,6 226,7 @@ impl BinaryClient for TcpClient {
stream.write(&(payload_length as u32).to_le_bytes()).await?;
stream.write(&command.to_le_bytes()).await?;
stream.write(&payload).await?;
stream.flush().await?;
trace!("Sent a TCP request, waiting for a response...");

let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH];
Expand Down

0 comments on commit f3e6d67

Please sign in to comment.