use arc_swap::ArcSwap; use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; use std::cmp::Ordering; use std::collections::BinaryHeap; use std::sync::Arc; use std::time::Duration; use super::link::LinkState; use super::sequencer::{AudioCommand, MidiCommand}; use super::timing::{SyncTime, ACTIVE_WAIT_THRESHOLD_US}; /// A command scheduled for dispatch at a specific time. #[derive(Clone)] pub struct TimedCommand { pub command: DispatchCommand, pub target_time_us: SyncTime, } /// Commands the dispatcher can send to audio/MIDI threads. #[derive(Clone)] pub enum DispatchCommand { Audio { cmd: String, time: Option }, Midi(MidiCommand), FlushMidi, Hush, Panic, } impl Ord for TimedCommand { fn cmp(&self, other: &Self) -> Ordering { // Reverse ordering for min-heap (earliest time first) other.target_time_us.cmp(&self.target_time_us) } } impl PartialOrd for TimedCommand { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl PartialEq for TimedCommand { fn eq(&self, other: &Self) -> bool { self.target_time_us == other.target_time_us } } impl Eq for TimedCommand {} /// Main dispatcher loop - receives timed commands and dispatches them at the right moment. pub fn dispatcher_loop( cmd_rx: Receiver, audio_tx: Arc>>, midi_tx: Arc>>, link: Arc, ) { let mut queue: BinaryHeap = BinaryHeap::with_capacity(256); loop { let current_us = link.clock_micros() as SyncTime; // Calculate timeout based on next queued event let timeout_us = queue .peek() .map(|cmd| cmd.target_time_us.saturating_sub(current_us)) .unwrap_or(100_000) // 100ms default when idle .max(100); // Minimum 100μs to prevent busy-looping // Receive new commands (with timeout) match cmd_rx.recv_timeout(Duration::from_micros(timeout_us)) { Ok(cmd) => queue.push(cmd), Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => break, } // Drain any additional pending commands while let Ok(cmd) = cmd_rx.try_recv() { queue.push(cmd); } // Dispatch ready commands let current_us = link.clock_micros() as SyncTime; while let Some(cmd) = queue.peek() { if cmd.target_time_us <= current_us + ACTIVE_WAIT_THRESHOLD_US { let cmd = queue.pop().unwrap(); wait_until_dispatch(cmd.target_time_us, &link); dispatch_command(cmd.command, &audio_tx, &midi_tx); } else { break; } } } } /// Active-wait until the target time for precise dispatch. fn wait_until_dispatch(target_us: SyncTime, link: &LinkState) { while (link.clock_micros() as SyncTime) < target_us { std::hint::spin_loop(); } } /// Route a command to the appropriate output channel. fn dispatch_command( cmd: DispatchCommand, audio_tx: &Arc>>, midi_tx: &Arc>>, ) { match cmd { DispatchCommand::Audio { cmd, time } => { let _ = audio_tx .load() .try_send(AudioCommand::Evaluate { cmd, time }); } DispatchCommand::Midi(midi_cmd) => { let _ = midi_tx.load().try_send(midi_cmd); } DispatchCommand::FlushMidi => { // Send All Notes Off (CC 123) on all 16 channels for all 4 devices for dev in 0..4u8 { for chan in 0..16u8 { let _ = midi_tx.load().try_send(MidiCommand::CC { device: dev, channel: chan, cc: 123, value: 0, }); } } } DispatchCommand::Hush => { let _ = audio_tx.load().try_send(AudioCommand::Hush); } DispatchCommand::Panic => { let _ = audio_tx.load().try_send(AudioCommand::Panic); } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_timed_command_ordering() { let mut heap: BinaryHeap = BinaryHeap::new(); heap.push(TimedCommand { command: DispatchCommand::Hush, target_time_us: 300, }); heap.push(TimedCommand { command: DispatchCommand::Hush, target_time_us: 100, }); heap.push(TimedCommand { command: DispatchCommand::Hush, target_time_us: 200, }); // Min-heap: earliest time should come out first assert_eq!(heap.pop().unwrap().target_time_us, 100); assert_eq!(heap.pop().unwrap().target_time_us, 200); assert_eq!(heap.pop().unwrap().target_time_us, 300); } }