use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::Stream; use crossbeam_channel::Receiver; use doux::{Engine, EngineMetrics}; use ringbuf::{traits::*, HeapRb}; use rustfft::{num_complex::Complex, FftPlanner}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::thread::{self, JoinHandle}; use super::AudioCommand; pub struct ScopeBuffer { pub samples: [AtomicU32; 64], peak_left: AtomicU32, peak_right: AtomicU32, } impl ScopeBuffer { pub fn new() -> Self { Self { samples: std::array::from_fn(|_| AtomicU32::new(0)), peak_left: AtomicU32::new(0), peak_right: AtomicU32::new(0), } } pub fn write(&self, data: &[f32]) { let mut peak_l: f32 = 0.0; let mut peak_r: f32 = 0.0; for (i, atom) in self.samples.iter().enumerate() { let idx = i * 2; let left = data.get(idx).copied().unwrap_or(0.0); let right = data.get(idx + 1).copied().unwrap_or(0.0); peak_l = peak_l.max(left.abs()); peak_r = peak_r.max(right.abs()); atom.store(left.to_bits(), Ordering::Relaxed); } self.peak_left.store(peak_l.to_bits(), Ordering::Relaxed); self.peak_right.store(peak_r.to_bits(), Ordering::Relaxed); } pub fn read(&self) -> [f32; 64] { std::array::from_fn(|i| f32::from_bits(self.samples[i].load(Ordering::Relaxed))) } pub fn peaks(&self) -> (f32, f32) { let left = f32::from_bits(self.peak_left.load(Ordering::Relaxed)); let right = f32::from_bits(self.peak_right.load(Ordering::Relaxed)); (left, right) } } pub struct SpectrumBuffer { pub bands: [AtomicU32; 32], } impl SpectrumBuffer { pub fn new() -> Self { Self { bands: std::array::from_fn(|_| AtomicU32::new(0)), } } pub fn write(&self, data: &[f32; 32]) { for (atom, &val) in self.bands.iter().zip(data.iter()) { atom.store(val.to_bits(), Ordering::Relaxed); } } pub fn read(&self) -> [f32; 32] { std::array::from_fn(|i| f32::from_bits(self.bands[i].load(Ordering::Relaxed))) } } const FFT_SIZE: usize = 512; const NUM_BANDS: usize = 32; const ANALYSIS_RING_SIZE: usize = 4096; struct SpectrumAnalyzer { ring: Vec, pos: usize, fft: Arc>, window: [f32; FFT_SIZE], scratch: Vec>, band_edges: [usize; NUM_BANDS + 1], } impl SpectrumAnalyzer { fn new(sample_rate: f32) -> Self { let mut planner = FftPlanner::new(); let fft = planner.plan_fft_forward(FFT_SIZE); let scratch_len = fft.get_inplace_scratch_len(); let window: [f32; FFT_SIZE] = std::array::from_fn(|i| { 0.5 * (1.0 - (2.0 * std::f32::consts::PI * i as f32 / (FFT_SIZE - 1) as f32).cos()) }); let nyquist = sample_rate / 2.0; let min_freq: f32 = 20.0; let log_min = min_freq.ln(); let log_max = nyquist.ln(); let band_edges: [usize; NUM_BANDS + 1] = std::array::from_fn(|i| { let freq = (log_min + (log_max - log_min) * i as f32 / NUM_BANDS as f32).exp(); let bin = (freq * FFT_SIZE as f32 / sample_rate).round() as usize; bin.min(FFT_SIZE / 2) }); Self { ring: vec![0.0; FFT_SIZE], pos: 0, fft, window, scratch: vec![Complex::default(); scratch_len], band_edges, } } fn feed(&mut self, samples: &[f32], output: &SpectrumBuffer) { for &s in samples { self.ring[self.pos] = s; self.pos += 1; if self.pos >= FFT_SIZE { self.pos = 0; self.run_fft(output); } } } fn run_fft(&mut self, output: &SpectrumBuffer) { let mut buf: Vec> = (0..FFT_SIZE) .map(|i| { let idx = (self.pos + i) % FFT_SIZE; Complex::new(self.ring[idx] * self.window[i], 0.0) }) .collect(); self.fft.process_with_scratch(&mut buf, &mut self.scratch); let mut bands = [0.0f32; NUM_BANDS]; for (band, mag) in bands.iter_mut().enumerate() { let lo = self.band_edges[band]; let hi = self.band_edges[band + 1].max(lo + 1); let sum: f32 = buf[lo..hi].iter().map(|c| c.norm()).sum(); let avg = sum / (hi - lo) as f32; let amplitude = avg / (FFT_SIZE as f32 / 2.0); let db = 20.0 * amplitude.max(1e-10).log10(); *mag = ((db + 60.0) / 60.0).clamp(0.0, 1.0); } output.write(&bands); } } pub struct AnalysisHandle { running: Arc, #[allow(dead_code)] thread: Option>, } impl AnalysisHandle { #[allow(dead_code)] pub fn shutdown(mut self) { self.running.store(false, Ordering::SeqCst); if let Some(t) = self.thread.take() { let _ = t.join(); } } } impl Drop for AnalysisHandle { fn drop(&mut self) { self.running.store(false, Ordering::SeqCst); } } pub fn spawn_analysis_thread( sample_rate: f32, spectrum_buffer: Arc, ) -> (ringbuf::HeapProd, AnalysisHandle) { let rb = HeapRb::::new(ANALYSIS_RING_SIZE); let (producer, consumer) = rb.split(); let running = Arc::new(AtomicBool::new(true)); let running_clone = Arc::clone(&running); let thread = thread::Builder::new() .name("fft-analysis".into()) .spawn(move || { analysis_loop(consumer, spectrum_buffer, sample_rate, running_clone); }) .expect("Failed to spawn FFT analysis thread"); let handle = AnalysisHandle { running, thread: Some(thread), }; (producer, handle) } fn analysis_loop( mut consumer: ringbuf::HeapCons, spectrum_buffer: Arc, sample_rate: f32, running: Arc, ) { let mut analyzer = SpectrumAnalyzer::new(sample_rate); let mut local_buf = [0.0f32; 256]; while running.load(Ordering::Relaxed) { let count = consumer.pop_slice(&mut local_buf); if count > 0 { analyzer.feed(&local_buf[..count], &spectrum_buffer); } else { thread::sleep(std::time::Duration::from_micros(500)); } } } pub struct AudioStreamConfig { pub output_device: Option, pub channels: u16, pub buffer_size: u32, pub max_voices: usize, } pub fn build_stream( config: &AudioStreamConfig, audio_rx: Receiver, scope_buffer: Arc, spectrum_buffer: Arc, metrics: Arc, initial_samples: Vec, ) -> Result<(Stream, f32, AnalysisHandle), String> { let host = cpal::default_host(); let device = match &config.output_device { Some(name) => doux::audio::find_output_device(name) .ok_or_else(|| format!("Device not found: {name}"))?, None => host .default_output_device() .ok_or("No default output device")?, }; let default_config = device.default_output_config().map_err(|e| e.to_string())?; let sample_rate = default_config.sample_rate().0 as f32; let buffer_size = if config.buffer_size > 0 { cpal::BufferSize::Fixed(config.buffer_size) } else { cpal::BufferSize::Default }; let stream_config = cpal::StreamConfig { channels: config.channels, sample_rate: default_config.sample_rate(), buffer_size, }; let sr = sample_rate; let channels = config.channels as usize; let max_voices = config.max_voices; let metrics_clone = Arc::clone(&metrics); let mut engine = Engine::new_with_metrics(sample_rate, channels, max_voices, Arc::clone(&metrics)); engine.sample_index = initial_samples; let (mut fft_producer, analysis_handle) = spawn_analysis_thread(sample_rate, spectrum_buffer); let stream = device .build_output_stream( &stream_config, move |data: &mut [f32], _| { let buffer_samples = data.len() / channels; let buffer_time_ns = (buffer_samples as f64 / sr as f64 * 1e9) as u64; while let Ok(cmd) = audio_rx.try_recv() { match cmd { AudioCommand::Evaluate(s) => { engine.evaluate(&s); } AudioCommand::Hush => { engine.hush(); } AudioCommand::Panic => { engine.panic(); } AudioCommand::LoadSamples(samples) => { engine.sample_index.extend(samples); } AudioCommand::ResetEngine => { let old_samples = std::mem::take(&mut engine.sample_index); engine = Engine::new_with_metrics(sr, channels, max_voices, Arc::clone(&metrics_clone)); engine.sample_index = old_samples; } } } engine.metrics.load.set_buffer_time(buffer_time_ns); engine.process_block(data, &[], &[]); scope_buffer.write(&engine.output); // Feed mono mix to analysis thread via ring buffer (non-blocking) for chunk in engine.output.chunks(channels) { let mono = chunk.iter().sum::() / channels as f32; let _ = fft_producer.try_push(mono); } }, |err| eprintln!("stream error: {err}"), None, ) .map_err(|e| format!("Failed to build stream: {e}"))?; stream .play() .map_err(|e| format!("Failed to play stream: {e}"))?; Ok((stream, sample_rate, analysis_handle)) }