Skip to content

Commit

Permalink
Websocket, RTC InitializeTransports command
Browse files Browse the repository at this point in the history
  • Loading branch information
fatalerrorcoded committed Aug 15, 2021
1 parent 6d986f6 commit f701edf
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 34 deletions.
64 changes: 52 additions & 12 deletions src/rtc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 1,20 @@
use std::num::{NonZeroU32, NonZeroU8};
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use std::num::{NonZeroU32, NonZeroU8};

use crate::util::variables::{DISABLE_RTP, RTC_IPS};
use futures::join;
use mediasoup::prelude::*;
use crate::util::variables::{RTC_IPS, DISABLE_RTP};

pub mod types;
pub mod worker;

pub use worker::get_worker_pool;

use types::{InitializationInput, InitializationInputMode};
pub const SRTP_CRYPTO_SUITE: SrtpCryptoSuite = SrtpCryptoSuite::AesCm128HmacSha180;

use types::{
InitializationInput, InitializationInputMode, TransportInitData, WebRtcTransportInitData,
};

pub fn create_opus_codec(channels: u8) -> RtpCodecCapability {
RtpCodecCapability::Audio {
Expand All @@ -27,11 30,11 @@ pub fn create_opus_codec(channels: u8) -> RtpCodecCapability {
pub struct RtcState {
rtp_capabilities: RtpCapabilities,
transport_mode: TransportMode,
consumers: HashMap<String, Consumer>
consumers: HashMap<String, Consumer>,
}

impl RtcState {
pub async fn initialize(router: Router, init_data: InitializationInput) -> Result<Self, ()> {
pub async fn initialize(router: &Router, init_data: InitializationInput) -> Result<Self, ()> {
let mut webrtc_options = WebRtcTransportOptions::new(RTC_IPS.clone());
webrtc_options.enable_udp = true;
webrtc_options.enable_tcp = true;
Expand All @@ -41,24 44,25 @@ impl RtcState {
InitializationInputMode::SplitWebRtc => {
let (send, recv) = join!(
router.create_webrtc_transport(webrtc_options.clone()),
router.create_webrtc_transport(webrtc_options));
router.create_webrtc_transport(webrtc_options)
);
TransportMode::SplitWebRtc(send.map_err(|_| ())?, recv.map_err(|_| ())?)
},
}
InitializationInputMode::CombinedWebRtc => {
let transport = router.create_webrtc_transport(webrtc_options).await;
TransportMode::CombinedWebRtc(transport.map_err(|_| ())?)
},
}
InitializationInputMode::CombinedRtp => {
// TODO: make it return an error struct instead of ()
if *DISABLE_RTP {
return Err(())
return Err(());
}

let mut options = PlainTransportOptions::new(RTC_IPS[0]);
options.rtcp_mux = true;
options.comedia = true;
options.enable_srtp = true;
options.srtp_crypto_suite = SrtpCryptoSuite::AesCm128HmacSha180;
options.srtp_crypto_suite = SRTP_CRYPTO_SUITE;
let transport = router.create_plain_transport(options).await;
TransportMode::CombinedRtp(transport.map_err(|_| ())?)
}
Expand All @@ -71,7 75,43 @@ impl RtcState {
})
}

pub fn combined(&self) -> bool { self.transport_mode.combined() }
pub fn get_init_data(&self) -> TransportInitData {
match &self.transport_mode {
TransportMode::SplitWebRtc(send, recv) => TransportInitData::SplitWebRtc {
send_transport: RtcState::get_webrtc_init_data(&send),
recv_transport: RtcState::get_webrtc_init_data(&recv),
},
TransportMode::CombinedWebRtc(transport) => TransportInitData::CombinedWebRtc {
transport: RtcState::get_webrtc_init_data(&transport),
},
TransportMode::CombinedRtp(transport) => {
let tuple = transport.tuple();
let ip = RTC_IPS[0];
let ip = ip.announced_ip.unwrap_or(ip.ip);
TransportInitData::CombinedRtp {
ip,
port: tuple.local_port(),
protocol: tuple.protocol(),
id: transport.id(),
srtp_crypto_suite: SRTP_CRYPTO_SUITE,
}
}
}
}

fn get_webrtc_init_data(transport: &WebRtcTransport) -> WebRtcTransportInitData {
WebRtcTransportInitData {
id: transport.id(),
ice_parameters: transport.ice_parameters().clone(),
ice_candidates: transport.ice_candidates().clone(),
dtls_parameters: transport.dtls_parameters(),
sctp_parameters: transport.sctp_parameters(),
}
}

pub fn combined(&self) -> bool {
self.transport_mode.combined()
}
}

enum TransportMode {
Expand Down
32 changes: 16 additions & 16 deletions src/rtc/types.rs
Original file line number Diff line number Diff line change
@@ -1,10 1,10 @@
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use serde::{Serialize, Deserialize};

use mediasoup::data_structures::TransportProtocol;
use mediasoup::prelude::*;
use mediasoup::sctp_parameters::SctpParameters;
use mediasoup::srtp_parameters::SrtpParameters;
use mediasoup::data_structures::TransportProtocol;

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand All @@ -15,7 15,7 @@ pub struct InitializationInput {
}

#[derive(Serialize, Deserialize)]
#[serde(tag="mode")]
#[serde(tag = "mode")]
pub enum InitializationInputMode {
SplitWebRtc,
CombinedWebRtc,
Expand All @@ -27,31 27,31 @@ pub enum InitializationInputMode {
#[serde(rename_all = "camelCase")]
pub enum TransportInitData {
#[serde(rename_all = "camelCase")]
SplitWebRTC {
send_transport: WebRTCTransportInitData,
recv_transport: WebRTCTransportInitData,
SplitWebRtc {
send_transport: WebRtcTransportInitData,
recv_transport: WebRtcTransportInitData,
},
CombinedWebRTC {
transport: WebRTCTransportInitData,
CombinedWebRtc {
transport: WebRtcTransportInitData,
},
#[serde(rename_all = "camelCase")]
CombinedRTP {
CombinedRtp {
ip: IpAddr,
port: u16,
protocol: TransportProtocol,
id: String,
id: TransportId,
srtp_crypto_suite: SrtpCryptoSuite,
},
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WebRTCTransportInitData {
id: String,
ice_parameters: IceParameters,
ice_candidates: Vec<IceCandidate>,
dtls_arameters: DtlsParameters,
sctp_parameters: SctpParameters,
pub struct WebRtcTransportInitData {
pub id: TransportId,
pub ice_parameters: IceParameters,
pub ice_candidates: Vec<IceCandidate>,
pub dtls_parameters: DtlsParameters,
pub sctp_parameters: Option<SctpParameters>,
}

#[derive(Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion src/util/variables.rs
Original file line number Diff line number Diff line change
@@ -1,7 1,7 @@
use std::convert::TryFrom;
use std::env;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::convert::TryFrom;

use mediasoup::data_structures::TransportListenIp;
use mediasoup::prelude::TransportListenIps;
Expand Down
45 changes: 43 additions & 2 deletions src/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 8,10 @@ use futures::{
use warp::ws::{Message, WebSocket, Ws};
use warp::{Filter, Rejection, Reply};

use crate::state::room::{Room, RoomEvent};
use crate::{
rtc::RtcState,
state::room::{Room, RoomEvent},
};

mod error;
mod types;
Expand Down Expand Up @@ -81,17 84,55 @@ async fn handle(
}
};

// Transport initialization
let rtc_state = loop {
match ws_stream.next().await {
Some(message) => {
let message = message.map_err(|_| WSCloseType::ServerError)?;
// Try to get the text message, ignore otherwise (might be ping, binary)
if let Ok(text) = message.to_str() {
let out: WSCommand = serde_json::from_str(text)?;
if let WSCommandType::InitializeTransports { init_data } = out.command_type {
let router = room.router().ok_or(WSCloseType::RoomClosed)?;
let rtc_state = RtcState::initialize(router, init_data)
.await
.map_err(|_| WSCloseType::ServerError)?;
let reply_data = rtc_state.get_init_data();

let reply = WSReply {
id: out.id,
reply_type: WSReplyType::InitializeTransports { reply_data },
};

ws_sink
.send(Message::text(serde_json::to_string(&reply)?))
.await?;
break rtc_state;
} else {
return Err(WSCloseType::InvalidState);
}
}
}
// Client disconnected before they authenticated, clean up
None => {
room.users().remove(&user_id).await.ok();
return Ok(());
}
}
};

// TODO: implement some sort of way to automatically remove a user from a room if the thread panics
// the Room user remove function is async but the Drop trait is not

let result = event_loop(&room, &user_id, ws_sink, ws_stream).await;
let result = event_loop(&room, &user_id, rtc_state, ws_sink, ws_stream).await;
room.users().remove(&user_id).await.ok();
result
}

async fn event_loop(
room: &Arc<Room>,
user_id: &str,
rtc_state: RtcState,
ws_sink: &mut SplitSink<WebSocket, Message>,
ws_stream: &mut SplitStream<WebSocket>,
) -> Result<(), WSCloseType> {
Expand Down
6 changes: 3 additions & 3 deletions src/ws/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 3,8 @@ use strum::IntoStaticStr;

use mediasoup::rtp_parameters::{MediaKind, RtpCapabilitiesFinalized, RtpParameters};

use crate::state::user::{ProduceType, UserInfo};
use crate::rtc::types::{ConnectTransportData, InitializationInput, TransportInitData};
use crate::state::user::{ProduceType, UserInfo};

#[derive(Deserialize, IntoStaticStr)]
#[serde(tag = "type", content = "data")]
Expand All @@ -17,11 17,11 @@ pub enum WSCommandType {

InitializeTransports {
#[serde(flatten)]
variant: InitializationInput,
init_data: InitializationInput,
},
ConnectTransport {
#[serde(flatten)]
variant: ConnectTransportData,
connect_data: ConnectTransportData,
},

RoomInfo,
Expand Down

0 comments on commit f701edf

Please sign in to comment.