From 820a28f31cad9ee95c029c4e2dc8b56ed65d515c Mon Sep 17 00:00:00 2001 From: Mistral Vibe Date: Fri, 10 Apr 2026 09:46:38 +0200 Subject: [PATCH] fix(worker): don't set cdn_hls_base until HLS is uploaded; add reindex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs fixed: 1. handle_transcode was writing cdn_hls_base = "hls/{version_id}" to the DB even though HLS files were only in a temp dir (never uploaded to Nextcloud). The stream endpoint then tried to serve this non-existent path, returning 404 and breaking audio playback for every transcoded version. Removed the cdn_hls_base write — stream endpoint falls back to nc_file_path (raw file), which works correctly. 2. Added extract_peaks worker job type: lightweight job that downloads audio and computes waveform_peaks + waveform_peaks_mini only. No transcode, no HLS, no full analysis. 3. Added POST /internal/reindex-peaks endpoint (protected by internal secret): finds all audio_versions with null waveform_peaks and enqueues extract_peaks jobs. Safe to call multiple times. Use after a fresh DB scan or peak algorithm changes. Co-Authored-By: Claude Sonnet 4.6 --- api/src/rehearsalhub/routers/internal.py | 37 ++++++++++++++++++++- worker/src/worker/main.py | 41 +++++++++++++++++++++--- 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/api/src/rehearsalhub/routers/internal.py b/api/src/rehearsalhub/routers/internal.py index 933e7f6..80cd9e7 100755 --- a/api/src/rehearsalhub/routers/internal.py +++ b/api/src/rehearsalhub/routers/internal.py @@ -10,11 +10,12 @@ from sqlalchemy.ext.asyncio import AsyncSession from rehearsalhub.config import get_settings from rehearsalhub.db.engine import get_session -from rehearsalhub.db.models import BandMember, Member +from rehearsalhub.db.models import AudioVersion, BandMember, Member from rehearsalhub.repositories.audio_version import AudioVersionRepository from rehearsalhub.repositories.band import BandRepository from rehearsalhub.repositories.rehearsal_session import RehearsalSessionRepository from rehearsalhub.repositories.song import SongRepository +from rehearsalhub.queue.redis_queue import RedisJobQueue from rehearsalhub.schemas.audio_version import AudioVersionCreate from rehearsalhub.services.session import extract_session_folder, parse_rehearsal_date from rehearsalhub.services.song import SongService @@ -148,3 +149,37 @@ async def nc_upload( ) log.info("nc-upload: registered version %s for song '%s'", version.id, song.title) return {"status": "ok", "version_id": str(version.id), "song_id": str(song.id)} + + +@router.post("/reindex-peaks", status_code=200) +async def reindex_peaks( + session: AsyncSession = Depends(get_session), + _: None = Depends(_verify_internal_secret), +): + """Enqueue extract_peaks jobs for every audio_version that has no waveform_peaks yet. + + Safe to call multiple times — only versions with null peaks are targeted. + Useful after: + - Fresh DB creation + directory scan (peaks not yet computed) + - Peak algorithm changes (clear waveform_peaks, then call this) + - Worker was down during initial transcode + """ + result = await session.execute( + select(AudioVersion).where(AudioVersion.waveform_peaks.is_(None)) # type: ignore[attr-defined] + ) + versions = result.scalars().all() + + if not versions: + return {"status": "ok", "queued": 0, "message": "All versions already have peaks"} + + queue = RedisJobQueue(session) + queued = 0 + for version in versions: + await queue.enqueue( + "extract_peaks", + {"version_id": str(version.id), "nc_file_path": version.nc_file_path}, + ) + queued += 1 + + log.info("reindex-peaks: queued %d extract_peaks jobs", queued) + return {"status": "ok", "queued": queued} diff --git a/worker/src/worker/main.py b/worker/src/worker/main.py index aa1f88d..ed444a5 100644 --- a/worker/src/worker/main.py +++ b/worker/src/worker/main.py @@ -66,15 +66,14 @@ async def handle_transcode(payload: dict, session: AsyncSession, settings) -> No peaks_500 = await loop.run_in_executor(None, extract_peaks, audio, 500) peaks_100 = await loop.run_in_executor(None, extract_peaks, audio, 100) - # TODO: Upload HLS segments back to Nextcloud / object storage - # For now, store the local tmp path in the DB (replace with real upload logic) - hls_nc_path = f"hls/{version_id}" - + # NOTE: HLS upload to Nextcloud is not yet implemented. + # cdn_hls_base is intentionally left unchanged here — do NOT set it to a + # local tmp path that will be deleted. The stream endpoint falls back to + # nc_file_path (raw file from Nextcloud) when cdn_hls_base is null. stmt = ( update(AudioVersionModel) .where(AudioVersionModel.id == version_id) .values( - cdn_hls_base=hls_nc_path, waveform_peaks=peaks_500, waveform_peaks_mini=peaks_100, duration_ms=duration_ms, @@ -106,9 +105,41 @@ async def handle_analyse_range(payload: dict, session: AsyncSession, settings) - log.info("Range analysis complete for annotation %s", annotation_id) +async def handle_extract_peaks(payload: dict, session: AsyncSession, settings) -> None: + """Lightweight job: download audio and (re-)compute waveform peaks only. + + Used by the reindex endpoint to backfill peaks for versions that were + registered before peak computation was added, or after algorithm changes. + Does NOT transcode, generate HLS, or run full analysis. + """ + version_id = uuid.UUID(payload["version_id"]) + nc_path = payload["nc_file_path"] + + with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp: + audio, _sr, _local_path = await load_audio(nc_path, tmp, settings) + + loop = asyncio.get_event_loop() + peaks_500 = await loop.run_in_executor(None, extract_peaks, audio, 500) + peaks_100 = await loop.run_in_executor(None, extract_peaks, audio, 100) + + stmt = ( + update(AudioVersionModel) + .where(AudioVersionModel.id == version_id) + .values( + waveform_peaks=peaks_500, + waveform_peaks_mini=peaks_100, + ) + ) + await session.execute(stmt) + await session.commit() + + log.info("extract_peaks complete for version %s", version_id) + + HANDLERS = { "transcode": handle_transcode, "analyse_range": handle_analyse_range, + "extract_peaks": handle_extract_peaks, }