diff --git a/api/alembic/versions/0007_band_storage.py b/api/alembic/versions/0007_band_storage.py new file mode 100644 index 0000000..acd363f --- /dev/null +++ b/api/alembic/versions/0007_band_storage.py @@ -0,0 +1,68 @@ +"""Add band_storage table for provider-agnostic, encrypted storage configs. + +Each band can have one active storage provider (Nextcloud, Google Drive, etc.). +Credentials are Fernet-encrypted at the application layer — never stored in plaintext. +A partial unique index enforces at most one active config per band at the DB level. + +Revision ID: 0007_band_storage +Revises: 0006_waveform_peaks_in_db +Create Date: 2026-04-10 +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID + +revision = "0007_band_storage" +down_revision = "0006_waveform_peaks_in_db" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "band_storage", + sa.Column("id", UUID(as_uuid=True), primary_key=True), + sa.Column( + "band_id", + UUID(as_uuid=True), + sa.ForeignKey("bands.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("provider", sa.String(20), nullable=False), + sa.Column("label", sa.String(255), nullable=True), + sa.Column("is_active", sa.Boolean, nullable=False, server_default="false"), + sa.Column("root_path", sa.Text, nullable=True), + # Fernet-encrypted JSON — never plaintext + sa.Column("credentials", sa.Text, nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.func.now(), + nullable=False, + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.func.now(), + nullable=False, + ), + ) + + # Index for fast per-band lookups + op.create_index("ix_band_storage_band_id", "band_storage", ["band_id"]) + + # Partial unique index: at most one active storage per band + op.execute( + """ + CREATE UNIQUE INDEX uq_band_active_storage + ON band_storage (band_id) + WHERE is_active = true + """ + ) + + +def downgrade() -> None: + op.execute("DROP INDEX IF EXISTS uq_band_active_storage") + op.drop_index("ix_band_storage_band_id", table_name="band_storage") + op.drop_table("band_storage") diff --git a/api/alembic/versions/0008_drop_nc_columns.py b/api/alembic/versions/0008_drop_nc_columns.py new file mode 100644 index 0000000..f0103a4 --- /dev/null +++ b/api/alembic/versions/0008_drop_nc_columns.py @@ -0,0 +1,42 @@ +"""Remove Nextcloud-specific columns from members and bands. + +Prior to this migration, storage credentials lived directly on the Member +and Band rows. They are now in the band_storage table (migration 0007), +encrypted at the application layer. + +Run 0007 first; if you still need to migrate existing data, do it in a +separate script before applying this migration. + +Revision ID: 0008_drop_nc_columns +Revises: 0007_band_storage +Create Date: 2026-04-10 +""" + +from alembic import op +import sqlalchemy as sa + +revision = "0008_drop_nc_columns" +down_revision = "0007_band_storage" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Drop Nextcloud credential columns from members + op.drop_column("members", "nc_url") + op.drop_column("members", "nc_username") + op.drop_column("members", "nc_password") + + # Drop Nextcloud-specific columns from bands + op.drop_column("bands", "nc_folder_path") + op.drop_column("bands", "nc_user") + + +def downgrade() -> None: + # Restore columns (data is lost — this is intentional) + op.add_column("bands", sa.Column("nc_user", sa.String(255), nullable=True)) + op.add_column("bands", sa.Column("nc_folder_path", sa.Text, nullable=True)) + + op.add_column("members", sa.Column("nc_password", sa.Text, nullable=True)) + op.add_column("members", sa.Column("nc_username", sa.String(255), nullable=True)) + op.add_column("members", sa.Column("nc_url", sa.Text, nullable=True)) diff --git a/api/pyproject.toml b/api/pyproject.toml index 26e8442..99fc25e 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "pydantic[email]>=2.7", "pydantic-settings>=2.3", "python-jose[cryptography]>=3.3", + "cryptography>=42.0", "bcrypt>=4.1", "httpx>=0.27", "redis[hiredis]>=5.0", diff --git a/api/src/rehearsalhub/config.py b/api/src/rehearsalhub/config.py index c8dbf5a..65d130d 100755 --- a/api/src/rehearsalhub/config.py +++ b/api/src/rehearsalhub/config.py @@ -12,6 +12,10 @@ class Settings(BaseSettings): jwt_algorithm: str = "HS256" access_token_expire_minutes: int = 60 # 1 hour + # Storage credential encryption — generate once with: Fernet.generate_key().decode() + # NEVER commit this value; store in env / secrets manager only. + storage_encryption_key: str = "" + # Database database_url: str # postgresql+asyncpg://... @@ -28,6 +32,19 @@ class Settings(BaseSettings): # Worker analysis_version: str = "1.0.0" + # OAuth2 — Google Drive + google_client_id: str = "" + google_client_secret: str = "" + + # OAuth2 — Dropbox + dropbox_app_key: str = "" + dropbox_app_secret: str = "" + + # OAuth2 — OneDrive (Microsoft Graph) + onedrive_client_id: str = "" + onedrive_client_secret: str = "" + onedrive_tenant_id: str = "common" # 'common' for multi-tenant apps + @lru_cache def get_settings() -> Settings: diff --git a/api/src/rehearsalhub/db/models.py b/api/src/rehearsalhub/db/models.py index 3837a38..14b69ff 100755 --- a/api/src/rehearsalhub/db/models.py +++ b/api/src/rehearsalhub/db/models.py @@ -10,12 +10,14 @@ from sqlalchemy import ( Boolean, DateTime, ForeignKey, + Index, Integer, Numeric, String, Text, UniqueConstraint, func, + text, ) from sqlalchemy.dialects.postgresql import ARRAY, JSONB, UUID from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship @@ -35,9 +37,6 @@ class Member(Base): email: Mapped[str] = mapped_column(String(320), unique=True, nullable=False, index=True) display_name: Mapped[str] = mapped_column(String(255), nullable=False) avatar_url: Mapped[str | None] = mapped_column(Text) - nc_username: Mapped[str | None] = mapped_column(String(255)) - nc_url: Mapped[str | None] = mapped_column(Text) - nc_password: Mapped[str | None] = mapped_column(Text) password_hash: Mapped[str] = mapped_column(Text, nullable=False) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), nullable=False @@ -67,8 +66,6 @@ class Band(Base): id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name: Mapped[str] = mapped_column(String(255), nullable=False) slug: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True) - nc_folder_path: Mapped[str | None] = mapped_column(Text) - nc_user: Mapped[str | None] = mapped_column(String(255)) genre_tags: Mapped[list[str]] = mapped_column(ARRAY(Text), default=list, nullable=False) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), nullable=False @@ -86,6 +83,59 @@ class Band(Base): sessions: Mapped[list[RehearsalSession]] = relationship( "RehearsalSession", back_populates="band", cascade="all, delete-orphan" ) + storage_configs: Mapped[list[BandStorage]] = relationship( + "BandStorage", back_populates="band", cascade="all, delete-orphan" + ) + + +class BandStorage(Base): + """Storage provider configuration for a band. + + Credentials are stored as a Fernet-encrypted JSON blob — never in plaintext. + Only one ``BandStorage`` row per band may be active at a time, enforced by + a partial unique index on ``(band_id) WHERE is_active``. + + Supported providers and their credential shapes (all encrypted): + nextcloud: { "url": "...", "username": "...", "app_password": "..." } + googledrive: { "access_token": "...", "refresh_token": "...", + "token_expiry": "ISO-8601", "token_type": "Bearer" } + onedrive: { "access_token": "...", "refresh_token": "...", + "token_expiry": "ISO-8601", "token_type": "Bearer" } + dropbox: { "access_token": "...", "refresh_token": "...", + "token_expiry": "ISO-8601" } + """ + + __tablename__ = "band_storage" + __table_args__ = ( + # DB-enforced: at most one active storage config per band. + Index( + "uq_band_active_storage", + "band_id", + unique=True, + postgresql_where=text("is_active = true"), + ), + ) + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + band_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), ForeignKey("bands.id", ondelete="CASCADE"), nullable=False, index=True + ) + # 'nextcloud' | 'googledrive' | 'onedrive' | 'dropbox' + provider: Mapped[str] = mapped_column(String(20), nullable=False) + label: Mapped[str | None] = mapped_column(String(255)) + is_active: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) + # Root path within the provider's storage (e.g. "/bands/cool-band/"). Not sensitive. + root_path: Mapped[str | None] = mapped_column(Text) + # Fernet-encrypted JSON blob — shape depends on provider (see docstring above). + credentials: Mapped[str] = mapped_column(Text, nullable=False) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False + ) + + band: Mapped[Band] = relationship("Band", back_populates="storage_configs") class BandMember(Base): diff --git a/api/src/rehearsalhub/main.py b/api/src/rehearsalhub/main.py index 944740a..a90d6f8 100755 --- a/api/src/rehearsalhub/main.py +++ b/api/src/rehearsalhub/main.py @@ -20,6 +20,7 @@ from rehearsalhub.routers import ( members_router, sessions_router, songs_router, + storage_router, versions_router, ws_router, ) @@ -94,6 +95,7 @@ def create_app() -> FastAPI: app.include_router(annotations_router, prefix=prefix) app.include_router(members_router, prefix=prefix) app.include_router(internal_router, prefix=prefix) + app.include_router(storage_router, prefix=prefix) app.include_router(ws_router) # WebSocket routes don't use /api/v1 prefix @app.get("/api/health") diff --git a/api/src/rehearsalhub/repositories/audio_version.py b/api/src/rehearsalhub/repositories/audio_version.py index 3b0a9cd..1603683 100755 --- a/api/src/rehearsalhub/repositories/audio_version.py +++ b/api/src/rehearsalhub/repositories/audio_version.py @@ -17,6 +17,11 @@ class AudioVersionRepository(BaseRepository[AudioVersion]): result = await self.session.execute(stmt) return result.scalar_one_or_none() + async def get_by_nc_file_path(self, nc_file_path: str) -> AudioVersion | None: + stmt = select(AudioVersion).where(AudioVersion.nc_file_path == nc_file_path) + result = await self.session.execute(stmt) + return result.scalar_one_or_none() + async def list_for_song(self, song_id: uuid.UUID) -> list[AudioVersion]: stmt = ( select(AudioVersion) diff --git a/api/src/rehearsalhub/repositories/band.py b/api/src/rehearsalhub/repositories/band.py index a30577a..29a5b2e 100755 --- a/api/src/rehearsalhub/repositories/band.py +++ b/api/src/rehearsalhub/repositories/band.py @@ -7,7 +7,7 @@ from datetime import UTC, datetime, timedelta from sqlalchemy import select from sqlalchemy.orm import selectinload -from rehearsalhub.db.models import Band, BandInvite, BandMember +from rehearsalhub.db.models import Band, BandInvite, BandMember, BandStorage from rehearsalhub.repositories.base import BaseRepository @@ -92,16 +92,27 @@ class BandRepository(BaseRepository[Band]): return list(result.scalars().all()) async def get_by_nc_folder_prefix(self, path: str) -> Band | None: - """Return the band whose nc_folder_path is a prefix of path.""" - stmt = select(Band).where(Band.nc_folder_path.is_not(None)) + """Return the band whose active storage root_path is a prefix of *path*. + + Longest match wins (most-specific prefix) so nested paths resolve correctly. + """ + stmt = ( + select(Band, BandStorage.root_path) + .join( + BandStorage, + (BandStorage.band_id == Band.id) & BandStorage.is_active.is_(True), + ) + .where(BandStorage.root_path.is_not(None)) + ) result = await self.session.execute(stmt) - bands = result.scalars().all() - # Longest match wins (most specific prefix) + rows = result.all() best: Band | None = None - for band in bands: - folder = band.nc_folder_path # type: ignore[union-attr] - if path.startswith(folder) and (best is None or len(folder) > len(best.nc_folder_path)): # type: ignore[arg-type] + best_len = 0 + for band, root_path in rows: + folder = root_path.rstrip("/") + "/" + if path.startswith(folder) and len(folder) > best_len: best = band + best_len = len(folder) return best async def list_for_member(self, member_id: uuid.UUID) -> list[Band]: diff --git a/api/src/rehearsalhub/repositories/band_storage.py b/api/src/rehearsalhub/repositories/band_storage.py new file mode 100644 index 0000000..ed5baac --- /dev/null +++ b/api/src/rehearsalhub/repositories/band_storage.py @@ -0,0 +1,66 @@ +"""Repository for BandStorage — per-band storage provider configuration.""" + +from __future__ import annotations + +import uuid + +from sqlalchemy import select, update + +from rehearsalhub.db.models import BandStorage +from rehearsalhub.repositories.base import BaseRepository + + +class BandStorageRepository(BaseRepository[BandStorage]): + model = BandStorage + + async def get_active_for_band(self, band_id: uuid.UUID) -> BandStorage | None: + """Return the single active storage config for *band_id*, or None.""" + result = await self.session.execute( + select(BandStorage).where( + BandStorage.band_id == band_id, + BandStorage.is_active.is_(True), + ) + ) + return result.scalar_one_or_none() + + async def list_for_band(self, band_id: uuid.UUID) -> list[BandStorage]: + result = await self.session.execute( + select(BandStorage) + .where(BandStorage.band_id == band_id) + .order_by(BandStorage.created_at) + ) + return list(result.scalars().all()) + + async def list_active_by_provider(self, provider: str) -> list[BandStorage]: + """Return all active configs for a given provider (used by the watcher).""" + result = await self.session.execute( + select(BandStorage).where( + BandStorage.provider == provider, + BandStorage.is_active.is_(True), + ) + ) + return list(result.scalars().all()) + + async def activate(self, storage_id: uuid.UUID, band_id: uuid.UUID) -> BandStorage: + """Deactivate all configs for *band_id*, then activate *storage_id*.""" + await self.session.execute( + update(BandStorage) + .where(BandStorage.band_id == band_id) + .values(is_active=False) + ) + storage = await self.get_by_id(storage_id) + if storage is None: + raise LookupError(f"BandStorage {storage_id} not found") + storage.is_active = True + await self.session.flush() + await self.session.refresh(storage) + return storage + + async def deactivate_all(self, band_id: uuid.UUID) -> None: + """Deactivate every storage config for a band (disconnect).""" + await self.session.execute( + update(BandStorage) + .where(BandStorage.band_id == band_id) + .values(is_active=False) + ) + await self.session.flush() diff --git a/api/src/rehearsalhub/routers/__init__.py b/api/src/rehearsalhub/routers/__init__.py index 84ea1b1..a37fe9a 100755 --- a/api/src/rehearsalhub/routers/__init__.py +++ b/api/src/rehearsalhub/routers/__init__.py @@ -6,6 +6,7 @@ from rehearsalhub.routers.invites import router as invites_router from rehearsalhub.routers.members import router as members_router from rehearsalhub.routers.sessions import router as sessions_router from rehearsalhub.routers.songs import router as songs_router +from rehearsalhub.routers.storage import router as storage_router from rehearsalhub.routers.versions import router as versions_router from rehearsalhub.routers.ws import router as ws_router @@ -17,6 +18,7 @@ __all__ = [ "members_router", "sessions_router", "songs_router", + "storage_router", "versions_router", "annotations_router", "ws_router", diff --git a/api/src/rehearsalhub/routers/auth.py b/api/src/rehearsalhub/routers/auth.py index fe17085..f9b7eba 100755 --- a/api/src/rehearsalhub/routers/auth.py +++ b/api/src/rehearsalhub/routers/auth.py @@ -34,7 +34,7 @@ async def register(request: Request, req: RegisterRequest, session: AsyncSession member = await svc.register(req) except ValueError as e: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) - return MemberRead.from_model(member) + return MemberRead.model_validate(member) @router.post("/login", response_model=TokenResponse) @@ -87,7 +87,7 @@ async def logout(response: Response): @router.get("/me", response_model=MemberRead) async def get_me(current_member: Member = Depends(get_current_member)): - return MemberRead.from_model(current_member) + return MemberRead.model_validate(current_member) @router.patch("/me/settings", response_model=MemberRead) @@ -100,12 +100,6 @@ async def update_settings( updates: dict = {} if data.display_name is not None: updates["display_name"] = data.display_name - if data.nc_url is not None: - updates["nc_url"] = data.nc_url.rstrip("/") if data.nc_url else None - if data.nc_username is not None: - updates["nc_username"] = data.nc_username or None - if data.nc_password is not None: - updates["nc_password"] = data.nc_password or None if data.avatar_url is not None: updates["avatar_url"] = data.avatar_url or None @@ -113,7 +107,7 @@ async def update_settings( member = await repo.update(current_member, **updates) else: member = current_member - return MemberRead.from_model(member) + return MemberRead.model_validate(member) @router.post("/me/avatar", response_model=MemberRead) @@ -187,4 +181,4 @@ async def upload_avatar( repo = MemberRepository(session) avatar_url = f"/api/static/avatars/{filename}" member = await repo.update(current_member, avatar_url=avatar_url) - return MemberRead.from_model(member) + return MemberRead.model_validate(member) diff --git a/api/src/rehearsalhub/routers/bands.py b/api/src/rehearsalhub/routers/bands.py index daeb843..f995227 100755 --- a/api/src/rehearsalhub/routers/bands.py +++ b/api/src/rehearsalhub/routers/bands.py @@ -11,7 +11,6 @@ from rehearsalhub.repositories.band import BandRepository from rehearsalhub.schemas.band import BandCreate, BandRead, BandReadWithMembers, BandUpdate from rehearsalhub.schemas.invite import BandInviteList, BandInviteListItem from rehearsalhub.services.band import BandService -from rehearsalhub.storage.nextcloud import NextcloudClient router = APIRouter(prefix="/bands", tags=["bands"]) @@ -126,10 +125,9 @@ async def create_band( session: AsyncSession = Depends(get_session), current_member: Member = Depends(get_current_member), ): - storage = NextcloudClient.for_member(current_member) - svc = BandService(session, storage) + svc = BandService(session) try: - band = await svc.create_band(data, current_member.id, creator=current_member) + band = await svc.create_band(data, current_member.id) except ValueError as e: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) except LookupError as e: @@ -143,8 +141,7 @@ async def get_band( session: AsyncSession = Depends(get_session), current_member: Member = Depends(get_current_member), ): - storage = NextcloudClient.for_member(current_member) - svc = BandService(session, storage) + svc = BandService(session) try: await svc.assert_membership(band_id, current_member.id) except PermissionError: @@ -173,9 +170,10 @@ async def update_band( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Band not found") updates: dict = {} - if data.nc_folder_path is not None: - path = data.nc_folder_path.strip() - updates["nc_folder_path"] = (path.rstrip("/") + "/") if path else None + if data.name is not None: + updates["name"] = data.name.strip() + if data.genre_tags is not None: + updates["genre_tags"] = data.genre_tags if updates: band = await repo.update(band, **updates) diff --git a/api/src/rehearsalhub/routers/internal.py b/api/src/rehearsalhub/routers/internal.py index dcdf624..a2db88e 100755 --- a/api/src/rehearsalhub/routers/internal.py +++ b/api/src/rehearsalhub/routers/internal.py @@ -1,24 +1,28 @@ -"""Internal endpoints — called by trusted services (watcher) on the Docker network.""" +"""Internal endpoints — called by trusted services (watcher, worker) on the Docker network.""" import logging +import uuid from pathlib import Path from fastapi import APIRouter, Depends, Header, HTTPException, status +from fastapi.responses import StreamingResponse from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from rehearsalhub.config import get_settings from rehearsalhub.db.engine import get_session -from rehearsalhub.db.models import AudioVersion, BandMember, Member +from rehearsalhub.db.models import AudioVersion, BandMember from rehearsalhub.queue.redis_queue import RedisJobQueue from rehearsalhub.repositories.band import BandRepository +from rehearsalhub.repositories.band_storage import BandStorageRepository from rehearsalhub.repositories.rehearsal_session import RehearsalSessionRepository from rehearsalhub.repositories.song import SongRepository from rehearsalhub.schemas.audio_version import AudioVersionCreate +from rehearsalhub.security.encryption import decrypt_credentials from rehearsalhub.services.session import extract_session_folder, parse_rehearsal_date from rehearsalhub.services.song import SongService -from rehearsalhub.storage.nextcloud import NextcloudClient +from rehearsalhub.storage.factory import StorageFactory log = logging.getLogger(__name__) @@ -34,6 +38,9 @@ async def _verify_internal_secret(x_internal_token: str | None = Header(None)) - AUDIO_EXTENSIONS = {".mp3", ".wav", ".flac", ".ogg", ".m4a", ".aac", ".opus"} +# ── Watcher: detect new audio file ──────────────────────────────────────────── + + class NcUploadEvent(BaseModel): nc_file_path: str nc_file_etag: str | None = None @@ -45,10 +52,9 @@ async def nc_upload( session: AsyncSession = Depends(get_session), _: None = Depends(_verify_internal_secret), ): - """ - Called by nc-watcher when a new audio file is detected in Nextcloud. - Parses the path to find/create the band+song and registers a version. + """Called by nc-watcher when a new audio file is detected in storage. + Parses the path to find/create the band + song and registers a version. Expected path format: bands/{slug}/[songs/]{folder}/filename.ext """ path = event.nc_file_path.lstrip("/") @@ -58,13 +64,11 @@ async def nc_upload( band_repo = BandRepository(session) - # Try slug-based lookup first (standard bands/{slug}/ layout) parts = path.split("/") band = None if len(parts) >= 3 and parts[0] == "bands": band = await band_repo.get_by_slug(parts[1]) - # Fall back to prefix match for bands with custom nc_folder_path if band is None: band = await band_repo.get_by_nc_folder_prefix(path) @@ -72,25 +76,14 @@ async def nc_upload( log.info("nc-upload: no band found for path '%s' — skipping", path) return {"status": "skipped", "reason": "band not found"} - # Determine song title and folder from path. - # The title is always the filename stem (e.g. "take1" from "take1.wav"). - # The nc_folder groups all versions of the same recording (the parent directory). - # - # Examples: - # bands/my-band/take1.wav → folder=bands/my-band/, title=take1 - # bands/my-band/231015/take1.wav → folder=bands/my-band/231015/, title=take1 - # bands/my-band/songs/groove/take1.wav → folder=bands/my-band/songs/groove/, title=take1 parent = str(Path(path).parent) nc_folder = parent.rstrip("/") + "/" title = Path(path).stem - # If the file sits directly inside a dated session folder, give it a unique - # virtual folder so it becomes its own song (not merged with other takes). session_folder_path = extract_session_folder(path) if session_folder_path and session_folder_path.rstrip("/") == nc_folder.rstrip("/"): nc_folder = nc_folder + title + "/" - # Resolve or create rehearsal session from YYMMDD folder segment session_repo = RehearsalSessionRepository(session) rehearsal_date = parse_rehearsal_date(path) rehearsal_session_id = None @@ -125,23 +118,13 @@ async def nc_upload( log.error("nc-upload: failed to find/create song for '%s': %s", path, exc, exc_info=True) raise HTTPException(status_code=500, detail="Failed to resolve song") from exc - # Use first member of the band as uploader (best-effort for watcher uploads) result = await session.execute( select(BandMember.member_id).where(BandMember.band_id == band.id).limit(1) ) uploader_id = result.scalar_one_or_none() - # Get the uploader's storage credentials - storage = None - if uploader_id: - uploader_result = await session.execute( - select(Member).where(Member.id == uploader_id).limit(1) # type: ignore[arg-type] - ) - uploader = uploader_result.scalar_one_or_none() - storage = NextcloudClient.for_member(uploader) if uploader else None - try: - song_svc = SongService(session, storage=storage) + song_svc = SongService(session) version = await song_svc.register_version( song.id, AudioVersionCreate( @@ -162,6 +145,97 @@ async def nc_upload( return {"status": "ok", "version_id": str(version.id), "song_id": str(song.id)} +# ── Worker: stream audio ─────────────────────────────────────────────────────── + + +@router.get("/audio/{version_id}/stream") +async def stream_audio( + version_id: uuid.UUID, + session: AsyncSession = Depends(get_session), + _: None = Depends(_verify_internal_secret), +): + """Proxy an audio file from the band's storage to the caller (audio-worker). + + The worker never handles storage credentials. This endpoint resolves the + band's active storage config and streams the file transparently. + """ + result = await session.execute( + select(AudioVersion).where(AudioVersion.id == version_id) + ) + version = result.scalar_one_or_none() + if version is None: + raise HTTPException(status_code=404, detail="Version not found") + + # Resolve the band from the song + from sqlalchemy.orm import selectinload + from rehearsalhub.db.models import Song + + song_result = await session.execute( + select(Song).where(Song.id == version.song_id) + ) + song = song_result.scalar_one_or_none() + if song is None: + raise HTTPException(status_code=404, detail="Song not found") + + try: + storage = await StorageFactory.create(session, song.band_id, get_settings()) + except LookupError: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail="Band has no active storage configured", + ) + + log.info("stream_audio: streaming version %s from storage", version_id) + + async def _stream(): + data = await storage.download(version.nc_file_path) + yield data + + return StreamingResponse(_stream(), media_type="application/octet-stream") + + +# ── Watcher: list active Nextcloud configs ───────────────────────────────────── + + +@router.get("/storage/nextcloud-watch-configs") +async def get_nextcloud_watch_configs( + session: AsyncSession = Depends(get_session), + _: None = Depends(_verify_internal_secret), +): + """Return decrypted Nextcloud configs for all active NC bands. + + Used exclusively by the nc-watcher service to know which Nextcloud + instances to poll and with what credentials. Traffic stays on the + internal Docker network and is never exposed externally. + """ + settings = get_settings() + if not settings.storage_encryption_key: + raise HTTPException(status_code=500, detail="Storage encryption key not configured") + + repo = BandStorageRepository(session) + configs = await repo.list_active_by_provider("nextcloud") + + result = [] + for config in configs: + try: + creds = decrypt_credentials(settings.storage_encryption_key, config.credentials) + result.append({ + "band_id": str(config.band_id), + "nc_url": creds["url"], + "nc_username": creds["username"], + "nc_app_password": creds["app_password"], + "root_path": config.root_path, + }) + except Exception as exc: + log.error("Failed to decrypt credentials for band_storage %s: %s", config.id, exc) + # Skip this band rather than failing the whole response + + return result + + +# ── Maintenance: reindex waveform peaks ─────────────────────────────────────── + + @router.post("/reindex-peaks", status_code=200) async def reindex_peaks( session: AsyncSession = Depends(get_session), @@ -170,10 +244,6 @@ async def reindex_peaks( """Enqueue extract_peaks jobs for every audio_version that has no waveform_peaks yet. Safe to call multiple times — only versions with null peaks are targeted. - Useful after: - - Fresh DB creation + directory scan (peaks not yet computed) - - Peak algorithm changes (clear waveform_peaks, then call this) - - Worker was down during initial transcode """ result = await session.execute( select(AudioVersion).where(AudioVersion.waveform_peaks.is_(None)) # type: ignore[attr-defined] diff --git a/api/src/rehearsalhub/routers/songs.py b/api/src/rehearsalhub/routers/songs.py index ceb6b90..47b4446 100755 --- a/api/src/rehearsalhub/routers/songs.py +++ b/api/src/rehearsalhub/routers/songs.py @@ -7,10 +7,13 @@ from fastapi.responses import StreamingResponse from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession +from rehearsalhub.config import get_settings from rehearsalhub.db.engine import get_session, get_session_factory +from rehearsalhub.queue.redis_queue import flush_pending_pushes from rehearsalhub.db.models import Member from rehearsalhub.dependencies import get_current_member from rehearsalhub.repositories.band import BandRepository +from rehearsalhub.repositories.band_storage import BandStorageRepository from rehearsalhub.repositories.comment import CommentRepository from rehearsalhub.repositories.song import SongRepository from rehearsalhub.routers.versions import _member_from_request @@ -19,7 +22,7 @@ from rehearsalhub.schemas.song import SongCreate, SongRead, SongUpdate from rehearsalhub.services.band import BandService from rehearsalhub.services.nc_scan import scan_band_folder from rehearsalhub.services.song import SongService -from rehearsalhub.storage.nextcloud import NextcloudClient +from rehearsalhub.storage.factory import StorageFactory log = logging.getLogger(__name__) @@ -47,8 +50,7 @@ async def list_songs( await band_svc.assert_membership(band_id, current_member.id) except PermissionError: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member") - storage = NextcloudClient.for_member(current_member) - song_svc = SongService(session, storage=storage) + song_svc = SongService(session) return await song_svc.list_songs(band_id) @@ -149,9 +151,8 @@ async def create_song( if band is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Band not found") - storage = NextcloudClient.for_member(current_member) - song_svc = SongService(session, storage=storage) - song = await song_svc.create_song(band_id, data, current_member.id, band.slug, creator=current_member) + song_svc = SongService(session) + song = await song_svc.create_song(band_id, data, current_member.id, band.slug) read = SongRead.model_validate(song) read.version_count = 0 return read @@ -186,22 +187,28 @@ async def scan_nextcloud_stream( Accepts ?token= for EventSource clients that can't set headers. """ band = await _get_band_and_assert_member(band_id, current_member, session) - band_folder = band.nc_folder_path or f"bands/{band.slug}/" - nc = NextcloudClient.for_member(current_member) + bs = await BandStorageRepository(session).get_active_for_band(band_id) + band_folder = (bs.root_path if bs and bs.root_path else None) or f"bands/{band.slug}/" member_id = current_member.id + settings = get_settings() async def event_generator(): async with get_session_factory()() as db: try: - async for event in scan_band_folder(db, nc, band_id, band_folder, member_id): + storage = await StorageFactory.create(db, band_id, settings) + async for event in scan_band_folder(db, storage, band_id, band_folder, member_id): yield json.dumps(event) + "\n" if event.get("type") in ("song", "session"): await db.commit() + await flush_pending_pushes(db) + except LookupError as exc: + yield json.dumps({"type": "error", "message": str(exc)}) + "\n" except Exception: log.exception("SSE scan error for band %s", band_id) yield json.dumps({"type": "error", "message": "Scan failed due to an internal error."}) + "\n" finally: await db.commit() + await flush_pending_pushes(db) return StreamingResponse( event_generator(), @@ -220,13 +227,18 @@ async def scan_nextcloud( Prefer the SSE /nc-scan/stream endpoint for large folders. """ band = await _get_band_and_assert_member(band_id, current_member, session) - band_folder = band.nc_folder_path or f"bands/{band.slug}/" - nc = NextcloudClient.for_member(current_member) + bs = await BandStorageRepository(session).get_active_for_band(band_id) + band_folder = (bs.root_path if bs and bs.root_path else None) or f"bands/{band.slug}/" + + try: + storage = await StorageFactory.create(session, band_id, get_settings()) + except LookupError as exc: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) songs: list[SongRead] = [] stats = {"found": 0, "imported": 0, "skipped": 0} - async for event in scan_band_folder(session, nc, band_id, band_folder, current_member.id): + async for event in scan_band_folder(session, storage, band_id, band_folder, current_member.id): if event["type"] == "song": songs.append(SongRead(**event["song"])) elif event["type"] == "done": diff --git a/api/src/rehearsalhub/routers/storage.py b/api/src/rehearsalhub/routers/storage.py new file mode 100644 index 0000000..63be1df --- /dev/null +++ b/api/src/rehearsalhub/routers/storage.py @@ -0,0 +1,336 @@ +"""Storage provider management endpoints. + +Bands connect to a storage provider (Nextcloud, Google Drive, OneDrive, Dropbox) +through this router. Credentials are encrypted before being written to the DB. + +OAuth2 flow: + 1. Admin calls GET /bands/{id}/storage/connect/{provider}/authorize + → receives a redirect URL to the provider's consent page + 2. After consent, provider redirects to GET /oauth/callback/{provider}?code=...&state=... + → tokens are exchanged, encrypted, stored, and the admin is redirected to the frontend + +Nextcloud (app-password) flow: + POST /bands/{id}/storage/connect/nextcloud + → credentials validated and stored immediately (no OAuth redirect needed) +""" + +from __future__ import annotations + +import logging +import secrets +import uuid +from datetime import datetime, timedelta, timezone +from urllib.parse import urlencode + +import httpx +from fastapi import APIRouter, Depends, HTTPException, Query, status +from fastapi.responses import RedirectResponse +from jose import JWTError, jwt +from sqlalchemy.ext.asyncio import AsyncSession + +from rehearsalhub.config import Settings, get_settings +from rehearsalhub.db.engine import get_session +from rehearsalhub.db.models import Member +from rehearsalhub.dependencies import get_current_member +from rehearsalhub.repositories.band_storage import BandStorageRepository +from rehearsalhub.schemas.storage import BandStorageRead, NextcloudConnect, OAuthAuthorizeResponse +from rehearsalhub.security.encryption import encrypt_credentials +from rehearsalhub.services.band import BandService + +log = logging.getLogger(__name__) + +router = APIRouter(tags=["storage"]) + +# OAuth2 state JWT expires after 15 minutes (consent must happen in this window) +_STATE_TTL_MINUTES = 15 + +# ── OAuth2 provider definitions ──────────────────────────────────────────────── + +_OAUTH_CONFIGS: dict[str, dict] = { + "googledrive": { + "auth_url": "https://accounts.google.com/o/oauth2/v2/auth", + "token_url": "https://oauth2.googleapis.com/token", + "scopes": "https://www.googleapis.com/auth/drive openid", + "extra_auth_params": {"access_type": "offline", "prompt": "consent"}, + }, + "onedrive": { + # tenant_id is injected at runtime from settings + "auth_url": "https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/authorize", + "token_url": "https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", + "scopes": "https://graph.microsoft.com/Files.ReadWrite offline_access", + "extra_auth_params": {}, + }, + "dropbox": { + "auth_url": "https://www.dropbox.com/oauth2/authorize", + "token_url": "https://api.dropboxapi.com/oauth2/token", + "scopes": "", # Dropbox uses app-level scopes set in the developer console + "extra_auth_params": {"token_access_type": "offline"}, + }, +} + + +def _get_client_id_and_secret(provider: str, settings: Settings) -> tuple[str, str]: + match provider: + case "googledrive": + return settings.google_client_id, settings.google_client_secret + case "onedrive": + return settings.onedrive_client_id, settings.onedrive_client_secret + case "dropbox": + return settings.dropbox_app_key, settings.dropbox_app_secret + case _: + raise ValueError(f"Unknown OAuth provider: {provider!r}") + + +def _redirect_uri(provider: str, settings: Settings) -> str: + scheme = "http" if settings.debug else "https" + return f"{scheme}://{settings.domain}/api/v1/oauth/callback/{provider}" + + +# ── State JWT helpers ────────────────────────────────────────────────────────── + + +def _encode_state(band_id: uuid.UUID, provider: str, settings: Settings) -> str: + payload = { + "band_id": str(band_id), + "provider": provider, + "nonce": secrets.token_hex(16), + "exp": datetime.now(timezone.utc) + timedelta(minutes=_STATE_TTL_MINUTES), + } + return jwt.encode(payload, settings.secret_key, algorithm=settings.jwt_algorithm) + + +def _decode_state(state: str, settings: Settings) -> dict: + try: + return jwt.decode(state, settings.secret_key, algorithms=[settings.jwt_algorithm]) + except JWTError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid OAuth state: {exc}") + + +# ── Nextcloud (app-password) ─────────────────────────────────────────────────── + + +@router.post( + "/bands/{band_id}/storage/connect/nextcloud", + response_model=BandStorageRead, + status_code=status.HTTP_201_CREATED, +) +async def connect_nextcloud( + band_id: uuid.UUID, + body: NextcloudConnect, + session: AsyncSession = Depends(get_session), + current_member: Member = Depends(get_current_member), + settings: Settings = Depends(get_settings), +): + """Connect a band to a Nextcloud instance using an app password.""" + band_svc = BandService(session) + try: + await band_svc.assert_admin(band_id, current_member.id) + except PermissionError: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") + + # Smoke-test the credentials before storing them + from rehearsalhub.storage.nextcloud import NextcloudClient + + nc = NextcloudClient(base_url=body.url, username=body.username, password=body.app_password) + try: + await nc.list_folder(body.root_path or "/") + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Could not connect to Nextcloud: {exc}", + ) + + creds = { + "url": body.url, + "username": body.username, + "app_password": body.app_password, + } + encrypted = encrypt_credentials(settings.storage_encryption_key, creds) + + repo = BandStorageRepository(session) + # Deactivate any previous storage before creating the new one + await repo.deactivate_all(band_id) + band_storage = await repo.create( + band_id=band_id, + provider="nextcloud", + label=body.label, + is_active=True, + root_path=body.root_path, + credentials=encrypted, + ) + await session.commit() + log.info("Band %s connected to Nextcloud (%s)", band_id, body.url) + return BandStorageRead.model_validate(band_storage) + + +# ── OAuth2 — authorize ───────────────────────────────────────────────────────── + + +@router.get( + "/bands/{band_id}/storage/connect/{provider}/authorize", + response_model=OAuthAuthorizeResponse, +) +async def oauth_authorize( + band_id: uuid.UUID, + provider: str, + session: AsyncSession = Depends(get_session), + current_member: Member = Depends(get_current_member), + settings: Settings = Depends(get_settings), +): + """Return the provider's OAuth2 authorization URL. + + The frontend should redirect the user to ``redirect_url``. + After the user consents, the provider redirects to our callback endpoint. + """ + if provider not in _OAUTH_CONFIGS: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Unknown provider {provider!r}. Supported: {list(_OAUTH_CONFIGS)}", + ) + + band_svc = BandService(session) + try: + await band_svc.assert_admin(band_id, current_member.id) + except PermissionError: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") + + client_id, _ = _get_client_id_and_secret(provider, settings) + if not client_id: + raise HTTPException( + status_code=status.HTTP_501_NOT_IMPLEMENTED, + detail=f"OAuth2 for {provider!r} is not configured on this server", + ) + + cfg = _OAUTH_CONFIGS[provider] + auth_url = cfg["auth_url"].format(tenant_id=settings.onedrive_tenant_id) + state = _encode_state(band_id, provider, settings) + redirect_uri = _redirect_uri(provider, settings) + + params: dict = { + "client_id": client_id, + "redirect_uri": redirect_uri, + "response_type": "code", + "state": state, + **cfg["extra_auth_params"], + } + if cfg["scopes"]: + params["scope"] = cfg["scopes"] + + return OAuthAuthorizeResponse( + redirect_url=f"{auth_url}?{urlencode(params)}", + provider=provider, + ) + + +# ── OAuth2 — callback ────────────────────────────────────────────────────────── + + +@router.get("/oauth/callback/{provider}") +async def oauth_callback( + provider: str, + code: str = Query(...), + state: str = Query(...), + session: AsyncSession = Depends(get_session), + settings: Settings = Depends(get_settings), +): + """Exchange authorization code for tokens, encrypt, and store.""" + if provider not in _OAUTH_CONFIGS: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unknown provider") + + state_data = _decode_state(state, settings) + band_id = uuid.UUID(state_data["band_id"]) + + client_id, client_secret = _get_client_id_and_secret(provider, settings) + cfg = _OAUTH_CONFIGS[provider] + token_url = cfg["token_url"].format(tenant_id=settings.onedrive_tenant_id) + redirect_uri = _redirect_uri(provider, settings) + + payload = { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": redirect_uri, + "client_id": client_id, + "client_secret": client_secret, + } + + try: + async with httpx.AsyncClient(timeout=15.0) as http: + resp = await http.post(token_url, data=payload) + resp.raise_for_status() + token_data = resp.json() + except Exception as exc: + log.error("OAuth token exchange failed for %s: %s", provider, exc) + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail="Token exchange failed") + + from datetime import timedelta + + expires_in = int(token_data.get("expires_in", 3600)) + expiry = datetime.now(timezone.utc) + timedelta(seconds=expires_in - 60) + + creds = { + "access_token": token_data["access_token"], + "refresh_token": token_data.get("refresh_token", ""), + "token_expiry": expiry.isoformat(), + "token_type": token_data.get("token_type", "Bearer"), + } + encrypted = encrypt_credentials(settings.storage_encryption_key, creds) + + repo = BandStorageRepository(session) + await repo.deactivate_all(band_id) + await repo.create( + band_id=band_id, + provider=provider, + label=None, + is_active=True, + root_path=None, + credentials=encrypted, + ) + await session.commit() + log.info("Band %s connected to %s via OAuth2", band_id, provider) + + # Redirect back to the frontend settings page + scheme = "http" if settings.debug else "https" + return RedirectResponse( + url=f"{scheme}://{settings.domain}/bands/{band_id}/settings?storage=connected", + status_code=status.HTTP_302_FOUND, + ) + + +# ── Read / disconnect ────────────────────────────────────────────────────────── + + +@router.get("/bands/{band_id}/storage", response_model=list[BandStorageRead]) +async def list_storage( + band_id: uuid.UUID, + session: AsyncSession = Depends(get_session), + current_member: Member = Depends(get_current_member), +): + """List all storage configs for a band (credentials never returned).""" + band_svc = BandService(session) + try: + await band_svc.assert_membership(band_id, current_member.id) + except PermissionError: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member") + + repo = BandStorageRepository(session) + configs = await repo.list_for_band(band_id) + return [BandStorageRead.model_validate(c) for c in configs] + + +@router.delete("/bands/{band_id}/storage", status_code=status.HTTP_204_NO_CONTENT) +async def disconnect_storage( + band_id: uuid.UUID, + session: AsyncSession = Depends(get_session), + current_member: Member = Depends(get_current_member), +): + """Deactivate the band's active storage (does not delete historical records).""" + band_svc = BandService(session) + try: + await band_svc.assert_admin(band_id, current_member.id) + except PermissionError: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") + + repo = BandStorageRepository(session) + await repo.deactivate_all(band_id) + await session.commit() + log.info("Band %s storage disconnected by member %s", band_id, current_member.id) diff --git a/api/src/rehearsalhub/routers/versions.py b/api/src/rehearsalhub/routers/versions.py index 2754a18..d49423b 100755 --- a/api/src/rehearsalhub/routers/versions.py +++ b/api/src/rehearsalhub/routers/versions.py @@ -17,9 +17,11 @@ from rehearsalhub.repositories.member import MemberRepository from rehearsalhub.repositories.song import SongRepository from rehearsalhub.schemas.audio_version import AudioVersionCreate, AudioVersionRead from rehearsalhub.services.auth import decode_token +from rehearsalhub.config import get_settings from rehearsalhub.services.band import BandService from rehearsalhub.services.song import SongService -from rehearsalhub.storage.nextcloud import NextcloudClient +from rehearsalhub.storage.factory import StorageFactory +from rehearsalhub.storage.protocol import StorageClient router = APIRouter(tags=["versions"]) @@ -35,7 +37,7 @@ _AUDIO_CONTENT_TYPES: dict[str, str] = { } -async def _download_with_retry(storage: NextcloudClient, file_path: str, max_retries: int = 3) -> bytes: +async def _download_with_retry(storage: StorageClient, file_path: str, max_retries: int = 3) -> bytes: """Download file from Nextcloud with retry logic for transient errors.""" last_error = None @@ -171,8 +173,7 @@ async def create_version( except PermissionError: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member") - storage = NextcloudClient.for_member(current_member) - song_svc = SongService(session, storage=storage) + song_svc = SongService(session) version = await song_svc.register_version(song_id, data, current_member.id) return AudioVersionRead.model_validate(version) @@ -219,15 +220,12 @@ async def stream_version( else: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No audio file") - # Use the uploader's NC credentials — invited members may not have NC configured - uploader: Member | None = None - if version.uploaded_by: - uploader = await MemberRepository(session).get_by_id(version.uploaded_by) - storage = NextcloudClient.for_member(uploader) if uploader else NextcloudClient.for_member(current_member) - if storage is None: + try: + storage = await StorageFactory.create(session, song.band_id, get_settings()) + except LookupError: raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="No storage provider configured for this account" + status_code=status.HTTP_502_BAD_GATEWAY, + detail="Band has no active storage configured", ) try: data = await _download_with_retry(storage, file_path) diff --git a/api/src/rehearsalhub/schemas/band.py b/api/src/rehearsalhub/schemas/band.py index 238db6f..9d0a83f 100755 --- a/api/src/rehearsalhub/schemas/band.py +++ b/api/src/rehearsalhub/schemas/band.py @@ -18,11 +18,11 @@ class BandCreate(BaseModel): name: str slug: str genre_tags: list[str] = [] - nc_base_path: str | None = None # e.g. "Bands/MyBand/" — defaults to "bands/{slug}/" class BandUpdate(BaseModel): - nc_folder_path: str | None = None # update the Nextcloud base folder for scans + name: str | None = None + genre_tags: list[str] | None = None class BandRead(BaseModel): @@ -31,7 +31,6 @@ class BandRead(BaseModel): name: str slug: str genre_tags: list[str] - nc_folder_path: str | None = None created_at: datetime updated_at: datetime diff --git a/api/src/rehearsalhub/schemas/member.py b/api/src/rehearsalhub/schemas/member.py index 9d89dd4..b2011f0 100755 --- a/api/src/rehearsalhub/schemas/member.py +++ b/api/src/rehearsalhub/schemas/member.py @@ -13,23 +13,9 @@ class MemberRead(MemberBase): model_config = ConfigDict(from_attributes=True) id: uuid.UUID avatar_url: str | None = None - nc_username: str | None = None - nc_url: str | None = None - nc_configured: bool = False # True if nc_url + nc_username + nc_password are all set created_at: datetime - @classmethod - def from_model(cls, m: object) -> "MemberRead": - obj = cls.model_validate(m) - obj.nc_configured = bool( - m.nc_url and m.nc_username and m.nc_password - ) - return obj - class MemberSettingsUpdate(BaseModel): display_name: str | None = None - nc_url: str | None = None - nc_username: str | None = None - nc_password: str | None = None # send null to clear, omit to leave unchanged - avatar_url: str | None = None # URL to user's avatar image + avatar_url: str | None = None diff --git a/api/src/rehearsalhub/schemas/storage.py b/api/src/rehearsalhub/schemas/storage.py new file mode 100644 index 0000000..130afc3 --- /dev/null +++ b/api/src/rehearsalhub/schemas/storage.py @@ -0,0 +1,56 @@ +"""Pydantic schemas for storage provider configuration endpoints.""" + +from __future__ import annotations + +import uuid +from datetime import datetime + +from pydantic import BaseModel, field_validator + + +# ── Request bodies ───────────────────────────────────────────────────────────── + + +class NextcloudConnect(BaseModel): + """Connect a band to a Nextcloud instance via an app password. + + Use an *app password* (generated in Nextcloud → Settings → Security), + not the account password. App passwords can be revoked without changing + the main account credentials. + """ + + url: str + username: str + app_password: str + label: str | None = None + root_path: str | None = None + + @field_validator("url") + @classmethod + def strip_trailing_slash(cls, v: str) -> str: + return v.rstrip("/") + + +# ── Response bodies ──────────────────────────────────────────────────────────── + + +class BandStorageRead(BaseModel): + """Public representation of a storage config — credentials are never exposed.""" + + id: uuid.UUID + band_id: uuid.UUID + provider: str + label: str | None + is_active: bool + root_path: str | None + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} + + +class OAuthAuthorizeResponse(BaseModel): + """Returned by the authorize endpoint — frontend should redirect the user here.""" + + redirect_url: str + provider: str diff --git a/api/src/rehearsalhub/security/__init__.py b/api/src/rehearsalhub/security/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/src/rehearsalhub/security/encryption.py b/api/src/rehearsalhub/security/encryption.py new file mode 100644 index 0000000..94f8a77 --- /dev/null +++ b/api/src/rehearsalhub/security/encryption.py @@ -0,0 +1,38 @@ +"""Fernet-based symmetric encryption for storage credentials. + +The encryption key must be a 32-byte URL-safe base64-encoded string, +generated once via: Fernet.generate_key().decode() +and stored in the STORAGE_ENCRYPTION_KEY environment variable. + +No credentials are ever stored in plaintext — only the encrypted blob +is written to the database. +""" + +from __future__ import annotations + +import json + +from cryptography.fernet import Fernet, InvalidToken + + +def encrypt_credentials(key: str, data: dict) -> str: + """Serialize *data* to JSON and encrypt it with Fernet. + + Returns a URL-safe base64-encoded ciphertext string safe to store in TEXT columns. + """ + f = Fernet(key.encode()) + plaintext = json.dumps(data, separators=(",", ":")).encode() + return f.encrypt(plaintext).decode() + + +def decrypt_credentials(key: str, blob: str) -> dict: + """Decrypt and deserialize a blob previously created by :func:`encrypt_credentials`. + + Raises ``cryptography.fernet.InvalidToken`` if the key is wrong or the blob is tampered. + """ + f = Fernet(key.encode()) + try: + plaintext = f.decrypt(blob.encode()) + except InvalidToken: + raise InvalidToken("Failed to decrypt storage credentials — wrong key or corrupted blob") + return json.loads(plaintext) diff --git a/api/src/rehearsalhub/services/band.py b/api/src/rehearsalhub/services/band.py index 77dae5d..a81bc5c 100755 --- a/api/src/rehearsalhub/services/band.py +++ b/api/src/rehearsalhub/services/band.py @@ -8,53 +8,45 @@ from sqlalchemy.ext.asyncio import AsyncSession from rehearsalhub.db.models import Band from rehearsalhub.repositories.band import BandRepository from rehearsalhub.schemas.band import BandCreate -from rehearsalhub.storage.nextcloud import NextcloudClient log = logging.getLogger(__name__) class BandService: - def __init__(self, session: AsyncSession, storage: NextcloudClient | None = None) -> None: + def __init__(self, session: AsyncSession) -> None: self._repo = BandRepository(session) - self._storage = storage + self._session = session async def create_band( self, data: BandCreate, creator_id: uuid.UUID, - creator: object | None = None, ) -> Band: if await self._repo.get_by_slug(data.slug): raise ValueError(f"Slug already taken: {data.slug}") - nc_folder = (data.nc_base_path or f"bands/{data.slug}/").strip("/") + "/" - storage = NextcloudClient.for_member(creator) if creator else self._storage - - if data.nc_base_path: - # User explicitly specified a folder — verify it actually exists in NC. - log.info("Checking NC folder existence: %s", nc_folder) - try: - await storage.get_file_metadata(nc_folder.rstrip("/")) - except Exception as exc: - log.warning("NC folder '%s' not accessible: %s", nc_folder, exc) - raise LookupError(f"Nextcloud folder '{nc_folder}' not found or not accessible") - else: - # Auto-generated path — create it (idempotent MKCOL). - log.info("Creating NC folder: %s", nc_folder) - try: - await storage.create_folder(nc_folder) - except Exception as exc: - # Not fatal — NC may be temporarily unreachable during dev/test. - log.warning("Could not create NC folder '%s': %s", nc_folder, exc) - band = await self._repo.create( name=data.name, slug=data.slug, genre_tags=data.genre_tags, - nc_folder_path=nc_folder, ) await self._repo.add_member(band.id, creator_id, role="admin") - log.info("Created band '%s' (slug=%s, nc_folder=%s)", data.name, data.slug, nc_folder) + log.info("Created band '%s' (slug=%s)", data.name, data.slug) + + # Storage is configured separately via POST /bands/{id}/storage/connect/*. + # If the band already has active storage, create the root folder now. + try: + from rehearsalhub.storage.factory import StorageFactory + from rehearsalhub.config import get_settings + storage = await StorageFactory.create(self._session, band.id, get_settings()) + root = f"bands/{data.slug}/" + await storage.create_folder(root.strip("/") + "/") + log.info("Created storage folder '%s' for band '%s'", root, data.slug) + except LookupError: + log.info("Band '%s' has no active storage yet — skipping folder creation", data.slug) + except Exception as exc: + log.warning("Could not create storage folder for band '%s': %s", data.slug, exc) + return band async def get_band_with_members(self, band_id: uuid.UUID) -> Band | None: diff --git a/api/src/rehearsalhub/services/nc_scan.py b/api/src/rehearsalhub/services/nc_scan.py index bbbc98a..961d189 100755 --- a/api/src/rehearsalhub/services/nc_scan.py +++ b/api/src/rehearsalhub/services/nc_scan.py @@ -1,21 +1,27 @@ -"""Core nc-scan logic shared by the blocking and streaming endpoints.""" +"""Storage scan logic: walk a band's storage folder and import audio files. + +Works against any ``StorageClient`` implementation — Nextcloud, Google Drive, etc. +``StorageClient.list_folder`` must return ``FileMetadata`` objects whose ``path`` +field is a *provider-relative* path (i.e. the DAV prefix has already been stripped +by the client implementation). +""" from __future__ import annotations import logging from collections.abc import AsyncGenerator from pathlib import Path -from urllib.parse import unquote from sqlalchemy.ext.asyncio import AsyncSession +from rehearsalhub.repositories.audio_version import AudioVersionRepository from rehearsalhub.repositories.rehearsal_session import RehearsalSessionRepository from rehearsalhub.repositories.song import SongRepository from rehearsalhub.schemas.audio_version import AudioVersionCreate from rehearsalhub.schemas.song import SongRead from rehearsalhub.services.session import extract_session_folder, parse_rehearsal_date from rehearsalhub.services.song import SongService -from rehearsalhub.storage.nextcloud import NextcloudClient +from rehearsalhub.storage.protocol import StorageClient log = logging.getLogger(__name__) @@ -26,72 +32,53 @@ AUDIO_EXTENSIONS = {".mp3", ".wav", ".flac", ".ogg", ".m4a", ".aac", ".opus"} MAX_SCAN_DEPTH = 3 -def _make_relative(dav_prefix: str): - """Return a function that strips the WebDAV prefix and URL-decodes a href.""" - def relative(href: str) -> str: - decoded = unquote(href) - if decoded.startswith(dav_prefix): - return decoded[len(dav_prefix):] - # Strip any leading slash for robustness - return decoded.lstrip("/") - return relative - - async def collect_audio_files( - nc: NextcloudClient, - relative: object, # Callable[[str], str] + storage: StorageClient, folder_path: str, max_depth: int = MAX_SCAN_DEPTH, _depth: int = 0, ) -> AsyncGenerator[str, None]: - """ - Recursively yield user-relative audio file paths under folder_path. + """Recursively yield provider-relative audio file paths under *folder_path*. - Handles any depth: - bands/slug/take.wav depth 0 - bands/slug/231015/take.wav depth 1 - bands/slug/231015/groove/take.wav depth 2 ← was broken before + ``storage.list_folder`` is expected to return ``FileMetadata`` with paths + already normalised to provider-relative form (no host, no DAV prefix). """ if _depth > max_depth: log.debug("Max depth %d exceeded at '%s', stopping recursion", max_depth, folder_path) return try: - items = await nc.list_folder(folder_path) + items = await storage.list_folder(folder_path) except Exception as exc: log.warning("Could not list folder '%s': %s", folder_path, exc) return - log.info( - "scan depth=%d folder='%s' entries=%d", - _depth, folder_path, len(items), - ) + log.info("scan depth=%d folder='%s' entries=%d", _depth, folder_path, len(items)) for item in items: - rel = relative(item.path) # type: ignore[operator] - if rel.endswith("/"): - # It's a subdirectory — recurse - log.info(" → subdir: %s", rel) - async for subpath in collect_audio_files(nc, relative, rel, max_depth, _depth + 1): + path = item.path.lstrip("/") + if path.endswith("/"): + log.info(" → subdir: %s", path) + async for subpath in collect_audio_files(storage, path, max_depth, _depth + 1): yield subpath else: - ext = Path(rel).suffix.lower() + ext = Path(path).suffix.lower() if ext in AUDIO_EXTENSIONS: - log.info(" → audio file: %s", rel) - yield rel + log.info(" → audio file: %s", path) + yield path elif ext: - log.debug(" → skip (ext=%s): %s", ext, rel) + log.debug(" → skip (ext=%s): %s", ext, path) async def scan_band_folder( db_session: AsyncSession, - nc: NextcloudClient, + storage: StorageClient, band_id, band_folder: str, member_id, ) -> AsyncGenerator[dict, None]: - """ - Async generator that scans band_folder and yields event dicts: + """Async generator that scans *band_folder* and yields event dicts: + {"type": "progress", "message": str} {"type": "song", "song": SongRead-dict, "is_new": bool} {"type": "session", "session": {id, date, label}} @@ -99,11 +86,9 @@ async def scan_band_folder( {"type": "done", "stats": {found, imported, skipped}} {"type": "error", "message": str} """ - dav_prefix = f"/remote.php/dav/files/{nc._auth[0]}/" - relative = _make_relative(dav_prefix) - session_repo = RehearsalSessionRepository(db_session) song_repo = SongRepository(db_session) + version_repo = AudioVersionRepository(db_session) song_svc = SongService(db_session) found = 0 @@ -112,23 +97,28 @@ async def scan_band_folder( yield {"type": "progress", "message": f"Scanning {band_folder}…"} - async for nc_file_path in collect_audio_files(nc, relative, band_folder): + async for nc_file_path in collect_audio_files(storage, band_folder): found += 1 song_folder = str(Path(nc_file_path).parent).rstrip("/") + "/" song_title = Path(nc_file_path).stem # If the file sits directly inside a dated session folder (YYMMDD/file.wav), - # give it a unique virtual folder so each file becomes its own song rather - # than being merged as a new version of the first file in that folder. + # give it a unique virtual folder so each file becomes its own song. session_folder_path = extract_session_folder(nc_file_path) if session_folder_path and session_folder_path.rstrip("/") == song_folder.rstrip("/"): song_folder = song_folder + song_title + "/" yield {"type": "progress", "message": f"Checking {Path(nc_file_path).name}…"} - # Fetch file metadata (etag + size) — one PROPFIND per file + existing = await version_repo.get_by_nc_file_path(nc_file_path) + if existing is not None: + log.debug("scan: skipping already-registered '%s' (version %s)", nc_file_path, existing.id) + skipped += 1 + yield {"type": "skipped", "path": nc_file_path, "reason": "already imported"} + continue + try: - meta = await nc.get_file_metadata(nc_file_path) + meta = await storage.get_file_metadata(nc_file_path) etag = meta.etag except Exception as exc: log.error("Metadata fetch failed for '%s': %s", nc_file_path, exc, exc_info=True) @@ -137,7 +127,6 @@ async def scan_band_folder( continue try: - # Resolve or create a RehearsalSession from a YYMMDD folder segment rehearsal_date = parse_rehearsal_date(nc_file_path) rehearsal_session_id = None if rehearsal_date: @@ -154,7 +143,6 @@ async def scan_band_folder( }, } - # Find or create the Song record song = await song_repo.get_by_nc_folder_path(song_folder) if song is None: song = await song_repo.get_by_title_and_band(band_id, song_title) @@ -173,7 +161,6 @@ async def scan_band_folder( elif rehearsal_session_id and song.session_id is None: song = await song_repo.update(song, session_id=rehearsal_session_id) - # Register the audio version version = await song_svc.register_version( song.id, AudioVersionCreate( @@ -187,7 +174,9 @@ async def scan_band_folder( log.info("Imported '%s' as version %s for song '%s'", nc_file_path, version.id, song.title) imported += 1 - read = SongRead.model_validate(song).model_copy(update={"version_count": 1, "session_id": rehearsal_session_id}) + read = SongRead.model_validate(song).model_copy( + update={"version_count": 1, "session_id": rehearsal_session_id} + ) yield {"type": "song", "song": read.model_dump(mode="json"), "is_new": is_new} except Exception as exc: diff --git a/api/src/rehearsalhub/services/song.py b/api/src/rehearsalhub/services/song.py index c3c9bfc..9441c9f 100755 --- a/api/src/rehearsalhub/services/song.py +++ b/api/src/rehearsalhub/services/song.py @@ -13,7 +13,6 @@ from rehearsalhub.repositories.audio_version import AudioVersionRepository from rehearsalhub.repositories.song import SongRepository from rehearsalhub.schemas.audio_version import AudioVersionCreate from rehearsalhub.schemas.song import SongCreate, SongRead -from rehearsalhub.storage.nextcloud import NextcloudClient class SongService: @@ -21,25 +20,31 @@ class SongService: self, session: AsyncSession, job_queue: RedisJobQueue | None = None, - storage: NextcloudClient | None = None, ) -> None: self._repo = SongRepository(session) self._version_repo = AudioVersionRepository(session) self._session = session self._queue = job_queue or RedisJobQueue(session) - self._storage = storage async def create_song( - self, band_id: uuid.UUID, data: SongCreate, creator_id: uuid.UUID, band_slug: str, - creator: object | None = None, + self, + band_id: uuid.UUID, + data: SongCreate, + creator_id: uuid.UUID, + band_slug: str, ) -> Song: - from rehearsalhub.storage.nextcloud import NextcloudClient nc_folder = f"bands/{band_slug}/songs/{data.title.lower().replace(' ', '-')}/" - storage = NextcloudClient.for_member(creator) if creator else self._storage + try: + from rehearsalhub.config import get_settings + from rehearsalhub.storage.factory import StorageFactory + storage = await StorageFactory.create(self._session, band_id, get_settings()) await storage.create_folder(nc_folder) + except LookupError: + log.info("Band %s has no active storage — skipping folder creation for '%s'", band_id, nc_folder) + nc_folder = None # type: ignore[assignment] except Exception: - nc_folder = None # best-effort + nc_folder = None # best-effort; storage may be temporarily unreachable song = await self._repo.create( band_id=band_id, diff --git a/api/src/rehearsalhub/storage/factory.py b/api/src/rehearsalhub/storage/factory.py new file mode 100644 index 0000000..a3ddcc5 --- /dev/null +++ b/api/src/rehearsalhub/storage/factory.py @@ -0,0 +1,175 @@ +"""StorageFactory — creates the correct StorageClient from a BandStorage record. + +Usage: + storage = await StorageFactory.create(session, band_id, settings) + await storage.list_folder("bands/my-band/") + +Token refresh for OAuth2 providers is handled transparently: if the stored +access token is expired the factory refreshes it and persists the new tokens +before returning the client. +""" + +from __future__ import annotations + +import logging +import uuid +from datetime import datetime, timezone + +import httpx +from sqlalchemy.ext.asyncio import AsyncSession + +from rehearsalhub.config import Settings, get_settings +from rehearsalhub.db.models import BandStorage +from rehearsalhub.repositories.band_storage import BandStorageRepository +from rehearsalhub.security.encryption import decrypt_credentials, encrypt_credentials +from rehearsalhub.storage.nextcloud import NextcloudClient +from rehearsalhub.storage.protocol import StorageClient + +log = logging.getLogger(__name__) + + +class StorageFactory: + @staticmethod + async def create( + session: AsyncSession, + band_id: uuid.UUID, + settings: Settings | None = None, + ) -> StorageClient: + """Return a ready-to-use ``StorageClient`` for *band_id*. + + Raises ``LookupError`` if the band has no active storage configured. + """ + if settings is None: + settings = get_settings() + + repo = BandStorageRepository(session) + band_storage = await repo.get_active_for_band(band_id) + if band_storage is None: + raise LookupError(f"Band {band_id} has no active storage configured") + + return await StorageFactory._build(session, band_storage, settings) + + @staticmethod + async def _build( + session: AsyncSession, + band_storage: BandStorage, + settings: Settings, + ) -> StorageClient: + creds = decrypt_credentials(settings.storage_encryption_key, band_storage.credentials) + creds = await _maybe_refresh_token(session, band_storage, creds, settings) + + match band_storage.provider: + case "nextcloud": + return NextcloudClient( + base_url=creds["url"], + username=creds["username"], + password=creds["app_password"], + ) + case "googledrive": + raise NotImplementedError("Google Drive storage client not yet implemented") + case "onedrive": + raise NotImplementedError("OneDrive storage client not yet implemented") + case "dropbox": + raise NotImplementedError("Dropbox storage client not yet implemented") + case _: + raise ValueError(f"Unknown storage provider: {band_storage.provider!r}") + + +# ── OAuth2 token refresh ─────────────────────────────────────────────────────── + +_TOKEN_ENDPOINTS: dict[str, str] = { + "googledrive": "https://oauth2.googleapis.com/token", + "dropbox": "https://api.dropbox.com/oauth2/token", + # OneDrive token endpoint is tenant-specific; handled separately. +} + + +async def _maybe_refresh_token( + session: AsyncSession, + band_storage: BandStorage, + creds: dict, + settings: Settings, +) -> dict: + """If the OAuth2 access token is expired, refresh it and persist the update.""" + if band_storage.provider == "nextcloud": + return creds # Nextcloud uses app passwords — no expiry + + expiry_str = creds.get("token_expiry") + if not expiry_str: + return creds # No expiry recorded — assume still valid + + expiry = datetime.fromisoformat(expiry_str) + if expiry.tzinfo is None: + expiry = expiry.replace(tzinfo=timezone.utc) + + if datetime.now(timezone.utc) < expiry: + return creds # Still valid + + log.info( + "Access token for band_storage %s (%s) expired — refreshing", + band_storage.id, + band_storage.provider, + ) + + try: + creds = await _do_refresh(band_storage, creds, settings) + # Persist refreshed tokens + from rehearsalhub.config import get_settings as _gs + _settings = settings or _gs() + band_storage.credentials = encrypt_credentials(_settings.storage_encryption_key, creds) + await session.flush() + except Exception: + log.exception("Token refresh failed for band_storage %s", band_storage.id) + raise + + return creds + + +async def _do_refresh(band_storage: BandStorage, creds: dict, settings: Settings) -> dict: + """Call the provider's token endpoint and return updated credentials.""" + from datetime import timedelta + + provider = band_storage.provider + + if provider == "onedrive": + tenant = settings.onedrive_tenant_id + token_url = f"https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token" + client_id = settings.onedrive_client_id + client_secret = settings.onedrive_client_secret + extra: dict = {"scope": "https://graph.microsoft.com/Files.ReadWrite offline_access"} + elif provider == "googledrive": + token_url = _TOKEN_ENDPOINTS["googledrive"] + client_id = settings.google_client_id + client_secret = settings.google_client_secret + extra = {} + elif provider == "dropbox": + token_url = _TOKEN_ENDPOINTS["dropbox"] + client_id = settings.dropbox_app_key + client_secret = settings.dropbox_app_secret + extra = {} + else: + raise ValueError(f"Token refresh not supported for provider: {provider!r}") + + payload = { + "grant_type": "refresh_token", + "refresh_token": creds["refresh_token"], + "client_id": client_id, + "client_secret": client_secret, + **extra, + } + + async with httpx.AsyncClient(timeout=15.0) as http: + resp = await http.post(token_url, data=payload) + resp.raise_for_status() + data = resp.json() + + expires_in = int(data.get("expires_in", 3600)) + expiry = datetime.now(timezone.utc) + timedelta(seconds=expires_in - 60) # 60s buffer + + return { + **creds, + "access_token": data["access_token"], + "refresh_token": data.get("refresh_token", creds["refresh_token"]), + "token_expiry": expiry.isoformat(), + "token_type": data.get("token_type", "Bearer"), + } diff --git a/api/src/rehearsalhub/storage/nextcloud.py b/api/src/rehearsalhub/storage/nextcloud.py index 3e8b2fc..654a393 100755 --- a/api/src/rehearsalhub/storage/nextcloud.py +++ b/api/src/rehearsalhub/storage/nextcloud.py @@ -5,6 +5,7 @@ from __future__ import annotations import logging import xml.etree.ElementTree as ET from typing import Any +from urllib.parse import unquote import httpx @@ -25,19 +26,11 @@ class NextcloudClient: if not base_url or not username: raise ValueError("Nextcloud credentials must be provided explicitly") self._base = base_url.rstrip("/") + self._username = username self._auth = (username, password) - self._dav_root = f"{self._base}/remote.php/dav/files/{self._auth[0]}" - - @classmethod - def for_member(cls, member: object) -> NextcloudClient | None: - """Return a client using member's personal NC credentials if configured. - Returns None if member has no Nextcloud configuration.""" - nc_url = getattr(member, "nc_url", None) - nc_username = getattr(member, "nc_username", None) - nc_password = getattr(member, "nc_password", None) - if nc_url and nc_username and nc_password: - return cls(base_url=nc_url, username=nc_username, password=nc_password) - return None + self._dav_root = f"{self._base}/remote.php/dav/files/{username}" + # Prefix stripped from WebDAV hrefs to produce relative paths + self._dav_prefix = f"/remote.php/dav/files/{username}/" def _client(self) -> httpx.AsyncClient: return httpx.AsyncClient(auth=self._auth, timeout=30.0) @@ -83,7 +76,17 @@ class NextcloudClient: content=body, ) resp.raise_for_status() - return _parse_propfind_multi(resp.text) + items = _parse_propfind_multi(resp.text) + # Normalise WebDAV absolute hrefs to provider-relative paths so callers + # never need to know about DAV internals. URL-decode to handle + # filenames that contain spaces or non-ASCII characters. + for item in items: + decoded = unquote(item.path) + if decoded.startswith(self._dav_prefix): + item.path = decoded[len(self._dav_prefix):] + else: + item.path = decoded.lstrip("/") + return items async def download(self, path: str) -> bytes: logger.debug("Downloading file from Nextcloud: %s", path) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 1d4c78e..6721e5a 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -7,6 +7,8 @@ services: POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-default_secure_password} volumes: - pg_data_dev:/var/lib/postgresql/data + ports: + - "5432:5432" networks: - rh_net healthcheck: @@ -20,6 +22,11 @@ services: image: redis:7-alpine networks: - rh_net + healthcheck: + test: ["CMD-SHELL", "redis-cli ping || exit 1"] + interval: 5s + timeout: 3s + retries: 10 api: build: @@ -34,6 +41,7 @@ services: REDIS_URL: redis://redis:6379/0 SECRET_KEY: ${SECRET_KEY:-replace_me_with_32_byte_hex_default} INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default} + STORAGE_ENCRYPTION_KEY: ${STORAGE_ENCRYPTION_KEY:-5vaaZQs4J7CFYZ7fqee37HgIt4xNxKHHX6OWd29Yh5E=} DOMAIN: localhost ports: - "8000:8000" @@ -43,6 +51,29 @@ services: db: condition: service_healthy + audio-worker: + build: + context: ./worker + target: development + environment: + DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-rh_user}:${POSTGRES_PASSWORD:-default_secure_password}@db:5432/${POSTGRES_DB:-rehearsalhub} + REDIS_URL: redis://redis:6379/0 + API_URL: http://api:8000 + INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default} + ANALYSIS_VERSION: "1.0.0" + LOG_LEVEL: DEBUG + PYTHONUNBUFFERED: "1" + volumes: + - ./worker/src:/app/src:z + - audio_tmp:/tmp/audio + networks: + - rh_net + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + web: build: context: ./web @@ -62,3 +93,4 @@ networks: volumes: pg_data_dev: + audio_tmp: diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 2bc3fb7..a12e95e 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -47,6 +47,7 @@ services: REDIS_URL: redis://redis:6379/0 SECRET_KEY: ${SECRET_KEY:-replace_me_with_32_byte_hex_default} INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default} + STORAGE_ENCRYPTION_KEY: ${STORAGE_ENCRYPTION_KEY} DOMAIN: ${DOMAIN:-localhost} networks: - rh_net @@ -72,9 +73,8 @@ services: environment: DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-rh_user}:${POSTGRES_PASSWORD:-default_secure_password}@db:5432/${POSTGRES_DB:-rehearsalhub} REDIS_URL: redis://redis:6379/0 - NEXTCLOUD_URL: ${NEXTCLOUD_URL:-https://cloud.example.com} - NEXTCLOUD_USER: ${NEXTCLOUD_USER:-rh_service} - NEXTCLOUD_PASS: ${NEXTCLOUD_PASS:-default_password} + API_URL: http://api:8000 + INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default} ANALYSIS_VERSION: "1.0.0" volumes: - audio_tmp:/tmp/audio @@ -88,6 +88,8 @@ services: api: condition: service_started restart: unless-stopped + deploy: + replicas: ${WORKER_REPLICAS:-2} nc-watcher: image: git.sschuhmann.de/sschuhmann/rehearsalhub/watcher:0.1.0 diff --git a/docker-compose.yml b/docker-compose.yml index d95d5b4..5b3ada6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -50,6 +50,7 @@ services: REDIS_URL: redis://redis:6379/0 SECRET_KEY: ${SECRET_KEY:-replace_me_with_32_byte_hex_default} INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default} + STORAGE_ENCRYPTION_KEY: ${STORAGE_ENCRYPTION_KEY:-5vaaZQs4J7CFYZ7fqee37HgIt4xNxKHHX6OWd29Yh5E=} DOMAIN: ${DOMAIN:-localhost} networks: - rh_net @@ -78,9 +79,8 @@ services: environment: DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-rh_user}:${POSTGRES_PASSWORD:-default_secure_password}@db:5432/${POSTGRES_DB:-rehearsalhub} REDIS_URL: redis://redis:6379/0 - NEXTCLOUD_URL: ${NEXTCLOUD_URL:-https://cloud.example.com} - NEXTCLOUD_USER: ${NEXTCLOUD_USER:-rh_service} - NEXTCLOUD_PASS: ${NEXTCLOUD_PASS:-default_password} + API_URL: http://api:8000 + INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default} ANALYSIS_VERSION: "1.0.0" volumes: - audio_tmp:/tmp/audio @@ -94,6 +94,8 @@ services: api: condition: service_started restart: unless-stopped + deploy: + replicas: ${WORKER_REPLICAS:-2} nc-watcher: build: diff --git a/watcher/src/watcher/config.py b/watcher/src/watcher/config.py index 20b004b..d9bc7bb 100644 --- a/watcher/src/watcher/config.py +++ b/watcher/src/watcher/config.py @@ -5,11 +5,10 @@ from pydantic_settings import BaseSettings, SettingsConfigDict class WatcherSettings(BaseSettings): model_config = SettingsConfigDict(env_file=".env", extra="ignore") - nextcloud_url: str = "http://nextcloud" - nextcloud_user: str = "ncadmin" - nextcloud_pass: str = "" - api_url: str = "http://api:8000" + # Shared secret for calling internal API endpoints + internal_secret: str = "dev-change-me-in-production" + redis_url: str = "redis://localhost:6379/0" job_queue_key: str = "rh:jobs" @@ -18,6 +17,10 @@ class WatcherSettings(BaseSettings): # File extensions to watch audio_extensions: list[str] = [".wav", ".mp3", ".flac", ".aac", ".ogg", ".m4a", ".opus"] + # How often (in poll cycles) to refresh the list of bands from the API. + # 0 = only on startup, N = every N poll cycles. + config_refresh_interval: int = 10 + @lru_cache def get_settings() -> WatcherSettings: diff --git a/watcher/src/watcher/event_loop.py b/watcher/src/watcher/event_loop.py index 8415a08..dffc502 100644 --- a/watcher/src/watcher/event_loop.py +++ b/watcher/src/watcher/event_loop.py @@ -1,149 +1,93 @@ -"""Event loop: poll Nextcloud activity, detect audio uploads, push to API.""" +"""Event loop: fetch per-band storage configs from the API, detect audio uploads.""" from __future__ import annotations import logging -from pathlib import Path -from typing import Any import httpx from watcher.config import WatcherSettings -from watcher.nc_client import NextcloudWatcherClient +from watcher.nc_watcher import NextcloudWatcher +from watcher.protocol import FileEvent, WatcherClient log = logging.getLogger("watcher.event_loop") -# Persist last seen activity ID in-process (good enough for a POC) -_last_activity_id: int = 0 -# Nextcloud Activity API v2 filter sets. -# -# NC 22+ returns: type="file_created"|"file_changed" (subject is human-readable) -# NC <22 returns: type="files" (subject is a machine key like "created_self") -# -# We accept either style so the watcher works across NC versions. -_UPLOAD_TYPES = {"file_created", "file_changed"} - -_UPLOAD_SUBJECTS = { - "created_by", - "changed_by", - "created_public", - "created_self", - "changed_self", -} - - -def is_audio_file(path: str, extensions: list[str]) -> bool: - return Path(path).suffix.lower() in extensions - - -def normalize_nc_path(raw_path: str, username: str) -> str: - """ - Strip the Nextcloud WebDAV/activity path prefix so we get a plain - user-relative path. - - Activity objects can look like: - /username/files/bands/slug/... - /remote.php/dav/files/username/bands/slug/... - bands/slug/... (already relative) - """ - path = raw_path.strip("/") - - # /remote.php/dav/files//... - dav_prefix = f"remote.php/dav/files/{username}/" - if path.startswith(dav_prefix): - return path[len(dav_prefix):] - - # //files/... (activity app format) - user_files_prefix = f"{username}/files/" - if path.startswith(user_files_prefix): - return path[len(user_files_prefix):] - - # files/... - if path.startswith("files/"): - return path[len("files/"):] - - return path - - - -def extract_nc_file_path(activity: dict[str, Any]) -> str | None: - """Extract the server-relative file path from an activity event.""" - objects = activity.get("objects", {}) - if isinstance(objects, dict): - for _file_id, file_path in objects.items(): - if isinstance(file_path, str): - return file_path - # Fallback: older NC versions put it in object_name - return activity.get("object_name") or None - - -async def register_version_with_api(nc_file_path: str, nc_file_etag: str | None, api_url: str) -> bool: +async def fetch_nextcloud_configs(settings: WatcherSettings) -> list[dict]: + """Fetch active Nextcloud configs for all bands from the internal API.""" + url = f"{settings.api_url}/api/v1/internal/storage/nextcloud-watch-configs" + headers = {"X-Internal-Token": settings.internal_secret} try: - payload = {"nc_file_path": nc_file_path, "nc_file_etag": nc_file_etag} async with httpx.AsyncClient(timeout=15.0) as c: - resp = await c.post(f"{api_url}/api/v1/internal/nc-upload", json=payload) + resp = await c.get(url, headers=headers) + resp.raise_for_status() + return resp.json() + except Exception as exc: + log.warning("Failed to fetch NC configs from API: %s", exc) + return [] + + +def build_nc_watchers( + configs: list[dict], + settings: WatcherSettings, +) -> dict[str, NextcloudWatcher]: + """Build one NextcloudWatcher per band from the API config payload.""" + watchers: dict[str, NextcloudWatcher] = {} + for cfg in configs: + band_id = cfg["band_id"] + try: + watchers[band_id] = NextcloudWatcher( + band_id=band_id, + nc_url=cfg["nc_url"], + nc_username=cfg["nc_username"], + nc_app_password=cfg["nc_app_password"], + audio_extensions=settings.audio_extensions, + ) + except Exception as exc: + log.error("Failed to create watcher for band %s: %s", band_id, exc) + return watchers + + +async def register_event_with_api(event: FileEvent, settings: WatcherSettings) -> bool: + """Forward a FileEvent to the API's internal nc-upload endpoint.""" + payload = {"nc_file_path": event.file_path, "nc_file_etag": event.etag} + headers = {"X-Internal-Token": settings.internal_secret} + try: + async with httpx.AsyncClient(timeout=15.0) as c: + resp = await c.post( + f"{settings.api_url}/api/v1/internal/nc-upload", + json=payload, + headers=headers, + ) if resp.status_code in (200, 201): - log.info("Registered version via internal API: %s", nc_file_path) + log.info("Registered event via internal API: %s", event.file_path) return True log.warning( "Internal API returned %d for %s: %s", - resp.status_code, nc_file_path, resp.text[:200], + resp.status_code, event.file_path, resp.text[:200], ) return False except Exception as exc: - log.warning("Failed to register version with API for %s: %s", nc_file_path, exc) + log.warning("Failed to register event with API for %s: %s", event.file_path, exc) return False -async def poll_once(nc_client: NextcloudWatcherClient, settings: WatcherSettings) -> None: - global _last_activity_id - - activities = await nc_client.get_activities(since_id=_last_activity_id) - if not activities: - log.info("No new activities since id=%d", _last_activity_id) - return - - log.info("Received %d activities (since id=%d)", len(activities), _last_activity_id) - - for activity in activities: - activity_id = int(activity.get("activity_id", 0)) - activity_type = activity.get("type", "") - subject = activity.get("subject", "") - raw_path = extract_nc_file_path(activity) - - # Advance the cursor regardless of whether we act on this event - _last_activity_id = max(_last_activity_id, activity_id) - - log.info( - "Activity id=%d type=%r subject=%r raw_path=%r", - activity_id, activity_type, subject, raw_path, - ) - - if raw_path is None: - log.info(" → skip: no file path in activity payload") - continue - - nc_path = normalize_nc_path(raw_path, nc_client.username) - log.info(" → normalized path: %r", nc_path) - - # Only care about audio files — skip everything else immediately - if not is_audio_file(nc_path, settings.audio_extensions): - log.info( - " → skip: not an audio file (ext=%s)", - Path(nc_path).suffix.lower() or "", - ) - continue - - if activity_type not in _UPLOAD_TYPES and subject not in _UPLOAD_SUBJECTS: - log.info( - " → skip: type=%r subject=%r is not a file upload event", - activity_type, subject, - ) - continue - - log.info(" → MATCH — registering audio upload: %s", nc_path) - etag = await nc_client.get_file_etag(nc_path) - success = await register_version_with_api(nc_path, etag, settings.api_url) - if not success: - log.warning(" → FAILED to register upload for activity %d (%s)", activity_id, nc_path) +async def poll_all_once( + watchers: dict[str, WatcherClient], + cursors: dict[str, str | None], + settings: WatcherSettings, +) -> None: + """Poll every watcher once and forward new events to the API.""" + for band_id, watcher in watchers.items(): + cursor = cursors.get(band_id) + try: + events, new_cursor = await watcher.poll_changes(cursor) + cursors[band_id] = new_cursor + if not events: + log.debug("Band %s: no new events (cursor=%s)", band_id, new_cursor) + continue + log.info("Band %s: %d new event(s)", band_id, len(events)) + for event in events: + await register_event_with_api(event, settings) + except Exception as exc: + log.exception("Poll error for band %s: %s", band_id, exc) diff --git a/watcher/src/watcher/main.py b/watcher/src/watcher/main.py index 3c28207..085eb33 100644 --- a/watcher/src/watcher/main.py +++ b/watcher/src/watcher/main.py @@ -6,11 +6,13 @@ import asyncio import logging from watcher.config import get_settings -from watcher.event_loop import poll_once -from watcher.nc_client import NextcloudWatcherClient +from watcher.event_loop import ( + build_nc_watchers, + fetch_nextcloud_configs, + poll_all_once, +) logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(name)s %(message)s") -# Quiet httpx's per-request noise at DEBUG; keep our own loggers verbose logging.getLogger("httpx").setLevel(logging.INFO) logging.getLogger("httpcore").setLevel(logging.WARNING) log = logging.getLogger("watcher") @@ -18,22 +20,39 @@ log = logging.getLogger("watcher") async def main() -> None: settings = get_settings() - nc = NextcloudWatcherClient( - base_url=settings.nextcloud_url, - username=settings.nextcloud_user, - password=settings.nextcloud_pass, - ) + log.info("Starting watcher (poll_interval=%ds)", settings.poll_interval) - log.info("Waiting for Nextcloud to become available...") - while not await nc.is_healthy(): - await asyncio.sleep(10) - log.info("Nextcloud is ready. Starting poll loop (interval=%ds)", settings.poll_interval) + # Per-band WatcherClient instances; keyed by band_id string + watchers: dict = {} + # Per-band opaque cursors (last seen activity ID, page token, etc.) + cursors: dict[str, str | None] = {} + poll_cycle = 0 while True: - try: - await poll_once(nc, settings) - except Exception as exc: - log.exception("Poll error: %s", exc) + # Refresh the list of bands (and their storage configs) periodically. + refresh = ( + poll_cycle == 0 + or (settings.config_refresh_interval > 0 and poll_cycle % settings.config_refresh_interval == 0) + ) + if refresh: + log.info("Refreshing storage configs from API…") + configs = await fetch_nextcloud_configs(settings) + if configs: + watchers = build_nc_watchers(configs, settings) + # Preserve cursors for bands that were already being watched + for band_id in watchers: + cursors.setdefault(band_id, None) + log.info("Watching %d Nextcloud band(s): %s", len(watchers), list(watchers)) + else: + log.warning("No Nextcloud storage configs received — no bands to watch") + + if watchers: + try: + await poll_all_once(watchers, cursors, settings) + except Exception as exc: + log.exception("Unexpected error in poll loop: %s", exc) + + poll_cycle += 1 await asyncio.sleep(settings.poll_interval) diff --git a/watcher/src/watcher/nc_watcher.py b/watcher/src/watcher/nc_watcher.py new file mode 100644 index 0000000..a4ea1e0 --- /dev/null +++ b/watcher/src/watcher/nc_watcher.py @@ -0,0 +1,116 @@ +"""Nextcloud WatcherClient implementation. + +Polls the Nextcloud Activity API to detect new / modified audio files. +The cursor is the last seen ``activity_id`` (stored as a string for +protocol compatibility). +""" + +from __future__ import annotations + +import logging +from pathlib import Path + +from watcher.nc_client import NextcloudWatcherClient +from watcher.protocol import FileEvent + +log = logging.getLogger("watcher.nc_watcher") + +_UPLOAD_TYPES = {"file_created", "file_changed"} +_UPLOAD_SUBJECTS = { + "created_by", + "changed_by", + "created_public", + "created_self", + "changed_self", +} + + +class NextcloudWatcher: + """WatcherClient implementation backed by the Nextcloud Activity API.""" + + def __init__( + self, + band_id: str, + nc_url: str, + nc_username: str, + nc_app_password: str, + audio_extensions: list[str], + ) -> None: + self.band_id = band_id + self._audio_extensions = audio_extensions + self._nc = NextcloudWatcherClient( + base_url=nc_url, + username=nc_username, + password=nc_app_password, + ) + + async def poll_changes(self, cursor: str | None) -> tuple[list[FileEvent], str]: + since_id = int(cursor) if cursor else 0 + activities = await self._nc.get_activities(since_id=since_id) + + events: list[FileEvent] = [] + new_cursor = cursor or "0" + + for activity in activities: + activity_id = int(activity.get("activity_id", 0)) + new_cursor = str(max(int(new_cursor), activity_id)) + + activity_type = activity.get("type", "") + subject = activity.get("subject", "") + raw_path = _extract_file_path(activity) + + if raw_path is None: + continue + + nc_path = _normalize_path(raw_path, self._nc.username) + log.debug("Activity %d type=%r path=%r", activity_id, activity_type, nc_path) + + if not _is_audio(nc_path, self._audio_extensions): + continue + + if activity_type not in _UPLOAD_TYPES and subject not in _UPLOAD_SUBJECTS: + continue + + etag = await self._nc.get_file_etag(nc_path) + events.append( + FileEvent( + band_id=self.band_id, + file_path=nc_path, + event_type="created" if "created" in activity_type else "modified", + etag=etag, + ) + ) + + return events, new_cursor + + async def is_healthy(self) -> bool: + return await self._nc.is_healthy() + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + + +def _extract_file_path(activity: dict) -> str | None: + objects = activity.get("objects", {}) + if isinstance(objects, dict): + for _, file_path in objects.items(): + if isinstance(file_path, str): + return file_path + return activity.get("object_name") or None + + +def _normalize_path(raw_path: str, username: str) -> str: + path = raw_path.strip("/") + dav_prefix = f"remote.php/dav/files/{username}/" + if path.startswith(dav_prefix): + return path[len(dav_prefix):] + user_files_prefix = f"{username}/files/" + if path.startswith(user_files_prefix): + return path[len(user_files_prefix):] + if path.startswith("files/"): + return path[len("files/"):] + return path + + +def _is_audio(path: str, extensions: list[str]) -> bool: + return Path(path).suffix.lower() in extensions diff --git a/watcher/src/watcher/protocol.py b/watcher/src/watcher/protocol.py new file mode 100644 index 0000000..93c8f51 --- /dev/null +++ b/watcher/src/watcher/protocol.py @@ -0,0 +1,42 @@ +"""WatcherClient protocol — abstracts provider-specific change-detection APIs. + +Each storage provider implements its own change detection: + Nextcloud → Activity API (polling) + Google Drive → Changes API or webhook push + OneDrive → Microsoft Graph subscriptions + Dropbox → Long-poll or webhooks + +All implementations must satisfy this protocol so the event loop can treat +them uniformly. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Protocol + + +@dataclass +class FileEvent: + """A file-change event emitted by a WatcherClient.""" + band_id: str + file_path: str # Provider-relative path (no host, no DAV prefix) + event_type: str # 'created' | 'modified' | 'deleted' + etag: str | None = None + + +class WatcherClient(Protocol): + band_id: str + + async def poll_changes(self, cursor: str | None) -> tuple[list[FileEvent], str]: + """Return (events, new_cursor) since the given cursor. + + ``cursor`` is an opaque string whose meaning is implementation-defined + (e.g., an activity ID for Nextcloud, a page token for Google Drive). + Pass ``None`` to start from the current position (i.e. only new events). + """ + ... + + async def is_healthy(self) -> bool: + """Return True if the storage backend is reachable.""" + ... diff --git a/web/src/api/bands.ts b/web/src/api/bands.ts index e298172..9091bef 100755 --- a/web/src/api/bands.ts +++ b/web/src/api/bands.ts @@ -5,7 +5,6 @@ export interface Band { name: string; slug: string; genre_tags: string[]; - nc_folder_path: string | null; created_at: string; updated_at: string; memberships?: BandMembership[]; @@ -18,6 +17,25 @@ export interface BandMembership { joined_at: string; } +export interface BandStorage { + id: string; + band_id: string; + provider: string; + label: string | null; + is_active: boolean; + root_path: string | null; + created_at: string; + updated_at: string; +} + +export interface NextcloudConnectData { + url: string; + username: string; + app_password: string; + label?: string; + root_path?: string; +} + export const listBands = () => api.get("/bands"); export const getBand = (bandId: string) => api.get(`/bands/${bandId}`); @@ -25,5 +43,13 @@ export const createBand = (data: { name: string; slug: string; genre_tags?: string[]; - nc_base_path?: string; }) => api.post("/bands", data); + +export const listStorage = (bandId: string) => + api.get(`/bands/${bandId}/storage`); + +export const connectNextcloud = (bandId: string, data: NextcloudConnectData) => + api.post(`/bands/${bandId}/storage/connect/nextcloud`, data); + +export const disconnectStorage = (bandId: string) => + api.delete(`/bands/${bandId}/storage`); diff --git a/web/src/components/TopBandBar.tsx b/web/src/components/TopBandBar.tsx index b64edd3..a51353e 100644 --- a/web/src/components/TopBandBar.tsx +++ b/web/src/components/TopBandBar.tsx @@ -4,7 +4,6 @@ import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; import { listBands, createBand } from "../api/bands"; import { getInitials } from "../utils"; import { useBandStore } from "../stores/bandStore"; -import { api } from "../api/client"; // ── Shared primitives ────────────────────────────────────────────────────────── @@ -30,27 +29,6 @@ const labelStyle: React.CSSProperties = { marginBottom: 5, }; -// ── Step indicator ───────────────────────────────────────────────────────────── - -function StepDots({ current, total }: { current: number; total: number }) { - return ( -
- {Array.from({ length: total }, (_, i) => ( -
- ))} -
- ); -} - // ── Error banner ─────────────────────────────────────────────────────────────── function ErrorBanner({ msg }: { msg: string }) { @@ -61,117 +39,20 @@ function ErrorBanner({ msg }: { msg: string }) { ); } -// ── Step 1: Storage setup ────────────────────────────────────────────────────── +// ── Band creation form ───────────────────────────────────────────────────────── -interface Me { nc_configured: boolean; nc_url: string | null; nc_username: string | null; } - -function StorageStep({ me, onNext }: { me: Me; onNext: () => void }) { - const qc = useQueryClient(); - const [ncUrl, setNcUrl] = useState(me.nc_url ?? ""); - const [ncUsername, setNcUsername] = useState(me.nc_username ?? ""); - const [ncPassword, setNcPassword] = useState(""); - const [error, setError] = useState(null); - const urlRef = useRef(null); - - useEffect(() => { urlRef.current?.focus(); }, []); - - const saveMutation = useMutation({ - mutationFn: () => - api.patch("/auth/me/settings", { - nc_url: ncUrl.trim() || null, - nc_username: ncUsername.trim() || null, - nc_password: ncPassword || null, - }), - onSuccess: () => { - qc.invalidateQueries({ queryKey: ["me"] }); - onNext(); - }, - onError: (err) => setError(err instanceof Error ? err.message : "Failed to save"), - }); - - const canSave = ncUrl.trim() && ncUsername.trim() && ncPassword; - - return ( - <> -
- - setNcUrl(e.target.value)} - style={inputStyle} - placeholder="https://cloud.example.com" - type="url" - /> -
- -
- - setNcUsername(e.target.value)} - style={inputStyle} - placeholder="your-nc-username" - autoComplete="username" - /> -
- -
- - setNcPassword(e.target.value)} - style={inputStyle} - type="password" - placeholder="Generate one in Nextcloud → Settings → Security" - autoComplete="current-password" - /> -
-

- Use an app password, not your account password. -

- - {error && } - -
- - -
- - ); -} - -// ── Step 2: Band details ─────────────────────────────────────────────────────── - -function BandStep({ ncConfigured, onClose }: { ncConfigured: boolean; onClose: () => void }) { +function BandStep({ onClose }: { onClose: () => void }) { const navigate = useNavigate(); const qc = useQueryClient(); const [name, setName] = useState(""); const [slug, setSlug] = useState(""); - const [ncFolder, setNcFolder] = useState(""); const [error, setError] = useState(null); const nameRef = useRef(null); useEffect(() => { nameRef.current?.focus(); }, []); const mutation = useMutation({ - mutationFn: () => - createBand({ - name, - slug, - ...(ncFolder.trim() ? { nc_base_path: ncFolder.trim() } : {}), - }), + mutationFn: () => createBand({ name, slug }), onSuccess: (band) => { qc.invalidateQueries({ queryKey: ["bands"] }); onClose(); @@ -187,12 +68,6 @@ function BandStep({ ncConfigured, onClose }: { ncConfigured: boolean; onClose: ( return ( <> - {!ncConfigured && ( -
- Storage not configured — recordings won't be scanned. You can set it up later in Settings → Storage. -
- )} - {error && }
@@ -207,7 +82,7 @@ function BandStep({ ncConfigured, onClose }: { ncConfigured: boolean; onClose: ( />
-
+
-
- - setNcFolder(e.target.value)} - style={{ ...inputStyle, fontFamily: "monospace" }} - placeholder={slug ? `bands/${slug}/` : "bands/my-band/"} - disabled={!ncConfigured} - /> -

- {ncConfigured - ? <>Leave blank to auto-create bands/{slug || "slug"}/. - : "Connect storage first to set a folder."} -

-
+

+ Connect storage after creating the band via Settings → Storage. +

+ )} + + ) : ( +
+
+ No storage connected
+ )} +
-
+ {/* Connect form — admin only, shown when no active storage or toggled */} + {amAdmin && (!activeStorage || showConnect) && ( + <> + {activeStorage && } +
+ {activeStorage ? "Replace connection" : "Connect Nextcloud"} +
+
setNcUrl(e.target.value)} placeholder="https://cloud.example.com" /> @@ -376,69 +402,58 @@ function StorageSection({ bandId, band, amAdmin, me }: { bandId: string; band: B setNcUsername(e.target.value)} />
- - setNcPassword(e.target.value)} placeholder={me.nc_configured ? "•••••••• (leave blank to keep)" : ""} /> + + setNcPassword(e.target.value)} placeholder="Generate in Nextcloud → Settings → Security" /> +
+
+ + setNcRootPath(e.target.value)} placeholder={`bands/${band.slug}/`} style={{ fontFamily: "monospace" }} />
- Use an app password from Nextcloud → Settings → Security. + Leave blank to auto-create bands/{band.slug}/
- - {ncError &&

{ncError}

} -
- ncMutation.mutate()} /> -
-
-
- - {/* Scan folder — admin only */} - {amAdmin && ( - <> - -
Scan Folder
-
- RehearsalHub reads recordings from your Nextcloud — files are never copied to our servers. -
- -
-
-
- - {currentPath} -
- {!editingPath && ( - - )} -
- - {editingPath && ( -
- setFolderInput(e.target.value)} placeholder={defaultPath} style={{ fontFamily: "monospace" }} /> -
- - -
-
+ {connectError &&

{connectError}

} +
+ + {activeStorage && ( + )}
+ + )} + {amAdmin && activeStorage && !showConnect && ( + + )} + + {/* Scan — admin only, only if active storage */} + {amAdmin && activeStorage && ( + <> + +
Scan Recordings
+
+ RehearsalHub reads recordings from storage — files are never copied to our servers. +
- {scanning && scanProgress && (
{scanProgress} @@ -750,7 +765,7 @@ export function SettingsPage() {
{section === "profile" && } {section === "members" && activeBandId && band && } - {section === "storage" && activeBandId && band && } + {section === "storage" && activeBandId && band && } {section === "band" && activeBandId && band && amAdmin && }
@@ -832,7 +847,7 @@ export function SettingsPage() { )} {section === "storage" && activeBandId && band && ( - + )} {section === "band" && activeBandId && band && amAdmin && ( diff --git a/worker/Dockerfile b/worker/Dockerfile index b8c3ef5..7d1331a 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -22,9 +22,29 @@ RUN --mount=type=bind,from=essentia-builder,source=/usr/local/lib,target=/essent RUN pip install uv +FROM base AS development +COPY pyproject.toml . +RUN uv sync --all-extras --no-install-project --frozen || uv sync --all-extras --no-install-project +ENV PYTHONPATH=/app/src +ENV PYTHONUNBUFFERED=1 +ENV LOG_LEVEL=DEBUG +CMD ["/bin/sh", "-c", "PYTHONPATH=/app/src exec /app/.venv/bin/watchfiles --ignore-permission-denied '/app/.venv/bin/python -m worker.main' src"] + FROM base AS production COPY pyproject.toml . RUN uv sync --no-dev --frozen || uv sync --no-dev COPY . . ENV PYTHONPATH=/app/src + +# Pre-warm librosa/numba JIT cache and pooch downloads so they happen at build +# time and are baked into the image rather than downloaded on every cold start. +RUN uv run python -c "\ +import numpy as np; \ +import librosa; \ +_dummy = np.zeros(22050, dtype=np.float32); \ +librosa.beat.beat_track(y=_dummy, sr=22050); \ +librosa.feature.chroma_stft(y=_dummy, sr=22050); \ +print('librosa warmup done') \ +" + CMD ["uv", "run", "python", "-m", "worker.main"] diff --git a/worker/pyproject.toml b/worker/pyproject.toml index 7ca6c01..8d71c72 100644 --- a/worker/pyproject.toml +++ b/worker/pyproject.toml @@ -26,6 +26,7 @@ dev = [ "pytest-asyncio>=0.23", "pytest-cov>=5", "ruff>=0.4", + "watchfiles>=0.21", ] [tool.hatch.build.targets.wheel] diff --git a/worker/src/worker/config.py b/worker/src/worker/config.py index 8efeafe..e9ccb05 100644 --- a/worker/src/worker/config.py +++ b/worker/src/worker/config.py @@ -10,9 +10,8 @@ class WorkerSettings(BaseSettings): redis_url: str = "redis://localhost:6379/0" job_queue_key: str = "rh:jobs" - nextcloud_url: str = "http://nextcloud" - nextcloud_user: str = "ncadmin" - nextcloud_pass: str = "" + api_url: str = "http://api:8000" + internal_secret: str = "dev-change-me-in-production" audio_tmp_dir: str = "/tmp/audio" analysis_version: str = "1.0.0" diff --git a/worker/src/worker/db.py b/worker/src/worker/db.py index 8ef504c..829f48b 100644 --- a/worker/src/worker/db.py +++ b/worker/src/worker/db.py @@ -36,6 +36,14 @@ class AudioVersionModel(Base): uploaded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) +class SongModel(Base): + __tablename__ = "songs" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True) + global_bpm: Mapped[Optional[float]] = mapped_column(Numeric(6, 2)) + global_key: Mapped[Optional[str]] = mapped_column(String(30)) + + class RangeAnalysisModel(Base): __tablename__ = "range_analyses" diff --git a/worker/src/worker/main.py b/worker/src/worker/main.py index d4b9f39..ef2f484 100644 --- a/worker/src/worker/main.py +++ b/worker/src/worker/main.py @@ -23,23 +23,30 @@ from worker.pipeline.analyse_range import run_range_analysis from worker.pipeline.transcode import get_duration_ms, transcode_to_hls from worker.pipeline.waveform import extract_peaks, generate_waveform_file -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") +logging.basicConfig( + level=os.environ.get("LOG_LEVEL", "INFO").upper(), + format="%(asctime)s %(levelname)s %(name)s %(message)s", +) +# Numba floods logs with JIT compilation details at DEBUG level — keep it quiet +logging.getLogger("numba").setLevel(logging.WARNING) log = logging.getLogger("worker") -async def load_audio(nc_path: str, tmp_dir: str, settings) -> tuple[np.ndarray, int, str]: - """Download from Nextcloud and load as numpy array. Returns (audio, sr, local_path).""" +async def load_audio(version_id: str, filename: str, tmp_dir: str, settings) -> tuple[np.ndarray, int, str]: + """Download audio via the internal API and load as numpy array. Returns (audio, sr, local_path).""" import httpx - local_path = os.path.join(tmp_dir, Path(nc_path).name) - dav_url = f"{settings.nextcloud_url}/remote.php/dav/files/{settings.nextcloud_user}/{nc_path.lstrip('/')}" + local_path = os.path.join(tmp_dir, filename) + url = f"{settings.api_url}/api/v1/internal/audio/{version_id}/stream" + log.info("Fetching audio for version %s from %s", version_id, url) async with httpx.AsyncClient( - auth=(settings.nextcloud_user, settings.nextcloud_pass), timeout=120.0 + headers={"X-Internal-Token": settings.internal_secret}, timeout=120.0 ) as client: - resp = await client.get(dav_url) - resp.raise_for_status() - with open(local_path, "wb") as f: - f.write(resp.content) + async with client.stream("GET", url) as resp: + resp.raise_for_status() + with open(local_path, "wb") as f: + async for chunk in resp.aiter_bytes(65536): + f.write(chunk) loop = asyncio.get_event_loop() audio, sr = await loop.run_in_executor( @@ -53,7 +60,7 @@ async def handle_transcode(payload: dict, session: AsyncSession, settings) -> No nc_path = payload["nc_file_path"] with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp: - audio, sr, local_path = await load_audio(nc_path, tmp, settings) + audio, sr, local_path = await load_audio(str(version_id), Path(nc_path).name, tmp, settings) duration_ms = await get_duration_ms(local_path) hls_dir = os.path.join(tmp, "hls") @@ -99,7 +106,7 @@ async def handle_analyse_range(payload: dict, session: AsyncSession, settings) - raise ValueError(f"AudioVersion {version_id} not found") with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp: - audio, sr, _ = await load_audio(version.nc_file_path, tmp, settings) + audio, sr, _ = await load_audio(str(version_id), Path(version.nc_file_path).name, tmp, settings) await run_range_analysis(audio, sr, version_id, annotation_id, start_ms, end_ms, session) log.info("Range analysis complete for annotation %s", annotation_id) @@ -116,7 +123,7 @@ async def handle_extract_peaks(payload: dict, session: AsyncSession, settings) - nc_path = payload["nc_file_path"] with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp: - audio, _sr, _local_path = await load_audio(nc_path, tmp, settings) + audio, _sr, _local_path = await load_audio(str(version_id), Path(nc_path).name, tmp, settings) loop = asyncio.get_event_loop() peaks_500 = await loop.run_in_executor(None, extract_peaks, audio, 500) @@ -146,17 +153,27 @@ HANDLERS = { async def main() -> None: settings = get_settings() os.makedirs(settings.audio_tmp_dir, exist_ok=True) + log.info( + "Worker config — redis_url=%s api_url=%s queue=%s", + settings.redis_url, settings.api_url, settings.job_queue_key, + ) engine = create_async_engine(settings.database_url, pool_pre_ping=True) session_factory = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) redis = aioredis.from_url(settings.redis_url, decode_responses=True) - # Drain stale job IDs left in Redis from previous runs whose API transactions - # were never committed (e.g. crashed processes). - stale = await redis.llen(settings.job_queue_key) - if stale: - log.warning("Draining %d stale job IDs from Redis queue before starting", stale) - await redis.delete(settings.job_queue_key) + # Wait for Redis to be reachable before proceeding (startup race condition guard). + for attempt in range(1, 31): + try: + await redis.ping() + log.info("Redis connection established (attempt %d)", attempt) + break + except Exception as exc: + if attempt == 30: + log.error("Redis unreachable after 30 attempts — giving up: %s", exc) + raise + log.warning("Redis not ready (attempt %d/30): %s — retrying in 2s", attempt, exc) + await asyncio.sleep(2) log.info("Worker started. Listening for jobs on %s", settings.job_queue_key) diff --git a/worker/src/worker/pipeline/analyse_full.py b/worker/src/worker/pipeline/analyse_full.py index 5dc6592..7f629e7 100644 --- a/worker/src/worker/pipeline/analyse_full.py +++ b/worker/src/worker/pipeline/analyse_full.py @@ -3,15 +3,28 @@ from __future__ import annotations import asyncio +import logging import uuid +from concurrent.futures import ThreadPoolExecutor from typing import Any import numpy as np from sqlalchemy.ext.asyncio import AsyncSession +from worker.analyzers.base import AnalysisResult from worker.analyzers.bpm import BPMAnalyzer from worker.analyzers.key import KeyAnalyzer +log = logging.getLogger(__name__) + +# Dedicated pool so heavy Essentia threads can't starve the default executor. +# max_workers=2 covers BPM + Key running sequentially per job. +_analysis_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="analysis") + +# Per-analyzer timeout in seconds. Essentia multifeature BPM can be slow on +# long recordings; 3 minutes is generous for a single-track analysis pass. +_ANALYZER_TIMEOUT = 180.0 + async def run_full_analysis( audio: np.ndarray, @@ -19,10 +32,25 @@ async def run_full_analysis( version_id: uuid.UUID, session: AsyncSession, ) -> dict[str, Any]: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() - bpm_result = await loop.run_in_executor(None, BPMAnalyzer().analyze, audio, sample_rate) - key_result = await loop.run_in_executor(None, KeyAnalyzer().analyze, audio, sample_rate) + try: + bpm_result = await asyncio.wait_for( + loop.run_in_executor(_analysis_pool, BPMAnalyzer().analyze, audio, sample_rate), + timeout=_ANALYZER_TIMEOUT, + ) + except asyncio.TimeoutError: + log.warning("BPM analysis timed out for version %s — storing null", version_id) + bpm_result = AnalysisResult(analyzer_name="bpm", fields={"bpm": None, "bpm_confidence": None}) + + try: + key_result = await asyncio.wait_for( + loop.run_in_executor(_analysis_pool, KeyAnalyzer().analyze, audio, sample_rate), + timeout=_ANALYZER_TIMEOUT, + ) + except asyncio.TimeoutError: + log.warning("Key analysis timed out for version %s — storing null", version_id) + key_result = AnalysisResult(analyzer_name="key", fields={"key": None, "scale": None, "key_confidence": None}) fields: dict[str, Any] = {**bpm_result.fields, **key_result.fields} @@ -33,15 +61,32 @@ async def run_full_analysis( global_bpm = fields.get("bpm") global_key = fields.get("key") + from worker.db import SongModel + + # Mark version analysis done stmt = ( update(AudioVersionModel) .where(AudioVersionModel.id == version_id) - .values( - analysis_status="done", - **({} if global_bpm is None else {"global_bpm": global_bpm}), - ) + .values(analysis_status="done") ) await session.execute(stmt) + + # Write BPM/key to the song (global_bpm/global_key live on songs, not audio_versions) + version = await session.get(AudioVersionModel, version_id) + if version is not None: + song_extra: dict[str, Any] = {} + if global_bpm is not None: + song_extra["global_bpm"] = global_bpm + if global_key is not None: + song_extra["global_key"] = global_key + if song_extra: + song_stmt = ( + update(SongModel) + .where(SongModel.id == version.song_id) + .values(**song_extra) + ) + await session.execute(song_stmt) + await session.commit() return fields diff --git a/worker/src/worker/pipeline/transcode.py b/worker/src/worker/pipeline/transcode.py index a6226b9..48324e6 100644 --- a/worker/src/worker/pipeline/transcode.py +++ b/worker/src/worker/pipeline/transcode.py @@ -41,16 +41,26 @@ async def get_duration_ms(input_path: str) -> int: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) - stdout, _ = await proc.communicate() + try: + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30.0) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise RuntimeError(f"ffprobe timed out for {input_path}") info = json.loads(stdout) duration_s = float(info.get("format", {}).get("duration", 0)) return int(duration_s * 1000) -async def _run_ffmpeg(cmd: list[str]) -> None: +async def _run_ffmpeg(cmd: list[str], timeout: float = 600.0) -> None: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) - _, stderr = await proc.communicate() + try: + _, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise RuntimeError(f"FFmpeg timed out after {timeout}s") if proc.returncode != 0: raise RuntimeError(f"FFmpeg failed: {stderr.decode()[:500]}")