Files
speech-to-text/lib/audio.py

184 lines
5.9 KiB
Python

import json
import os
import re
import subprocess
import tempfile
import sys
from .config import CHUNK_SECONDS, SILENCE_DB, SILENCE_DUR
def get_duration(path):
try:
result = subprocess.run(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', path],
capture_output=True, text=True, timeout=30
)
d = float(result.stdout.strip())
if d > 0:
return d
except Exception:
pass
try:
result = subprocess.run(
['ffprobe', '-v', 'error', '-analyzeduration', '100M',
'-probesize', '100M', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', path],
capture_output=True, text=True, timeout=30
)
d = float(result.stdout.strip())
if d > 0:
return d
except Exception:
pass
return 0.0
def detect_silence(path):
try:
result = subprocess.run(
['ffmpeg', '-hide_banner', '-loglevel', 'info', '-i', path,
'-af', f'silencedetect=noise={SILENCE_DB}dB:d={SILENCE_DUR}', '-f', 'null', '-'],
capture_output=True, text=True, timeout=300
)
starts, ends = [], []
for line in result.stderr.splitlines():
m = re.search(r'silence_start:\s*([0-9.]+)', line)
if m:
starts.append(float(m.group(1)))
m = re.search(r'silence_end:\s*([0-9.]+)', line)
if m:
ends.append(float(m.group(1)))
if len(starts) > len(ends):
ends.append(get_duration(path))
return list(zip(starts, ends))
except Exception as e:
print(f"[silence_detect] failed: {e}", file=sys.stderr)
return []
def find_split_points(silences, total_duration, target=None, min_chunk=5.0, tolerance=None):
if target is None:
target = CHUNK_SECONDS
if tolerance is None:
tolerance = max(target / 2, 8.0)
if total_duration <= target:
return [0.0, total_duration]
splits = [0.0]
current = 0.0
while current + target < total_duration:
target_point = current + target
best = None
best_dist = float('inf')
for s, e in silences:
if target_point - tolerance <= s <= target_point + tolerance:
dist = abs(s - target_point)
if dist < best_dist:
best_dist = dist
best = s
if best is None:
best = target_point
if best - current < min_chunk:
best = current + min_chunk
if total_duration - best < min_chunk:
best = total_duration
splits.append(best)
break
splits.append(best)
current = best
if splits[-1] < total_duration:
splits.append(total_duration)
return splits
def convert_to_wav(input_path, output_path):
subprocess.run(
['ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
'-i', input_path, '-ar', '16000', '-ac', '1', '-c:a', 'pcm_s16le', output_path],
check=True, capture_output=True, timeout=300
)
def extract_chunk(input_path, output_path, start, end):
subprocess.run(
['ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
'-i', input_path, '-ss', str(start), '-to', str(end),
'-ar', '16000', '-ac', '1', '-c:a', 'pcm_s16le', output_path],
check=True, capture_output=True, timeout=120
)
def prepare_audio(input_path):
duration = get_duration(input_path)
tmpdir = tempfile.mkdtemp(prefix='stt_chunks_')
if input_path.lower().endswith('.wav'):
wav_path = input_path
else:
print(f"[audio] converting to WAV for accurate seeking...", file=sys.stderr)
wav_path = os.path.join(tmpdir, 'full.wav')
convert_to_wav(input_path, wav_path)
if duration <= 0.0 and wav_path != input_path:
duration = get_duration(wav_path)
if duration <= 0.0:
print("[audio] could not determine duration, processing as single segment", file=sys.stderr)
duration = 0.0
return tmpdir, wav_path, duration
def fmt_srt(seconds):
h = int(seconds) // 3600
m = (int(seconds) % 3600) // 60
s = int(seconds) % 60
ms = int((seconds - int(seconds)) * 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def fmt_vtt(seconds):
h = int(seconds) // 3600
m = (int(seconds) % 3600) // 60
s = int(seconds) % 60
ms = int((seconds - int(seconds)) * 1000)
return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}"
def build_response(segments, fmt):
if fmt == 'text':
return ' '.join(s['text'] for s in segments).strip()
elif fmt == 'srt':
lines = []
for i, seg in enumerate(segments, 1):
lines.append(f"{i}")
lines.append(f"{fmt_srt(seg['start'])} --> {fmt_srt(seg['end'])}")
lines.append(seg['text'])
lines.append("")
return "\n".join(lines)
elif fmt == 'vtt':
lines = ["WEBVTT", ""]
for seg in segments:
lines.append(f"{fmt_vtt(seg['start'])} --> {fmt_vtt(seg['end'])}")
lines.append(seg['text'])
lines.append("")
return "\n".join(lines)
elif fmt == 'verbose_json':
total_duration = segments[-1]['end'] if segments else 0.0
full_text = ' '.join(s['text'] for s in segments).strip()
for i, seg in enumerate(segments):
seg['id'] = i
return json.dumps({
"task": "transcribe",
"language": segments[0].get('language', 'fr') if segments else 'fr',
"duration": total_duration,
"text": full_text,
"segments": segments
}, ensure_ascii=False)
else:
full_text = ' '.join(s['text'] for s in segments).strip()
return json.dumps({"text": full_text}, ensure_ascii=False)