Skip to content

Commit

Permalink
Turn optionals into result
Browse files Browse the repository at this point in the history
  • Loading branch information
eatonphil committed Jan 19, 2024
1 parent 32e7bfd commit afe9f7e
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,21 756,21 @@ impl RequestVoteRequest {
metadata: [u8; 33],
request_id: u64,
term: u64,
) -> Option<RPCBody> {
) -> Result<RPCBody, String> {
let mut buffer: [u8; 69] = [0; 69];
buffer[0..metadata.len()].copy_from_slice(&metadata);
reader.read_exact(&mut buffer[metadata.len()..]).unwrap();

let checksum = u32::from_le_bytes(buffer[65..69].try_into().unwrap());
if checksum != crc32c(&buffer[0..65]) {
return None;
return Err("Bad checksum.".into());
}

let candidate_id = u128::from_le_bytes(buffer[33..49].try_into().unwrap());
let last_log_index = u64::from_le_bytes(buffer[49..57].try_into().unwrap());
let last_log_term = u64::from_le_bytes(buffer[57..65].try_into().unwrap());

Some(RPCBody::RequestVoteRequest(RequestVoteRequest {
Ok(RPCBody::RequestVoteRequest(RequestVoteRequest {
request_id,
term,
candidate_id,
Expand Down Expand Up @@ -801,17 801,17 @@ impl RequestVoteResponse {
metadata: [u8; 33],
request_id: u64,
term: u64,
) -> Option<RPCBody> {
) -> Result<RPCBody, String> {
let mut buffer: [u8; 38] = [0; 38];
buffer[0..metadata.len()].copy_from_slice(&metadata);
reader.read_exact(&mut buffer[metadata.len()..]).unwrap();

let checksum = u32::from_le_bytes(buffer[34..38].try_into().unwrap());
if checksum != crc32c(&buffer[0..34]) {
return None;
return Err("Bad checksum.".into());
}

Some(RPCBody::RequestVoteResponse(RequestVoteResponse {
Ok(RPCBody::RequestVoteResponse(RequestVoteResponse {
request_id,
term,
vote_granted: buffer[33] == 1,
Expand Down Expand Up @@ -842,14 842,14 @@ impl AppendEntriesRequest {
metadata: [u8; 33],
request_id: u64,
term: u64,
) -> Option<RPCBody> {
) -> Result<RPCBody, String> {
let mut buffer: [u8; 78] = [0; 78];
buffer[0..metadata.len()].copy_from_slice(&metadata);
reader.read_exact(&mut buffer[metadata.len()..]).unwrap();

let checksum = u32::from_le_bytes(buffer[74..78].try_into().unwrap());
if checksum != crc32c(&buffer[0..74]) {
return None;
return Err("Bad checksum.".into());
}

let leader_id = u128::from_le_bytes(buffer[33..49].try_into().unwrap());
Expand All @@ -864,7 864,7 @@ impl AppendEntriesRequest {
entries.push(e);
}

Some(RPCBody::AppendEntriesRequest(AppendEntriesRequest {
Ok(RPCBody::AppendEntriesRequest(AppendEntriesRequest {
request_id,
term,
leader_id,
Expand Down Expand Up @@ -917,7 917,7 @@ impl AppendEntriesResponse {
metadata: [u8; 33],
request_id: u64,
term: u64,
) -> Option<RPCBody> {
) -> Result<RPCBody, String> {
let mut buffer: [u8; 46] = [0; 46];
buffer[0..metadata.len()].copy_from_slice(&metadata);
reader.read_exact(&mut buffer[metadata.len()..]).unwrap();
Expand All @@ -926,10 926,10 @@ impl AppendEntriesResponse {

let checksum = u32::from_le_bytes(buffer[42..46].try_into().unwrap());
if checksum != crc32c(&buffer[0..42]) {
return None;
return Err("Bad checksum.".into());
}

Some(RPCBody::AppendEntriesResponse(AppendEntriesResponse {
Ok(RPCBody::AppendEntriesResponse(AppendEntriesResponse {
request_id,
term,
success: buffer[33] == 1,
Expand Down Expand Up @@ -995,11 995,10 @@ impl RPCMessage {
self.body.request_id()
}

fn decode<T: std::io::Read>(mut reader: BufReader<T>) -> Option<RPCMessage> {
fn decode<T: std::io::Read>(mut reader: BufReader<T>) -> Result<RPCMessage, String> {
let mut metadata: [u8; 33] = [0; 33];
if reader.read_exact(&mut metadata).is_err() {
// TODO: Should probably log the above ignored error?
return None;
return Err("Could not read metadata.".into());
}

let request_id = u64::from_le_bytes(metadata[0..8].try_into().unwrap());
Expand All @@ -1016,10 1015,10 @@ impl RPCMessage {
} else if message_type == RPCBodyKind::AppendEntriesResponse as u8 {
AppendEntriesResponse::decode(reader, metadata, request_id, term)
} else {
panic!("Unknown request type: {}.", message_type);
return Err(format!("Unknown request type: {}.", message_type));
};

Some(RPCMessage {
Ok(RPCMessage {
from: server_id,
body: body?,
})
Expand Down Expand Up @@ -1116,8 1115,9 @@ impl RPCManager {
}

let bufreader = BufReader::new(stream);
if let Some(msg) = RPCMessage::decode(bufreader) {
thread_stream_sender.send(msg).unwrap();
match RPCMessage::decode(bufreader) {
Ok(msg) => thread_stream_sender.send(msg).unwrap(),
Err(msg) => panic!("{}", msg),
}
}
});
Expand Down Expand Up @@ -1161,8 1161,9 @@ impl RPCManager {

std::thread::spawn(move || {
let bufreader = BufReader::new(stream);
if let Some(response) = RPCMessage::decode(bufreader) {
thread_stream_sender.send(response).unwrap();
match RPCMessage::decode(bufreader) {
Ok(response) => thread_stream_sender.send(response).unwrap(),
Err(msg) => panic!("{}", msg),
}
});
}
Expand Down Expand Up @@ -1824,8 1825,10 @@ impl<SM: StateMachine> Server<SM> {
return;
}
state.log(format!(
"Tick start. Timeout in: {}ms.",
(state.volatile.election_timeout - Instant::now()).as_millis()
"Tick start. Log length: {}, commit index: {}. Timeout in: {}ms.",
state.durable.next_log_index,
state.volatile.commit_index,
(state.volatile.election_timeout - Instant::now()).as_millis(),
));
drop(state);

Expand Down

0 comments on commit afe9f7e

Please sign in to comment.