//! Audio output stream (cpal) and FFT spectrum analysis. use ringbuf::{traits::*, HeapRb}; use rustfft::{num_complex::Complex, FftPlanner}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::thread::{self, JoinHandle}; #[cfg(feature = "cli")] use std::sync::atomic::AtomicU64; pub struct ScopeBuffer { pub samples: [AtomicU32; 256], pub samples_right: [AtomicU32; 256], peak_left: AtomicU32, peak_right: AtomicU32, } impl Default for ScopeBuffer { fn default() -> Self { Self::new() } } impl ScopeBuffer { pub fn new() -> Self { Self { samples: std::array::from_fn(|_| AtomicU32::new(0)), samples_right: std::array::from_fn(|_| AtomicU32::new(0)), peak_left: AtomicU32::new(0), peak_right: AtomicU32::new(0), } } pub fn write(&self, data: &[f32]) { let mut peak_l: f32 = 0.0; let mut peak_r: f32 = 0.0; // Calculate peaks from ALL input frames for accurate VU metering for chunk in data.chunks(2) { if let [left, right] = chunk { peak_l = peak_l.max(left.abs()); peak_r = peak_r.max(right.abs()); } } // Downsample for scope display let frames = data.len() / 2; for (i, (left_atom, right_atom)) in self.samples.iter().zip(self.samples_right.iter()).enumerate() { let frame_idx = (i * frames) / self.samples.len(); let left = data.get(frame_idx * 2).copied().unwrap_or(0.0); let right = data.get(frame_idx * 2 + 1).copied().unwrap_or(0.0); left_atom.store(left.to_bits(), Ordering::Relaxed); right_atom.store(right.to_bits(), Ordering::Relaxed); } self.peak_left.store(peak_l.to_bits(), Ordering::Relaxed); self.peak_right.store(peak_r.to_bits(), Ordering::Relaxed); } pub fn read(&self) -> [f32; 256] { std::array::from_fn(|i| f32::from_bits(self.samples[i].load(Ordering::Relaxed))) } pub fn read_right(&self) -> [f32; 256] { std::array::from_fn(|i| f32::from_bits(self.samples_right[i].load(Ordering::Relaxed))) } pub fn peaks(&self) -> (f32, f32) { let left = f32::from_bits(self.peak_left.load(Ordering::Relaxed)); let right = f32::from_bits(self.peak_right.load(Ordering::Relaxed)); (left, right) } } pub struct SpectrumBuffer { pub bands: [AtomicU32; 32], } impl Default for SpectrumBuffer { fn default() -> Self { Self::new() } } impl SpectrumBuffer { pub fn new() -> Self { Self { bands: std::array::from_fn(|_| AtomicU32::new(0)), } } pub fn write(&self, data: &[f32; 32]) { for (atom, &val) in self.bands.iter().zip(data.iter()) { atom.store(val.to_bits(), Ordering::Relaxed); } } pub fn read(&self) -> [f32; 32] { std::array::from_fn(|i| f32::from_bits(self.bands[i].load(Ordering::Relaxed))) } } const FFT_SIZE: usize = 512; const NUM_BANDS: usize = 32; const ANALYSIS_RING_SIZE: usize = 4096; struct SpectrumAnalyzer { ring: Vec, pos: usize, fft: Arc>, window: [f32; FFT_SIZE], scratch: Vec>, fft_buf: Vec>, band_edges: [usize; NUM_BANDS + 1], } impl SpectrumAnalyzer { fn new(sample_rate: f32) -> Self { let mut planner = FftPlanner::new(); let fft = planner.plan_fft_forward(FFT_SIZE); let scratch_len = fft.get_inplace_scratch_len(); let window: [f32; FFT_SIZE] = std::array::from_fn(|i| { 0.5 * (1.0 - (2.0 * std::f32::consts::PI * i as f32 / (FFT_SIZE - 1) as f32).cos()) }); let min_freq: f32 = 20.0; let max_freq: f32 = 16000.0; let log_min = min_freq.ln(); let log_max = max_freq.ln(); let band_edges: [usize; NUM_BANDS + 1] = std::array::from_fn(|i| { let freq = (log_min + (log_max - log_min) * i as f32 / NUM_BANDS as f32).exp(); let bin = (freq * FFT_SIZE as f32 / sample_rate).round() as usize; bin.min(FFT_SIZE / 2) }); Self { ring: vec![0.0; FFT_SIZE], pos: 0, fft, window, scratch: vec![Complex::default(); scratch_len], fft_buf: vec![Complex::default(); FFT_SIZE], band_edges, } } fn feed(&mut self, samples: &[f32], output: &SpectrumBuffer) { for &s in samples { self.ring[self.pos] = s; self.pos += 1; if self.pos >= FFT_SIZE { self.pos = 0; self.run_fft(output); } } } fn run_fft(&mut self, output: &SpectrumBuffer) { for i in 0..FFT_SIZE { let idx = (self.pos + i) % FFT_SIZE; self.fft_buf[i] = Complex::new(self.ring[idx] * self.window[i], 0.0); } self.fft .process_with_scratch(&mut self.fft_buf, &mut self.scratch); let mut bands = [0.0f32; NUM_BANDS]; for (band, mag) in bands.iter_mut().enumerate() { let lo = self.band_edges[band]; let hi = self.band_edges[band + 1].max(lo + 1); let sum: f32 = self.fft_buf[lo..hi].iter().map(|c| c.norm()).sum(); let avg = sum / (hi - lo) as f32; let amplitude = avg / (FFT_SIZE as f32 / 4.0); let db = 20.0 * amplitude.max(1e-10).log10(); *mag = ((db + 80.0) / 80.0).clamp(0.0, 1.0); } output.write(&bands); } } pub struct AnalysisHandle { running: Arc, // Held to keep the thread alive until this handle is dropped. _thread: Option>, } impl Drop for AnalysisHandle { fn drop(&mut self) { self.running.store(false, Ordering::SeqCst); } } pub fn spawn_analysis_thread( sample_rate: f32, spectrum_buffer: Arc, ) -> (ringbuf::HeapProd, AnalysisHandle) { let rb = HeapRb::::new(ANALYSIS_RING_SIZE); let (producer, consumer) = rb.split(); let running = Arc::new(AtomicBool::new(true)); let running_clone = Arc::clone(&running); let thread = thread::Builder::new() .name("fft-analysis".into()) .spawn(move || { analysis_loop(consumer, spectrum_buffer, sample_rate, running_clone); }) .expect("Failed to spawn FFT analysis thread"); let handle = AnalysisHandle { running, _thread: Some(thread), }; (producer, handle) } fn analysis_loop( mut consumer: ringbuf::HeapCons, spectrum_buffer: Arc, sample_rate: f32, running: Arc, ) { let mut analyzer = SpectrumAnalyzer::new(sample_rate); let mut local_buf = [0.0f32; 256]; while running.load(Ordering::Relaxed) { let count = consumer.pop_slice(&mut local_buf); if count > 0 { analyzer.feed(&local_buf[..count], &spectrum_buffer); } else { thread::sleep(std::time::Duration::from_micros(500)); } } } pub fn preload_sample_heads( entries: Vec<(String, std::path::PathBuf)>, target_sr: f32, registry: &doux::SampleRegistry, ) { let mut batch = Vec::with_capacity(entries.len()); for (name, path) in &entries { match doux::sampling::decode_sample_head(path, target_sr) { Ok(data) => batch.push((name.clone(), Arc::new(data))), Err(e) => eprintln!("preload {name}: {e}"), } } if !batch.is_empty() { registry.insert_batch(batch); } } #[cfg(feature = "cli")] use cpal::traits::{DeviceTrait, StreamTrait}; #[cfg(feature = "cli")] use cpal::Stream; #[cfg(feature = "cli")] use crossbeam_channel::{Receiver, Sender}; #[cfg(feature = "cli")] use doux::{Engine, EngineMetrics}; #[cfg(feature = "cli")] use super::AudioCommand; #[cfg(feature = "cli")] pub struct AudioStreamConfig { pub output_device: Option, pub input_device: Option, pub channels: u16, pub buffer_size: u32, pub max_voices: usize, } #[cfg(feature = "cli")] pub struct AudioStreamInfo { pub sample_rate: f32, pub host_name: String, pub channels: u16, } #[cfg(feature = "cli")] type BuildStreamResult = ( Stream, Option, AudioStreamInfo, AnalysisHandle, Arc, ); #[cfg(feature = "cli")] #[allow(clippy::too_many_arguments)] pub fn build_stream( config: &AudioStreamConfig, audio_rx: Receiver, scope_buffer: Arc, spectrum_buffer: Arc, metrics: Arc, initial_samples: Vec, audio_sample_pos: Arc, error_tx: Sender, sample_paths: &[std::path::PathBuf], device_lost: Arc, ) -> Result { let device = match &config.output_device { Some(name) => doux::audio::find_output_device(name) .ok_or_else(|| format!("Device not found: {name}"))?, None => doux::audio::default_output_device().ok_or("No default output device")?, }; let default_config = device.default_output_config().map_err(|e| e.to_string())?; let sample_rate = default_config.sample_rate() as f32; let max_channels = doux::audio::max_output_channels(&device); let channels = config.channels.min(max_channels).max(2); let host_name = doux::audio::preferred_host().id().name().to_string(); let is_jack = host_name.to_lowercase().contains("jack"); let buffer_size = if config.buffer_size > 0 && !is_jack { cpal::BufferSize::Fixed(config.buffer_size) } else { cpal::BufferSize::Default }; let stream_config = cpal::StreamConfig { channels, sample_rate: default_config.sample_rate(), buffer_size, }; let sr = sample_rate; let effective_channels = channels; let channels = channels as usize; let max_voices = config.max_voices; let block_size = if config.buffer_size > 0 { config.buffer_size as usize } else { 512 }; let mut engine = Engine::new_with_metrics(sample_rate, channels, max_voices, Arc::clone(&metrics), block_size); engine.sample_index = initial_samples; for path in sample_paths { if let Some(sf2_path) = doux::soundfont::find_sf2_file(path) { if let Err(e) = engine.load_soundfont(&sf2_path) { eprintln!("Failed to load soundfont: {e}"); } else { break; } } } let registry = Arc::clone(&engine.sample_registry); let input_device = config .input_device .as_ref() .and_then(|name| { let dev = doux::audio::find_input_device(name); if dev.is_none() { eprintln!("input device not found: {name}"); } dev }); let input_channels: usize = input_device .as_ref() .and_then(|dev| dev.default_input_config().ok()) .map_or(0, |cfg| cfg.channels() as usize); engine.input_channels = input_channels; const INPUT_BUFFER_BASE: usize = 8192; let input_buffer_size = INPUT_BUFFER_BASE * (input_channels.max(2) / 2); let (input_producer, input_consumer) = HeapRb::::new(input_buffer_size).split(); let input_stream = input_device.and_then(|dev| { let input_cfg = match dev.default_input_config() { Ok(cfg) => cfg, Err(e) => { eprintln!("input config error: {e}"); return None; } }; if input_cfg.sample_rate() != default_config.sample_rate() { eprintln!( "warning: input sample rate ({}Hz) differs from output ({}Hz)", input_cfg.sample_rate(), default_config.sample_rate() ); } eprintln!( "opening input: {}ch @ {}Hz", input_cfg.channels(), input_cfg.sample_rate() ); let mut input_producer = input_producer; let stream = dev .build_input_stream( &input_cfg.into(), move |data: &[f32], _| { input_producer.push_slice(data); }, { let device_lost = Arc::clone(&device_lost); move |err: cpal::StreamError| { eprintln!("input stream error: {err}"); match err { cpal::StreamError::DeviceNotAvailable | cpal::StreamError::StreamInvalidated => { device_lost.store(true, Ordering::Release); } _ => {} } } }, None, ) .ok()?; stream.play().ok()?; Some(stream) }); let (mut fft_producer, analysis_handle) = spawn_analysis_thread(sample_rate, spectrum_buffer); let mut cmd_buffer = String::with_capacity(256); let mut rt_set = false; let mut live_scratch = vec![0.0f32; 4096]; let mut input_consumer = input_consumer; let stream = device .build_output_stream( &stream_config, move |data: &mut [f32], _| { if !rt_set { let ok = super::realtime::set_realtime_priority(); rt_set = true; if !ok { super::realtime::warn_no_rt("audio"); } } let buffer_samples = data.len() / channels; let buffer_time_ns = (buffer_samples as f64 / sr as f64 * 1e9) as u64; audio_sample_pos.fetch_add(buffer_samples as u64, Ordering::Release); while let Ok(cmd) = audio_rx.try_recv() { match cmd { AudioCommand::Evaluate { cmd, tick } => { let cmd_ref = match tick { Some(t) => { cmd_buffer.clear(); use std::fmt::Write; let _ = write!(&mut cmd_buffer, "{cmd}/tick/{t}"); cmd_buffer.as_str() } None => &cmd, }; engine.evaluate(cmd_ref); } AudioCommand::Hush => { engine.hush(); } AudioCommand::Panic => { engine.panic(); } AudioCommand::LoadSamples(samples) => { engine.sample_index.extend(samples); } AudioCommand::LoadSoundfont(path) => { if let Err(e) = engine.load_soundfont(&path) { eprintln!("Failed to load soundfont: {e}"); } } } } let nch_in = input_channels.max(1); let raw_len = buffer_samples * nch_in; if live_scratch.len() < raw_len { live_scratch.resize(raw_len, 0.0); } live_scratch[..raw_len].fill(0.0); input_consumer.pop_slice(&mut live_scratch[..raw_len]); engine.metrics.load.set_buffer_time(buffer_time_ns); engine.process_block(data, &[], &live_scratch[..raw_len]); scope_buffer.write(data); // Feed mono mix to analysis thread via ring buffer (non-blocking) for chunk in data.chunks(channels) { let mono = chunk.iter().sum::() / channels as f32; let _ = fft_producer.try_push(mono); } }, move |err: cpal::StreamError| { let _ = error_tx.try_send(format!("stream error: {err}")); match err { cpal::StreamError::DeviceNotAvailable | cpal::StreamError::StreamInvalidated => { device_lost.store(true, Ordering::Release); } _ => {} } }, None, ) .map_err(|e| format!("Failed to build stream: {e}"))?; stream .play() .map_err(|e| format!("Failed to play stream: {e}"))?; let info = AudioStreamInfo { sample_rate, host_name, channels: effective_channels, }; Ok((stream, input_stream, info, analysis_handle, registry)) }