Progress on sync sockets (4)

This commit is contained in:
Riccardo Zaglia 2023-06-23 10:48:00 +08:00
parent f6cf93bc98
commit f60ab33e3d
2 changed files with 178 additions and 167 deletions

View File

@ -8,14 +8,14 @@ use crate::{
tracking::{self, TrackingManager},
FfiButtonValue, FfiFov, FfiViewsConfig, VideoPacket, BITRATE_MANAGER, CONTROL_CHANNEL_SENDER,
DECODER_CONFIG, DISCONNECT_CLIENT_NOTIFIER, HAPTICS_CHANNEL_SENDER, RESTART_NOTIFIER,
SERVER_DATA_MANAGER, SHUTDOWN_NOTIFIER, STATISTICS_MANAGER, VIDEO_CHANNEL_SENDER,
SERVER_DATA_MANAGER, SHUTDOWN_NOTIFIER, STATISTICS_MANAGER, VIDEO_MIRROR_SENDER,
VIDEO_RECORDING_FILE,
};
use alvr_audio::AudioDevice;
use alvr_common::{
glam::{UVec2, Vec2},
once_cell::sync::Lazy,
parking_lot::{self, RwLock},
parking_lot,
prelude::*,
settings_schema::Switch,
RelaxedAtomic, DEVICE_ID_TO_PATH, HEAD_ID, LEFT_HAND_ID, RIGHT_HAND_ID,
@ -23,7 +23,8 @@ use alvr_common::{
use alvr_events::{ButtonEvent, EventType, HapticsEvent, TrackingEvent};
use alvr_packets::{
ButtonValue, ClientConnectionResult, ClientControlPacket, ClientListAction, ClientStatistics,
ServerControlPacket, StreamConfigPacket, Tracking, AUDIO, HAPTICS, STATISTICS, TRACKING, VIDEO,
ServerControlPacket, StreamConfigPacket, Tracking, VideoPacketHeader, AUDIO, HAPTICS,
STATISTICS, TRACKING, VIDEO,
};
use alvr_session::{CodecType, ConnectionState, ControllersEmulationMode, FrameSize, OpenvrConfig};
use alvr_sockets::{
@ -33,17 +34,22 @@ use futures::future::BoxFuture;
use std::{
collections::{HashMap, HashSet},
future,
io::Write,
net::IpAddr,
process::Command,
ptr,
sync::{mpsc as smpsc, Arc},
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self as smpsc, RecvTimeoutError, TrySendError},
Arc,
},
thread,
time::Duration,
};
use tokio::{
runtime::Runtime,
sync::{mpsc as tmpsc, Mutex},
task, time,
time,
};
const RETRY_CONNECT_MIN_INTERVAL: Duration = Duration::from_secs(1);
@ -52,6 +58,11 @@ pub static SHOULD_CONNECT_TO_CLIENTS: Lazy<Arc<RelaxedAtomic>> =
Lazy::new(|| Arc::new(RelaxedAtomic::new(false)));
static CONNECTED_CLIENT_HOSTNAMES: Lazy<parking_lot::Mutex<HashSet<String>>> =
Lazy::new(|| parking_lot::Mutex::new(HashSet::new()));
static CONNECTION_RUNTIME: Lazy<parking_lot::RwLock<Option<Runtime>>> =
Lazy::new(|| parking_lot::RwLock::new(None));
static VIDEO_CHANNEL_SENDER: Lazy<
parking_lot::Mutex<Option<std::sync::mpsc::SyncSender<VideoPacket>>>,
> = Lazy::new(|| parking_lot::Mutex::new(None));
fn align32(value: f32) -> u32 {
((value / 32.).floor() * 32.) as u32
@ -606,17 +617,46 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
Box::pin(future::pending())
};
let video_send_loop = async move {
let (data_sender, mut data_receiver) =
tmpsc::channel(settings.connection.max_queued_server_video_frames);
*VIDEO_CHANNEL_SENDER.lock() = Some(data_sender);
let (playspace_sync_sender, playspace_sync_receiver) = smpsc::channel::<Option<Vec2>>();
while let Some(VideoPacket { header, payload }) = data_receiver.recv().await {
video_sender.send(&header, payload).await.ok();
let is_tracking_ref_only = settings.headset.tracking_ref_only;
if !is_tracking_ref_only {
// use a separate thread because SetChaperone() is blocking
thread::spawn(move || {
while let Ok(packet) = playspace_sync_receiver.recv() {
if let Some(area) = packet {
unsafe { crate::SetChaperone(area.x, area.y) };
} else {
unsafe { crate::SetChaperone(2.0, 2.0) };
}
}
});
}
// Note: here we create CONNECTION_RUNTIME. The rest of the function MUST be infallible, as
// CONNECTION_RUNTIME must be destroyed in the thread defined at the end of the function.
// Failure to respect this might leave a lingering runtime.
*CONNECTION_RUNTIME.write() = Some(runtime);
let (video_channel_sender, video_channel_receiver) =
std::sync::mpsc::sync_channel(settings.connection.max_queued_server_video_frames);
*VIDEO_CHANNEL_SENDER.lock() = Some(video_channel_sender);
let video_send_thread = thread::spawn(move || loop {
let VideoPacket { header, payload } =
match video_channel_receiver.recv_timeout(Duration::from_millis(100)) {
Ok(packet) => packet,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return,
};
if let Some(runtime) = &*CONNECTION_RUNTIME.read() {
// IMPORTANT: The only error that can happen here is socket closed. For this reason it's
// acceptable to call .ok() and ignore the error. The connection would already be
// closing so no corruption handling is necessary
runtime.block_on(video_sender.send(&header, payload)).ok();
}
Ok(())
};
});
let haptics_send_loop = async move {
let (data_sender, mut data_receiver) = tmpsc::unbounded_channel();
@ -657,28 +697,9 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
Ok(())
};
let (playspace_sync_sender, playspace_sync_receiver) = smpsc::channel::<Option<Vec2>>();
let is_tracking_ref_only = settings.headset.tracking_ref_only;
if !is_tracking_ref_only {
// use a separate thread because SetChaperone() is blocking
thread::spawn(move || {
while let Ok(packet) = playspace_sync_receiver.recv() {
if let Some(area) = packet {
unsafe { crate::SetChaperone(area.x, area.y) };
} else {
unsafe { crate::SetChaperone(2.0, 2.0) };
}
}
});
}
let runtime = Arc::new(RwLock::new(Some(runtime)));
let tracking_manager = Arc::new(parking_lot::Mutex::new(TrackingManager::new()));
let tracking_receive_thread = thread::spawn({
let runtime = Arc::clone(&runtime);
let tracking_manager = Arc::clone(&tracking_manager);
move || {
let face_tracking_sink =
@ -695,8 +716,8 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
track_controllers = config.tracked.into();
}
task::block_in_place(|| loop {
let Some(tracking) = runtime
loop {
let Some(tracking) = CONNECTION_RUNTIME
.read()
.as_ref()
.and_then(|runtime| runtime.block_on(tracking_receiver.recv_header_only()).ok())
@ -801,35 +822,30 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
)
};
}
})
}
}
});
let statistics_thread = thread::spawn({
let runtime = Arc::clone(&runtime);
move || {
task::block_in_place(|| loop {
let Some(client_stats) = runtime
.read()
.as_ref()
.and_then(|runtime| runtime.block_on(statics_receiver.recv_header_only()).ok())
else {
return;
};
let statistics_thread = thread::spawn(move || loop {
let Some(client_stats) = CONNECTION_RUNTIME
.read()
.as_ref()
.and_then(|runtime| runtime.block_on(statics_receiver.recv_header_only()).ok())
else {
return;
};
if let Some(stats) = &mut *STATISTICS_MANAGER.lock() {
let timestamp = client_stats.target_timestamp;
let decoder_latency = client_stats.video_decode;
let network_latency = stats.report_statistics(client_stats);
if let Some(stats) = &mut *STATISTICS_MANAGER.lock() {
let timestamp = client_stats.target_timestamp;
let decoder_latency = client_stats.video_decode;
let network_latency = stats.report_statistics(client_stats);
BITRATE_MANAGER.lock().report_frame_latencies(
&SERVER_DATA_MANAGER.read().settings().video.bitrate.mode,
timestamp,
network_latency,
decoder_latency,
);
}
})
BITRATE_MANAGER.lock().report_frame_latencies(
&SERVER_DATA_MANAGER.read().settings().video.bitrate.mode,
timestamp,
network_latency,
decoder_latency,
);
}
});
@ -1025,47 +1041,51 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
};
thread::spawn(move || {
let res = runtime.read().as_ref().unwrap().block_on(async move {
tokio::select! {
// Spawn new tasks and let the runtime manage threading
res = spawn_cancelable(receive_loop) => {
if let Err(e) = res {
info!("Client disconnected. Cause: {e}" );
let res = CONNECTION_RUNTIME
.read()
.as_ref()
.unwrap()
.block_on(async move {
tokio::select! {
// Spawn new tasks and let the runtime manage threading
res = spawn_cancelable(receive_loop) => {
if let Err(e) = res {
info!("Client disconnected. Cause: {e}" );
}
Ok(())
},
res = spawn_cancelable(game_audio_loop) => res,
res = spawn_cancelable(microphone_loop) => res,
res = spawn_cancelable(haptics_send_loop) => res,
// Leave these loops on the current task
res = keepalive_loop => res,
res = control_loop => res,
res = control_send_loop => res,
_ = RESTART_NOTIFIER.notified() => {
control_sender
.lock()
.await
.send(&ServerControlPacket::Restarting)
.await
.ok();
Ok(())
}
Ok(())
},
res = spawn_cancelable(game_audio_loop) => res,
res = spawn_cancelable(microphone_loop) => res,
res = spawn_cancelable(video_send_loop) => res,
res = spawn_cancelable(haptics_send_loop) => res,
// Leave these loops on the current task
res = keepalive_loop => res,
res = control_loop => res,
res = control_send_loop => res,
_ = RESTART_NOTIFIER.notified() => {
control_sender
.lock()
.await
.send(&ServerControlPacket::Restarting)
.await
.ok();
Ok(())
_ = SHUTDOWN_NOTIFIER.notified() => Ok(()),
_ = DISCONNECT_CLIENT_NOTIFIER.notified() => Ok(()),
_ = shutdown_detector => Ok(()),
}
_ = SHUTDOWN_NOTIFIER.notified() => Ok(()),
_ = DISCONNECT_CLIENT_NOTIFIER.notified() => Ok(()),
_ = shutdown_detector => Ok(()),
}
});
});
if let Err(e) = res {
warn!("Connection interrupted: {e:?}");
}
// This requests shutdown from threads
*runtime.write() = None;
*VIDEO_CHANNEL_SENDER.lock() = None;
*CONNECTION_RUNTIME.write() = None;
SERVER_DATA_MANAGER.write().update_client_list(
client_hostname.clone(),
@ -1093,6 +1113,7 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
}
// ensure shutdown of threads
video_send_thread.join().ok();
tracking_receive_thread.join().ok();
statistics_thread.join().ok();
@ -1101,3 +1122,64 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> IntResult {
Ok(())
}
pub extern "C" fn send_video(timestamp_ns: u64, buffer_ptr: *mut u8, len: i32, is_idr: bool) {
// start in the corrupts state, the client didn't receive the initial IDR yet.
static STREAM_CORRUPTED: AtomicBool = AtomicBool::new(true);
if let Some(sender) = &*VIDEO_CHANNEL_SENDER.lock() {
let buffer_size = len as usize;
if is_idr {
STREAM_CORRUPTED.store(false, Ordering::SeqCst);
}
let timestamp = Duration::from_nanos(timestamp_ns);
let mut payload = vec![0; buffer_size];
// use copy_nonoverlapping (aka memcpy) to avoid freeing memory allocated by C++
unsafe {
ptr::copy_nonoverlapping(buffer_ptr, payload.as_mut_ptr(), buffer_size);
}
if !STREAM_CORRUPTED.load(Ordering::SeqCst)
|| !SERVER_DATA_MANAGER
.read()
.settings()
.connection
.avoid_video_glitching
{
if let Some(sender) = &*VIDEO_MIRROR_SENDER.lock() {
sender.send(payload.clone()).ok();
}
if let Some(file) = &mut *VIDEO_RECORDING_FILE.lock() {
file.write_all(&payload).ok();
}
if matches!(
sender.try_send(VideoPacket {
header: VideoPacketHeader { timestamp, is_idr },
payload,
}),
Err(TrySendError::Full(_))
) {
STREAM_CORRUPTED.store(true, Ordering::SeqCst);
unsafe { crate::RequestIDR() };
warn!("Dropping video packet. Reason: Can't push to network");
}
} else {
warn!("Dropping video packet. Reason: Waiting for IDR frame");
}
if let Some(stats) = &mut *STATISTICS_MANAGER.lock() {
let encoder_latency =
stats.report_frame_encoded(Duration::from_nanos(timestamp_ns), buffer_size);
BITRATE_MANAGER
.lock()
.report_frame_encoded(timestamp, encoder_latency, buffer_size);
}
}
}

View File

@ -45,21 +45,14 @@ use std::{
fs::File,
io::Write,
ptr,
sync::{
atomic::{AtomicBool, Ordering},
Once,
},
sync::Once,
thread,
time::{Duration, Instant},
};
use sysinfo::{ProcessRefreshKind, RefreshKind, SystemExt};
use tokio::{
runtime::Runtime,
sync::{
broadcast,
mpsc::{self, error::TrySendError},
Notify,
},
sync::{broadcast, mpsc, Notify},
};
static FILESYSTEM_LAYOUT: Lazy<Layout> = Lazy::new(|| {
@ -87,8 +80,6 @@ pub struct VideoPacket {
static CONTROL_CHANNEL_SENDER: Lazy<Mutex<Option<mpsc::UnboundedSender<ServerControlPacket>>>> =
Lazy::new(|| Mutex::new(None));
static VIDEO_CHANNEL_SENDER: Lazy<Mutex<Option<mpsc::Sender<VideoPacket>>>> =
Lazy::new(|| Mutex::new(None));
static HAPTICS_CHANNEL_SENDER: Lazy<Mutex<Option<mpsc::UnboundedSender<Haptics>>>> =
Lazy::new(|| Mutex::new(None));
static VIDEO_MIRROR_SENDER: Lazy<Mutex<Option<broadcast::Sender<Vec<u8>>>>> =
@ -341,68 +332,6 @@ pub unsafe extern "C" fn HmdDriverFactory(
});
}
extern "C" fn video_send(timestamp_ns: u64, buffer_ptr: *mut u8, len: i32, is_idr: bool) {
let buffer_size = len as usize;
// start in the corrupts state, the client didn't receive the initial IDR yet.
static STREAM_CORRUPTED: AtomicBool = AtomicBool::new(true);
if let Some(sender) = &*VIDEO_CHANNEL_SENDER.lock() {
if is_idr {
STREAM_CORRUPTED.store(false, Ordering::SeqCst);
}
let timestamp = Duration::from_nanos(timestamp_ns);
let mut payload = vec![0; buffer_size];
// use copy_nonoverlapping (aka memcpy) to avoid freeing memory allocated by C++
unsafe {
ptr::copy_nonoverlapping(buffer_ptr, payload.as_mut_ptr(), buffer_size);
}
if !STREAM_CORRUPTED.load(Ordering::SeqCst)
|| !SERVER_DATA_MANAGER
.read()
.settings()
.connection
.avoid_video_glitching
{
if let Some(sender) = &*VIDEO_MIRROR_SENDER.lock() {
sender.send(payload.clone()).ok();
}
if let Some(file) = &mut *VIDEO_RECORDING_FILE.lock() {
file.write_all(&payload).ok();
}
if matches!(
sender.try_send(VideoPacket {
header: VideoPacketHeader { timestamp, is_idr },
payload,
}),
Err(TrySendError::Full(_))
) {
STREAM_CORRUPTED.store(true, Ordering::SeqCst);
unsafe { crate::RequestIDR() };
warn!("Dropping video packet. Reason: Can't push to network");
}
} else {
warn!("Dropping video packet. Reason: Waiting for IDR frame");
}
if let Some(stats) = &mut *STATISTICS_MANAGER.lock() {
let encoder_latency =
stats.report_frame_encoded(Duration::from_nanos(timestamp_ns), buffer_size);
BITRATE_MANAGER.lock().report_frame_encoded(
timestamp,
encoder_latency,
buffer_size,
);
}
}
}
extern "C" fn haptics_send(device_id: u64, duration_s: f32, frequency: f32, amplitude: f32) {
if let Some(sender) = &*HAPTICS_CHANNEL_SENDER.lock() {
let haptics = Haptics {
@ -503,7 +432,7 @@ pub unsafe extern "C" fn HmdDriverFactory(
LogPeriodically = Some(log_periodically);
DriverReadyIdle = Some(driver_ready_idle);
InitializeDecoder = Some(initialize_decoder);
VideoSend = Some(video_send);
VideoSend = Some(connection::send_video);
HapticsSend = Some(haptics_send);
ShutdownRuntime = Some(shutdown_driver);
PathStringToHash = Some(path_string_to_hash);