Fix: simpler scheduling

This commit is contained in:
2026-02-03 15:55:43 +01:00
parent 266a625cf3
commit 3284354f40
4 changed files with 125 additions and 103 deletions

View File

@@ -7,52 +7,50 @@ use std::time::Duration;
use super::link::LinkState;
use super::realtime::{precise_sleep_us, set_realtime_priority};
use super::sequencer::{AudioCommand, MidiCommand};
use super::sequencer::MidiCommand;
use super::timing::SyncTime;
/// A command scheduled for dispatch at a specific time.
/// A MIDI command scheduled for dispatch at a specific time.
#[derive(Clone)]
pub struct TimedCommand {
pub command: DispatchCommand,
pub struct TimedMidiCommand {
pub command: MidiDispatch,
pub target_time_us: SyncTime,
}
/// Commands the dispatcher can send to audio/MIDI threads.
/// MIDI commands the dispatcher can send.
#[derive(Clone)]
pub enum DispatchCommand {
Audio { cmd: String, time: Option<f64> },
Midi(MidiCommand),
FlushMidi,
pub enum MidiDispatch {
Send(MidiCommand),
FlushAll,
}
impl Ord for TimedCommand {
impl Ord for TimedMidiCommand {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse ordering for min-heap (earliest time first)
other.target_time_us.cmp(&self.target_time_us)
}
}
impl PartialOrd for TimedCommand {
impl PartialOrd for TimedMidiCommand {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for TimedCommand {
impl PartialEq for TimedMidiCommand {
fn eq(&self, other: &Self) -> bool {
self.target_time_us == other.target_time_us
}
}
impl Eq for TimedCommand {}
impl Eq for TimedMidiCommand {}
/// Spin-wait threshold in microseconds for dispatcher.
const SPIN_THRESHOLD_US: SyncTime = 100;
/// Main dispatcher loop - receives timed commands and dispatches them at the right moment.
/// Dispatcher loop — handles MIDI timing only.
/// Audio commands bypass the dispatcher entirely and go straight to doux's
/// sample-accurate scheduler via the audio thread channel.
pub fn dispatcher_loop(
cmd_rx: Receiver<TimedCommand>,
audio_tx: Arc<ArcSwap<Sender<AudioCommand>>>,
cmd_rx: Receiver<TimedMidiCommand>,
midi_tx: Arc<ArcSwap<Sender<MidiCommand>>>,
link: Arc<LinkState>,
) {
@@ -63,37 +61,33 @@ pub fn dispatcher_loop(
eprintln!("[cagire] Warning: Could not set realtime priority for dispatcher thread.");
}
let mut queue: BinaryHeap<TimedCommand> = BinaryHeap::with_capacity(256);
let mut queue: BinaryHeap<TimedMidiCommand> = BinaryHeap::with_capacity(256);
loop {
let current_us = link.clock_micros() as SyncTime;
// Calculate timeout based on next queued event
let timeout_us = queue
.peek()
.map(|cmd| cmd.target_time_us.saturating_sub(current_us))
.unwrap_or(100_000) // 100ms default when idle
.max(100); // Minimum 100μs to prevent busy-looping
.unwrap_or(100_000)
.max(100);
// Receive new commands (with timeout)
match cmd_rx.recv_timeout(Duration::from_micros(timeout_us)) {
Ok(cmd) => queue.push(cmd),
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => break,
}
// Drain any additional pending commands
while let Ok(cmd) = cmd_rx.try_recv() {
queue.push(cmd);
}
// Dispatch ready commands
let current_us = link.clock_micros() as SyncTime;
while let Some(cmd) = queue.peek() {
if cmd.target_time_us <= current_us + SPIN_THRESHOLD_US {
let cmd = queue.pop().unwrap();
wait_until_dispatch(cmd.target_time_us, &link, has_rt);
dispatch_command(cmd.command, &audio_tx, &midi_tx);
dispatch_midi(cmd.command, &midi_tx);
} else {
break;
}
@@ -101,44 +95,25 @@ pub fn dispatcher_loop(
}
}
/// Wait until the target time for dispatch.
/// With RT priority: spin-wait for precision
/// Without RT priority: sleep (spinning wastes CPU without benefit)
fn wait_until_dispatch(target_us: SyncTime, link: &LinkState, has_rt: bool) {
let current = link.clock_micros() as SyncTime;
let remaining = target_us.saturating_sub(current);
if has_rt {
// With RT priority: spin-wait for precise timing
while (link.clock_micros() as SyncTime) < target_us {
std::hint::spin_loop();
}
} else {
// Without RT priority: sleep (spin-waiting is counterproductive)
if remaining > 0 {
precise_sleep_us(remaining);
}
} else if remaining > 0 {
precise_sleep_us(remaining);
}
}
/// Route a command to the appropriate output channel.
fn dispatch_command(
cmd: DispatchCommand,
audio_tx: &Arc<ArcSwap<Sender<AudioCommand>>>,
midi_tx: &Arc<ArcSwap<Sender<MidiCommand>>>,
) {
fn dispatch_midi(cmd: MidiDispatch, midi_tx: &Arc<ArcSwap<Sender<MidiCommand>>>) {
match cmd {
DispatchCommand::Audio { cmd, time } => {
let _ = audio_tx
.load()
.try_send(AudioCommand::Evaluate { cmd, time });
}
DispatchCommand::Midi(midi_cmd) => {
MidiDispatch::Send(midi_cmd) => {
let _ = midi_tx.load().try_send(midi_cmd);
}
DispatchCommand::FlushMidi => {
// Send All Notes Off (CC 123) on all 16 channels for all 4 devices
MidiDispatch::FlushAll => {
for dev in 0..4u8 {
for chan in 0..16u8 {
let _ = midi_tx.load().try_send(MidiCommand::CC {
@@ -159,22 +134,21 @@ mod tests {
#[test]
fn test_timed_command_ordering() {
let mut heap: BinaryHeap<TimedCommand> = BinaryHeap::new();
let mut heap: BinaryHeap<TimedMidiCommand> = BinaryHeap::new();
heap.push(TimedCommand {
command: DispatchCommand::FlushMidi,
heap.push(TimedMidiCommand {
command: MidiDispatch::FlushAll,
target_time_us: 300,
});
heap.push(TimedCommand {
command: DispatchCommand::FlushMidi,
heap.push(TimedMidiCommand {
command: MidiDispatch::FlushAll,
target_time_us: 100,
});
heap.push(TimedCommand {
command: DispatchCommand::FlushMidi,
heap.push(TimedMidiCommand {
command: MidiDispatch::FlushAll,
target_time_us: 200,
});
// Min-heap: earliest time should come out first
assert_eq!(heap.pop().unwrap().target_time_us, 100);
assert_eq!(heap.pop().unwrap().target_time_us, 200);
assert_eq!(heap.pop().unwrap().target_time_us, 300);

View File

@@ -42,6 +42,54 @@ pub fn is_memory_locked() -> bool {
MLOCKALL_SUCCESS.load(Ordering::Relaxed)
}
/// Attempts to set realtime scheduling priority for the current thread.
/// Returns true if RT priority was successfully set, false otherwise.
#[cfg(target_os = "macos")]
pub fn set_realtime_priority() -> bool {
// macOS: use THREAD_TIME_CONSTRAINT_POLICY for true RT scheduling.
// This is the same mechanism CoreAudio uses for its audio threads.
// SCHED_FIFO/RR require root on macOS, but time constraint policy does not.
unsafe {
let thread = libc::pthread_self();
#[repr(C)]
struct ThreadTimeConstraintPolicy {
period: u32,
computation: u32,
constraint: u32,
preemptible: i32,
}
const THREAD_TIME_CONSTRAINT_POLICY_ID: u32 = 2;
const THREAD_TIME_CONSTRAINT_POLICY_COUNT: u32 = 4;
// ~1ms period at ~1GHz mach_absolute_time ticks (typical for audio)
let policy = ThreadTimeConstraintPolicy {
period: 1_000_000,
computation: 500_000,
constraint: 1_000_000,
preemptible: 1,
};
extern "C" {
fn thread_policy_set(
thread: libc::pthread_t,
flavor: u32,
policy_info: *const ThreadTimeConstraintPolicy,
count: u32,
) -> i32;
}
let result = thread_policy_set(
thread,
THREAD_TIME_CONSTRAINT_POLICY_ID,
&policy,
THREAD_TIME_CONSTRAINT_POLICY_COUNT,
);
result == 0
}
}
/// Attempts to set realtime scheduling priority for the current thread.
/// Returns true if RT priority was successfully set, false otherwise.
///
@@ -50,7 +98,7 @@ pub fn is_memory_locked() -> bool {
/// - Configured rtprio limits in /etc/security/limits.conf:
/// @audio - rtprio 95
/// @audio - memlock unlimited
#[cfg(unix)]
#[cfg(target_os = "linux")]
pub fn set_realtime_priority() -> bool {
use thread_priority::unix::{
set_thread_priority_and_policy, thread_native_id, NormalThreadSchedulePolicy,
@@ -60,27 +108,22 @@ pub fn set_realtime_priority() -> bool {
let tid = thread_native_id();
// Try SCHED_FIFO first (requires CAP_SYS_NICE on Linux)
let fifo = ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo);
if set_thread_priority_and_policy(tid, ThreadPriority::Max, fifo).is_ok() {
return true;
}
// Try SCHED_RR (round-robin realtime, sometimes works without caps)
let rr = ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::RoundRobin);
if set_thread_priority_and_policy(tid, ThreadPriority::Max, rr).is_ok() {
return true;
}
// Fall back to highest normal priority (SCHED_OTHER)
let _ = set_thread_priority_and_policy(
tid,
ThreadPriority::Max,
ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other),
);
// Also try nice -20 on Linux
#[cfg(target_os = "linux")]
unsafe {
libc::setpriority(libc::PRIO_PROCESS, 0, -20);
}
@@ -88,7 +131,12 @@ pub fn set_realtime_priority() -> bool {
false
}
#[cfg(not(unix))]
#[cfg(not(any(unix, target_os = "windows")))]
pub fn set_realtime_priority() -> bool {
false
}
#[cfg(target_os = "windows")]
pub fn set_realtime_priority() -> bool {
use thread_priority::{set_current_thread_priority, ThreadPriority};
set_current_thread_priority(ThreadPriority::Max).is_ok()

View File

@@ -7,7 +7,7 @@ use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use super::dispatcher::{dispatcher_loop, DispatchCommand, TimedCommand};
use super::dispatcher::{dispatcher_loop, MidiDispatch, TimedMidiCommand};
use super::realtime::{precise_sleep_us, set_realtime_priority};
use super::{micros_until_next_substep, substeps_crossed, LinkState, StepTiming, SyncTime};
use crate::model::{
@@ -299,8 +299,8 @@ pub fn spawn_sequencer(
let audio_tx = Arc::new(ArcSwap::from_pointee(audio_tx));
let midi_tx = Arc::new(ArcSwap::from_pointee(midi_tx));
// Dispatcher channel (unbounded to avoid blocking the scheduler)
let (dispatch_tx, dispatch_rx) = unbounded::<TimedCommand>();
// Dispatcher channel — MIDI only (unbounded to avoid blocking the scheduler)
let (dispatch_tx, dispatch_rx) = unbounded::<TimedMidiCommand>();
let shared_state = Arc::new(ArcSwap::from_pointee(SharedSequencerState::default()));
let shared_state_clone = Arc::clone(&shared_state);
@@ -312,28 +312,24 @@ pub fn spawn_sequencer(
#[cfg(feature = "desktop")]
let mouse_down = config.mouse_down;
// Spawn dispatcher thread
// Spawn dispatcher thread (MIDI only — audio goes direct to doux)
let dispatcher_link = Arc::clone(&link);
let dispatcher_audio_tx = Arc::clone(&audio_tx);
let dispatcher_midi_tx = Arc::clone(&midi_tx);
thread::Builder::new()
.name("cagire-dispatcher".into())
.spawn(move || {
dispatcher_loop(
dispatch_rx,
dispatcher_audio_tx,
dispatcher_midi_tx,
dispatcher_link,
);
dispatcher_loop(dispatch_rx, dispatcher_midi_tx, dispatcher_link);
})
.expect("Failed to spawn dispatcher thread");
let sequencer_audio_tx = Arc::clone(&audio_tx);
let thread = thread::Builder::new()
.name("sequencer".into())
.spawn(move || {
sequencer_loop(
cmd_rx,
dispatch_tx,
sequencer_audio_tx,
link,
playing,
variables,
@@ -830,11 +826,24 @@ impl SequencerState {
.copied()
.unwrap_or_else(|| pattern.speed.multiplier());
let steps_to_fire = substeps_crossed(prev_beat, beat, speed_mult);
let substeps_per_beat = 4.0 * speed_mult;
let base_substep = (prev_beat * substeps_per_beat).floor() as i64;
for _ in 0..steps_to_fire {
for step_offset in 0..steps_to_fire {
result.any_step_fired = true;
let step_idx = active.step_index % pattern.length;
// Per-step timing: each step gets its exact beat position
let step_substep = base_substep + step_offset as i64 + 1;
let step_beat = step_substep as f64 / substeps_per_beat;
let beat_delta = step_beat - beat;
let time_delta = if tempo > 0.0 {
(beat_delta / tempo) * 60.0
} else {
0.0
};
let event_time = Some(engine_time + lookahead_secs + time_delta);
if let Some(step) = pattern.steps.get(step_idx) {
let resolved_script = pattern.resolve_script(step_idx);
let has_script = resolved_script
@@ -887,12 +896,6 @@ impl SequencerState {
std::mem::take(&mut trace),
);
let event_time = if lookahead_secs > 0.0 {
Some(engine_time + lookahead_secs)
} else {
None
};
for cmd in cmds {
self.event_count += 1;
self.buf_audio_commands.push(TimestampedCommand {
@@ -932,7 +935,9 @@ impl SequencerState {
}
let vars = self.variables.load();
let new_tempo = vars.get("__tempo__").and_then(|v: &Value| v.as_float().ok());
let new_tempo = vars
.get("__tempo__")
.and_then(|v: &Value| v.as_float().ok());
let mut chain_transitions = Vec::new();
for id in completed {
@@ -1030,7 +1035,8 @@ impl SequencerState {
#[allow(clippy::too_many_arguments)]
fn sequencer_loop(
cmd_rx: Receiver<SeqCommand>,
dispatch_tx: Sender<TimedCommand>,
dispatch_tx: Sender<TimedMidiCommand>,
audio_tx: Arc<ArcSwap<Sender<AudioCommand>>>,
link: Arc<LinkState>,
playing: Arc<std::sync::atomic::AtomicBool>,
variables: Variables,
@@ -1109,16 +1115,14 @@ fn sequencer_loop(
let output = seq_state.tick(input);
// Dispatch commands via the dispatcher thread
// Route commands: audio direct to doux, MIDI through dispatcher
for tsc in output.audio_commands {
if let Some((midi_cmd, dur)) = parse_midi_command(&tsc.cmd) {
// Queue MIDI command for immediate dispatch
let _ = dispatch_tx.send(TimedCommand {
command: DispatchCommand::Midi(midi_cmd.clone()),
let _ = dispatch_tx.send(TimedMidiCommand {
command: MidiDispatch::Send(midi_cmd.clone()),
target_time_us: current_time_us,
});
// Schedule note-off if duration specified
if let (
MidiCommand::NoteOn {
device,
@@ -1130,8 +1134,8 @@ fn sequencer_loop(
) = (&midi_cmd, dur)
{
let off_time_us = current_time_us + (dur_secs * 1_000_000.0) as SyncTime;
let _ = dispatch_tx.send(TimedCommand {
command: DispatchCommand::Midi(MidiCommand::NoteOff {
let _ = dispatch_tx.send(TimedMidiCommand {
command: MidiDispatch::Send(MidiCommand::NoteOff {
device: *device,
channel: *channel,
note: *note,
@@ -1140,21 +1144,17 @@ fn sequencer_loop(
});
}
} else {
// Queue audio command
let _ = dispatch_tx.send(TimedCommand {
command: DispatchCommand::Audio {
cmd: tsc.cmd,
time: tsc.time,
},
target_time_us: current_time_us,
// Audio direct to doux — sample-accurate scheduling via /time/ parameter
let _ = audio_tx.load().try_send(AudioCommand::Evaluate {
cmd: tsc.cmd,
time: tsc.time,
});
}
}
// Handle MIDI flush request
if output.flush_midi_notes {
let _ = dispatch_tx.send(TimedCommand {
command: DispatchCommand::FlushMidi,
let _ = dispatch_tx.send(TimedMidiCommand {
command: MidiDispatch::FlushAll,
target_time_us: current_time_us,
});
}
@@ -1201,7 +1201,6 @@ fn sequencer_loop(
/// spinning is counterproductive and we sleep the entire duration.
const SPIN_THRESHOLD_US: SyncTime = 100;
/// Two-phase wait: sleep most of the time, optionally spin-wait for final precision.
/// With RT priority: sleep + spin for precision
/// Without RT priority: sleep only (spinning wastes CPU without benefit)