Files
Cagire/src/engine/dispatcher.rs

157 lines
4.4 KiB
Rust

use arc_swap::ArcSwap;
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::time::Duration;
use super::link::LinkState;
use super::realtime::{precise_sleep_us, set_realtime_priority};
use super::sequencer::MidiCommand;
use super::timing::SyncTime;
/// A MIDI command scheduled for dispatch at a specific time.
#[derive(Clone)]
pub struct TimedMidiCommand {
pub command: MidiDispatch,
pub target_time_us: SyncTime,
}
/// MIDI commands the dispatcher can send.
#[derive(Clone)]
pub enum MidiDispatch {
Send(MidiCommand),
FlushAll,
}
impl Ord for TimedMidiCommand {
fn cmp(&self, other: &Self) -> Ordering {
other.target_time_us.cmp(&self.target_time_us)
}
}
impl PartialOrd for TimedMidiCommand {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for TimedMidiCommand {
fn eq(&self, other: &Self) -> bool {
self.target_time_us == other.target_time_us
}
}
impl Eq for TimedMidiCommand {}
const SPIN_THRESHOLD_US: SyncTime = 100;
/// Dispatcher loop — handles MIDI timing only.
/// Audio commands bypass the dispatcher entirely and go straight to doux's
/// sample-accurate scheduler via the audio thread channel.
pub fn dispatcher_loop(
cmd_rx: Receiver<TimedMidiCommand>,
midi_tx: Arc<ArcSwap<Sender<MidiCommand>>>,
link: Arc<LinkState>,
) {
let has_rt = set_realtime_priority();
#[cfg(target_os = "linux")]
if !has_rt {
eprintln!("[cagire] Warning: Could not set realtime priority for dispatcher thread.");
}
let mut queue: BinaryHeap<TimedMidiCommand> = BinaryHeap::with_capacity(256);
loop {
let current_us = link.clock_micros() as SyncTime;
let timeout_us = queue
.peek()
.map(|cmd| cmd.target_time_us.saturating_sub(current_us))
.unwrap_or(100_000)
.max(100);
match cmd_rx.recv_timeout(Duration::from_micros(timeout_us)) {
Ok(cmd) => queue.push(cmd),
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => break,
}
while let Ok(cmd) = cmd_rx.try_recv() {
queue.push(cmd);
}
let current_us = link.clock_micros() as SyncTime;
while let Some(cmd) = queue.peek() {
if cmd.target_time_us <= current_us + SPIN_THRESHOLD_US {
let cmd = queue.pop().unwrap();
wait_until_dispatch(cmd.target_time_us, &link, has_rt);
dispatch_midi(cmd.command, &midi_tx);
} else {
break;
}
}
}
}
fn wait_until_dispatch(target_us: SyncTime, link: &LinkState, has_rt: bool) {
let current = link.clock_micros() as SyncTime;
let remaining = target_us.saturating_sub(current);
if has_rt {
while (link.clock_micros() as SyncTime) < target_us {
std::hint::spin_loop();
}
} else if remaining > 0 {
precise_sleep_us(remaining);
}
}
fn dispatch_midi(cmd: MidiDispatch, midi_tx: &Arc<ArcSwap<Sender<MidiCommand>>>) {
match cmd {
MidiDispatch::Send(midi_cmd) => {
let _ = midi_tx.load().try_send(midi_cmd);
}
MidiDispatch::FlushAll => {
for dev in 0..4u8 {
for chan in 0..16u8 {
let _ = midi_tx.load().try_send(MidiCommand::CC {
device: dev,
channel: chan,
cc: 123,
value: 0,
});
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_timed_command_ordering() {
let mut heap: BinaryHeap<TimedMidiCommand> = BinaryHeap::new();
heap.push(TimedMidiCommand {
command: MidiDispatch::FlushAll,
target_time_us: 300,
});
heap.push(TimedMidiCommand {
command: MidiDispatch::FlushAll,
target_time_us: 100,
});
heap.push(TimedMidiCommand {
command: MidiDispatch::FlushAll,
target_time_us: 200,
});
assert_eq!(heap.pop().unwrap().target_time_us, 100);
assert_eq!(heap.pop().unwrap().target_time_us, 200);
assert_eq!(heap.pop().unwrap().target_time_us, 300);
}
}