Wip: refacto

This commit is contained in:
2026-01-25 22:17:08 +01:00
parent 2d609f6b7a
commit 016d050678
11 changed files with 289 additions and 82 deletions

View File

@@ -2,9 +2,11 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::Stream;
use crossbeam_channel::Receiver;
use doux::{Engine, EngineMetrics};
use ringbuf::{traits::*, HeapRb};
use rustfft::{num_complex::Complex, FftPlanner};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use super::AudioCommand;
@@ -75,6 +77,7 @@ impl SpectrumBuffer {
const FFT_SIZE: usize = 512;
const NUM_BANDS: usize = 32;
const ANALYSIS_RING_SIZE: usize = 4096;
struct SpectrumAnalyzer {
ring: Vec<f32>,
@@ -151,6 +154,72 @@ impl SpectrumAnalyzer {
}
}
pub struct AnalysisHandle {
running: Arc<AtomicBool>,
#[allow(dead_code)]
thread: Option<JoinHandle<()>>,
}
impl AnalysisHandle {
#[allow(dead_code)]
pub fn shutdown(mut self) {
self.running.store(false, Ordering::SeqCst);
if let Some(t) = self.thread.take() {
let _ = t.join();
}
}
}
impl Drop for AnalysisHandle {
fn drop(&mut self) {
self.running.store(false, Ordering::SeqCst);
}
}
pub fn spawn_analysis_thread(
sample_rate: f32,
spectrum_buffer: Arc<SpectrumBuffer>,
) -> (ringbuf::HeapProd<f32>, AnalysisHandle) {
let rb = HeapRb::<f32>::new(ANALYSIS_RING_SIZE);
let (producer, consumer) = rb.split();
let running = Arc::new(AtomicBool::new(true));
let running_clone = Arc::clone(&running);
let thread = thread::Builder::new()
.name("fft-analysis".into())
.spawn(move || {
analysis_loop(consumer, spectrum_buffer, sample_rate, running_clone);
})
.expect("Failed to spawn FFT analysis thread");
let handle = AnalysisHandle {
running,
thread: Some(thread),
};
(producer, handle)
}
fn analysis_loop(
mut consumer: ringbuf::HeapCons<f32>,
spectrum_buffer: Arc<SpectrumBuffer>,
sample_rate: f32,
running: Arc<AtomicBool>,
) {
let mut analyzer = SpectrumAnalyzer::new(sample_rate);
let mut local_buf = [0.0f32; 256];
while running.load(Ordering::Relaxed) {
let count = consumer.pop_slice(&mut local_buf);
if count > 0 {
analyzer.feed(&local_buf[..count], &spectrum_buffer);
} else {
thread::sleep(std::time::Duration::from_micros(500));
}
}
}
pub struct AudioStreamConfig {
pub output_device: Option<String>,
pub channels: u16,
@@ -165,7 +234,7 @@ pub fn build_stream(
spectrum_buffer: Arc<SpectrumBuffer>,
metrics: Arc<EngineMetrics>,
initial_samples: Vec<doux::sample::SampleEntry>,
) -> Result<(Stream, f32), String> {
) -> Result<(Stream, f32, AnalysisHandle), String> {
let host = cpal::default_host();
let device = match &config.output_device {
@@ -193,12 +262,13 @@ pub fn build_stream(
let sr = sample_rate;
let channels = config.channels as usize;
let max_voices = config.max_voices;
let metrics_clone = Arc::clone(&metrics);
let mut engine = Engine::new_with_metrics(sample_rate, channels, Arc::clone(&metrics));
let mut engine = Engine::new_with_metrics(sample_rate, channels, max_voices, Arc::clone(&metrics));
engine.sample_index = initial_samples;
let mut analyzer = SpectrumAnalyzer::new(sample_rate);
let (mut fft_producer, analysis_handle) = spawn_analysis_thread(sample_rate, spectrum_buffer);
let stream = device
.build_output_stream(
@@ -224,7 +294,7 @@ pub fn build_stream(
AudioCommand::ResetEngine => {
let old_samples = std::mem::take(&mut engine.sample_index);
engine =
Engine::new_with_metrics(sr, channels, Arc::clone(&metrics_clone));
Engine::new_with_metrics(sr, channels, max_voices, Arc::clone(&metrics_clone));
engine.sample_index = old_samples;
}
}
@@ -234,11 +304,11 @@ pub fn build_stream(
engine.process_block(data, &[], &[]);
scope_buffer.write(&engine.output);
// Feed mono mix to spectrum analyzer
let mono: Vec<f32> = engine.output.chunks(channels)
.map(|ch| ch.iter().sum::<f32>() / channels as f32)
.collect();
analyzer.feed(&mono, &spectrum_buffer);
// Feed mono mix to analysis thread via ring buffer (non-blocking)
for chunk in engine.output.chunks(channels) {
let mono = chunk.iter().sum::<f32>() / channels as f32;
let _ = fft_producer.try_push(mono);
}
},
|err| eprintln!("stream error: {err}"),
None,
@@ -248,5 +318,5 @@ pub fn build_stream(
stream
.play()
.map_err(|e| format!("Failed to play stream: {e}"))?;
Ok((stream, sample_rate))
Ok((stream, sample_rate, analysis_handle))
}

View File

@@ -1,8 +1,10 @@
use arc_swap::ArcSwap;
use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use thread_priority::{set_current_thread_priority, ThreadPriority};
use super::LinkState;
use crate::model::{MAX_BANKS, MAX_PATTERNS};
@@ -89,12 +91,14 @@ pub struct SharedSequencerState {
pub active_patterns: Vec<ActivePatternState>,
pub step_traces: HashMap<(usize, usize, usize), ExecutionTrace>,
pub event_count: usize,
pub dropped_events: usize,
}
pub struct SequencerSnapshot {
pub active_patterns: Vec<ActivePatternState>,
pub step_traces: HashMap<(usize, usize, usize), ExecutionTrace>,
pub event_count: usize,
pub dropped_events: usize,
}
impl SequencerSnapshot {
@@ -127,17 +131,18 @@ pub struct SequencerHandle {
pub cmd_tx: Sender<SeqCommand>,
pub audio_tx: Sender<AudioCommand>,
pub audio_rx: Receiver<AudioCommand>,
shared_state: Arc<Mutex<SharedSequencerState>>,
shared_state: Arc<ArcSwap<SharedSequencerState>>,
thread: JoinHandle<()>,
}
impl SequencerHandle {
pub fn snapshot(&self) -> SequencerSnapshot {
let state = self.shared_state.lock().unwrap();
let state = self.shared_state.load();
SequencerSnapshot {
active_patterns: state.active_patterns.clone(),
step_traces: state.step_traces.clone(),
event_count: state.event_count,
dropped_events: state.dropped_events,
}
}
@@ -185,7 +190,7 @@ pub fn spawn_sequencer(
let (cmd_tx, cmd_rx) = bounded::<SeqCommand>(64);
let (audio_tx, audio_rx) = bounded::<AudioCommand>(256);
let shared_state = Arc::new(Mutex::new(SharedSequencerState::default()));
let shared_state = Arc::new(ArcSwap::from_pointee(SharedSequencerState::default()));
let shared_state_clone = Arc::clone(&shared_state);
let audio_tx_clone = audio_tx.clone();
@@ -296,17 +301,20 @@ fn sequencer_loop(
dict: Dictionary,
rng: Rng,
quantum: f64,
shared_state: Arc<Mutex<SharedSequencerState>>,
shared_state: Arc<ArcSwap<SharedSequencerState>>,
live_keys: Arc<LiveKeyState>,
) {
use std::sync::atomic::Ordering;
let _ = set_current_thread_priority(ThreadPriority::Max);
let script_engine = ScriptEngine::new(Arc::clone(&variables), dict, rng);
let mut audio_state = AudioState::new();
let mut pattern_cache = PatternCache::new();
let mut runs_counter = RunsCounter::new();
let mut step_traces: HashMap<(usize, usize, usize), ExecutionTrace> = HashMap::new();
let mut event_count: usize = 0;
let mut dropped_events: usize = 0;
loop {
while let Ok(cmd) = cmd_rx.try_recv() {
@@ -339,7 +347,7 @@ fn sequencer_loop(
}
if !playing.load(Ordering::Relaxed) {
thread::sleep(Duration::from_micros(500));
thread::sleep(Duration::from_micros(200));
continue;
}
@@ -350,6 +358,7 @@ fn sequencer_loop(
let bar = (beat / quantum).floor() as i64;
let prev_bar = (audio_state.prev_beat / quantum).floor() as i64;
let mut stopped_chain_keys: Vec<String> = Vec::new();
if bar != prev_bar && audio_state.prev_beat >= 0.0 {
for id in audio_state.pending_starts.drain(..) {
audio_state.active_patterns.insert(
@@ -367,13 +376,14 @@ fn sequencer_loop(
step_traces.retain(|&(bank, pattern, _), _| {
bank != id.bank || pattern != id.pattern
});
let chain_key = format!("__chain_{}_{}__", id.bank, id.pattern);
variables.lock().unwrap().remove(&chain_key);
stopped_chain_keys.push(format!("__chain_{}_{}__", id.bank, id.pattern));
}
}
let prev_beat = audio_state.prev_beat;
let mut chain_transitions: Vec<(PatternId, PatternId)> = Vec::new();
let mut chain_keys_to_remove: Vec<String> = Vec::new();
let mut new_tempo: Option<f64> = None;
for (_id, active) in audio_state.active_patterns.iter_mut() {
let Some(pattern) = pattern_cache.get(active.bank, active.pattern) else {
@@ -424,18 +434,16 @@ fn sequencer_loop(
Ok(()) => {
event_count += 1;
}
Err(TrySendError::Full(_)) => {}
Err(TrySendError::Full(_)) => {
dropped_events += 1;
}
Err(TrySendError::Disconnected(_)) => {
return;
}
}
}
if let Some(new_tempo) = {
let mut vars = variables.lock().unwrap();
vars.remove("__tempo__").and_then(|v| v.as_float().ok())
} {
link.set_tempo(new_tempo);
}
// Defer tempo check to batched variable read
new_tempo = None; // Will be read in batch below
}
}
}
@@ -445,32 +453,61 @@ fn sequencer_loop(
if next_step >= pattern.length {
active.iter += 1;
let chain_key = format!("__chain_{}_{}__", active.bank, active.pattern);
let chain_target = {
let vars = variables.lock().unwrap();
vars.get(&chain_key).and_then(|v| {
if let Value::Str(s, _) = v {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() == 2 {
let b = parts[0].parse::<usize>().ok()?;
let p = parts[1].parse::<usize>().ok()?;
Some(PatternId { bank: b, pattern: p })
} else {
None
}
} else {
None
}
})
};
if let Some(target) = chain_target {
let source = PatternId { bank: active.bank, pattern: active.pattern };
chain_transitions.push((source, target));
}
chain_keys_to_remove.push(chain_key);
}
active.step_index = next_step % pattern.length;
}
}
// Batched variable operations: read chain targets, check tempo, remove keys
let needs_var_access = !chain_keys_to_remove.is_empty() || !stopped_chain_keys.is_empty();
if needs_var_access {
let mut vars = variables.lock().unwrap();
// Check for tempo change
if let Some(t) = vars.remove("__tempo__").and_then(|v| v.as_float().ok()) {
new_tempo = Some(t);
}
// Read chain targets and queue transitions
for key in &chain_keys_to_remove {
if let Some(Value::Str(s, _)) = vars.get(key) {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() == 2 {
if let (Ok(b), Ok(p)) = (parts[0].parse::<usize>(), parts[1].parse::<usize>()) {
let target = PatternId { bank: b, pattern: p };
// Extract bank/pattern from key: "__chain_{bank}_{pattern}__"
if let Some(rest) = key.strip_prefix("__chain_") {
if let Some(rest) = rest.strip_suffix("__") {
let kparts: Vec<&str> = rest.split('_').collect();
if kparts.len() == 2 {
if let (Ok(sb), Ok(sp)) = (kparts[0].parse::<usize>(), kparts[1].parse::<usize>()) {
let source = PatternId { bank: sb, pattern: sp };
chain_transitions.push((source, target));
}
}
}
}
}
}
}
}
// Remove all chain keys (both from stopped patterns and completed iterations)
for key in chain_keys_to_remove {
vars.remove(&key);
}
for key in stopped_chain_keys {
vars.remove(&key);
}
}
// Apply tempo change
if let Some(t) = new_tempo {
link.set_tempo(t);
}
// Apply chain transitions
for (source, target) in chain_transitions {
if !audio_state.pending_stops.contains(&source) {
audio_state.pending_stops.push(source);
@@ -478,13 +515,10 @@ fn sequencer_loop(
if !audio_state.pending_starts.contains(&target) {
audio_state.pending_starts.push(target);
}
let chain_key = format!("__chain_{}_{}__", source.bank, source.pattern);
variables.lock().unwrap().remove(&chain_key);
}
{
let mut state = shared_state.lock().unwrap();
state.active_patterns = audio_state
let new_state = SharedSequencerState {
active_patterns: audio_state
.active_patterns
.values()
.map(|a| ActivePatternState {
@@ -493,13 +527,15 @@ fn sequencer_loop(
step_index: a.step_index,
iter: a.iter,
})
.collect();
state.step_traces = step_traces.clone();
state.event_count = event_count;
}
.collect(),
step_traces: step_traces.clone(),
event_count,
dropped_events,
};
shared_state.store(Arc::new(new_state));
audio_state.prev_beat = beat;
thread::sleep(Duration::from_micros(500));
thread::sleep(Duration::from_micros(200));
}
}