-
Notifications
You must be signed in to change notification settings - Fork 2
/
mqtt_example.rs
138 lines (130 loc) · 5.17 KB
/
mqtt_example.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// OPC UA Pubsub implementation for Rust
// SPDX-License-Identifier: MPL-2.0
// Copyright (C) 2021 Alexander Schrode
use opcua_pubsub::prelude::*;
use rand::prelude::*;
use std::sync::{Arc, Mutex, RwLock};
use std::{thread, time};
/// Example uses Mqtt with Uadp encoding
/// The difference to uadp is, that you have to configure
/// datasets with broker settings
// Generates the Publisher
fn generate_pubsub(
ns: u16,
addr: &Arc<RwLock<SimpleAddressSpace>>,
cb: Arc<Mutex<dyn OnPubSubReceiveValues Send>>,
) -> Result<PubSubApp, StatusCode> {
let topic: UAString = "OPCUA_TEST/Data".into();
let topic_meta: UAString = "OPCUA_TEST/Meta".into();
const META_INTERVAL: f64 = 0.0;
const QOS: BrokerTransportQualityOfService = BrokerTransportQualityOfService::BestEffort;
let broker: UAString = "mqtt://localhost:1883".into();
// Create Application Object
let mut pubsub = PubSubApp::new();
// Create a pubsub connection
let mut connection = PubSubConnectionBuilder::new()
.mqtt(MqttConfig::new(broker))
.publisher_id(Variant::UInt16(2234))
.build(addr.clone())?;
// Create a Published Dataset with the fields to publish
let dataset_name = "Dataset 1".into();
let mut dataset = PublishedDataSet::new(dataset_name);
// add fields to the dataset
DataSetFieldBuilder::new()
.set_target_variable(NodeId::new(ns, 0))
.set_alias("ServerTime".into())
.insert(&mut dataset);
DataSetFieldBuilder::new()
.set_target_variable(NodeId::new(ns, 1))
.set_alias("Int32".into())
.insert(&mut dataset);
DataSetFieldBuilder::new()
.set_target_variable(NodeId::new(ns, 2))
.set_alias("Int64".into())
.insert(&mut dataset);
DataSetFieldBuilder::new()
.set_target_variable(NodeId::new(ns, 3))
.set_alias("BoolToggle".into())
.insert(&mut dataset);
// Configure a Writer Group which is responsible for sending the messages
let mut wg = WriterGroupBuilder::new_for_broker(&topic, &QOS)
.set_name("WriterGroup1".into())
.set_group_id(100)
.set_publish_interval(1000.0)
.build();
// Glue the writer group and published dataset together with a
// dataset writer
let dsw = DataSetWriterBuilder::new_for_broker(&dataset, META_INTERVAL, &topic_meta, &QOS)
.key_frame_count(1)
.dataset_writer_id(62541)
.name("DataSetWriter1".into())
.build();
wg.add_dataset_writer(dsw);
connection.add_writer_group(wg);
pubsub.add_dataset(dataset)?;
// create a reader group to handle incoming messages
let mut rg = ReaderGroup::new("Reader Group 1".into());
// build the dataset reader to receive values.
// Publisher id, writer group id and dataset writer id are to target publisher and dataset
let mut dsr = DataSetReaderBuilder::new_for_broker(&topic, &topic_meta, QOS)
.name("DataSet Reader 1".into())
.publisher_id(2234_u16.into())
.writer_group_id(100)
.dataset_writer_id(62541)
.build();
// Add the expected fields as MetaData
PubSubFieldMetaDataBuilder::new()
.data_type(&DataTypeId::DateTime)
.name("DateTime".into())
.insert(&mut dsr);
PubSubFieldMetaDataBuilder::new()
.data_type(&DataTypeId::Int32)
.name("Int32".into())
.insert(&mut dsr);
PubSubFieldMetaDataBuilder::new()
.data_type(&DataTypeId::Int64)
.name("Int64".into())
.insert(&mut dsr);
PubSubFieldMetaDataBuilder::new()
.data_type(&DataTypeId::Boolean)
.name("ToggleBool".into())
.insert(&mut dsr);
rg.add_dataset_reader(dsr);
connection.add_reader_group(rg);
connection.set_data_value_recv(Some(cb));
pubsub.add_connection(connection)?;
Ok(pubsub)
}
fn on_value(reader: &DataSetReader, dataset: &[UpdateTarget]) {
println!("#### Got Dataset from reader: {}", reader.name());
for UpdateTarget(_, dv, meta) in dataset {
println!("#### Variable: {} Value: {:?}", meta.name(), dv);
}
}
fn main() -> Result<(), StatusCode> {
opcua_console_logging::init();
let data_source = SimpleAddressSpace::new_arc_lock();
let nodes: Vec<NodeId> = (0..8).map(|i| NodeId::new(0, i as u32)).collect();
// Generating a PubSubApp
let cb = OnReceiveValueFn::new_boxed(on_value);
let pubsub = generate_pubsub(0, &data_source, cb)?;
// Spawn a pubsub connection
PubSubApp::run_thread(Arc::new(RwLock::new(pubsub)));
// Simulate a working loop where data is produced
let mut rng = rand::thread_rng();
let mut i = 0_usize;
loop {
// Update values Time every 100ms all others all 10 seconds
{
let mut ds = data_source.write().unwrap();
ds.set_value(&nodes[0], DataValue::new_now(DateTime::now()));
if i % 10 == 0 {
ds.set_value(&nodes[1], DataValue::new_now(rng.gen::<i32>()));
ds.set_value(&nodes[2], DataValue::new_now(rng.gen::<i64>()));
ds.set_value(&nodes[3], DataValue::new_now(rng.gen::<bool>()));
}
}
thread::sleep(time::Duration::from_millis(100));
i = i.wrapping_add(1);
}
}