Files
Cagire/src/engine/audio.rs
2026-02-23 22:06:09 +01:00

413 lines
13 KiB
Rust

//! Audio output stream (cpal) and FFT spectrum analysis.
use ringbuf::{traits::*, HeapRb};
use rustfft::{num_complex::Complex, FftPlanner};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
#[cfg(feature = "cli")]
use std::sync::atomic::AtomicU64;
pub struct ScopeBuffer {
pub samples: [AtomicU32; 256],
pub samples_right: [AtomicU32; 256],
peak_left: AtomicU32,
peak_right: AtomicU32,
}
impl Default for ScopeBuffer {
fn default() -> Self {
Self::new()
}
}
impl ScopeBuffer {
pub fn new() -> Self {
Self {
samples: std::array::from_fn(|_| AtomicU32::new(0)),
samples_right: std::array::from_fn(|_| AtomicU32::new(0)),
peak_left: AtomicU32::new(0),
peak_right: AtomicU32::new(0),
}
}
pub fn write(&self, data: &[f32]) {
let mut peak_l: f32 = 0.0;
let mut peak_r: f32 = 0.0;
// Calculate peaks from ALL input frames for accurate VU metering
for chunk in data.chunks(2) {
if let [left, right] = chunk {
peak_l = peak_l.max(left.abs());
peak_r = peak_r.max(right.abs());
}
}
// Downsample for scope display
let frames = data.len() / 2;
for (i, (left_atom, right_atom)) in
self.samples.iter().zip(self.samples_right.iter()).enumerate()
{
let frame_idx = (i * frames) / self.samples.len();
let left = data.get(frame_idx * 2).copied().unwrap_or(0.0);
let right = data.get(frame_idx * 2 + 1).copied().unwrap_or(0.0);
left_atom.store(left.to_bits(), Ordering::Relaxed);
right_atom.store(right.to_bits(), Ordering::Relaxed);
}
self.peak_left.store(peak_l.to_bits(), Ordering::Relaxed);
self.peak_right.store(peak_r.to_bits(), Ordering::Relaxed);
}
pub fn read(&self) -> [f32; 256] {
std::array::from_fn(|i| f32::from_bits(self.samples[i].load(Ordering::Relaxed)))
}
pub fn read_right(&self) -> [f32; 256] {
std::array::from_fn(|i| f32::from_bits(self.samples_right[i].load(Ordering::Relaxed)))
}
pub fn peaks(&self) -> (f32, f32) {
let left = f32::from_bits(self.peak_left.load(Ordering::Relaxed));
let right = f32::from_bits(self.peak_right.load(Ordering::Relaxed));
(left, right)
}
}
pub struct SpectrumBuffer {
pub bands: [AtomicU32; 32],
}
impl Default for SpectrumBuffer {
fn default() -> Self {
Self::new()
}
}
impl SpectrumBuffer {
pub fn new() -> Self {
Self {
bands: std::array::from_fn(|_| AtomicU32::new(0)),
}
}
pub fn write(&self, data: &[f32; 32]) {
for (atom, &val) in self.bands.iter().zip(data.iter()) {
atom.store(val.to_bits(), Ordering::Relaxed);
}
}
pub fn read(&self) -> [f32; 32] {
std::array::from_fn(|i| f32::from_bits(self.bands[i].load(Ordering::Relaxed)))
}
}
const FFT_SIZE: usize = 512;
const NUM_BANDS: usize = 32;
const ANALYSIS_RING_SIZE: usize = 4096;
struct SpectrumAnalyzer {
ring: Vec<f32>,
pos: usize,
fft: Arc<dyn rustfft::Fft<f32>>,
window: [f32; FFT_SIZE],
scratch: Vec<Complex<f32>>,
fft_buf: Vec<Complex<f32>>,
band_edges: [usize; NUM_BANDS + 1],
}
impl SpectrumAnalyzer {
fn new(sample_rate: f32) -> Self {
let mut planner = FftPlanner::new();
let fft = planner.plan_fft_forward(FFT_SIZE);
let scratch_len = fft.get_inplace_scratch_len();
let window: [f32; FFT_SIZE] = std::array::from_fn(|i| {
0.5 * (1.0 - (2.0 * std::f32::consts::PI * i as f32 / (FFT_SIZE - 1) as f32).cos())
});
let min_freq: f32 = 20.0;
let max_freq: f32 = 16000.0;
let log_min = min_freq.ln();
let log_max = max_freq.ln();
let band_edges: [usize; NUM_BANDS + 1] = std::array::from_fn(|i| {
let freq = (log_min + (log_max - log_min) * i as f32 / NUM_BANDS as f32).exp();
let bin = (freq * FFT_SIZE as f32 / sample_rate).round() as usize;
bin.min(FFT_SIZE / 2)
});
Self {
ring: vec![0.0; FFT_SIZE],
pos: 0,
fft,
window,
scratch: vec![Complex::default(); scratch_len],
fft_buf: vec![Complex::default(); FFT_SIZE],
band_edges,
}
}
fn feed(&mut self, samples: &[f32], output: &SpectrumBuffer) {
for &s in samples {
self.ring[self.pos] = s;
self.pos += 1;
if self.pos >= FFT_SIZE {
self.pos = 0;
self.run_fft(output);
}
}
}
fn run_fft(&mut self, output: &SpectrumBuffer) {
for i in 0..FFT_SIZE {
let idx = (self.pos + i) % FFT_SIZE;
self.fft_buf[i] = Complex::new(self.ring[idx] * self.window[i], 0.0);
}
self.fft
.process_with_scratch(&mut self.fft_buf, &mut self.scratch);
let mut bands = [0.0f32; NUM_BANDS];
for (band, mag) in bands.iter_mut().enumerate() {
let lo = self.band_edges[band];
let hi = self.band_edges[band + 1].max(lo + 1);
let sum: f32 = self.fft_buf[lo..hi].iter().map(|c| c.norm()).sum();
let avg = sum / (hi - lo) as f32;
let amplitude = avg / (FFT_SIZE as f32 / 4.0);
let db = 20.0 * amplitude.max(1e-10).log10();
*mag = ((db + 60.0) / 60.0).clamp(0.0, 1.0);
}
output.write(&bands);
}
}
pub struct AnalysisHandle {
running: Arc<AtomicBool>,
// Held to keep the thread alive until this handle is dropped.
_thread: Option<JoinHandle<()>>,
}
impl Drop for AnalysisHandle {
fn drop(&mut self) {
self.running.store(false, Ordering::SeqCst);
}
}
pub fn spawn_analysis_thread(
sample_rate: f32,
spectrum_buffer: Arc<SpectrumBuffer>,
) -> (ringbuf::HeapProd<f32>, AnalysisHandle) {
let rb = HeapRb::<f32>::new(ANALYSIS_RING_SIZE);
let (producer, consumer) = rb.split();
let running = Arc::new(AtomicBool::new(true));
let running_clone = Arc::clone(&running);
let thread = thread::Builder::new()
.name("fft-analysis".into())
.spawn(move || {
analysis_loop(consumer, spectrum_buffer, sample_rate, running_clone);
})
.expect("Failed to spawn FFT analysis thread");
let handle = AnalysisHandle {
running,
_thread: Some(thread),
};
(producer, handle)
}
fn analysis_loop(
mut consumer: ringbuf::HeapCons<f32>,
spectrum_buffer: Arc<SpectrumBuffer>,
sample_rate: f32,
running: Arc<AtomicBool>,
) {
let mut analyzer = SpectrumAnalyzer::new(sample_rate);
let mut local_buf = [0.0f32; 256];
while running.load(Ordering::Relaxed) {
let count = consumer.pop_slice(&mut local_buf);
if count > 0 {
analyzer.feed(&local_buf[..count], &spectrum_buffer);
} else {
thread::sleep(std::time::Duration::from_micros(500));
}
}
}
pub fn preload_sample_heads(
entries: Vec<(String, std::path::PathBuf)>,
target_sr: f32,
registry: &doux::SampleRegistry,
) {
let mut batch = Vec::with_capacity(entries.len());
for (name, path) in &entries {
match doux::sampling::decode_sample_head(path, target_sr) {
Ok(data) => batch.push((name.clone(), Arc::new(data))),
Err(e) => eprintln!("preload {name}: {e}"),
}
}
if !batch.is_empty() {
registry.insert_batch(batch);
}
}
#[cfg(feature = "cli")]
use cpal::traits::{DeviceTrait, StreamTrait};
#[cfg(feature = "cli")]
use cpal::Stream;
#[cfg(feature = "cli")]
use crossbeam_channel::{Receiver, Sender};
#[cfg(feature = "cli")]
use doux::{Engine, EngineMetrics};
#[cfg(feature = "cli")]
use super::AudioCommand;
#[cfg(feature = "cli")]
pub struct AudioStreamConfig {
pub output_device: Option<String>,
pub channels: u16,
pub buffer_size: u32,
pub max_voices: usize,
}
#[cfg(feature = "cli")]
pub struct AudioStreamInfo {
pub sample_rate: f32,
pub host_name: String,
pub channels: u16,
}
#[cfg(feature = "cli")]
#[allow(clippy::too_many_arguments)]
pub fn build_stream(
config: &AudioStreamConfig,
audio_rx: Receiver<AudioCommand>,
scope_buffer: Arc<ScopeBuffer>,
spectrum_buffer: Arc<SpectrumBuffer>,
metrics: Arc<EngineMetrics>,
initial_samples: Vec<doux::sampling::SampleEntry>,
audio_sample_pos: Arc<AtomicU64>,
error_tx: Sender<String>,
) -> Result<
(
Stream,
AudioStreamInfo,
AnalysisHandle,
Arc<doux::SampleRegistry>,
),
String,
> {
let device = match &config.output_device {
Some(name) => doux::audio::find_output_device(name)
.ok_or_else(|| format!("Device not found: {name}"))?,
None => doux::audio::default_output_device().ok_or("No default output device")?,
};
let default_config = device.default_output_config().map_err(|e| e.to_string())?;
let sample_rate = default_config.sample_rate() as f32;
let max_channels = doux::audio::max_output_channels(&device);
let channels = config.channels.min(max_channels);
let host_name = doux::audio::preferred_host().id().name().to_string();
let is_jack = host_name.to_lowercase().contains("jack");
let buffer_size = if config.buffer_size > 0 && !is_jack {
cpal::BufferSize::Fixed(config.buffer_size)
} else {
cpal::BufferSize::Default
};
let stream_config = cpal::StreamConfig {
channels,
sample_rate: default_config.sample_rate(),
buffer_size,
};
let sr = sample_rate;
let effective_channels = channels;
let channels = channels as usize;
let max_voices = config.max_voices;
let mut engine =
Engine::new_with_metrics(sample_rate, channels, max_voices, Arc::clone(&metrics));
engine.sample_index = initial_samples;
let registry = Arc::clone(&engine.sample_registry);
let (mut fft_producer, analysis_handle) = spawn_analysis_thread(sample_rate, spectrum_buffer);
let mut cmd_buffer = String::with_capacity(256);
let mut rt_set = false;
let stream = device
.build_output_stream(
&stream_config,
move |data: &mut [f32], _| {
if !rt_set {
super::realtime::set_realtime_priority();
rt_set = true;
}
let buffer_samples = data.len() / channels;
let buffer_time_ns = (buffer_samples as f64 / sr as f64 * 1e9) as u64;
audio_sample_pos.fetch_add(buffer_samples as u64, Ordering::Release);
while let Ok(cmd) = audio_rx.try_recv() {
match cmd {
AudioCommand::Evaluate { cmd, time } => {
let cmd_ref = match time {
Some(t) => {
cmd_buffer.clear();
use std::fmt::Write;
let _ = write!(&mut cmd_buffer, "{cmd}/time/{t:.6}");
cmd_buffer.as_str()
}
None => &cmd,
};
engine.evaluate(cmd_ref);
}
AudioCommand::Hush => {
engine.hush();
}
AudioCommand::Panic => {
engine.panic();
}
AudioCommand::LoadSamples(samples) => {
engine.sample_index.extend(samples);
}
}
}
engine.metrics.load.set_buffer_time(buffer_time_ns);
engine.process_block(data, &[], &[]);
scope_buffer.write(data);
// Feed mono mix to analysis thread via ring buffer (non-blocking)
for chunk in data.chunks(channels) {
let mono = chunk.iter().sum::<f32>() / channels as f32;
let _ = fft_producer.try_push(mono);
}
},
move |err| { let _ = error_tx.try_send(format!("stream error: {err}")); },
None,
)
.map_err(|e| format!("Failed to build stream: {e}"))?;
stream
.play()
.map_err(|e| format!("Failed to play stream: {e}"))?;
let info = AudioStreamInfo {
sample_rate,
host_name,
channels: effective_channels,
};
Ok((stream, info, analysis_handle, registry))
}