Skip to content

Commit

Permalink
feat: impl ControlService for rudder (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx authored May 13, 2022
1 parent 0d300a6 commit 3b5c7fc
Show file tree
Hide file tree
Showing 23 changed files with 663 additions and 239 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

266 changes: 178 additions & 88 deletions bench/bench_kv/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 1,5 @@
use runkv_proto::rudder::control_service_client::ControlServiceClient;
use runkv_proto::rudder::{AddKeyRangesRequest, AddWheelsRequest};
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;

Expand All @@ -22,9 24,7 @@ use runkv_exhauster::{bootstrap_exhauster, build_exhauster_with_object_store};
use runkv_proto::common::Endpoint;
use runkv_proto::kv::kv_service_client::KvServiceClient;
use runkv_proto::kv::{DeleteRequest, GetRequest, PutRequest};
use runkv_proto::meta::KeyRange;
use runkv_proto::wheel::wheel_service_client::WheelServiceClient;
use runkv_proto::wheel::{AddEndpointsRequest, AddKeyRangeRequest};
use runkv_proto::meta::{KeyRange, KeyRangeInfo};
use runkv_rudder::config::RudderConfig;
use runkv_rudder::{bootstrap_rudder, build_rudder_with_object_store};
use runkv_storage::MemObjectStore;
Expand All @@ -38,8 38,27 @@ const WHEEL_CONFIG_PATH: &str = "bench/etc/wheel.toml";
const EXHAUSTER_CONFIG_PATH: &str = "bench/etc/exhauster.toml";
const LSM_TREE_CONFIG_PATH: &str = "bench/etc/lsm_tree.toml";

const RUDDER_NODE_ID: u64 = 10000;
const WHEEL_NODE_ID_BASE: u64 = 0;
const EXHAUSTER_NODE_ID_BASE: u64 = 100;

const RUDDER_PORT: u16 = 12300;
const WHEEL_PORT_BASE: u16 = 12300;
const EXHAUSTER_PORT_BASE: u16 = 12400;

const LOCALHOST: &str = "127.0.0.1";

#[derive(Parser, Debug, Clone)]
struct Args {
// TODO: Increase default value to 3.
/// Count of wheel nodes, [1, 10].
#[clap(long, default_value = "1")]
wheels: u64,

/// Count of exhauster nodes, [1, 10].
#[clap(long, default_value = "1")]
exhausters: u64,

/// Count of raft groups, [1, 100].
#[clap(long, default_value = "10")]
groups: u8,
Expand Down Expand Up @@ -111,26 130,58 @@ fn end_key(group: u8) -> Vec<u8> {
buf
}

async fn add_key_range(
wheel_client: &mut WheelServiceClient<Channel>,
start: &[u8],
end: &[u8],
group: u64,
raft_nodes: &[u64],
node: u64,
) {
wheel_client
.add_key_range(Request::new(AddKeyRangeRequest {
key_range: Some(KeyRange {
start_key: start.to_vec(),
end_key: end.to_vec(),
}),
struct ClusterInitializer {
client: ControlServiceClient<Channel>,
wheels: HashMap<u64, Endpoint>,
key_ranges: Vec<KeyRangeInfo>,
}

impl ClusterInitializer {
fn new(client: ControlServiceClient<Channel>) -> Self {
Self {
client,
wheels: HashMap::default(),
key_ranges: vec![],
}
}

fn add_wheel(&mut self, node: u64, host: String, port: u16) {
self.wheels.insert(
node,
Endpoint {
host,
port: port as u32,
},
);
}

fn add_key_range(
&mut self,
group: u64,
start_key: Vec<u8>,
end_key: Vec<u8>,
raft_nodes: HashMap<u64, u64>,
) {
self.key_ranges.push(KeyRangeInfo {
group,
raft_nodes: raft_nodes.to_vec(),
nodes: HashMap::from_iter(raft_nodes.iter().map(|&raft_node| (raft_node, node))),
}))
.await
.unwrap();
key_range: Some(KeyRange { start_key, end_key }),
raft_nodes,
});
}

async fn init(mut self) {
let req = AddWheelsRequest {
wheels: self.wheels,
};
self.client.add_wheels(Request::new(req)).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;

let req = AddKeyRangesRequest {
key_ranges: self.key_ranges,
};
self.client.add_key_ranges(Request::new(req)).await.unwrap();
tokio::time::sleep(Duration::from_secs(10)).await;
}
}

async fn assert_put(client: &mut KvServiceClient<Channel>, key: Vec<u8>, value: Vec<u8>) {
Expand Down Expand Up @@ -175,109 226,148 @@ async fn mkdir_if_not_exists(path: &str) {

#[tokio::main]
async fn main() {
// Parse arguments.
let args = Args::parse();

println!("{:#?}", args);

// Prepare directories.
println!("Prepare directories...");
let ts = timestamp();
let log_dir = format!("{}-{}", args.log_dir, ts);
let raft_log_store_data_dir = format!("{}-{}", args.raft_log_store_data_dir, ts);

mkdir_if_not_exists(&log_dir).await;
mkdir_if_not_exists(&raft_log_store_data_dir).await;

// Init log.
println!("Init log...");
let _log = init_runkv_logger("tests", 0, &log_dir);

let rudder_config: RudderConfig =
toml::from_str(&concat_toml(RUDDER_CONFIG_PATH, LSM_TREE_CONFIG_PATH)).unwrap();

let wheel_config: WheelConfig = {
// Read config templates.
println!("Read config templates...");
let rudder_config: RudderConfig = {
let mut config: RudderConfig =
toml::from_str(&concat_toml(RUDDER_CONFIG_PATH, LSM_TREE_CONFIG_PATH)).unwrap();
config.id = RUDDER_NODE_ID;
config.host = LOCALHOST.to_string();
config.port = RUDDER_PORT;
config
};
let wheel_config_template: WheelConfig = {
let mut config: WheelConfig =
toml::from_str(&concat_toml(WHEEL_CONFIG_PATH, LSM_TREE_CONFIG_PATH)).unwrap();
config.raft_log_store.log_dir_path = raft_log_store_data_dir;
config.rudder.id = RUDDER_NODE_ID;
config.rudder.host = rudder_config.host.clone();
config.rudder.port = rudder_config.port;
config.raft_log_store.persist = args.persist;
config.host = LOCALHOST.to_string();
config
};
let exhauster_config: ExhausterConfig = {
let exhauster_config_template: ExhausterConfig = {
let mut config: ExhausterConfig =
toml::from_str(&read_to_string(EXHAUSTER_CONFIG_PATH).unwrap()).unwrap();
config.rudder.id = RUDDER_NODE_ID;
config.rudder.host = rudder_config.host.clone();
config.rudder.port = rudder_config.port;
config.host = LOCALHOST.to_string();
config
};

// Connect object store.
// TODO: Support S3.
println!("Connect object store...");
let object_store = Arc::new(MemObjectStore::default());

// Build and bootstrap rudder.
println!("Bootstrap rudder...");
let (rudder, rudder_workers) =
build_rudder_with_object_store(&rudder_config, object_store.clone())
.await
.unwrap();

let (wheel, wheel_workers) = build_wheel_with_object_store(&wheel_config, object_store.clone())
.await
.unwrap();

let (exhuaster, exhauster_workers) =
build_exhauster_with_object_store(&exhauster_config, object_store)
.await
.unwrap();

tokio::spawn(async move { bootstrap_rudder(&rudder_config, rudder, rudder_workers).await });
let rudder_config_clone = rudder_config.clone();
tokio::spawn(
async move { bootstrap_rudder(&rudder_config_clone, rudder, rudder_workers).await },
);
tokio::time::sleep(Duration::from_secs(1)).await;

tokio::spawn(async move {
bootstrap_exhauster(&exhauster_config, exhuaster, exhauster_workers).await
});
tokio::time::sleep(Duration::from_secs(1)).await;

let wheel_config_clone = wheel_config.clone();
tokio::spawn(async move { bootstrap_wheel(&wheel_config_clone, wheel, wheel_workers).await });
tokio::time::sleep(Duration::from_secs(1)).await;

// TODO: Refine me.
let mut wheel_client = WheelServiceClient::connect(format!(
"http://{}:{}",
wheel_config.host, wheel_config.port
))
.await
.unwrap();
wheel_client
.add_endpoints(AddEndpointsRequest {
endpoints: HashMap::from_iter([(
wheel_config.id,
Endpoint {
host: wheel_config.host.to_owned(),
port: wheel_config.port as u32,
},
)]),
})
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
// Build and bootstrap wheels.
for i in 1..=args.wheels {
println!("Bootstrap wheel {}...", i);
let wheel_config = {
let mut config = wheel_config_template.clone();
config.raft_log_store.log_dir_path = format!("{}/{}", raft_log_store_data_dir, i);
config.id = i WHEEL_NODE_ID_BASE;
config.port = i as u16 WHEEL_PORT_BASE;
config
};
let (wheel, wheel_workers) =
build_wheel_with_object_store(&wheel_config, object_store.clone())
.await
.unwrap();
tokio::spawn(async move { bootstrap_wheel(&wheel_config, wheel, wheel_workers).await });
}

// Add key ranges.
println!("Add key ranges...");
for group in 1..=args.groups {
add_key_range(
&mut wheel_client,
&start_key(group),
&end_key(group),
group as u64,
&[
(group as u64 - 1) * 3 1,
(group as u64 - 1) * 3 2,
(group as u64 - 1) * 3 3,
],
wheel_config.id,
)
.await;
// Build and bootstrap exhausters.
for i in 1..=args.exhausters {
println!("Bootstrap exhauster {}...", i);
let exhauster_config = {
let mut config = exhauster_config_template.clone();
config.id = i EXHAUSTER_NODE_ID_BASE;
config.port = i as u16 EXHAUSTER_PORT_BASE;
config
};
let (exhuaster, exhauster_workers) =
build_exhauster_with_object_store(&exhauster_config, object_store.clone())
.await
.unwrap();
tokio::spawn(async move {
bootstrap_exhauster(&exhauster_config, exhuaster, exhauster_workers).await
});
}

tokio::time::sleep(Duration::from_secs(10)).await;
// Initialize cluster.
println!("Init cluster...");
let client =
ControlServiceClient::connect(format!("http://{}:{}", LOCALHOST, rudder_config.port))
.await
.unwrap();
let mut initializer = ClusterInitializer::new(client);
for i in 1..=args.wheels {
initializer.add_wheel(
i WHEEL_NODE_ID_BASE,
LOCALHOST.to_string(),
i as u16 WHEEL_PORT_BASE,
);
}
for (i, group) in (1..=args.groups).enumerate() {
let raft_nodes = HashMap::from_iter([
(
(i as u64) * 3 1,
(((i as u64) * 3) % args.wheels as u64) 1 WHEEL_NODE_ID_BASE,
),
(
(i as u64) * 3 2,
(((i as u64) * 3 1) % args.wheels as u64) 1 WHEEL_NODE_ID_BASE,
),
(
(i as u64) * 3 3,
(((i as u64) * 3 2) % args.wheels as u64) 1 WHEEL_NODE_ID_BASE,
),
]);
let start_key = start_key(group);
let end_key = end_key(group);
println!(
"Add key range: [group: {}] [start key: {:?}] [end key: {:?}] [raft nodes: {:?}]",
group, start_key, end_key, raft_nodes,
);
initializer.add_key_range(group as u64, start_key, end_key, raft_nodes);
}
initializer.init().await;

let channel = tonic::transport::Endpoint::from_shared(format!(
"http://{}:{}",
wheel_config.host, wheel_config.port
// TODO: Support multi wheels.
LOCALHOST,
1 WHEEL_PORT_BASE,
))
.unwrap()
.connect()
Expand Down
4 changes: 2 additions & 2 deletions bench/etc/exhauster.toml
Original file line number Diff line number Diff line change
@@ -1,6 1,6 @@
id = 201
id = 0
host = "127.0.0.1"
port = 12302
port = 0
data_path = "data"
meta_path = "meta"
heartbeat_interval = "1 s"
Expand Down
4 changes: 2 additions & 2 deletions bench/etc/rudder.toml
Original file line number Diff line number Diff line change
@@ -1,6 1,6 @@
id = 1
id = 0
host = "127.0.0.1"
port = 12300
port = 0
data_path = "data"
meta_path = "meta"
health_timeout = "10 s"
Expand Down
4 changes: 2 additions & 2 deletions bench/etc/wheel.toml
Original file line number Diff line number Diff line change
@@ -1,6 1,6 @@
id = 101
id = 0
host = "127.0.0.1"
port = 12301
port = 0
log = ".run/log/"
data_path = "data"
meta_path = "meta"
Expand Down
9 changes: 9 additions & 0 deletions proto/src/proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 10,15 @@ message KeyRange {
bytes end_key = 2;
}

message KeyRangeInfo {
// raft group id
uint64 group = 1;
// key range
meta.KeyRange key_range = 2;
// { raft node id -> node id }
map<uint64, uint64> raft_nodes = 3;
}

message WheelMeta {
uint64 id = 1;
KeyRange key_range = 2;
Expand Down
Loading

0 comments on commit 3b5c7fc

Please sign in to comment.