Refactor storage to provider-agnostic band-scoped model
Replaces per-member Nextcloud credentials with a BandStorage model that supports multiple providers. Credentials are Fernet-encrypted at rest; worker receives audio via an internal streaming endpoint instead of direct storage access. - Add BandStorage DB model with partial unique index (one active per band) - Add migrations 0007 (create band_storage) and 0008 (drop old nc columns) - Add StorageFactory that builds the correct StorageClient from BandStorage - Add storage router: connect/nextcloud, OAuth2 authorize/callback, list, disconnect - Add Fernet encryption helpers in security/encryption.py - Rewrite watcher for per-band polling via internal API config endpoint - Update worker to stream audio from API instead of accessing storage directly - Update frontend: new storage API in bands.ts, rewritten StorageSection, simplified band creation modal (no storage step) - Add STORAGE_ENCRYPTION_KEY to all docker-compose files Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -22,9 +22,29 @@ RUN --mount=type=bind,from=essentia-builder,source=/usr/local/lib,target=/essent
|
||||
|
||||
RUN pip install uv
|
||||
|
||||
FROM base AS development
|
||||
COPY pyproject.toml .
|
||||
RUN uv sync --all-extras --no-install-project --frozen || uv sync --all-extras --no-install-project
|
||||
ENV PYTHONPATH=/app/src
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
ENV LOG_LEVEL=DEBUG
|
||||
CMD ["/bin/sh", "-c", "PYTHONPATH=/app/src exec /app/.venv/bin/watchfiles --ignore-permission-denied '/app/.venv/bin/python -m worker.main' src"]
|
||||
|
||||
FROM base AS production
|
||||
COPY pyproject.toml .
|
||||
RUN uv sync --no-dev --frozen || uv sync --no-dev
|
||||
COPY . .
|
||||
ENV PYTHONPATH=/app/src
|
||||
|
||||
# Pre-warm librosa/numba JIT cache and pooch downloads so they happen at build
|
||||
# time and are baked into the image rather than downloaded on every cold start.
|
||||
RUN uv run python -c "\
|
||||
import numpy as np; \
|
||||
import librosa; \
|
||||
_dummy = np.zeros(22050, dtype=np.float32); \
|
||||
librosa.beat.beat_track(y=_dummy, sr=22050); \
|
||||
librosa.feature.chroma_stft(y=_dummy, sr=22050); \
|
||||
print('librosa warmup done') \
|
||||
"
|
||||
|
||||
CMD ["uv", "run", "python", "-m", "worker.main"]
|
||||
|
||||
@@ -26,6 +26,7 @@ dev = [
|
||||
"pytest-asyncio>=0.23",
|
||||
"pytest-cov>=5",
|
||||
"ruff>=0.4",
|
||||
"watchfiles>=0.21",
|
||||
]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
|
||||
@@ -10,9 +10,8 @@ class WorkerSettings(BaseSettings):
|
||||
redis_url: str = "redis://localhost:6379/0"
|
||||
job_queue_key: str = "rh:jobs"
|
||||
|
||||
nextcloud_url: str = "http://nextcloud"
|
||||
nextcloud_user: str = "ncadmin"
|
||||
nextcloud_pass: str = ""
|
||||
api_url: str = "http://api:8000"
|
||||
internal_secret: str = "dev-change-me-in-production"
|
||||
|
||||
audio_tmp_dir: str = "/tmp/audio"
|
||||
analysis_version: str = "1.0.0"
|
||||
|
||||
@@ -36,6 +36,14 @@ class AudioVersionModel(Base):
|
||||
uploaded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
|
||||
|
||||
|
||||
class SongModel(Base):
|
||||
__tablename__ = "songs"
|
||||
|
||||
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
|
||||
global_bpm: Mapped[Optional[float]] = mapped_column(Numeric(6, 2))
|
||||
global_key: Mapped[Optional[str]] = mapped_column(String(30))
|
||||
|
||||
|
||||
class RangeAnalysisModel(Base):
|
||||
__tablename__ = "range_analyses"
|
||||
|
||||
|
||||
@@ -23,23 +23,30 @@ from worker.pipeline.analyse_range import run_range_analysis
|
||||
from worker.pipeline.transcode import get_duration_ms, transcode_to_hls
|
||||
from worker.pipeline.waveform import extract_peaks, generate_waveform_file
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
||||
logging.basicConfig(
|
||||
level=os.environ.get("LOG_LEVEL", "INFO").upper(),
|
||||
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
||||
)
|
||||
# Numba floods logs with JIT compilation details at DEBUG level — keep it quiet
|
||||
logging.getLogger("numba").setLevel(logging.WARNING)
|
||||
log = logging.getLogger("worker")
|
||||
|
||||
|
||||
async def load_audio(nc_path: str, tmp_dir: str, settings) -> tuple[np.ndarray, int, str]:
|
||||
"""Download from Nextcloud and load as numpy array. Returns (audio, sr, local_path)."""
|
||||
async def load_audio(version_id: str, filename: str, tmp_dir: str, settings) -> tuple[np.ndarray, int, str]:
|
||||
"""Download audio via the internal API and load as numpy array. Returns (audio, sr, local_path)."""
|
||||
import httpx
|
||||
|
||||
local_path = os.path.join(tmp_dir, Path(nc_path).name)
|
||||
dav_url = f"{settings.nextcloud_url}/remote.php/dav/files/{settings.nextcloud_user}/{nc_path.lstrip('/')}"
|
||||
local_path = os.path.join(tmp_dir, filename)
|
||||
url = f"{settings.api_url}/api/v1/internal/audio/{version_id}/stream"
|
||||
log.info("Fetching audio for version %s from %s", version_id, url)
|
||||
async with httpx.AsyncClient(
|
||||
auth=(settings.nextcloud_user, settings.nextcloud_pass), timeout=120.0
|
||||
headers={"X-Internal-Token": settings.internal_secret}, timeout=120.0
|
||||
) as client:
|
||||
resp = await client.get(dav_url)
|
||||
resp.raise_for_status()
|
||||
with open(local_path, "wb") as f:
|
||||
f.write(resp.content)
|
||||
async with client.stream("GET", url) as resp:
|
||||
resp.raise_for_status()
|
||||
with open(local_path, "wb") as f:
|
||||
async for chunk in resp.aiter_bytes(65536):
|
||||
f.write(chunk)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
audio, sr = await loop.run_in_executor(
|
||||
@@ -53,7 +60,7 @@ async def handle_transcode(payload: dict, session: AsyncSession, settings) -> No
|
||||
nc_path = payload["nc_file_path"]
|
||||
|
||||
with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp:
|
||||
audio, sr, local_path = await load_audio(nc_path, tmp, settings)
|
||||
audio, sr, local_path = await load_audio(str(version_id), Path(nc_path).name, tmp, settings)
|
||||
duration_ms = await get_duration_ms(local_path)
|
||||
|
||||
hls_dir = os.path.join(tmp, "hls")
|
||||
@@ -99,7 +106,7 @@ async def handle_analyse_range(payload: dict, session: AsyncSession, settings) -
|
||||
raise ValueError(f"AudioVersion {version_id} not found")
|
||||
|
||||
with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp:
|
||||
audio, sr, _ = await load_audio(version.nc_file_path, tmp, settings)
|
||||
audio, sr, _ = await load_audio(str(version_id), Path(version.nc_file_path).name, tmp, settings)
|
||||
await run_range_analysis(audio, sr, version_id, annotation_id, start_ms, end_ms, session)
|
||||
|
||||
log.info("Range analysis complete for annotation %s", annotation_id)
|
||||
@@ -116,7 +123,7 @@ async def handle_extract_peaks(payload: dict, session: AsyncSession, settings) -
|
||||
nc_path = payload["nc_file_path"]
|
||||
|
||||
with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp:
|
||||
audio, _sr, _local_path = await load_audio(nc_path, tmp, settings)
|
||||
audio, _sr, _local_path = await load_audio(str(version_id), Path(nc_path).name, tmp, settings)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
peaks_500 = await loop.run_in_executor(None, extract_peaks, audio, 500)
|
||||
@@ -146,17 +153,27 @@ HANDLERS = {
|
||||
async def main() -> None:
|
||||
settings = get_settings()
|
||||
os.makedirs(settings.audio_tmp_dir, exist_ok=True)
|
||||
log.info(
|
||||
"Worker config — redis_url=%s api_url=%s queue=%s",
|
||||
settings.redis_url, settings.api_url, settings.job_queue_key,
|
||||
)
|
||||
|
||||
engine = create_async_engine(settings.database_url, pool_pre_ping=True)
|
||||
session_factory = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
|
||||
redis = aioredis.from_url(settings.redis_url, decode_responses=True)
|
||||
|
||||
# Drain stale job IDs left in Redis from previous runs whose API transactions
|
||||
# were never committed (e.g. crashed processes).
|
||||
stale = await redis.llen(settings.job_queue_key)
|
||||
if stale:
|
||||
log.warning("Draining %d stale job IDs from Redis queue before starting", stale)
|
||||
await redis.delete(settings.job_queue_key)
|
||||
# Wait for Redis to be reachable before proceeding (startup race condition guard).
|
||||
for attempt in range(1, 31):
|
||||
try:
|
||||
await redis.ping()
|
||||
log.info("Redis connection established (attempt %d)", attempt)
|
||||
break
|
||||
except Exception as exc:
|
||||
if attempt == 30:
|
||||
log.error("Redis unreachable after 30 attempts — giving up: %s", exc)
|
||||
raise
|
||||
log.warning("Redis not ready (attempt %d/30): %s — retrying in 2s", attempt, exc)
|
||||
await asyncio.sleep(2)
|
||||
|
||||
log.info("Worker started. Listening for jobs on %s", settings.job_queue_key)
|
||||
|
||||
|
||||
@@ -3,15 +3,28 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from worker.analyzers.base import AnalysisResult
|
||||
from worker.analyzers.bpm import BPMAnalyzer
|
||||
from worker.analyzers.key import KeyAnalyzer
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Dedicated pool so heavy Essentia threads can't starve the default executor.
|
||||
# max_workers=2 covers BPM + Key running sequentially per job.
|
||||
_analysis_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="analysis")
|
||||
|
||||
# Per-analyzer timeout in seconds. Essentia multifeature BPM can be slow on
|
||||
# long recordings; 3 minutes is generous for a single-track analysis pass.
|
||||
_ANALYZER_TIMEOUT = 180.0
|
||||
|
||||
|
||||
async def run_full_analysis(
|
||||
audio: np.ndarray,
|
||||
@@ -19,10 +32,25 @@ async def run_full_analysis(
|
||||
version_id: uuid.UUID,
|
||||
session: AsyncSession,
|
||||
) -> dict[str, Any]:
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
bpm_result = await loop.run_in_executor(None, BPMAnalyzer().analyze, audio, sample_rate)
|
||||
key_result = await loop.run_in_executor(None, KeyAnalyzer().analyze, audio, sample_rate)
|
||||
try:
|
||||
bpm_result = await asyncio.wait_for(
|
||||
loop.run_in_executor(_analysis_pool, BPMAnalyzer().analyze, audio, sample_rate),
|
||||
timeout=_ANALYZER_TIMEOUT,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
log.warning("BPM analysis timed out for version %s — storing null", version_id)
|
||||
bpm_result = AnalysisResult(analyzer_name="bpm", fields={"bpm": None, "bpm_confidence": None})
|
||||
|
||||
try:
|
||||
key_result = await asyncio.wait_for(
|
||||
loop.run_in_executor(_analysis_pool, KeyAnalyzer().analyze, audio, sample_rate),
|
||||
timeout=_ANALYZER_TIMEOUT,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
log.warning("Key analysis timed out for version %s — storing null", version_id)
|
||||
key_result = AnalysisResult(analyzer_name="key", fields={"key": None, "scale": None, "key_confidence": None})
|
||||
|
||||
fields: dict[str, Any] = {**bpm_result.fields, **key_result.fields}
|
||||
|
||||
@@ -33,15 +61,32 @@ async def run_full_analysis(
|
||||
global_bpm = fields.get("bpm")
|
||||
global_key = fields.get("key")
|
||||
|
||||
from worker.db import SongModel
|
||||
|
||||
# Mark version analysis done
|
||||
stmt = (
|
||||
update(AudioVersionModel)
|
||||
.where(AudioVersionModel.id == version_id)
|
||||
.values(
|
||||
analysis_status="done",
|
||||
**({} if global_bpm is None else {"global_bpm": global_bpm}),
|
||||
)
|
||||
.values(analysis_status="done")
|
||||
)
|
||||
await session.execute(stmt)
|
||||
|
||||
# Write BPM/key to the song (global_bpm/global_key live on songs, not audio_versions)
|
||||
version = await session.get(AudioVersionModel, version_id)
|
||||
if version is not None:
|
||||
song_extra: dict[str, Any] = {}
|
||||
if global_bpm is not None:
|
||||
song_extra["global_bpm"] = global_bpm
|
||||
if global_key is not None:
|
||||
song_extra["global_key"] = global_key
|
||||
if song_extra:
|
||||
song_stmt = (
|
||||
update(SongModel)
|
||||
.where(SongModel.id == version.song_id)
|
||||
.values(**song_extra)
|
||||
)
|
||||
await session.execute(song_stmt)
|
||||
|
||||
await session.commit()
|
||||
|
||||
return fields
|
||||
|
||||
@@ -41,16 +41,26 @@ async def get_duration_ms(input_path: str) -> int:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, _ = await proc.communicate()
|
||||
try:
|
||||
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30.0)
|
||||
except asyncio.TimeoutError:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
raise RuntimeError(f"ffprobe timed out for {input_path}")
|
||||
info = json.loads(stdout)
|
||||
duration_s = float(info.get("format", {}).get("duration", 0))
|
||||
return int(duration_s * 1000)
|
||||
|
||||
|
||||
async def _run_ffmpeg(cmd: list[str]) -> None:
|
||||
async def _run_ffmpeg(cmd: list[str], timeout: float = 600.0) -> None:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
_, stderr = await proc.communicate()
|
||||
try:
|
||||
_, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
raise RuntimeError(f"FFmpeg timed out after {timeout}s")
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError(f"FFmpeg failed: {stderr.decode()[:500]}")
|
||||
|
||||
Reference in New Issue
Block a user