Skip to content

Commit

Permalink
feat: introduce runkv client (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx authored May 14, 2022
1 parent 3b5c7fc commit 5e273fa
Show file tree
Hide file tree
Showing 24 changed files with 820 additions and 385 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures = "0.3"
itertools = "0.10.3"
lazy_static = "1.4.0"
rand = "0.8.5"
runkv-client = { path = "../client" }
runkv-common = { path = "../common" }
runkv-exhauster = { path = "../exhauster" }
runkv-proto = { path = "../proto" }
Expand Down
72 changes: 25 additions & 47 deletions bench/bench_kv/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use runkv_client::client::{RunkvClient, RunkvClientOptions};
use runkv_proto::rudder::control_service_client::ControlServiceClient;
use runkv_proto::rudder::{AddKeyRangesRequest, AddWheelsRequest};
#[cfg(not(target_env = "msvc"))]
Expand All @@ -22,8 +23,6 @@ use runkv_common::time::timestamp;
use runkv_exhauster::config::ExhausterConfig;
use runkv_exhauster::{bootstrap_exhauster, build_exhauster_with_object_store};
use runkv_proto::common::Endpoint;
use runkv_proto::kv::kv_service_client::KvServiceClient;
use runkv_proto::kv::{DeleteRequest, GetRequest, PutRequest};
use runkv_proto::meta::{KeyRange, KeyRangeInfo};
use runkv_rudder::config::RudderConfig;
use runkv_rudder::{bootstrap_rudder, build_rudder_with_object_store};
Expand Down Expand Up @@ -166,6 +165,7 @@ impl ClusterInitializer {
group,
key_range: Some(KeyRange { start_key, end_key }),
raft_nodes,
leader: 0,
});
}

Expand All @@ -184,37 +184,17 @@ impl ClusterInitializer {
}
}

async fn assert_put(client: &mut KvServiceClient<Channel>, key: Vec<u8>, value: Vec<u8>) {
client
.put(Request::new(PutRequest { key, value }))
.await
.unwrap();
async fn assert_put(client: &RunkvClient, key: Vec<u8>, value: Vec<u8>) {
client.put(key, value).await.unwrap();
}

async fn assert_get(
client: &mut KvServiceClient<Channel>,
key: Vec<u8>,
expected: Option<Vec<u8>>,
) {
let result = client
.get(Request::new(GetRequest { key, sequence: 0 }))
.await
.unwrap()
.into_inner()
.value;
let result = if result.is_empty() {
None
} else {
Some(result)
};
async fn assert_get(client: &RunkvClient, key: Vec<u8>, expected: Option<Vec<u8>>) {
let result = client.get(key).await.unwrap();
assert_eq!(result, expected);
}

async fn assert_delete(client: &mut KvServiceClient<Channel>, key: Vec<u8>) {
client
.delete(Request::new(DeleteRequest { key }))
.await
.unwrap();
async fn assert_delete(client: &RunkvClient, key: Vec<u8>) {
client.delete(key).await.unwrap();
}

async fn mkdir_if_not_exists(path: &str) {
Expand Down Expand Up @@ -363,43 +343,41 @@ async fn main() {
}
initializer.init().await;

let channel = tonic::transport::Endpoint::from_shared(format!(
"http://{}:{}",
// TODO: Support multi wheels.
LOCALHOST,
1 + WHEEL_PORT_BASE,
))
.unwrap()
.connect()
.await
.unwrap();
let runkv_client = RunkvClient::open(RunkvClientOptions {
rudder: rudder_config.id,
rudder_host: rudder_config.host.clone(),
rudder_port: rudder_config.port,
heartbeat_interval: Duration::from_nanos(0),
})
.await;
runkv_client.update_router().await.unwrap();

let futures = (1..=args.groups)
.flat_map(|group| {
let channel_clone = channel.clone();
let runkv_client_clone = runkv_client.clone();
(1..=args.concurrency).map(move |c| {
let channel_clone_clone = channel_clone.clone();
let client_clone = runkv_client_clone.clone();
async move {
let mut rng = thread_rng();

let mut client = KvServiceClient::new(channel_clone_clone);
let client = client_clone.clone();

let key = key(group, c, args.key_size);
let value = value(group, c, args.value_size);

for _ in 0..args.r#loop {
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..10))).await;
assert_put(&mut client, key.clone(), value.clone()).await;
assert_put(&client, key.clone(), value.clone()).await;
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..10))).await;
assert_get(&mut client, key.clone(), Some(value.clone())).await;
assert_get(&client, key.clone(), Some(value.clone())).await;
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..10))).await;
assert_delete(&mut client, key.clone()).await;
assert_delete(&client, key.clone()).await;
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..10))).await;
assert_get(&mut client, key.clone(), None).await;
assert_get(&client, key.clone(), None).await;
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..10))).await;
assert_put(&mut client, key.clone(), value.clone()).await;
assert_put(&client, key.clone(), value.clone()).await;
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..10))).await;
assert_get(&mut client, key.clone(), Some(value.clone())).await;
assert_get(&client, key.clone(), Some(value.clone())).await;
}
}
})
Expand Down
12 changes: 12 additions & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
async-trait = "0.1"
itertools = "0.10.3"
parking_lot = "0.12"
runkv-common = { path = "../common" }
runkv-proto = { path = "../proto" }
thiserror = "1.0"
tokio = { version = "1", features = [
"rt-multi-thread",
"sync",
"macros",
"time",
] }
tonic = "0.6.2"
tracing = "0.1"
Loading

0 comments on commit 5e273fa

Please sign in to comment.