Universal (client expandable) message bus library with redis pub-sub implementation in rust.
Dev version - only for tests
Add below line to your Cargo.toml dependencies config:
[dependencies]
bus-rs = { git = "https://github.com/sebgrz/bus-rs.git", tag = "v0.3.0" }
At first is require to create a message struct:
Take a look at message
attribute - it's a require macro to help registering and use the message inside whole message bus machinery.
#[message]
#[derive(Deserialize, Serialize)]
struct TestMessage {
data: String,
}
Second step is to create message handler. This one could be in two versions both sync and async (tokio runtime)
struct TestMessageHandler {}
impl MessageHandler<TestMessage> for TestMessageHandler {
fn handle(&mut self, msg: TestMessage, headers: Option<HashMap<String, String>>) {
println!("test {} {:?}", msg.data, headers);
}
}
struct TestMessageHandlerAsync {}
#[async_trait]
impl MessageHandlerAsync<TestMessage> for TestMessageHandlerAsync {
async fn handle(&mut self, msg: TestMessage, headers: Option<HashMap<String, String>>) {
println!("test {} {:?}", msg.data, headers);
}
}
To create a Listener instance, first a pubsub client is required. For redis implemention from bus_rs_redis
crate take a look for:
let redis_client = RedisClient::new("redis://127.0.0.1:6379", "test_channel");
let client = Box::new(redis_client);
let mut listener: Listener = builder::pubsub(client).build();
Next thing, which is important, is to register message handlers to the listener:
listener.register_handler(TestMessageHandler {});
thanks this a coming message could be recognize and redirect to the properly message handler.
The last step is to make listener to listening:
listener.listen().unwrap_or_else(|e| {
if let ClientError::General(err) = e {
panic!("client_error: {}", err);
}
});
let redis_client = RedisClientAsync::new_receiver("redis://127.0.0.1:6379", "test_channel").await;
let client = Box::new(redis_client);
let mut listener: ListenerAsync = builder::pubsub_async(client).build();
listener.register_handler(TestMessageHandlerAsync {}).await;
// when
listener.listen().await.unwrap_or_else(|e| {
if let ClientError::General(err) = e {
panic!("client_error: {}", err);
}
});
Publisher is bind to the specific pubsub channel and gives possibility to send messages.
At the beginning, a publisher instance should be created. It's a similar approach to the Listener - create Client and pass to the Publisher constructor:
let redis_client = RedisClient::new("redis://127.0.0.1:6379", "test_channel");
let client = Box::new(redis_client));
let publisher: Publisher = builder::pubsub(client).build();
The last step is to send message. It's worth to mention that when we wrapped a message struct before by #[message]
attribute that give us
mapper implementation of message type to the RawMessage
.
let test_msg = TestMessage {
data: "test_data".to_string(),
};
publisher.publish(&test_msg, None);
Publish message with additional headers:
let headers = HashMap::from([("trace-id".to_owned(), "trace123".to_owned())]);
publisher.publish(&test_msg, Some(headers));
async version:
let redis_client = RedisClientAsync::new_sender("redis://127.0.0.1:6379", "test_channel").await;
let client = Box::new(redis_client));
let publisher: PublisherAsync = builder::pubsub_async(client).build();
let test_msg = TestMessage {
data: "test_data".to_string(),
};
// when
publisher.publish(&test_msg, None).await;
Publish message with additional headers:
let headers = HashMap::from([("trace-id".to_owned(), "trace123".to_owned())]);
publisher.publish(&test_msg, Some(headers)).await;
In a builder stage you can attach interceptors (layers). The PubSubLayer is a trait with two function to implement: before and after.
As the name suggest before
function is calling before send/receive a message, and after
is calling when the message sent/received.
struct TestLayer;
impl PubSubLayer for TestLayer {
fn before(&self, raw_msg: &mut bus_rs::RawMessage) {
println!("Test layer before");
}
fn after(&self, raw_msg: &bus_rs::RawMessage) {
println!("Test layer after");
}
}
...
let test_layer = TestLayer {};
let mut listener: Listener = builder::pubsub(client)
.add_layer(Box::new(test_layer))
.build();