fix(worker): don't set cdn_hls_base until HLS is uploaded; add reindex

Two bugs fixed:

1. handle_transcode was writing cdn_hls_base = "hls/{version_id}" to the DB
   even though HLS files were only in a temp dir (never uploaded to Nextcloud).
   The stream endpoint then tried to serve this non-existent path, returning 404
   and breaking audio playback for every transcoded version. Removed the
   cdn_hls_base write — stream endpoint falls back to nc_file_path (raw file),
   which works correctly.

2. Added extract_peaks worker job type: lightweight job that downloads audio
   and computes waveform_peaks + waveform_peaks_mini only. No transcode, no HLS,
   no full analysis.

3. Added POST /internal/reindex-peaks endpoint (protected by internal secret):
   finds all audio_versions with null waveform_peaks and enqueues extract_peaks
   jobs. Safe to call multiple times. Use after a fresh DB scan or peak algorithm
   changes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mistral Vibe
2026-04-10 09:46:38 +02:00
parent efb16a096d
commit 820a28f31c
2 changed files with 72 additions and 6 deletions

View File

@@ -10,11 +10,12 @@ from sqlalchemy.ext.asyncio import AsyncSession
from rehearsalhub.config import get_settings from rehearsalhub.config import get_settings
from rehearsalhub.db.engine import get_session from rehearsalhub.db.engine import get_session
from rehearsalhub.db.models import BandMember, Member from rehearsalhub.db.models import AudioVersion, BandMember, Member
from rehearsalhub.repositories.audio_version import AudioVersionRepository from rehearsalhub.repositories.audio_version import AudioVersionRepository
from rehearsalhub.repositories.band import BandRepository from rehearsalhub.repositories.band import BandRepository
from rehearsalhub.repositories.rehearsal_session import RehearsalSessionRepository from rehearsalhub.repositories.rehearsal_session import RehearsalSessionRepository
from rehearsalhub.repositories.song import SongRepository from rehearsalhub.repositories.song import SongRepository
from rehearsalhub.queue.redis_queue import RedisJobQueue
from rehearsalhub.schemas.audio_version import AudioVersionCreate from rehearsalhub.schemas.audio_version import AudioVersionCreate
from rehearsalhub.services.session import extract_session_folder, parse_rehearsal_date from rehearsalhub.services.session import extract_session_folder, parse_rehearsal_date
from rehearsalhub.services.song import SongService from rehearsalhub.services.song import SongService
@@ -148,3 +149,37 @@ async def nc_upload(
) )
log.info("nc-upload: registered version %s for song '%s'", version.id, song.title) log.info("nc-upload: registered version %s for song '%s'", version.id, song.title)
return {"status": "ok", "version_id": str(version.id), "song_id": str(song.id)} return {"status": "ok", "version_id": str(version.id), "song_id": str(song.id)}
@router.post("/reindex-peaks", status_code=200)
async def reindex_peaks(
session: AsyncSession = Depends(get_session),
_: None = Depends(_verify_internal_secret),
):
"""Enqueue extract_peaks jobs for every audio_version that has no waveform_peaks yet.
Safe to call multiple times — only versions with null peaks are targeted.
Useful after:
- Fresh DB creation + directory scan (peaks not yet computed)
- Peak algorithm changes (clear waveform_peaks, then call this)
- Worker was down during initial transcode
"""
result = await session.execute(
select(AudioVersion).where(AudioVersion.waveform_peaks.is_(None)) # type: ignore[attr-defined]
)
versions = result.scalars().all()
if not versions:
return {"status": "ok", "queued": 0, "message": "All versions already have peaks"}
queue = RedisJobQueue(session)
queued = 0
for version in versions:
await queue.enqueue(
"extract_peaks",
{"version_id": str(version.id), "nc_file_path": version.nc_file_path},
)
queued += 1
log.info("reindex-peaks: queued %d extract_peaks jobs", queued)
return {"status": "ok", "queued": queued}

View File

@@ -66,15 +66,14 @@ async def handle_transcode(payload: dict, session: AsyncSession, settings) -> No
peaks_500 = await loop.run_in_executor(None, extract_peaks, audio, 500) peaks_500 = await loop.run_in_executor(None, extract_peaks, audio, 500)
peaks_100 = await loop.run_in_executor(None, extract_peaks, audio, 100) peaks_100 = await loop.run_in_executor(None, extract_peaks, audio, 100)
# TODO: Upload HLS segments back to Nextcloud / object storage # NOTE: HLS upload to Nextcloud is not yet implemented.
# For now, store the local tmp path in the DB (replace with real upload logic) # cdn_hls_base is intentionally left unchanged here — do NOT set it to a
hls_nc_path = f"hls/{version_id}" # local tmp path that will be deleted. The stream endpoint falls back to
# nc_file_path (raw file from Nextcloud) when cdn_hls_base is null.
stmt = ( stmt = (
update(AudioVersionModel) update(AudioVersionModel)
.where(AudioVersionModel.id == version_id) .where(AudioVersionModel.id == version_id)
.values( .values(
cdn_hls_base=hls_nc_path,
waveform_peaks=peaks_500, waveform_peaks=peaks_500,
waveform_peaks_mini=peaks_100, waveform_peaks_mini=peaks_100,
duration_ms=duration_ms, duration_ms=duration_ms,
@@ -106,9 +105,41 @@ async def handle_analyse_range(payload: dict, session: AsyncSession, settings) -
log.info("Range analysis complete for annotation %s", annotation_id) log.info("Range analysis complete for annotation %s", annotation_id)
async def handle_extract_peaks(payload: dict, session: AsyncSession, settings) -> None:
"""Lightweight job: download audio and (re-)compute waveform peaks only.
Used by the reindex endpoint to backfill peaks for versions that were
registered before peak computation was added, or after algorithm changes.
Does NOT transcode, generate HLS, or run full analysis.
"""
version_id = uuid.UUID(payload["version_id"])
nc_path = payload["nc_file_path"]
with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp:
audio, _sr, _local_path = await load_audio(nc_path, tmp, settings)
loop = asyncio.get_event_loop()
peaks_500 = await loop.run_in_executor(None, extract_peaks, audio, 500)
peaks_100 = await loop.run_in_executor(None, extract_peaks, audio, 100)
stmt = (
update(AudioVersionModel)
.where(AudioVersionModel.id == version_id)
.values(
waveform_peaks=peaks_500,
waveform_peaks_mini=peaks_100,
)
)
await session.execute(stmt)
await session.commit()
log.info("extract_peaks complete for version %s", version_id)
HANDLERS = { HANDLERS = {
"transcode": handle_transcode, "transcode": handle_transcode,
"analyse_range": handle_analyse_range, "analyse_range": handle_analyse_range,
"extract_peaks": handle_extract_peaks,
} }