5 Commits

Author SHA1 Message Date
Mistral Vibe
b2d6b4d113 Refactor storage to provider-agnostic band-scoped model
Replaces per-member Nextcloud credentials with a BandStorage model that
supports multiple providers. Credentials are Fernet-encrypted at rest;
worker receives audio via an internal streaming endpoint instead of
direct storage access.

- Add BandStorage DB model with partial unique index (one active per band)
- Add migrations 0007 (create band_storage) and 0008 (drop old nc columns)
- Add StorageFactory that builds the correct StorageClient from BandStorage
- Add storage router: connect/nextcloud, OAuth2 authorize/callback, list, disconnect
- Add Fernet encryption helpers in security/encryption.py
- Rewrite watcher for per-band polling via internal API config endpoint
- Update worker to stream audio from API instead of accessing storage directly
- Update frontend: new storage API in bands.ts, rewritten StorageSection,
  simplified band creation modal (no storage step)
- Add STORAGE_ENCRYPTION_KEY to all docker-compose files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 23:22:36 +02:00
Mistral Vibe
ba22853bc7 Wokring on Nextcloud scan 2026-04-10 13:01:31 +02:00
Mistral Vibe
4bab0a76f7 Build update 2026-04-10 12:23:27 +02:00
Mistral Vibe
5bb3f9c1f7 up 2026-04-10 12:09:13 +02:00
Mistral Vibe
7e7fd8c8f0 adding prod compose 2026-04-10 11:40:55 +02:00
45 changed files with 1971 additions and 788 deletions

View File

@@ -0,0 +1,68 @@
"""Add band_storage table for provider-agnostic, encrypted storage configs.
Each band can have one active storage provider (Nextcloud, Google Drive, etc.).
Credentials are Fernet-encrypted at the application layer — never stored in plaintext.
A partial unique index enforces at most one active config per band at the DB level.
Revision ID: 0007_band_storage
Revises: 0006_waveform_peaks_in_db
Create Date: 2026-04-10
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
revision = "0007_band_storage"
down_revision = "0006_waveform_peaks_in_db"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"band_storage",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"band_id",
UUID(as_uuid=True),
sa.ForeignKey("bands.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("provider", sa.String(20), nullable=False),
sa.Column("label", sa.String(255), nullable=True),
sa.Column("is_active", sa.Boolean, nullable=False, server_default="false"),
sa.Column("root_path", sa.Text, nullable=True),
# Fernet-encrypted JSON — never plaintext
sa.Column("credentials", sa.Text, nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
)
# Index for fast per-band lookups
op.create_index("ix_band_storage_band_id", "band_storage", ["band_id"])
# Partial unique index: at most one active storage per band
op.execute(
"""
CREATE UNIQUE INDEX uq_band_active_storage
ON band_storage (band_id)
WHERE is_active = true
"""
)
def downgrade() -> None:
op.execute("DROP INDEX IF EXISTS uq_band_active_storage")
op.drop_index("ix_band_storage_band_id", table_name="band_storage")
op.drop_table("band_storage")

View File

@@ -0,0 +1,42 @@
"""Remove Nextcloud-specific columns from members and bands.
Prior to this migration, storage credentials lived directly on the Member
and Band rows. They are now in the band_storage table (migration 0007),
encrypted at the application layer.
Run 0007 first; if you still need to migrate existing data, do it in a
separate script before applying this migration.
Revision ID: 0008_drop_nc_columns
Revises: 0007_band_storage
Create Date: 2026-04-10
"""
from alembic import op
import sqlalchemy as sa
revision = "0008_drop_nc_columns"
down_revision = "0007_band_storage"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Drop Nextcloud credential columns from members
op.drop_column("members", "nc_url")
op.drop_column("members", "nc_username")
op.drop_column("members", "nc_password")
# Drop Nextcloud-specific columns from bands
op.drop_column("bands", "nc_folder_path")
op.drop_column("bands", "nc_user")
def downgrade() -> None:
# Restore columns (data is lost — this is intentional)
op.add_column("bands", sa.Column("nc_user", sa.String(255), nullable=True))
op.add_column("bands", sa.Column("nc_folder_path", sa.Text, nullable=True))
op.add_column("members", sa.Column("nc_password", sa.Text, nullable=True))
op.add_column("members", sa.Column("nc_username", sa.String(255), nullable=True))
op.add_column("members", sa.Column("nc_url", sa.Text, nullable=True))

View File

@@ -15,6 +15,7 @@ dependencies = [
"pydantic[email]>=2.7",
"pydantic-settings>=2.3",
"python-jose[cryptography]>=3.3",
"cryptography>=42.0",
"bcrypt>=4.1",
"httpx>=0.27",
"redis[hiredis]>=5.0",

View File

@@ -12,6 +12,10 @@ class Settings(BaseSettings):
jwt_algorithm: str = "HS256"
access_token_expire_minutes: int = 60 # 1 hour
# Storage credential encryption — generate once with: Fernet.generate_key().decode()
# NEVER commit this value; store in env / secrets manager only.
storage_encryption_key: str = ""
# Database
database_url: str # postgresql+asyncpg://...
@@ -28,6 +32,19 @@ class Settings(BaseSettings):
# Worker
analysis_version: str = "1.0.0"
# OAuth2 — Google Drive
google_client_id: str = ""
google_client_secret: str = ""
# OAuth2 — Dropbox
dropbox_app_key: str = ""
dropbox_app_secret: str = ""
# OAuth2 — OneDrive (Microsoft Graph)
onedrive_client_id: str = ""
onedrive_client_secret: str = ""
onedrive_tenant_id: str = "common" # 'common' for multi-tenant apps
@lru_cache
def get_settings() -> Settings:

View File

@@ -10,12 +10,14 @@ from sqlalchemy import (
Boolean,
DateTime,
ForeignKey,
Index,
Integer,
Numeric,
String,
Text,
UniqueConstraint,
func,
text,
)
from sqlalchemy.dialects.postgresql import ARRAY, JSONB, UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
@@ -35,9 +37,6 @@ class Member(Base):
email: Mapped[str] = mapped_column(String(320), unique=True, nullable=False, index=True)
display_name: Mapped[str] = mapped_column(String(255), nullable=False)
avatar_url: Mapped[str | None] = mapped_column(Text)
nc_username: Mapped[str | None] = mapped_column(String(255))
nc_url: Mapped[str | None] = mapped_column(Text)
nc_password: Mapped[str | None] = mapped_column(Text)
password_hash: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
@@ -67,8 +66,6 @@ class Band(Base):
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(255), nullable=False)
slug: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
nc_folder_path: Mapped[str | None] = mapped_column(Text)
nc_user: Mapped[str | None] = mapped_column(String(255))
genre_tags: Mapped[list[str]] = mapped_column(ARRAY(Text), default=list, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
@@ -86,6 +83,59 @@ class Band(Base):
sessions: Mapped[list[RehearsalSession]] = relationship(
"RehearsalSession", back_populates="band", cascade="all, delete-orphan"
)
storage_configs: Mapped[list[BandStorage]] = relationship(
"BandStorage", back_populates="band", cascade="all, delete-orphan"
)
class BandStorage(Base):
"""Storage provider configuration for a band.
Credentials are stored as a Fernet-encrypted JSON blob — never in plaintext.
Only one ``BandStorage`` row per band may be active at a time, enforced by
a partial unique index on ``(band_id) WHERE is_active``.
Supported providers and their credential shapes (all encrypted):
nextcloud: { "url": "...", "username": "...", "app_password": "..." }
googledrive: { "access_token": "...", "refresh_token": "...",
"token_expiry": "ISO-8601", "token_type": "Bearer" }
onedrive: { "access_token": "...", "refresh_token": "...",
"token_expiry": "ISO-8601", "token_type": "Bearer" }
dropbox: { "access_token": "...", "refresh_token": "...",
"token_expiry": "ISO-8601" }
"""
__tablename__ = "band_storage"
__table_args__ = (
# DB-enforced: at most one active storage config per band.
Index(
"uq_band_active_storage",
"band_id",
unique=True,
postgresql_where=text("is_active = true"),
),
)
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
band_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("bands.id", ondelete="CASCADE"), nullable=False, index=True
)
# 'nextcloud' | 'googledrive' | 'onedrive' | 'dropbox'
provider: Mapped[str] = mapped_column(String(20), nullable=False)
label: Mapped[str | None] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# Root path within the provider's storage (e.g. "/bands/cool-band/"). Not sensitive.
root_path: Mapped[str | None] = mapped_column(Text)
# Fernet-encrypted JSON blob — shape depends on provider (see docstring above).
credentials: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False
)
band: Mapped[Band] = relationship("Band", back_populates="storage_configs")
class BandMember(Base):
@@ -228,23 +278,14 @@ class AudioVersion(Base):
version_number: Mapped[int] = mapped_column(Integer, nullable=False)
label: Mapped[str | None] = mapped_column(String(255))
nc_file_path: Mapped[str] = mapped_column(Text, nullable=False)
<<<<<<< HEAD
nc_file_etag: Mapped[Optional[str]] = mapped_column(String(255))
cdn_hls_base: Mapped[Optional[str]] = mapped_column(Text)
waveform_url: Mapped[Optional[str]] = mapped_column(Text)
waveform_peaks: Mapped[Optional[list]] = mapped_column(JSONB)
waveform_peaks_mini: Mapped[Optional[list]] = mapped_column(JSONB)
duration_ms: Mapped[Optional[int]] = mapped_column(Integer)
format: Mapped[Optional[str]] = mapped_column(String(10))
file_size_bytes: Mapped[Optional[int]] = mapped_column(BigInteger)
=======
nc_file_etag: Mapped[str | None] = mapped_column(String(255))
cdn_hls_base: Mapped[str | None] = mapped_column(Text)
waveform_url: Mapped[str | None] = mapped_column(Text)
waveform_peaks: Mapped[list | None] = mapped_column(JSONB)
waveform_peaks_mini: Mapped[list | None] = mapped_column(JSONB)
duration_ms: Mapped[int | None] = mapped_column(Integer)
format: Mapped[str | None] = mapped_column(String(10))
file_size_bytes: Mapped[int | None] = mapped_column(BigInteger)
>>>>>>> feature/pipeline-fix
analysis_status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending")
uploaded_by: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True), ForeignKey("members.id", ondelete="SET NULL")

View File

@@ -20,6 +20,7 @@ from rehearsalhub.routers import (
members_router,
sessions_router,
songs_router,
storage_router,
versions_router,
ws_router,
)
@@ -94,6 +95,7 @@ def create_app() -> FastAPI:
app.include_router(annotations_router, prefix=prefix)
app.include_router(members_router, prefix=prefix)
app.include_router(internal_router, prefix=prefix)
app.include_router(storage_router, prefix=prefix)
app.include_router(ws_router) # WebSocket routes don't use /api/v1 prefix
@app.get("/api/health")

View File

@@ -17,6 +17,11 @@ class AudioVersionRepository(BaseRepository[AudioVersion]):
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_by_nc_file_path(self, nc_file_path: str) -> AudioVersion | None:
stmt = select(AudioVersion).where(AudioVersion.nc_file_path == nc_file_path)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def list_for_song(self, song_id: uuid.UUID) -> list[AudioVersion]:
stmt = (
select(AudioVersion)

View File

@@ -7,7 +7,7 @@ from datetime import UTC, datetime, timedelta
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from rehearsalhub.db.models import Band, BandInvite, BandMember
from rehearsalhub.db.models import Band, BandInvite, BandMember, BandStorage
from rehearsalhub.repositories.base import BaseRepository
@@ -92,16 +92,27 @@ class BandRepository(BaseRepository[Band]):
return list(result.scalars().all())
async def get_by_nc_folder_prefix(self, path: str) -> Band | None:
"""Return the band whose nc_folder_path is a prefix of path."""
stmt = select(Band).where(Band.nc_folder_path.is_not(None))
"""Return the band whose active storage root_path is a prefix of *path*.
Longest match wins (most-specific prefix) so nested paths resolve correctly.
"""
stmt = (
select(Band, BandStorage.root_path)
.join(
BandStorage,
(BandStorage.band_id == Band.id) & BandStorage.is_active.is_(True),
)
.where(BandStorage.root_path.is_not(None))
)
result = await self.session.execute(stmt)
bands = result.scalars().all()
# Longest match wins (most specific prefix)
rows = result.all()
best: Band | None = None
for band in bands:
folder = band.nc_folder_path # type: ignore[union-attr]
if path.startswith(folder) and (best is None or len(folder) > len(best.nc_folder_path)): # type: ignore[arg-type]
best_len = 0
for band, root_path in rows:
folder = root_path.rstrip("/") + "/"
if path.startswith(folder) and len(folder) > best_len:
best = band
best_len = len(folder)
return best
async def list_for_member(self, member_id: uuid.UUID) -> list[Band]:

View File

@@ -0,0 +1,66 @@
"""Repository for BandStorage — per-band storage provider configuration."""
from __future__ import annotations
import uuid
from sqlalchemy import select, update
from rehearsalhub.db.models import BandStorage
from rehearsalhub.repositories.base import BaseRepository
class BandStorageRepository(BaseRepository[BandStorage]):
model = BandStorage
async def get_active_for_band(self, band_id: uuid.UUID) -> BandStorage | None:
"""Return the single active storage config for *band_id*, or None."""
result = await self.session.execute(
select(BandStorage).where(
BandStorage.band_id == band_id,
BandStorage.is_active.is_(True),
)
)
return result.scalar_one_or_none()
async def list_for_band(self, band_id: uuid.UUID) -> list[BandStorage]:
result = await self.session.execute(
select(BandStorage)
.where(BandStorage.band_id == band_id)
.order_by(BandStorage.created_at)
)
return list(result.scalars().all())
async def list_active_by_provider(self, provider: str) -> list[BandStorage]:
"""Return all active configs for a given provider (used by the watcher)."""
result = await self.session.execute(
select(BandStorage).where(
BandStorage.provider == provider,
BandStorage.is_active.is_(True),
)
)
return list(result.scalars().all())
async def activate(self, storage_id: uuid.UUID, band_id: uuid.UUID) -> BandStorage:
"""Deactivate all configs for *band_id*, then activate *storage_id*."""
await self.session.execute(
update(BandStorage)
.where(BandStorage.band_id == band_id)
.values(is_active=False)
)
storage = await self.get_by_id(storage_id)
if storage is None:
raise LookupError(f"BandStorage {storage_id} not found")
storage.is_active = True
await self.session.flush()
await self.session.refresh(storage)
return storage
async def deactivate_all(self, band_id: uuid.UUID) -> None:
"""Deactivate every storage config for a band (disconnect)."""
await self.session.execute(
update(BandStorage)
.where(BandStorage.band_id == band_id)
.values(is_active=False)
)
await self.session.flush()

View File

@@ -6,6 +6,7 @@ from rehearsalhub.routers.invites import router as invites_router
from rehearsalhub.routers.members import router as members_router
from rehearsalhub.routers.sessions import router as sessions_router
from rehearsalhub.routers.songs import router as songs_router
from rehearsalhub.routers.storage import router as storage_router
from rehearsalhub.routers.versions import router as versions_router
from rehearsalhub.routers.ws import router as ws_router
@@ -17,6 +18,7 @@ __all__ = [
"members_router",
"sessions_router",
"songs_router",
"storage_router",
"versions_router",
"annotations_router",
"ws_router",

View File

@@ -34,7 +34,7 @@ async def register(request: Request, req: RegisterRequest, session: AsyncSession
member = await svc.register(req)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
return MemberRead.from_model(member)
return MemberRead.model_validate(member)
@router.post("/login", response_model=TokenResponse)
@@ -87,7 +87,7 @@ async def logout(response: Response):
@router.get("/me", response_model=MemberRead)
async def get_me(current_member: Member = Depends(get_current_member)):
return MemberRead.from_model(current_member)
return MemberRead.model_validate(current_member)
@router.patch("/me/settings", response_model=MemberRead)
@@ -100,12 +100,6 @@ async def update_settings(
updates: dict = {}
if data.display_name is not None:
updates["display_name"] = data.display_name
if data.nc_url is not None:
updates["nc_url"] = data.nc_url.rstrip("/") if data.nc_url else None
if data.nc_username is not None:
updates["nc_username"] = data.nc_username or None
if data.nc_password is not None:
updates["nc_password"] = data.nc_password or None
if data.avatar_url is not None:
updates["avatar_url"] = data.avatar_url or None
@@ -113,7 +107,7 @@ async def update_settings(
member = await repo.update(current_member, **updates)
else:
member = current_member
return MemberRead.from_model(member)
return MemberRead.model_validate(member)
@router.post("/me/avatar", response_model=MemberRead)
@@ -187,4 +181,4 @@ async def upload_avatar(
repo = MemberRepository(session)
avatar_url = f"/api/static/avatars/{filename}"
member = await repo.update(current_member, avatar_url=avatar_url)
return MemberRead.from_model(member)
return MemberRead.model_validate(member)

View File

@@ -11,7 +11,6 @@ from rehearsalhub.repositories.band import BandRepository
from rehearsalhub.schemas.band import BandCreate, BandRead, BandReadWithMembers, BandUpdate
from rehearsalhub.schemas.invite import BandInviteList, BandInviteListItem
from rehearsalhub.services.band import BandService
from rehearsalhub.storage.nextcloud import NextcloudClient
router = APIRouter(prefix="/bands", tags=["bands"])
@@ -126,10 +125,9 @@ async def create_band(
session: AsyncSession = Depends(get_session),
current_member: Member = Depends(get_current_member),
):
storage = NextcloudClient.for_member(current_member)
svc = BandService(session, storage)
svc = BandService(session)
try:
band = await svc.create_band(data, current_member.id, creator=current_member)
band = await svc.create_band(data, current_member.id)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
except LookupError as e:
@@ -143,8 +141,7 @@ async def get_band(
session: AsyncSession = Depends(get_session),
current_member: Member = Depends(get_current_member),
):
storage = NextcloudClient.for_member(current_member)
svc = BandService(session, storage)
svc = BandService(session)
try:
await svc.assert_membership(band_id, current_member.id)
except PermissionError:
@@ -173,9 +170,10 @@ async def update_band(
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Band not found")
updates: dict = {}
if data.nc_folder_path is not None:
path = data.nc_folder_path.strip()
updates["nc_folder_path"] = (path.rstrip("/") + "/") if path else None
if data.name is not None:
updates["name"] = data.name.strip()
if data.genre_tags is not None:
updates["genre_tags"] = data.genre_tags
if updates:
band = await repo.update(band, **updates)

View File

@@ -1,25 +1,28 @@
"""Internal endpoints — called by trusted services (watcher) on the Docker network."""
"""Internal endpoints — called by trusted services (watcher, worker) on the Docker network."""
import logging
import uuid
from pathlib import Path
from fastapi import APIRouter, Depends, Header, HTTPException, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from rehearsalhub.config import get_settings
from rehearsalhub.db.engine import get_session
from rehearsalhub.db.models import AudioVersion, BandMember, Member
from rehearsalhub.repositories.audio_version import AudioVersionRepository
from rehearsalhub.db.models import AudioVersion, BandMember
from rehearsalhub.queue.redis_queue import RedisJobQueue
from rehearsalhub.repositories.band import BandRepository
from rehearsalhub.repositories.band_storage import BandStorageRepository
from rehearsalhub.repositories.rehearsal_session import RehearsalSessionRepository
from rehearsalhub.repositories.song import SongRepository
from rehearsalhub.queue.redis_queue import RedisJobQueue
from rehearsalhub.schemas.audio_version import AudioVersionCreate
from rehearsalhub.security.encryption import decrypt_credentials
from rehearsalhub.services.session import extract_session_folder, parse_rehearsal_date
from rehearsalhub.services.song import SongService
from rehearsalhub.storage.nextcloud import NextcloudClient
from rehearsalhub.storage.factory import StorageFactory
log = logging.getLogger(__name__)
@@ -35,6 +38,9 @@ async def _verify_internal_secret(x_internal_token: str | None = Header(None)) -
AUDIO_EXTENSIONS = {".mp3", ".wav", ".flac", ".ogg", ".m4a", ".aac", ".opus"}
# ── Watcher: detect new audio file ────────────────────────────────────────────
class NcUploadEvent(BaseModel):
nc_file_path: str
nc_file_etag: str | None = None
@@ -46,10 +52,9 @@ async def nc_upload(
session: AsyncSession = Depends(get_session),
_: None = Depends(_verify_internal_secret),
):
"""
Called by nc-watcher when a new audio file is detected in Nextcloud.
Parses the path to find/create the band+song and registers a version.
"""Called by nc-watcher when a new audio file is detected in storage.
Parses the path to find/create the band + song and registers a version.
Expected path format: bands/{slug}/[songs/]{folder}/filename.ext
"""
path = event.nc_file_path.lstrip("/")
@@ -59,13 +64,11 @@ async def nc_upload(
band_repo = BandRepository(session)
# Try slug-based lookup first (standard bands/{slug}/ layout)
parts = path.split("/")
band = None
if len(parts) >= 3 and parts[0] == "bands":
band = await band_repo.get_by_slug(parts[1])
# Fall back to prefix match for bands with custom nc_folder_path
if band is None:
band = await band_repo.get_by_nc_folder_prefix(path)
@@ -73,84 +76,166 @@ async def nc_upload(
log.info("nc-upload: no band found for path '%s' — skipping", path)
return {"status": "skipped", "reason": "band not found"}
# Determine song title and folder from path.
# The title is always the filename stem (e.g. "take1" from "take1.wav").
# The nc_folder groups all versions of the same recording (the parent directory).
#
# Examples:
# bands/my-band/take1.wav → folder=bands/my-band/, title=take1
# bands/my-band/231015/take1.wav → folder=bands/my-band/231015/, title=take1
# bands/my-band/songs/groove/take1.wav → folder=bands/my-band/songs/groove/, title=take1
parent = str(Path(path).parent)
nc_folder = parent.rstrip("/") + "/"
title = Path(path).stem
# If the file sits directly inside a dated session folder, give it a unique
# virtual folder so it becomes its own song (not merged with other takes).
session_folder_path = extract_session_folder(path)
if session_folder_path and session_folder_path.rstrip("/") == nc_folder.rstrip("/"):
nc_folder = nc_folder + title + "/"
version_repo = AudioVersionRepository(session)
if event.nc_file_etag and await version_repo.get_by_etag(event.nc_file_etag):
return {"status": "skipped", "reason": "version already registered"}
# Resolve or create rehearsal session from YYMMDD folder segment
session_repo = RehearsalSessionRepository(session)
rehearsal_date = parse_rehearsal_date(path)
rehearsal_session_id = None
if rehearsal_date:
rehearsal_session = await session_repo.get_or_create(band.id, rehearsal_date, nc_folder)
rehearsal_session_id = rehearsal_session.id
log.debug("nc-upload: linked to session %s (%s)", rehearsal_session_id, rehearsal_date)
try:
rehearsal_session = await session_repo.get_or_create(band.id, rehearsal_date, nc_folder)
rehearsal_session_id = rehearsal_session.id
log.debug("nc-upload: linked to session %s (%s)", rehearsal_session_id, rehearsal_date)
except Exception as exc:
log.error("nc-upload: failed to resolve session for '%s': %s", path, exc, exc_info=True)
raise HTTPException(status_code=500, detail="Failed to resolve rehearsal session") from exc
song_repo = SongRepository(session)
song = await song_repo.get_by_nc_folder_path(nc_folder)
if song is None:
song = await song_repo.get_by_title_and_band(band.id, title)
if song is None:
song = await song_repo.create(
band_id=band.id,
session_id=rehearsal_session_id,
title=title,
status="jam",
notes=None,
nc_folder_path=nc_folder,
created_by=None,
)
log.info("nc-upload: created song '%s' for band '%s'", title, band.slug)
elif rehearsal_session_id and song.session_id is None:
song = await song_repo.update(song, session_id=rehearsal_session_id)
try:
song = await song_repo.get_by_nc_folder_path(nc_folder)
if song is None:
song = await song_repo.get_by_title_and_band(band.id, title)
if song is None:
song = await song_repo.create(
band_id=band.id,
session_id=rehearsal_session_id,
title=title,
status="jam",
notes=None,
nc_folder_path=nc_folder,
created_by=None,
)
log.info("nc-upload: created song '%s' for band '%s'", title, band.slug)
elif rehearsal_session_id and song.session_id is None:
song = await song_repo.update(song, session_id=rehearsal_session_id)
except Exception as exc:
log.error("nc-upload: failed to find/create song for '%s': %s", path, exc, exc_info=True)
raise HTTPException(status_code=500, detail="Failed to resolve song") from exc
# Use first member of the band as uploader (best-effort for watcher uploads)
result = await session.execute(
select(BandMember.member_id).where(BandMember.band_id == band.id).limit(1)
)
uploader_id = result.scalar_one_or_none()
# Get the uploader's storage credentials
storage = None
if uploader_id:
uploader_result = await session.execute(
select(Member).where(Member.id == uploader_id).limit(1) # type: ignore[arg-type]
try:
song_svc = SongService(session)
version = await song_svc.register_version(
song.id,
AudioVersionCreate(
nc_file_path=path,
nc_file_etag=event.nc_file_etag,
format=Path(path).suffix.lstrip(".").lower(),
),
uploader_id,
)
uploader = uploader_result.scalar_one_or_none()
storage = NextcloudClient.for_member(uploader) if uploader else None
except Exception as exc:
log.error(
"nc-upload: failed to register version for '%s' (song '%s'): %s",
path, song.title, exc, exc_info=True,
)
raise HTTPException(status_code=500, detail="Failed to register version") from exc
song_svc = SongService(session, storage=storage)
version = await song_svc.register_version(
song.id,
AudioVersionCreate(
nc_file_path=path,
nc_file_etag=event.nc_file_etag,
format=Path(path).suffix.lstrip(".").lower(),
),
uploader_id,
)
log.info("nc-upload: registered version %s for song '%s'", version.id, song.title)
return {"status": "ok", "version_id": str(version.id), "song_id": str(song.id)}
# ── Worker: stream audio ───────────────────────────────────────────────────────
@router.get("/audio/{version_id}/stream")
async def stream_audio(
version_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
_: None = Depends(_verify_internal_secret),
):
"""Proxy an audio file from the band's storage to the caller (audio-worker).
The worker never handles storage credentials. This endpoint resolves the
band's active storage config and streams the file transparently.
"""
result = await session.execute(
select(AudioVersion).where(AudioVersion.id == version_id)
)
version = result.scalar_one_or_none()
if version is None:
raise HTTPException(status_code=404, detail="Version not found")
# Resolve the band from the song
from sqlalchemy.orm import selectinload
from rehearsalhub.db.models import Song
song_result = await session.execute(
select(Song).where(Song.id == version.song_id)
)
song = song_result.scalar_one_or_none()
if song is None:
raise HTTPException(status_code=404, detail="Song not found")
try:
storage = await StorageFactory.create(session, song.band_id, get_settings())
except LookupError:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Band has no active storage configured",
)
log.info("stream_audio: streaming version %s from storage", version_id)
async def _stream():
data = await storage.download(version.nc_file_path)
yield data
return StreamingResponse(_stream(), media_type="application/octet-stream")
# ── Watcher: list active Nextcloud configs ─────────────────────────────────────
@router.get("/storage/nextcloud-watch-configs")
async def get_nextcloud_watch_configs(
session: AsyncSession = Depends(get_session),
_: None = Depends(_verify_internal_secret),
):
"""Return decrypted Nextcloud configs for all active NC bands.
Used exclusively by the nc-watcher service to know which Nextcloud
instances to poll and with what credentials. Traffic stays on the
internal Docker network and is never exposed externally.
"""
settings = get_settings()
if not settings.storage_encryption_key:
raise HTTPException(status_code=500, detail="Storage encryption key not configured")
repo = BandStorageRepository(session)
configs = await repo.list_active_by_provider("nextcloud")
result = []
for config in configs:
try:
creds = decrypt_credentials(settings.storage_encryption_key, config.credentials)
result.append({
"band_id": str(config.band_id),
"nc_url": creds["url"],
"nc_username": creds["username"],
"nc_app_password": creds["app_password"],
"root_path": config.root_path,
})
except Exception as exc:
log.error("Failed to decrypt credentials for band_storage %s: %s", config.id, exc)
# Skip this band rather than failing the whole response
return result
# ── Maintenance: reindex waveform peaks ───────────────────────────────────────
@router.post("/reindex-peaks", status_code=200)
async def reindex_peaks(
session: AsyncSession = Depends(get_session),
@@ -159,10 +244,6 @@ async def reindex_peaks(
"""Enqueue extract_peaks jobs for every audio_version that has no waveform_peaks yet.
Safe to call multiple times — only versions with null peaks are targeted.
Useful after:
- Fresh DB creation + directory scan (peaks not yet computed)
- Peak algorithm changes (clear waveform_peaks, then call this)
- Worker was down during initial transcode
"""
result = await session.execute(
select(AudioVersion).where(AudioVersion.waveform_peaks.is_(None)) # type: ignore[attr-defined]

View File

@@ -7,10 +7,13 @@ from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from rehearsalhub.config import get_settings
from rehearsalhub.db.engine import get_session, get_session_factory
from rehearsalhub.queue.redis_queue import flush_pending_pushes
from rehearsalhub.db.models import Member
from rehearsalhub.dependencies import get_current_member
from rehearsalhub.repositories.band import BandRepository
from rehearsalhub.repositories.band_storage import BandStorageRepository
from rehearsalhub.repositories.comment import CommentRepository
from rehearsalhub.repositories.song import SongRepository
from rehearsalhub.routers.versions import _member_from_request
@@ -19,7 +22,7 @@ from rehearsalhub.schemas.song import SongCreate, SongRead, SongUpdate
from rehearsalhub.services.band import BandService
from rehearsalhub.services.nc_scan import scan_band_folder
from rehearsalhub.services.song import SongService
from rehearsalhub.storage.nextcloud import NextcloudClient
from rehearsalhub.storage.factory import StorageFactory
log = logging.getLogger(__name__)
@@ -47,8 +50,7 @@ async def list_songs(
await band_svc.assert_membership(band_id, current_member.id)
except PermissionError:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member")
storage = NextcloudClient.for_member(current_member)
song_svc = SongService(session, storage=storage)
song_svc = SongService(session)
return await song_svc.list_songs(band_id)
@@ -149,9 +151,8 @@ async def create_song(
if band is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Band not found")
storage = NextcloudClient.for_member(current_member)
song_svc = SongService(session, storage=storage)
song = await song_svc.create_song(band_id, data, current_member.id, band.slug, creator=current_member)
song_svc = SongService(session)
song = await song_svc.create_song(band_id, data, current_member.id, band.slug)
read = SongRead.model_validate(song)
read.version_count = 0
return read
@@ -186,22 +187,28 @@ async def scan_nextcloud_stream(
Accepts ?token= for EventSource clients that can't set headers.
"""
band = await _get_band_and_assert_member(band_id, current_member, session)
band_folder = band.nc_folder_path or f"bands/{band.slug}/"
nc = NextcloudClient.for_member(current_member)
bs = await BandStorageRepository(session).get_active_for_band(band_id)
band_folder = (bs.root_path if bs and bs.root_path else None) or f"bands/{band.slug}/"
member_id = current_member.id
settings = get_settings()
async def event_generator():
async with get_session_factory()() as db:
try:
async for event in scan_band_folder(db, nc, band_id, band_folder, member_id):
storage = await StorageFactory.create(db, band_id, settings)
async for event in scan_band_folder(db, storage, band_id, band_folder, member_id):
yield json.dumps(event) + "\n"
if event.get("type") in ("song", "session"):
await db.commit()
await flush_pending_pushes(db)
except LookupError as exc:
yield json.dumps({"type": "error", "message": str(exc)}) + "\n"
except Exception:
log.exception("SSE scan error for band %s", band_id)
yield json.dumps({"type": "error", "message": "Scan failed due to an internal error."}) + "\n"
finally:
await db.commit()
await flush_pending_pushes(db)
return StreamingResponse(
event_generator(),
@@ -220,13 +227,18 @@ async def scan_nextcloud(
Prefer the SSE /nc-scan/stream endpoint for large folders.
"""
band = await _get_band_and_assert_member(band_id, current_member, session)
band_folder = band.nc_folder_path or f"bands/{band.slug}/"
nc = NextcloudClient.for_member(current_member)
bs = await BandStorageRepository(session).get_active_for_band(band_id)
band_folder = (bs.root_path if bs and bs.root_path else None) or f"bands/{band.slug}/"
try:
storage = await StorageFactory.create(session, band_id, get_settings())
except LookupError as exc:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc))
songs: list[SongRead] = []
stats = {"found": 0, "imported": 0, "skipped": 0}
async for event in scan_band_folder(session, nc, band_id, band_folder, current_member.id):
async for event in scan_band_folder(session, storage, band_id, band_folder, current_member.id):
if event["type"] == "song":
songs.append(SongRead(**event["song"]))
elif event["type"] == "done":

View File

@@ -0,0 +1,336 @@
"""Storage provider management endpoints.
Bands connect to a storage provider (Nextcloud, Google Drive, OneDrive, Dropbox)
through this router. Credentials are encrypted before being written to the DB.
OAuth2 flow:
1. Admin calls GET /bands/{id}/storage/connect/{provider}/authorize
→ receives a redirect URL to the provider's consent page
2. After consent, provider redirects to GET /oauth/callback/{provider}?code=...&state=...
→ tokens are exchanged, encrypted, stored, and the admin is redirected to the frontend
Nextcloud (app-password) flow:
POST /bands/{id}/storage/connect/nextcloud
→ credentials validated and stored immediately (no OAuth redirect needed)
"""
from __future__ import annotations
import logging
import secrets
import uuid
from datetime import datetime, timedelta, timezone
from urllib.parse import urlencode
import httpx
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi.responses import RedirectResponse
from jose import JWTError, jwt
from sqlalchemy.ext.asyncio import AsyncSession
from rehearsalhub.config import Settings, get_settings
from rehearsalhub.db.engine import get_session
from rehearsalhub.db.models import Member
from rehearsalhub.dependencies import get_current_member
from rehearsalhub.repositories.band_storage import BandStorageRepository
from rehearsalhub.schemas.storage import BandStorageRead, NextcloudConnect, OAuthAuthorizeResponse
from rehearsalhub.security.encryption import encrypt_credentials
from rehearsalhub.services.band import BandService
log = logging.getLogger(__name__)
router = APIRouter(tags=["storage"])
# OAuth2 state JWT expires after 15 minutes (consent must happen in this window)
_STATE_TTL_MINUTES = 15
# ── OAuth2 provider definitions ────────────────────────────────────────────────
_OAUTH_CONFIGS: dict[str, dict] = {
"googledrive": {
"auth_url": "https://accounts.google.com/o/oauth2/v2/auth",
"token_url": "https://oauth2.googleapis.com/token",
"scopes": "https://www.googleapis.com/auth/drive openid",
"extra_auth_params": {"access_type": "offline", "prompt": "consent"},
},
"onedrive": {
# tenant_id is injected at runtime from settings
"auth_url": "https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/authorize",
"token_url": "https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token",
"scopes": "https://graph.microsoft.com/Files.ReadWrite offline_access",
"extra_auth_params": {},
},
"dropbox": {
"auth_url": "https://www.dropbox.com/oauth2/authorize",
"token_url": "https://api.dropboxapi.com/oauth2/token",
"scopes": "", # Dropbox uses app-level scopes set in the developer console
"extra_auth_params": {"token_access_type": "offline"},
},
}
def _get_client_id_and_secret(provider: str, settings: Settings) -> tuple[str, str]:
match provider:
case "googledrive":
return settings.google_client_id, settings.google_client_secret
case "onedrive":
return settings.onedrive_client_id, settings.onedrive_client_secret
case "dropbox":
return settings.dropbox_app_key, settings.dropbox_app_secret
case _:
raise ValueError(f"Unknown OAuth provider: {provider!r}")
def _redirect_uri(provider: str, settings: Settings) -> str:
scheme = "http" if settings.debug else "https"
return f"{scheme}://{settings.domain}/api/v1/oauth/callback/{provider}"
# ── State JWT helpers ──────────────────────────────────────────────────────────
def _encode_state(band_id: uuid.UUID, provider: str, settings: Settings) -> str:
payload = {
"band_id": str(band_id),
"provider": provider,
"nonce": secrets.token_hex(16),
"exp": datetime.now(timezone.utc) + timedelta(minutes=_STATE_TTL_MINUTES),
}
return jwt.encode(payload, settings.secret_key, algorithm=settings.jwt_algorithm)
def _decode_state(state: str, settings: Settings) -> dict:
try:
return jwt.decode(state, settings.secret_key, algorithms=[settings.jwt_algorithm])
except JWTError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid OAuth state: {exc}")
# ── Nextcloud (app-password) ───────────────────────────────────────────────────
@router.post(
"/bands/{band_id}/storage/connect/nextcloud",
response_model=BandStorageRead,
status_code=status.HTTP_201_CREATED,
)
async def connect_nextcloud(
band_id: uuid.UUID,
body: NextcloudConnect,
session: AsyncSession = Depends(get_session),
current_member: Member = Depends(get_current_member),
settings: Settings = Depends(get_settings),
):
"""Connect a band to a Nextcloud instance using an app password."""
band_svc = BandService(session)
try:
await band_svc.assert_admin(band_id, current_member.id)
except PermissionError:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
# Smoke-test the credentials before storing them
from rehearsalhub.storage.nextcloud import NextcloudClient
nc = NextcloudClient(base_url=body.url, username=body.username, password=body.app_password)
try:
await nc.list_folder(body.root_path or "/")
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Could not connect to Nextcloud: {exc}",
)
creds = {
"url": body.url,
"username": body.username,
"app_password": body.app_password,
}
encrypted = encrypt_credentials(settings.storage_encryption_key, creds)
repo = BandStorageRepository(session)
# Deactivate any previous storage before creating the new one
await repo.deactivate_all(band_id)
band_storage = await repo.create(
band_id=band_id,
provider="nextcloud",
label=body.label,
is_active=True,
root_path=body.root_path,
credentials=encrypted,
)
await session.commit()
log.info("Band %s connected to Nextcloud (%s)", band_id, body.url)
return BandStorageRead.model_validate(band_storage)
# ── OAuth2 — authorize ─────────────────────────────────────────────────────────
@router.get(
"/bands/{band_id}/storage/connect/{provider}/authorize",
response_model=OAuthAuthorizeResponse,
)
async def oauth_authorize(
band_id: uuid.UUID,
provider: str,
session: AsyncSession = Depends(get_session),
current_member: Member = Depends(get_current_member),
settings: Settings = Depends(get_settings),
):
"""Return the provider's OAuth2 authorization URL.
The frontend should redirect the user to ``redirect_url``.
After the user consents, the provider redirects to our callback endpoint.
"""
if provider not in _OAUTH_CONFIGS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unknown provider {provider!r}. Supported: {list(_OAUTH_CONFIGS)}",
)
band_svc = BandService(session)
try:
await band_svc.assert_admin(band_id, current_member.id)
except PermissionError:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
client_id, _ = _get_client_id_and_secret(provider, settings)
if not client_id:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail=f"OAuth2 for {provider!r} is not configured on this server",
)
cfg = _OAUTH_CONFIGS[provider]
auth_url = cfg["auth_url"].format(tenant_id=settings.onedrive_tenant_id)
state = _encode_state(band_id, provider, settings)
redirect_uri = _redirect_uri(provider, settings)
params: dict = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"state": state,
**cfg["extra_auth_params"],
}
if cfg["scopes"]:
params["scope"] = cfg["scopes"]
return OAuthAuthorizeResponse(
redirect_url=f"{auth_url}?{urlencode(params)}",
provider=provider,
)
# ── OAuth2 — callback ──────────────────────────────────────────────────────────
@router.get("/oauth/callback/{provider}")
async def oauth_callback(
provider: str,
code: str = Query(...),
state: str = Query(...),
session: AsyncSession = Depends(get_session),
settings: Settings = Depends(get_settings),
):
"""Exchange authorization code for tokens, encrypt, and store."""
if provider not in _OAUTH_CONFIGS:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unknown provider")
state_data = _decode_state(state, settings)
band_id = uuid.UUID(state_data["band_id"])
client_id, client_secret = _get_client_id_and_secret(provider, settings)
cfg = _OAUTH_CONFIGS[provider]
token_url = cfg["token_url"].format(tenant_id=settings.onedrive_tenant_id)
redirect_uri = _redirect_uri(provider, settings)
payload = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": client_id,
"client_secret": client_secret,
}
try:
async with httpx.AsyncClient(timeout=15.0) as http:
resp = await http.post(token_url, data=payload)
resp.raise_for_status()
token_data = resp.json()
except Exception as exc:
log.error("OAuth token exchange failed for %s: %s", provider, exc)
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail="Token exchange failed")
from datetime import timedelta
expires_in = int(token_data.get("expires_in", 3600))
expiry = datetime.now(timezone.utc) + timedelta(seconds=expires_in - 60)
creds = {
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token", ""),
"token_expiry": expiry.isoformat(),
"token_type": token_data.get("token_type", "Bearer"),
}
encrypted = encrypt_credentials(settings.storage_encryption_key, creds)
repo = BandStorageRepository(session)
await repo.deactivate_all(band_id)
await repo.create(
band_id=band_id,
provider=provider,
label=None,
is_active=True,
root_path=None,
credentials=encrypted,
)
await session.commit()
log.info("Band %s connected to %s via OAuth2", band_id, provider)
# Redirect back to the frontend settings page
scheme = "http" if settings.debug else "https"
return RedirectResponse(
url=f"{scheme}://{settings.domain}/bands/{band_id}/settings?storage=connected",
status_code=status.HTTP_302_FOUND,
)
# ── Read / disconnect ──────────────────────────────────────────────────────────
@router.get("/bands/{band_id}/storage", response_model=list[BandStorageRead])
async def list_storage(
band_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
current_member: Member = Depends(get_current_member),
):
"""List all storage configs for a band (credentials never returned)."""
band_svc = BandService(session)
try:
await band_svc.assert_membership(band_id, current_member.id)
except PermissionError:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member")
repo = BandStorageRepository(session)
configs = await repo.list_for_band(band_id)
return [BandStorageRead.model_validate(c) for c in configs]
@router.delete("/bands/{band_id}/storage", status_code=status.HTTP_204_NO_CONTENT)
async def disconnect_storage(
band_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
current_member: Member = Depends(get_current_member),
):
"""Deactivate the band's active storage (does not delete historical records)."""
band_svc = BandService(session)
try:
await band_svc.assert_admin(band_id, current_member.id)
except PermissionError:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
repo = BandStorageRepository(session)
await repo.deactivate_all(band_id)
await session.commit()
log.info("Band %s storage disconnected by member %s", band_id, current_member.id)

View File

@@ -17,9 +17,11 @@ from rehearsalhub.repositories.member import MemberRepository
from rehearsalhub.repositories.song import SongRepository
from rehearsalhub.schemas.audio_version import AudioVersionCreate, AudioVersionRead
from rehearsalhub.services.auth import decode_token
from rehearsalhub.config import get_settings
from rehearsalhub.services.band import BandService
from rehearsalhub.services.song import SongService
from rehearsalhub.storage.nextcloud import NextcloudClient
from rehearsalhub.storage.factory import StorageFactory
from rehearsalhub.storage.protocol import StorageClient
router = APIRouter(tags=["versions"])
@@ -35,7 +37,7 @@ _AUDIO_CONTENT_TYPES: dict[str, str] = {
}
async def _download_with_retry(storage: NextcloudClient, file_path: str, max_retries: int = 3) -> bytes:
async def _download_with_retry(storage: StorageClient, file_path: str, max_retries: int = 3) -> bytes:
"""Download file from Nextcloud with retry logic for transient errors."""
last_error = None
@@ -171,8 +173,7 @@ async def create_version(
except PermissionError:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member")
storage = NextcloudClient.for_member(current_member)
song_svc = SongService(session, storage=storage)
song_svc = SongService(session)
version = await song_svc.register_version(song_id, data, current_member.id)
return AudioVersionRead.model_validate(version)
@@ -219,15 +220,12 @@ async def stream_version(
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No audio file")
# Use the uploader's NC credentials — invited members may not have NC configured
uploader: Member | None = None
if version.uploaded_by:
uploader = await MemberRepository(session).get_by_id(version.uploaded_by)
storage = NextcloudClient.for_member(uploader) if uploader else NextcloudClient.for_member(current_member)
if storage is None:
try:
storage = await StorageFactory.create(session, song.band_id, get_settings())
except LookupError:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No storage provider configured for this account"
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Band has no active storage configured",
)
try:
data = await _download_with_retry(storage, file_path)

View File

@@ -18,11 +18,11 @@ class BandCreate(BaseModel):
name: str
slug: str
genre_tags: list[str] = []
nc_base_path: str | None = None # e.g. "Bands/MyBand/" — defaults to "bands/{slug}/"
class BandUpdate(BaseModel):
nc_folder_path: str | None = None # update the Nextcloud base folder for scans
name: str | None = None
genre_tags: list[str] | None = None
class BandRead(BaseModel):
@@ -31,7 +31,6 @@ class BandRead(BaseModel):
name: str
slug: str
genre_tags: list[str]
nc_folder_path: str | None = None
created_at: datetime
updated_at: datetime

View File

@@ -13,23 +13,9 @@ class MemberRead(MemberBase):
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID
avatar_url: str | None = None
nc_username: str | None = None
nc_url: str | None = None
nc_configured: bool = False # True if nc_url + nc_username + nc_password are all set
created_at: datetime
@classmethod
def from_model(cls, m: object) -> "MemberRead":
obj = cls.model_validate(m)
obj.nc_configured = bool(
m.nc_url and m.nc_username and m.nc_password
)
return obj
class MemberSettingsUpdate(BaseModel):
display_name: str | None = None
nc_url: str | None = None
nc_username: str | None = None
nc_password: str | None = None # send null to clear, omit to leave unchanged
avatar_url: str | None = None # URL to user's avatar image
avatar_url: str | None = None

View File

@@ -0,0 +1,56 @@
"""Pydantic schemas for storage provider configuration endpoints."""
from __future__ import annotations
import uuid
from datetime import datetime
from pydantic import BaseModel, field_validator
# ── Request bodies ─────────────────────────────────────────────────────────────
class NextcloudConnect(BaseModel):
"""Connect a band to a Nextcloud instance via an app password.
Use an *app password* (generated in Nextcloud → Settings → Security),
not the account password. App passwords can be revoked without changing
the main account credentials.
"""
url: str
username: str
app_password: str
label: str | None = None
root_path: str | None = None
@field_validator("url")
@classmethod
def strip_trailing_slash(cls, v: str) -> str:
return v.rstrip("/")
# ── Response bodies ────────────────────────────────────────────────────────────
class BandStorageRead(BaseModel):
"""Public representation of a storage config — credentials are never exposed."""
id: uuid.UUID
band_id: uuid.UUID
provider: str
label: str | None
is_active: bool
root_path: str | None
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
class OAuthAuthorizeResponse(BaseModel):
"""Returned by the authorize endpoint — frontend should redirect the user here."""
redirect_url: str
provider: str

View File

@@ -0,0 +1,38 @@
"""Fernet-based symmetric encryption for storage credentials.
The encryption key must be a 32-byte URL-safe base64-encoded string,
generated once via: Fernet.generate_key().decode()
and stored in the STORAGE_ENCRYPTION_KEY environment variable.
No credentials are ever stored in plaintext — only the encrypted blob
is written to the database.
"""
from __future__ import annotations
import json
from cryptography.fernet import Fernet, InvalidToken
def encrypt_credentials(key: str, data: dict) -> str:
"""Serialize *data* to JSON and encrypt it with Fernet.
Returns a URL-safe base64-encoded ciphertext string safe to store in TEXT columns.
"""
f = Fernet(key.encode())
plaintext = json.dumps(data, separators=(",", ":")).encode()
return f.encrypt(plaintext).decode()
def decrypt_credentials(key: str, blob: str) -> dict:
"""Decrypt and deserialize a blob previously created by :func:`encrypt_credentials`.
Raises ``cryptography.fernet.InvalidToken`` if the key is wrong or the blob is tampered.
"""
f = Fernet(key.encode())
try:
plaintext = f.decrypt(blob.encode())
except InvalidToken:
raise InvalidToken("Failed to decrypt storage credentials — wrong key or corrupted blob")
return json.loads(plaintext)

View File

@@ -8,53 +8,45 @@ from sqlalchemy.ext.asyncio import AsyncSession
from rehearsalhub.db.models import Band
from rehearsalhub.repositories.band import BandRepository
from rehearsalhub.schemas.band import BandCreate
from rehearsalhub.storage.nextcloud import NextcloudClient
log = logging.getLogger(__name__)
class BandService:
def __init__(self, session: AsyncSession, storage: NextcloudClient | None = None) -> None:
def __init__(self, session: AsyncSession) -> None:
self._repo = BandRepository(session)
self._storage = storage
self._session = session
async def create_band(
self,
data: BandCreate,
creator_id: uuid.UUID,
creator: object | None = None,
) -> Band:
if await self._repo.get_by_slug(data.slug):
raise ValueError(f"Slug already taken: {data.slug}")
nc_folder = (data.nc_base_path or f"bands/{data.slug}/").strip("/") + "/"
storage = NextcloudClient.for_member(creator) if creator else self._storage
if data.nc_base_path:
# User explicitly specified a folder — verify it actually exists in NC.
log.info("Checking NC folder existence: %s", nc_folder)
try:
await storage.get_file_metadata(nc_folder.rstrip("/"))
except Exception as exc:
log.warning("NC folder '%s' not accessible: %s", nc_folder, exc)
raise LookupError(f"Nextcloud folder '{nc_folder}' not found or not accessible")
else:
# Auto-generated path — create it (idempotent MKCOL).
log.info("Creating NC folder: %s", nc_folder)
try:
await storage.create_folder(nc_folder)
except Exception as exc:
# Not fatal — NC may be temporarily unreachable during dev/test.
log.warning("Could not create NC folder '%s': %s", nc_folder, exc)
band = await self._repo.create(
name=data.name,
slug=data.slug,
genre_tags=data.genre_tags,
nc_folder_path=nc_folder,
)
await self._repo.add_member(band.id, creator_id, role="admin")
log.info("Created band '%s' (slug=%s, nc_folder=%s)", data.name, data.slug, nc_folder)
log.info("Created band '%s' (slug=%s)", data.name, data.slug)
# Storage is configured separately via POST /bands/{id}/storage/connect/*.
# If the band already has active storage, create the root folder now.
try:
from rehearsalhub.storage.factory import StorageFactory
from rehearsalhub.config import get_settings
storage = await StorageFactory.create(self._session, band.id, get_settings())
root = f"bands/{data.slug}/"
await storage.create_folder(root.strip("/") + "/")
log.info("Created storage folder '%s' for band '%s'", root, data.slug)
except LookupError:
log.info("Band '%s' has no active storage yet — skipping folder creation", data.slug)
except Exception as exc:
log.warning("Could not create storage folder for band '%s': %s", data.slug, exc)
return band
async def get_band_with_members(self, band_id: uuid.UUID) -> Band | None:

View File

@@ -1,11 +1,16 @@
"""Core nc-scan logic shared by the blocking and streaming endpoints."""
"""Storage scan logic: walk a band's storage folder and import audio files.
Works against any ``StorageClient`` implementation — Nextcloud, Google Drive, etc.
``StorageClient.list_folder`` must return ``FileMetadata`` objects whose ``path``
field is a *provider-relative* path (i.e. the DAV prefix has already been stripped
by the client implementation).
"""
from __future__ import annotations
import logging
from collections.abc import AsyncGenerator
from pathlib import Path
from urllib.parse import unquote
from sqlalchemy.ext.asyncio import AsyncSession
@@ -16,7 +21,7 @@ from rehearsalhub.schemas.audio_version import AudioVersionCreate
from rehearsalhub.schemas.song import SongRead
from rehearsalhub.services.session import extract_session_folder, parse_rehearsal_date
from rehearsalhub.services.song import SongService
from rehearsalhub.storage.nextcloud import NextcloudClient
from rehearsalhub.storage.protocol import StorageClient
log = logging.getLogger(__name__)
@@ -27,72 +32,53 @@ AUDIO_EXTENSIONS = {".mp3", ".wav", ".flac", ".ogg", ".m4a", ".aac", ".opus"}
MAX_SCAN_DEPTH = 3
def _make_relative(dav_prefix: str):
"""Return a function that strips the WebDAV prefix and URL-decodes a href."""
def relative(href: str) -> str:
decoded = unquote(href)
if decoded.startswith(dav_prefix):
return decoded[len(dav_prefix):]
# Strip any leading slash for robustness
return decoded.lstrip("/")
return relative
async def collect_audio_files(
nc: NextcloudClient,
relative: object, # Callable[[str], str]
storage: StorageClient,
folder_path: str,
max_depth: int = MAX_SCAN_DEPTH,
_depth: int = 0,
) -> AsyncGenerator[str, None]:
"""
Recursively yield user-relative audio file paths under folder_path.
"""Recursively yield provider-relative audio file paths under *folder_path*.
Handles any depth:
bands/slug/take.wav depth 0
bands/slug/231015/take.wav depth 1
bands/slug/231015/groove/take.wav depth 2 ← was broken before
``storage.list_folder`` is expected to return ``FileMetadata`` with paths
already normalised to provider-relative form (no host, no DAV prefix).
"""
if _depth > max_depth:
log.debug("Max depth %d exceeded at '%s', stopping recursion", max_depth, folder_path)
return
try:
items = await nc.list_folder(folder_path)
items = await storage.list_folder(folder_path)
except Exception as exc:
log.warning("Could not list folder '%s': %s", folder_path, exc)
return
log.info(
"scan depth=%d folder='%s' entries=%d",
_depth, folder_path, len(items),
)
log.info("scan depth=%d folder='%s' entries=%d", _depth, folder_path, len(items))
for item in items:
rel = relative(item.path) # type: ignore[operator]
if rel.endswith("/"):
# It's a subdirectory — recurse
log.info(" → subdir: %s", rel)
async for subpath in collect_audio_files(nc, relative, rel, max_depth, _depth + 1):
path = item.path.lstrip("/")
if path.endswith("/"):
log.info(" → subdir: %s", path)
async for subpath in collect_audio_files(storage, path, max_depth, _depth + 1):
yield subpath
else:
ext = Path(rel).suffix.lower()
ext = Path(path).suffix.lower()
if ext in AUDIO_EXTENSIONS:
log.info(" → audio file: %s", rel)
yield rel
log.info(" → audio file: %s", path)
yield path
elif ext:
log.debug(" → skip (ext=%s): %s", ext, rel)
log.debug(" → skip (ext=%s): %s", ext, path)
async def scan_band_folder(
db_session: AsyncSession,
nc: NextcloudClient,
storage: StorageClient,
band_id,
band_folder: str,
member_id,
) -> AsyncGenerator[dict, None]:
"""
Async generator that scans band_folder and yields event dicts:
"""Async generator that scans *band_folder* and yields event dicts:
{"type": "progress", "message": str}
{"type": "song", "song": SongRead-dict, "is_new": bool}
{"type": "session", "session": {id, date, label}}
@@ -100,12 +86,9 @@ async def scan_band_folder(
{"type": "done", "stats": {found, imported, skipped}}
{"type": "error", "message": str}
"""
dav_prefix = f"/remote.php/dav/files/{nc._auth[0]}/"
relative = _make_relative(dav_prefix)
version_repo = AudioVersionRepository(db_session)
session_repo = RehearsalSessionRepository(db_session)
song_repo = SongRepository(db_session)
version_repo = AudioVersionRepository(db_session)
song_svc = SongService(db_session)
found = 0
@@ -114,87 +97,92 @@ async def scan_band_folder(
yield {"type": "progress", "message": f"Scanning {band_folder}"}
async for nc_file_path in collect_audio_files(nc, relative, band_folder):
async for nc_file_path in collect_audio_files(storage, band_folder):
found += 1
song_folder = str(Path(nc_file_path).parent).rstrip("/") + "/"
song_title = Path(nc_file_path).stem
# If the file sits directly inside a dated session folder (YYMMDD/file.wav),
# give it a unique virtual folder so each file becomes its own song rather
# than being merged as a new version of the first file in that folder.
# give it a unique virtual folder so each file becomes its own song.
session_folder_path = extract_session_folder(nc_file_path)
if session_folder_path and session_folder_path.rstrip("/") == song_folder.rstrip("/"):
song_folder = song_folder + song_title + "/"
yield {"type": "progress", "message": f"Checking {Path(nc_file_path).name}"}
# Fetch file metadata (etag + size) — one PROPFIND per file
existing = await version_repo.get_by_nc_file_path(nc_file_path)
if existing is not None:
log.debug("scan: skipping already-registered '%s' (version %s)", nc_file_path, existing.id)
skipped += 1
yield {"type": "skipped", "path": nc_file_path, "reason": "already imported"}
continue
try:
meta = await nc.get_file_metadata(nc_file_path)
meta = await storage.get_file_metadata(nc_file_path)
etag = meta.etag
except Exception as exc:
log.warning("Metadata error for '%s': %s", nc_file_path, exc)
log.error("Metadata fetch failed for '%s': %s", nc_file_path, exc, exc_info=True)
skipped += 1
yield {"type": "skipped", "path": nc_file_path, "reason": f"metadata error: {exc}"}
continue
# Skip if this exact version is already indexed
if etag and await version_repo.get_by_etag(etag):
log.info("Already registered (etag match): %s", nc_file_path)
skipped += 1
yield {"type": "skipped", "path": nc_file_path, "reason": "already registered"}
continue
try:
rehearsal_date = parse_rehearsal_date(nc_file_path)
rehearsal_session_id = None
if rehearsal_date:
session_folder = extract_session_folder(nc_file_path) or song_folder
rs = await session_repo.get_or_create(band_id, rehearsal_date, session_folder)
rehearsal_session_id = rs.id
yield {
"type": "session",
"session": {
"id": str(rs.id),
"date": rs.date.isoformat(),
"label": rs.label,
"nc_folder_path": rs.nc_folder_path,
},
}
# Resolve or create a RehearsalSession from a YYMMDD folder segment
rehearsal_date = parse_rehearsal_date(nc_file_path)
rehearsal_session_id = None
if rehearsal_date:
session_folder = extract_session_folder(nc_file_path) or song_folder
rs = await session_repo.get_or_create(band_id, rehearsal_date, session_folder)
rehearsal_session_id = rs.id
yield {
"type": "session",
"session": {
"id": str(rs.id),
"date": rs.date.isoformat(),
"label": rs.label,
"nc_folder_path": rs.nc_folder_path,
},
}
song = await song_repo.get_by_nc_folder_path(song_folder)
if song is None:
song = await song_repo.get_by_title_and_band(band_id, song_title)
is_new = song is None
if is_new:
log.info("Creating song '%s' folder='%s'", song_title, song_folder)
song = await song_repo.create(
band_id=band_id,
session_id=rehearsal_session_id,
title=song_title,
status="jam",
notes=None,
nc_folder_path=song_folder,
created_by=member_id,
)
elif rehearsal_session_id and song.session_id is None:
song = await song_repo.update(song, session_id=rehearsal_session_id)
# Find or create the Song record
song = await song_repo.get_by_nc_folder_path(song_folder)
if song is None:
song = await song_repo.get_by_title_and_band(band_id, song_title)
is_new = song is None
if is_new:
log.info("Creating song '%s' folder='%s'", song_title, song_folder)
song = await song_repo.create(
band_id=band_id,
session_id=rehearsal_session_id,
title=song_title,
status="jam",
notes=None,
nc_folder_path=song_folder,
created_by=member_id,
version = await song_svc.register_version(
song.id,
AudioVersionCreate(
nc_file_path=nc_file_path,
nc_file_etag=etag,
format=Path(nc_file_path).suffix.lstrip(".").lower(),
file_size_bytes=meta.size,
),
member_id,
)
elif rehearsal_session_id and song.session_id is None:
song = await song_repo.update(song, session_id=rehearsal_session_id)
log.info("Imported '%s' as version %s for song '%s'", nc_file_path, version.id, song.title)
# Register the audio version
await song_svc.register_version(
song.id,
AudioVersionCreate(
nc_file_path=nc_file_path,
nc_file_etag=etag,
format=Path(nc_file_path).suffix.lstrip(".").lower(),
file_size_bytes=meta.size,
),
member_id,
)
imported += 1
read = SongRead.model_validate(song).model_copy(
update={"version_count": 1, "session_id": rehearsal_session_id}
)
yield {"type": "song", "song": read.model_dump(mode="json"), "is_new": is_new}
imported += 1
read = SongRead.model_validate(song).model_copy(update={"version_count": 1, "session_id": rehearsal_session_id})
yield {"type": "song", "song": read.model_dump(mode="json"), "is_new": is_new}
except Exception as exc:
log.error("Failed to import '%s': %s", nc_file_path, exc, exc_info=True)
skipped += 1
yield {"type": "skipped", "path": nc_file_path, "reason": f"import error: {exc}"}
yield {
"type": "done",

View File

@@ -1,16 +1,18 @@
from __future__ import annotations
import logging
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
log = logging.getLogger(__name__)
from rehearsalhub.db.models import AudioVersion, Song
from rehearsalhub.queue.redis_queue import RedisJobQueue
from rehearsalhub.repositories.audio_version import AudioVersionRepository
from rehearsalhub.repositories.song import SongRepository
from rehearsalhub.schemas.audio_version import AudioVersionCreate
from rehearsalhub.schemas.song import SongCreate, SongRead
from rehearsalhub.storage.nextcloud import NextcloudClient
class SongService:
@@ -18,25 +20,31 @@ class SongService:
self,
session: AsyncSession,
job_queue: RedisJobQueue | None = None,
storage: NextcloudClient | None = None,
) -> None:
self._repo = SongRepository(session)
self._version_repo = AudioVersionRepository(session)
self._session = session
self._queue = job_queue or RedisJobQueue(session)
self._storage = storage
async def create_song(
self, band_id: uuid.UUID, data: SongCreate, creator_id: uuid.UUID, band_slug: str,
creator: object | None = None,
self,
band_id: uuid.UUID,
data: SongCreate,
creator_id: uuid.UUID,
band_slug: str,
) -> Song:
from rehearsalhub.storage.nextcloud import NextcloudClient
nc_folder = f"bands/{band_slug}/songs/{data.title.lower().replace(' ', '-')}/"
storage = NextcloudClient.for_member(creator) if creator else self._storage
try:
from rehearsalhub.config import get_settings
from rehearsalhub.storage.factory import StorageFactory
storage = await StorageFactory.create(self._session, band_id, get_settings())
await storage.create_folder(nc_folder)
except LookupError:
log.info("Band %s has no active storage — skipping folder creation for '%s'", band_id, nc_folder)
nc_folder = None # type: ignore[assignment]
except Exception:
nc_folder = None # best-effort
nc_folder = None # best-effort; storage may be temporarily unreachable
song = await self._repo.create(
band_id=band_id,
@@ -67,11 +75,6 @@ class SongService:
data: AudioVersionCreate,
uploader_id: uuid.UUID,
) -> AudioVersion:
if data.nc_file_etag:
existing = await self._version_repo.get_by_etag(data.nc_file_etag)
if existing:
return existing
version_number = await self._repo.next_version_number(song_id)
version = await self._version_repo.create(
song_id=song_id,
@@ -85,8 +88,15 @@ class SongService:
uploaded_by=uploader_id,
)
await self._queue.enqueue(
"transcode",
{"version_id": str(version.id), "nc_file_path": data.nc_file_path},
)
try:
await self._queue.enqueue(
"transcode",
{"version_id": str(version.id), "nc_file_path": data.nc_file_path},
)
except Exception as exc:
log.error(
"Failed to enqueue transcode job for version %s ('%s'): %s",
version.id, data.nc_file_path, exc, exc_info=True,
)
return version

View File

@@ -0,0 +1,175 @@
"""StorageFactory — creates the correct StorageClient from a BandStorage record.
Usage:
storage = await StorageFactory.create(session, band_id, settings)
await storage.list_folder("bands/my-band/")
Token refresh for OAuth2 providers is handled transparently: if the stored
access token is expired the factory refreshes it and persists the new tokens
before returning the client.
"""
from __future__ import annotations
import logging
import uuid
from datetime import datetime, timezone
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from rehearsalhub.config import Settings, get_settings
from rehearsalhub.db.models import BandStorage
from rehearsalhub.repositories.band_storage import BandStorageRepository
from rehearsalhub.security.encryption import decrypt_credentials, encrypt_credentials
from rehearsalhub.storage.nextcloud import NextcloudClient
from rehearsalhub.storage.protocol import StorageClient
log = logging.getLogger(__name__)
class StorageFactory:
@staticmethod
async def create(
session: AsyncSession,
band_id: uuid.UUID,
settings: Settings | None = None,
) -> StorageClient:
"""Return a ready-to-use ``StorageClient`` for *band_id*.
Raises ``LookupError`` if the band has no active storage configured.
"""
if settings is None:
settings = get_settings()
repo = BandStorageRepository(session)
band_storage = await repo.get_active_for_band(band_id)
if band_storage is None:
raise LookupError(f"Band {band_id} has no active storage configured")
return await StorageFactory._build(session, band_storage, settings)
@staticmethod
async def _build(
session: AsyncSession,
band_storage: BandStorage,
settings: Settings,
) -> StorageClient:
creds = decrypt_credentials(settings.storage_encryption_key, band_storage.credentials)
creds = await _maybe_refresh_token(session, band_storage, creds, settings)
match band_storage.provider:
case "nextcloud":
return NextcloudClient(
base_url=creds["url"],
username=creds["username"],
password=creds["app_password"],
)
case "googledrive":
raise NotImplementedError("Google Drive storage client not yet implemented")
case "onedrive":
raise NotImplementedError("OneDrive storage client not yet implemented")
case "dropbox":
raise NotImplementedError("Dropbox storage client not yet implemented")
case _:
raise ValueError(f"Unknown storage provider: {band_storage.provider!r}")
# ── OAuth2 token refresh ───────────────────────────────────────────────────────
_TOKEN_ENDPOINTS: dict[str, str] = {
"googledrive": "https://oauth2.googleapis.com/token",
"dropbox": "https://api.dropbox.com/oauth2/token",
# OneDrive token endpoint is tenant-specific; handled separately.
}
async def _maybe_refresh_token(
session: AsyncSession,
band_storage: BandStorage,
creds: dict,
settings: Settings,
) -> dict:
"""If the OAuth2 access token is expired, refresh it and persist the update."""
if band_storage.provider == "nextcloud":
return creds # Nextcloud uses app passwords — no expiry
expiry_str = creds.get("token_expiry")
if not expiry_str:
return creds # No expiry recorded — assume still valid
expiry = datetime.fromisoformat(expiry_str)
if expiry.tzinfo is None:
expiry = expiry.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) < expiry:
return creds # Still valid
log.info(
"Access token for band_storage %s (%s) expired — refreshing",
band_storage.id,
band_storage.provider,
)
try:
creds = await _do_refresh(band_storage, creds, settings)
# Persist refreshed tokens
from rehearsalhub.config import get_settings as _gs
_settings = settings or _gs()
band_storage.credentials = encrypt_credentials(_settings.storage_encryption_key, creds)
await session.flush()
except Exception:
log.exception("Token refresh failed for band_storage %s", band_storage.id)
raise
return creds
async def _do_refresh(band_storage: BandStorage, creds: dict, settings: Settings) -> dict:
"""Call the provider's token endpoint and return updated credentials."""
from datetime import timedelta
provider = band_storage.provider
if provider == "onedrive":
tenant = settings.onedrive_tenant_id
token_url = f"https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token"
client_id = settings.onedrive_client_id
client_secret = settings.onedrive_client_secret
extra: dict = {"scope": "https://graph.microsoft.com/Files.ReadWrite offline_access"}
elif provider == "googledrive":
token_url = _TOKEN_ENDPOINTS["googledrive"]
client_id = settings.google_client_id
client_secret = settings.google_client_secret
extra = {}
elif provider == "dropbox":
token_url = _TOKEN_ENDPOINTS["dropbox"]
client_id = settings.dropbox_app_key
client_secret = settings.dropbox_app_secret
extra = {}
else:
raise ValueError(f"Token refresh not supported for provider: {provider!r}")
payload = {
"grant_type": "refresh_token",
"refresh_token": creds["refresh_token"],
"client_id": client_id,
"client_secret": client_secret,
**extra,
}
async with httpx.AsyncClient(timeout=15.0) as http:
resp = await http.post(token_url, data=payload)
resp.raise_for_status()
data = resp.json()
expires_in = int(data.get("expires_in", 3600))
expiry = datetime.now(timezone.utc) + timedelta(seconds=expires_in - 60) # 60s buffer
return {
**creds,
"access_token": data["access_token"],
"refresh_token": data.get("refresh_token", creds["refresh_token"]),
"token_expiry": expiry.isoformat(),
"token_type": data.get("token_type", "Bearer"),
}

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import logging
import xml.etree.ElementTree as ET
from typing import Any
from urllib.parse import unquote
import httpx
@@ -25,19 +26,11 @@ class NextcloudClient:
if not base_url or not username:
raise ValueError("Nextcloud credentials must be provided explicitly")
self._base = base_url.rstrip("/")
self._username = username
self._auth = (username, password)
self._dav_root = f"{self._base}/remote.php/dav/files/{self._auth[0]}"
@classmethod
def for_member(cls, member: object) -> NextcloudClient | None:
"""Return a client using member's personal NC credentials if configured.
Returns None if member has no Nextcloud configuration."""
nc_url = getattr(member, "nc_url", None)
nc_username = getattr(member, "nc_username", None)
nc_password = getattr(member, "nc_password", None)
if nc_url and nc_username and nc_password:
return cls(base_url=nc_url, username=nc_username, password=nc_password)
return None
self._dav_root = f"{self._base}/remote.php/dav/files/{username}"
# Prefix stripped from WebDAV hrefs to produce relative paths
self._dav_prefix = f"/remote.php/dav/files/{username}/"
def _client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(auth=self._auth, timeout=30.0)
@@ -83,7 +76,17 @@ class NextcloudClient:
content=body,
)
resp.raise_for_status()
return _parse_propfind_multi(resp.text)
items = _parse_propfind_multi(resp.text)
# Normalise WebDAV absolute hrefs to provider-relative paths so callers
# never need to know about DAV internals. URL-decode to handle
# filenames that contain spaces or non-ASCII characters.
for item in items:
decoded = unquote(item.path)
if decoded.startswith(self._dav_prefix):
item.path = decoded[len(self._dav_prefix):]
else:
item.path = decoded.lstrip("/")
return items
async def download(self, path: str) -> bytes:
logger.debug("Downloading file from Nextcloud: %s", path)

View File

@@ -92,10 +92,9 @@ async def test_waveform_404_when_no_peaks_in_db(mock_session):
with (
patch("rehearsalhub.routers.versions._get_version_and_assert_band_membership",
return_value=(version, song)),
return_value=(version, song)),pytest.raises(HTTPException) as exc_info
):
with pytest.raises(HTTPException) as exc_info:
await get_waveform(version_id=version.id, session=mock_session, current_member=member)
await get_waveform(version_id=version.id, session=mock_session, current_member=member)
assert exc_info.value.status_code == 404
@@ -113,8 +112,8 @@ async def test_waveform_mini_404_when_no_mini_peaks(mock_session):
with (
patch("rehearsalhub.routers.versions._get_version_and_assert_band_membership",
return_value=(version, song)),
pytest.raises(HTTPException) as exc_info,
):
with pytest.raises(HTTPException) as exc_info:
await get_waveform(version_id=version.id, session=mock_session, current_member=member, resolution="mini")
await get_waveform(version_id=version.id, session=mock_session, current_member=member, resolution="mini")
assert exc_info.value.status_code == 404

View File

@@ -7,6 +7,8 @@ services:
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-default_secure_password}
volumes:
- pg_data_dev:/var/lib/postgresql/data
ports:
- "5432:5432"
networks:
- rh_net
healthcheck:
@@ -20,6 +22,11 @@ services:
image: redis:7-alpine
networks:
- rh_net
healthcheck:
test: ["CMD-SHELL", "redis-cli ping || exit 1"]
interval: 5s
timeout: 3s
retries: 10
api:
build:
@@ -34,6 +41,7 @@ services:
REDIS_URL: redis://redis:6379/0
SECRET_KEY: ${SECRET_KEY:-replace_me_with_32_byte_hex_default}
INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default}
STORAGE_ENCRYPTION_KEY: ${STORAGE_ENCRYPTION_KEY:-5vaaZQs4J7CFYZ7fqee37HgIt4xNxKHHX6OWd29Yh5E=}
DOMAIN: localhost
ports:
- "8000:8000"
@@ -43,6 +51,29 @@ services:
db:
condition: service_healthy
audio-worker:
build:
context: ./worker
target: development
environment:
DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-rh_user}:${POSTGRES_PASSWORD:-default_secure_password}@db:5432/${POSTGRES_DB:-rehearsalhub}
REDIS_URL: redis://redis:6379/0
API_URL: http://api:8000
INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default}
ANALYSIS_VERSION: "1.0.0"
LOG_LEVEL: DEBUG
PYTHONUNBUFFERED: "1"
volumes:
- ./worker/src:/app/src:z
- audio_tmp:/tmp/audio
networks:
- rh_net
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
web:
build:
context: ./web
@@ -62,3 +93,4 @@ networks:
volumes:
pg_data_dev:
audio_tmp:

134
docker-compose.prod.yml Normal file
View File

@@ -0,0 +1,134 @@
services:
db:
image: postgres:16-alpine
environment:
POSTGRES_DB: ${POSTGRES_DB:-rehearsalhub}
POSTGRES_USER: ${POSTGRES_USER:-rh_user}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-default_secure_password}
volumes:
- pg_data:/var/lib/postgresql/data
networks:
- rh_net
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rh_user} -d ${POSTGRES_DB:-rehearsalhub} || exit 1"]
interval: 15s
timeout: 10s
retries: 30
start_period: 45s
restart: unless-stopped
command: ["postgres", "-c", "max_connections=200", "-c", "shared_buffers=256MB"]
redis:
image: redis:7-alpine
command: redis-server --save 60 1 --loglevel warning
volumes:
- redis_data:/data
networks:
- rh_net
healthcheck:
test: ["CMD-SHELL", "redis-cli ping || exit 1"]
interval: 10s
timeout: 5s
retries: 15
start_period: 25s
restart: unless-stopped
deploy:
resources:
limits:
memory: 256M
api:
image: git.sschuhmann.de/sschuhmann/rehearsalhub/api:0.1.0
environment:
DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-rh_user}:${POSTGRES_PASSWORD:-default_secure_password}@db:5432/${POSTGRES_DB:-rehearsalhub}
NEXTCLOUD_URL: ${NEXTCLOUD_URL:-https://cloud.example.com}
NEXTCLOUD_USER: ${NEXTCLOUD_USER:-rh_service}
NEXTCLOUD_PASS: ${NEXTCLOUD_PASS:-default_password}
REDIS_URL: redis://redis:6379/0
SECRET_KEY: ${SECRET_KEY:-replace_me_with_32_byte_hex_default}
INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default}
STORAGE_ENCRYPTION_KEY: ${STORAGE_ENCRYPTION_KEY}
DOMAIN: ${DOMAIN:-localhost}
networks:
- rh_net
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8000/api/health || exit 1"]
interval: 20s
timeout: 10s
retries: 5
start_period: 60s
restart: unless-stopped
deploy:
resources:
limits:
memory: 512M
audio-worker:
image: git.sschuhmann.de/sschuhmann/rehearsalhub/worker:0.1.0
environment:
DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-rh_user}:${POSTGRES_PASSWORD:-default_secure_password}@db:5432/${POSTGRES_DB:-rehearsalhub}
REDIS_URL: redis://redis:6379/0
API_URL: http://api:8000
INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default}
ANALYSIS_VERSION: "1.0.0"
volumes:
- audio_tmp:/tmp/audio
networks:
- rh_net
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
api:
condition: service_started
restart: unless-stopped
deploy:
replicas: ${WORKER_REPLICAS:-2}
nc-watcher:
image: git.sschuhmann.de/sschuhmann/rehearsalhub/watcher:0.1.0
environment:
NEXTCLOUD_URL: ${NEXTCLOUD_URL:-https://cloud.example.com}
NEXTCLOUD_USER: ${NEXTCLOUD_USER:-rh_service}
NEXTCLOUD_PASS: ${NEXTCLOUD_PASS:-default_password}
API_URL: http://api:8000
REDIS_URL: redis://redis:6379/0
POLL_INTERVAL: "30"
networks:
- rh_net
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
api:
condition: service_started
restart: unless-stopped
web:
image: git.sschuhmann.de/sschuhmann/rehearsalhub/web:0.1.0
ports:
- "8080:80"
networks:
- frontend
- rh_net
depends_on:
- api
restart: unless-stopped
networks:
frontend:
name: proxy
external: true
rh_net:
volumes:
pg_data:
redis_data:
audio_tmp:

View File

@@ -41,7 +41,7 @@ services:
build:
context: ./api
target: production
image: rehearsalhub/api:latest
image: rehearshalhub/api:latest
environment:
DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-rh_user}:${POSTGRES_PASSWORD:-default_secure_password}@db:5432/${POSTGRES_DB:-rehearsalhub}
NEXTCLOUD_URL: ${NEXTCLOUD_URL:-https://cloud.example.com}
@@ -50,6 +50,7 @@ services:
REDIS_URL: redis://redis:6379/0
SECRET_KEY: ${SECRET_KEY:-replace_me_with_32_byte_hex_default}
INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default}
STORAGE_ENCRYPTION_KEY: ${STORAGE_ENCRYPTION_KEY:-5vaaZQs4J7CFYZ7fqee37HgIt4xNxKHHX6OWd29Yh5E=}
DOMAIN: ${DOMAIN:-localhost}
networks:
- rh_net
@@ -74,13 +75,12 @@ services:
build:
context: ./worker
target: production
image: rehearsalhub/audio-worker:latest
image: rehearshalhub/audio-worker:latest
environment:
DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-rh_user}:${POSTGRES_PASSWORD:-default_secure_password}@db:5432/${POSTGRES_DB:-rehearsalhub}
REDIS_URL: redis://redis:6379/0
NEXTCLOUD_URL: ${NEXTCLOUD_URL:-https://cloud.example.com}
NEXTCLOUD_USER: ${NEXTCLOUD_USER:-rh_service}
NEXTCLOUD_PASS: ${NEXTCLOUD_PASS:-default_password}
API_URL: http://api:8000
INTERNAL_SECRET: ${INTERNAL_SECRET:-replace_me_with_32_byte_hex_default}
ANALYSIS_VERSION: "1.0.0"
volumes:
- audio_tmp:/tmp/audio
@@ -94,12 +94,14 @@ services:
api:
condition: service_started
restart: unless-stopped
deploy:
replicas: ${WORKER_REPLICAS:-2}
nc-watcher:
build:
context: ./watcher
target: production
image: rehearsalhub/nc-watcher:latest
image: rehearshalhub/nc-watcher:latest
environment:
NEXTCLOUD_URL: ${NEXTCLOUD_URL:-https://cloud.example.com}
NEXTCLOUD_USER: ${NEXTCLOUD_USER:-rh_service}
@@ -122,7 +124,7 @@ services:
build:
context: ./web
target: production
image: rehearsalhub/web:latest
image: rehearshalhub/web:latest
ports:
- "8080:80"
networks:

View File

@@ -5,11 +5,10 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
class WatcherSettings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
nextcloud_url: str = "http://nextcloud"
nextcloud_user: str = "ncadmin"
nextcloud_pass: str = ""
api_url: str = "http://api:8000"
# Shared secret for calling internal API endpoints
internal_secret: str = "dev-change-me-in-production"
redis_url: str = "redis://localhost:6379/0"
job_queue_key: str = "rh:jobs"
@@ -18,6 +17,10 @@ class WatcherSettings(BaseSettings):
# File extensions to watch
audio_extensions: list[str] = [".wav", ".mp3", ".flac", ".aac", ".ogg", ".m4a", ".opus"]
# How often (in poll cycles) to refresh the list of bands from the API.
# 0 = only on startup, N = every N poll cycles.
config_refresh_interval: int = 10
@lru_cache
def get_settings() -> WatcherSettings:

View File

@@ -1,149 +1,93 @@
"""Event loop: poll Nextcloud activity, detect audio uploads, push to API."""
"""Event loop: fetch per-band storage configs from the API, detect audio uploads."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
import httpx
from watcher.config import WatcherSettings
from watcher.nc_client import NextcloudWatcherClient
from watcher.nc_watcher import NextcloudWatcher
from watcher.protocol import FileEvent, WatcherClient
log = logging.getLogger("watcher.event_loop")
# Persist last seen activity ID in-process (good enough for a POC)
_last_activity_id: int = 0
# Nextcloud Activity API v2 filter sets.
#
# NC 22+ returns: type="file_created"|"file_changed" (subject is human-readable)
# NC <22 returns: type="files" (subject is a machine key like "created_self")
#
# We accept either style so the watcher works across NC versions.
_UPLOAD_TYPES = {"file_created", "file_changed"}
_UPLOAD_SUBJECTS = {
"created_by",
"changed_by",
"created_public",
"created_self",
"changed_self",
}
def is_audio_file(path: str, extensions: list[str]) -> bool:
return Path(path).suffix.lower() in extensions
def normalize_nc_path(raw_path: str, username: str) -> str:
"""
Strip the Nextcloud WebDAV/activity path prefix so we get a plain
user-relative path.
Activity objects can look like:
/username/files/bands/slug/...
/remote.php/dav/files/username/bands/slug/...
bands/slug/... (already relative)
"""
path = raw_path.strip("/")
# /remote.php/dav/files/<user>/...
dav_prefix = f"remote.php/dav/files/{username}/"
if path.startswith(dav_prefix):
return path[len(dav_prefix):]
# /<username>/files/... (activity app format)
user_files_prefix = f"{username}/files/"
if path.startswith(user_files_prefix):
return path[len(user_files_prefix):]
# files/...
if path.startswith("files/"):
return path[len("files/"):]
return path
def extract_nc_file_path(activity: dict[str, Any]) -> str | None:
"""Extract the server-relative file path from an activity event."""
objects = activity.get("objects", {})
if isinstance(objects, dict):
for _file_id, file_path in objects.items():
if isinstance(file_path, str):
return file_path
# Fallback: older NC versions put it in object_name
return activity.get("object_name") or None
async def register_version_with_api(nc_file_path: str, nc_file_etag: str | None, api_url: str) -> bool:
async def fetch_nextcloud_configs(settings: WatcherSettings) -> list[dict]:
"""Fetch active Nextcloud configs for all bands from the internal API."""
url = f"{settings.api_url}/api/v1/internal/storage/nextcloud-watch-configs"
headers = {"X-Internal-Token": settings.internal_secret}
try:
payload = {"nc_file_path": nc_file_path, "nc_file_etag": nc_file_etag}
async with httpx.AsyncClient(timeout=15.0) as c:
resp = await c.post(f"{api_url}/api/v1/internal/nc-upload", json=payload)
resp = await c.get(url, headers=headers)
resp.raise_for_status()
return resp.json()
except Exception as exc:
log.warning("Failed to fetch NC configs from API: %s", exc)
return []
def build_nc_watchers(
configs: list[dict],
settings: WatcherSettings,
) -> dict[str, NextcloudWatcher]:
"""Build one NextcloudWatcher per band from the API config payload."""
watchers: dict[str, NextcloudWatcher] = {}
for cfg in configs:
band_id = cfg["band_id"]
try:
watchers[band_id] = NextcloudWatcher(
band_id=band_id,
nc_url=cfg["nc_url"],
nc_username=cfg["nc_username"],
nc_app_password=cfg["nc_app_password"],
audio_extensions=settings.audio_extensions,
)
except Exception as exc:
log.error("Failed to create watcher for band %s: %s", band_id, exc)
return watchers
async def register_event_with_api(event: FileEvent, settings: WatcherSettings) -> bool:
"""Forward a FileEvent to the API's internal nc-upload endpoint."""
payload = {"nc_file_path": event.file_path, "nc_file_etag": event.etag}
headers = {"X-Internal-Token": settings.internal_secret}
try:
async with httpx.AsyncClient(timeout=15.0) as c:
resp = await c.post(
f"{settings.api_url}/api/v1/internal/nc-upload",
json=payload,
headers=headers,
)
if resp.status_code in (200, 201):
log.info("Registered version via internal API: %s", nc_file_path)
log.info("Registered event via internal API: %s", event.file_path)
return True
log.warning(
"Internal API returned %d for %s: %s",
resp.status_code, nc_file_path, resp.text[:200],
resp.status_code, event.file_path, resp.text[:200],
)
return False
except Exception as exc:
log.warning("Failed to register version with API for %s: %s", nc_file_path, exc)
log.warning("Failed to register event with API for %s: %s", event.file_path, exc)
return False
async def poll_once(nc_client: NextcloudWatcherClient, settings: WatcherSettings) -> None:
global _last_activity_id
activities = await nc_client.get_activities(since_id=_last_activity_id)
if not activities:
log.info("No new activities since id=%d", _last_activity_id)
return
log.info("Received %d activities (since id=%d)", len(activities), _last_activity_id)
for activity in activities:
activity_id = int(activity.get("activity_id", 0))
activity_type = activity.get("type", "")
subject = activity.get("subject", "")
raw_path = extract_nc_file_path(activity)
# Advance the cursor regardless of whether we act on this event
_last_activity_id = max(_last_activity_id, activity_id)
log.info(
"Activity id=%d type=%r subject=%r raw_path=%r",
activity_id, activity_type, subject, raw_path,
)
if raw_path is None:
log.info(" → skip: no file path in activity payload")
continue
nc_path = normalize_nc_path(raw_path, nc_client.username)
log.info(" → normalized path: %r", nc_path)
# Only care about audio files — skip everything else immediately
if not is_audio_file(nc_path, settings.audio_extensions):
log.info(
" → skip: not an audio file (ext=%s)",
Path(nc_path).suffix.lower() or "<none>",
)
continue
if activity_type not in _UPLOAD_TYPES and subject not in _UPLOAD_SUBJECTS:
log.info(
" → skip: type=%r subject=%r is not a file upload event",
activity_type, subject,
)
continue
log.info(" → MATCH — registering audio upload: %s", nc_path)
etag = await nc_client.get_file_etag(nc_path)
success = await register_version_with_api(nc_path, etag, settings.api_url)
if not success:
log.warning(" → FAILED to register upload for activity %d (%s)", activity_id, nc_path)
async def poll_all_once(
watchers: dict[str, WatcherClient],
cursors: dict[str, str | None],
settings: WatcherSettings,
) -> None:
"""Poll every watcher once and forward new events to the API."""
for band_id, watcher in watchers.items():
cursor = cursors.get(band_id)
try:
events, new_cursor = await watcher.poll_changes(cursor)
cursors[band_id] = new_cursor
if not events:
log.debug("Band %s: no new events (cursor=%s)", band_id, new_cursor)
continue
log.info("Band %s: %d new event(s)", band_id, len(events))
for event in events:
await register_event_with_api(event, settings)
except Exception as exc:
log.exception("Poll error for band %s: %s", band_id, exc)

View File

@@ -6,11 +6,13 @@ import asyncio
import logging
from watcher.config import get_settings
from watcher.event_loop import poll_once
from watcher.nc_client import NextcloudWatcherClient
from watcher.event_loop import (
build_nc_watchers,
fetch_nextcloud_configs,
poll_all_once,
)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(name)s %(message)s")
# Quiet httpx's per-request noise at DEBUG; keep our own loggers verbose
logging.getLogger("httpx").setLevel(logging.INFO)
logging.getLogger("httpcore").setLevel(logging.WARNING)
log = logging.getLogger("watcher")
@@ -18,22 +20,39 @@ log = logging.getLogger("watcher")
async def main() -> None:
settings = get_settings()
nc = NextcloudWatcherClient(
base_url=settings.nextcloud_url,
username=settings.nextcloud_user,
password=settings.nextcloud_pass,
)
log.info("Starting watcher (poll_interval=%ds)", settings.poll_interval)
log.info("Waiting for Nextcloud to become available...")
while not await nc.is_healthy():
await asyncio.sleep(10)
log.info("Nextcloud is ready. Starting poll loop (interval=%ds)", settings.poll_interval)
# Per-band WatcherClient instances; keyed by band_id string
watchers: dict = {}
# Per-band opaque cursors (last seen activity ID, page token, etc.)
cursors: dict[str, str | None] = {}
poll_cycle = 0
while True:
try:
await poll_once(nc, settings)
except Exception as exc:
log.exception("Poll error: %s", exc)
# Refresh the list of bands (and their storage configs) periodically.
refresh = (
poll_cycle == 0
or (settings.config_refresh_interval > 0 and poll_cycle % settings.config_refresh_interval == 0)
)
if refresh:
log.info("Refreshing storage configs from API…")
configs = await fetch_nextcloud_configs(settings)
if configs:
watchers = build_nc_watchers(configs, settings)
# Preserve cursors for bands that were already being watched
for band_id in watchers:
cursors.setdefault(band_id, None)
log.info("Watching %d Nextcloud band(s): %s", len(watchers), list(watchers))
else:
log.warning("No Nextcloud storage configs received — no bands to watch")
if watchers:
try:
await poll_all_once(watchers, cursors, settings)
except Exception as exc:
log.exception("Unexpected error in poll loop: %s", exc)
poll_cycle += 1
await asyncio.sleep(settings.poll_interval)

View File

@@ -0,0 +1,116 @@
"""Nextcloud WatcherClient implementation.
Polls the Nextcloud Activity API to detect new / modified audio files.
The cursor is the last seen ``activity_id`` (stored as a string for
protocol compatibility).
"""
from __future__ import annotations
import logging
from pathlib import Path
from watcher.nc_client import NextcloudWatcherClient
from watcher.protocol import FileEvent
log = logging.getLogger("watcher.nc_watcher")
_UPLOAD_TYPES = {"file_created", "file_changed"}
_UPLOAD_SUBJECTS = {
"created_by",
"changed_by",
"created_public",
"created_self",
"changed_self",
}
class NextcloudWatcher:
"""WatcherClient implementation backed by the Nextcloud Activity API."""
def __init__(
self,
band_id: str,
nc_url: str,
nc_username: str,
nc_app_password: str,
audio_extensions: list[str],
) -> None:
self.band_id = band_id
self._audio_extensions = audio_extensions
self._nc = NextcloudWatcherClient(
base_url=nc_url,
username=nc_username,
password=nc_app_password,
)
async def poll_changes(self, cursor: str | None) -> tuple[list[FileEvent], str]:
since_id = int(cursor) if cursor else 0
activities = await self._nc.get_activities(since_id=since_id)
events: list[FileEvent] = []
new_cursor = cursor or "0"
for activity in activities:
activity_id = int(activity.get("activity_id", 0))
new_cursor = str(max(int(new_cursor), activity_id))
activity_type = activity.get("type", "")
subject = activity.get("subject", "")
raw_path = _extract_file_path(activity)
if raw_path is None:
continue
nc_path = _normalize_path(raw_path, self._nc.username)
log.debug("Activity %d type=%r path=%r", activity_id, activity_type, nc_path)
if not _is_audio(nc_path, self._audio_extensions):
continue
if activity_type not in _UPLOAD_TYPES and subject not in _UPLOAD_SUBJECTS:
continue
etag = await self._nc.get_file_etag(nc_path)
events.append(
FileEvent(
band_id=self.band_id,
file_path=nc_path,
event_type="created" if "created" in activity_type else "modified",
etag=etag,
)
)
return events, new_cursor
async def is_healthy(self) -> bool:
return await self._nc.is_healthy()
# ── Helpers ────────────────────────────────────────────────────────────────────
def _extract_file_path(activity: dict) -> str | None:
objects = activity.get("objects", {})
if isinstance(objects, dict):
for _, file_path in objects.items():
if isinstance(file_path, str):
return file_path
return activity.get("object_name") or None
def _normalize_path(raw_path: str, username: str) -> str:
path = raw_path.strip("/")
dav_prefix = f"remote.php/dav/files/{username}/"
if path.startswith(dav_prefix):
return path[len(dav_prefix):]
user_files_prefix = f"{username}/files/"
if path.startswith(user_files_prefix):
return path[len(user_files_prefix):]
if path.startswith("files/"):
return path[len("files/"):]
return path
def _is_audio(path: str, extensions: list[str]) -> bool:
return Path(path).suffix.lower() in extensions

View File

@@ -0,0 +1,42 @@
"""WatcherClient protocol — abstracts provider-specific change-detection APIs.
Each storage provider implements its own change detection:
Nextcloud → Activity API (polling)
Google Drive → Changes API or webhook push
OneDrive → Microsoft Graph subscriptions
Dropbox → Long-poll or webhooks
All implementations must satisfy this protocol so the event loop can treat
them uniformly.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
@dataclass
class FileEvent:
"""A file-change event emitted by a WatcherClient."""
band_id: str
file_path: str # Provider-relative path (no host, no DAV prefix)
event_type: str # 'created' | 'modified' | 'deleted'
etag: str | None = None
class WatcherClient(Protocol):
band_id: str
async def poll_changes(self, cursor: str | None) -> tuple[list[FileEvent], str]:
"""Return (events, new_cursor) since the given cursor.
``cursor`` is an opaque string whose meaning is implementation-defined
(e.g., an activity ID for Nextcloud, a page token for Google Drive).
Pass ``None`` to start from the current position (i.e. only new events).
"""
...
async def is_healthy(self) -> bool:
"""Return True if the storage backend is reachable."""
...

View File

@@ -5,7 +5,6 @@ export interface Band {
name: string;
slug: string;
genre_tags: string[];
nc_folder_path: string | null;
created_at: string;
updated_at: string;
memberships?: BandMembership[];
@@ -18,6 +17,25 @@ export interface BandMembership {
joined_at: string;
}
export interface BandStorage {
id: string;
band_id: string;
provider: string;
label: string | null;
is_active: boolean;
root_path: string | null;
created_at: string;
updated_at: string;
}
export interface NextcloudConnectData {
url: string;
username: string;
app_password: string;
label?: string;
root_path?: string;
}
export const listBands = () => api.get<Band[]>("/bands");
export const getBand = (bandId: string) => api.get<Band>(`/bands/${bandId}`);
@@ -25,5 +43,13 @@ export const createBand = (data: {
name: string;
slug: string;
genre_tags?: string[];
nc_base_path?: string;
}) => api.post<Band>("/bands", data);
export const listStorage = (bandId: string) =>
api.get<BandStorage[]>(`/bands/${bandId}/storage`);
export const connectNextcloud = (bandId: string, data: NextcloudConnectData) =>
api.post<BandStorage>(`/bands/${bandId}/storage/connect/nextcloud`, data);
export const disconnectStorage = (bandId: string) =>
api.delete(`/bands/${bandId}/storage`);

View File

@@ -4,7 +4,6 @@ import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query";
import { listBands, createBand } from "../api/bands";
import { getInitials } from "../utils";
import { useBandStore } from "../stores/bandStore";
import { api } from "../api/client";
// ── Shared primitives ──────────────────────────────────────────────────────────
@@ -30,27 +29,6 @@ const labelStyle: React.CSSProperties = {
marginBottom: 5,
};
// ── Step indicator ─────────────────────────────────────────────────────────────
function StepDots({ current, total }: { current: number; total: number }) {
return (
<div style={{ display: "flex", gap: 5, alignItems: "center" }}>
{Array.from({ length: total }, (_, i) => (
<div
key={i}
style={{
width: i === current ? 16 : 6,
height: 6,
borderRadius: 3,
background: i === current ? "#14b8a6" : i < current ? "rgba(20,184,166,0.4)" : "rgba(255,255,255,0.12)",
transition: "all 0.2s",
}}
/>
))}
</div>
);
}
// ── Error banner ───────────────────────────────────────────────────────────────
function ErrorBanner({ msg }: { msg: string }) {
@@ -61,117 +39,20 @@ function ErrorBanner({ msg }: { msg: string }) {
);
}
// ── Step 1: Storage setup ──────────────────────────────────────────────────────
// ── Band creation form ─────────────────────────────────────────────────────────
interface Me { nc_configured: boolean; nc_url: string | null; nc_username: string | null; }
function StorageStep({ me, onNext }: { me: Me; onNext: () => void }) {
const qc = useQueryClient();
const [ncUrl, setNcUrl] = useState(me.nc_url ?? "");
const [ncUsername, setNcUsername] = useState(me.nc_username ?? "");
const [ncPassword, setNcPassword] = useState("");
const [error, setError] = useState<string | null>(null);
const urlRef = useRef<HTMLInputElement>(null);
useEffect(() => { urlRef.current?.focus(); }, []);
const saveMutation = useMutation({
mutationFn: () =>
api.patch("/auth/me/settings", {
nc_url: ncUrl.trim() || null,
nc_username: ncUsername.trim() || null,
nc_password: ncPassword || null,
}),
onSuccess: () => {
qc.invalidateQueries({ queryKey: ["me"] });
onNext();
},
onError: (err) => setError(err instanceof Error ? err.message : "Failed to save"),
});
const canSave = ncUrl.trim() && ncUsername.trim() && ncPassword;
return (
<>
<div style={{ marginBottom: 14 }}>
<label style={labelStyle}>NEXTCLOUD URL</label>
<input
ref={urlRef}
value={ncUrl}
onChange={(e) => setNcUrl(e.target.value)}
style={inputStyle}
placeholder="https://cloud.example.com"
type="url"
/>
</div>
<div style={{ marginBottom: 14 }}>
<label style={labelStyle}>USERNAME</label>
<input
value={ncUsername}
onChange={(e) => setNcUsername(e.target.value)}
style={inputStyle}
placeholder="your-nc-username"
autoComplete="username"
/>
</div>
<div style={{ marginBottom: 4 }}>
<label style={labelStyle}>APP PASSWORD</label>
<input
value={ncPassword}
onChange={(e) => setNcPassword(e.target.value)}
style={inputStyle}
type="password"
placeholder="Generate one in Nextcloud → Settings → Security"
autoComplete="current-password"
/>
</div>
<p style={{ margin: "0 0 20px", fontSize: 11, color: "rgba(232,233,240,0.3)", lineHeight: 1.5 }}>
Use an app password, not your account password.
</p>
{error && <ErrorBanner msg={error} />}
<div style={{ display: "flex", gap: 8, justifyContent: "flex-end" }}>
<button
onClick={onNext}
style={{ padding: "8px 16px", background: "transparent", border: "1px solid rgba(255,255,255,0.1)", borderRadius: 7, color: "rgba(232,233,240,0.5)", cursor: "pointer", fontSize: 13, fontFamily: "inherit" }}
>
Skip for now
</button>
<button
onClick={() => saveMutation.mutate()}
disabled={!canSave || saveMutation.isPending}
style={{ padding: "8px 18px", background: canSave ? "#14b8a6" : "rgba(20,184,166,0.3)", border: "none", borderRadius: 7, color: canSave ? "#fff" : "rgba(255,255,255,0.4)", cursor: canSave ? "pointer" : "default", fontSize: 13, fontWeight: 600, fontFamily: "inherit" }}
>
{saveMutation.isPending ? "Saving…" : "Save & Continue"}
</button>
</div>
</>
);
}
// ── Step 2: Band details ───────────────────────────────────────────────────────
function BandStep({ ncConfigured, onClose }: { ncConfigured: boolean; onClose: () => void }) {
function BandStep({ onClose }: { onClose: () => void }) {
const navigate = useNavigate();
const qc = useQueryClient();
const [name, setName] = useState("");
const [slug, setSlug] = useState("");
const [ncFolder, setNcFolder] = useState("");
const [error, setError] = useState<string | null>(null);
const nameRef = useRef<HTMLInputElement>(null);
useEffect(() => { nameRef.current?.focus(); }, []);
const mutation = useMutation({
mutationFn: () =>
createBand({
name,
slug,
...(ncFolder.trim() ? { nc_base_path: ncFolder.trim() } : {}),
}),
mutationFn: () => createBand({ name, slug }),
onSuccess: (band) => {
qc.invalidateQueries({ queryKey: ["bands"] });
onClose();
@@ -187,12 +68,6 @@ function BandStep({ ncConfigured, onClose }: { ncConfigured: boolean; onClose: (
return (
<>
{!ncConfigured && (
<div style={{ marginBottom: 18, padding: "9px 12px", background: "rgba(251,191,36,0.07)", border: "1px solid rgba(251,191,36,0.2)", borderRadius: 7, fontSize: 12, color: "rgba(251,191,36,0.8)", lineHeight: 1.5 }}>
Storage not configured recordings won't be scanned. You can set it up later in Settings Storage.
</div>
)}
{error && <ErrorBanner msg={error} />}
<div style={{ marginBottom: 14 }}>
@@ -207,7 +82,7 @@ function BandStep({ ncConfigured, onClose }: { ncConfigured: boolean; onClose: (
/>
</div>
<div style={{ marginBottom: 20 }}>
<div style={{ marginBottom: 24 }}>
<label style={labelStyle}>SLUG</label>
<input
value={slug}
@@ -217,24 +92,9 @@ function BandStep({ ncConfigured, onClose }: { ncConfigured: boolean; onClose: (
/>
</div>
<div style={{ borderTop: "1px solid rgba(255,255,255,0.06)", paddingTop: 18, marginBottom: 22 }}>
<label style={labelStyle}>
NEXTCLOUD FOLDER{" "}
<span style={{ color: "rgba(232,233,240,0.25)", fontWeight: 400, letterSpacing: 0 }}>(optional)</span>
</label>
<input
value={ncFolder}
onChange={(e) => setNcFolder(e.target.value)}
style={{ ...inputStyle, fontFamily: "monospace" }}
placeholder={slug ? `bands/${slug}/` : "bands/my-band/"}
disabled={!ncConfigured}
/>
<p style={{ margin: "7px 0 0", fontSize: 11, color: "rgba(232,233,240,0.3)", lineHeight: 1.5 }}>
{ncConfigured
? <>Leave blank to auto-create <code style={{ color: "rgba(232,233,240,0.45)", fontFamily: "monospace" }}>bands/{slug || "slug"}/</code>.</>
: "Connect storage first to set a folder."}
</p>
</div>
<p style={{ margin: "0 0 20px", fontSize: 11, color: "rgba(232,233,240,0.3)", lineHeight: 1.5 }}>
Connect storage after creating the band via Settings Storage.
</p>
<div style={{ display: "flex", gap: 8, justifyContent: "flex-end" }}>
<button
@@ -255,21 +115,9 @@ function BandStep({ ncConfigured, onClose }: { ncConfigured: boolean; onClose: (
);
}
// ── Create Band Modal (orchestrates steps) ─────────────────────────────────────
// ── Create Band Modal ──────────────────────────────────────────────────────────
function CreateBandModal({ onClose }: { onClose: () => void }) {
const { data: me, isLoading } = useQuery<Me>({
queryKey: ["me"],
queryFn: () => api.get("/auth/me"),
});
// Start on step 0 (storage) if NC not configured, otherwise jump straight to step 1 (band)
const [step, setStep] = useState<0 | 1 | null>(null);
useEffect(() => {
if (me && step === null) setStep(me.nc_configured ? 1 : 0);
}, [me, step]);
// Close on Escape
useEffect(() => {
const handler = (e: KeyboardEvent) => { if (e.key === "Escape") onClose(); };
@@ -277,9 +125,6 @@ function CreateBandModal({ onClose }: { onClose: () => void }) {
return () => document.removeEventListener("keydown", handler);
}, [onClose]);
const totalSteps = me?.nc_configured === false ? 2 : 1;
const currentDot = step === 0 ? 0 : totalSteps - 1;
return (
<div
onClick={onClose}
@@ -289,28 +134,13 @@ function CreateBandModal({ onClose }: { onClose: () => void }) {
onClick={(e) => e.stopPropagation()}
style={{ background: "#112018", border: "1px solid rgba(255,255,255,0.1)", borderRadius: 14, padding: 28, width: 420, boxShadow: "0 24px 64px rgba(0,0,0,0.6)" }}
>
{/* Header */}
<div style={{ display: "flex", alignItems: "flex-start", justifyContent: "space-between", marginBottom: 18 }}>
<div>
<h3 style={{ margin: "0 0 3px", fontSize: 15, fontWeight: 600, color: "#e8e9f0" }}>
{step === 0 ? "Connect storage" : "New band"}
</h3>
<p style={{ margin: 0, fontSize: 12, color: "rgba(232,233,240,0.4)" }}>
{step === 0 ? "Needed to scan and index your recordings." : "Create a workspace for your recordings."}
</p>
</div>
{totalSteps > 1 && step !== null && (
<StepDots current={currentDot} total={totalSteps} />
)}
<div style={{ marginBottom: 18 }}>
<h3 style={{ margin: "0 0 3px", fontSize: 15, fontWeight: 600, color: "#e8e9f0" }}>New band</h3>
<p style={{ margin: 0, fontSize: 12, color: "rgba(232,233,240,0.4)" }}>
Create a workspace for your recordings.
</p>
</div>
{isLoading || step === null ? (
<p style={{ color: "rgba(232,233,240,0.3)", fontSize: 13 }}>Loading</p>
) : step === 0 ? (
<StorageStep me={me!} onNext={() => setStep(1)} />
) : (
<BandStep ncConfigured={me?.nc_configured ?? false} onClose={onClose} />
)}
<BandStep onClose={onClose} />
</div>
</div>
);

View File

@@ -2,7 +2,7 @@ import { useState, useEffect } from "react";
import { useSearchParams } from "react-router-dom";
import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query";
import { api } from "../api/client";
import { listBands } from "../api/bands";
import { listBands, listStorage, connectNextcloud, disconnectStorage } from "../api/bands";
import { listInvites, revokeInvite } from "../api/invites";
import { useBandStore } from "../stores/bandStore";
import { getInitials } from "../utils";
@@ -14,9 +14,6 @@ interface MemberRead {
display_name: string;
email: string;
avatar_url: string | null;
nc_username: string | null;
nc_url: string | null;
nc_configured: boolean;
}
interface BandMember {
@@ -40,7 +37,6 @@ interface Band {
name: string;
slug: string;
genre_tags: string[];
nc_folder_path: string | null;
}
type Section = "profile" | "members" | "storage" | "band";
@@ -267,42 +263,48 @@ function ProfileSection({ me }: { me: MemberRead }) {
);
}
// ── Storage section (NC credentials + scan folder) ────────────────────────────
// ── Storage section ────────────────────────────────────────────────────────────
function StorageSection({ bandId, band, amAdmin, me }: { bandId: string; band: Band; amAdmin: boolean; me: MemberRead }) {
function StorageSection({ bandId, band, amAdmin }: { bandId: string; band: Band; amAdmin: boolean }) {
const qc = useQueryClient();
// NC credentials state
const [ncUrl, setNcUrl] = useState(me.nc_url ?? "");
const [ncUsername, setNcUsername] = useState(me.nc_username ?? "");
const [showConnect, setShowConnect] = useState(false);
const [ncUrl, setNcUrl] = useState("");
const [ncUsername, setNcUsername] = useState("");
const [ncPassword, setNcPassword] = useState("");
const [ncSaved, setNcSaved] = useState(false);
const [ncError, setNcError] = useState<string | null>(null);
const [ncRootPath, setNcRootPath] = useState("");
const [connectError, setConnectError] = useState<string | null>(null);
// Scan folder state
const [editingPath, setEditingPath] = useState(false);
const [folderInput, setFolderInput] = useState("");
const [scanning, setScanning] = useState(false);
const [scanProgress, setScanProgress] = useState<string | null>(null);
const [scanMsg, setScanMsg] = useState<string | null>(null);
const ncMutation = useMutation({
mutationFn: () => api.patch<MemberRead>("/auth/me/settings", {
nc_url: ncUrl || undefined,
nc_username: ncUsername || undefined,
nc_password: ncPassword || undefined,
}),
onSuccess: () => {
qc.invalidateQueries({ queryKey: ["me"] });
setNcSaved(true); setNcError(null); setNcPassword("");
setTimeout(() => setNcSaved(false), 2500);
},
onError: (err) => setNcError(err instanceof Error ? err.message : "Save failed"),
const { data: storageConfigs, isLoading: storageLoading } = useQuery({
queryKey: ["storage", bandId],
queryFn: () => listStorage(bandId),
});
const pathMutation = useMutation({
mutationFn: (nc_folder_path: string) => api.patch(`/bands/${bandId}`, { nc_folder_path }),
onSuccess: () => { qc.invalidateQueries({ queryKey: ["band", bandId] }); setEditingPath(false); },
const activeStorage = storageConfigs?.find((s) => s.is_active) ?? null;
const connectMutation = useMutation({
mutationFn: () => connectNextcloud(bandId, {
url: ncUrl.trim(),
username: ncUsername.trim(),
app_password: ncPassword,
root_path: ncRootPath.trim() || undefined,
}),
onSuccess: () => {
qc.invalidateQueries({ queryKey: ["storage", bandId] });
setShowConnect(false);
setNcUrl(""); setNcUsername(""); setNcPassword(""); setNcRootPath("");
setConnectError(null);
},
onError: (err) => setConnectError(err instanceof Error ? err.message : "Connection failed"),
});
const disconnectMutation = useMutation({
mutationFn: () => disconnectStorage(bandId),
onSuccess: () => qc.invalidateQueries({ queryKey: ["storage", bandId] }),
});
async function startScan() {
@@ -340,33 +342,57 @@ function StorageSection({ bandId, band, amAdmin, me }: { bandId: string; band: B
finally { setScanning(false); setScanProgress(null); }
}
const defaultPath = `bands/${band.slug}/`;
const currentPath = band.nc_folder_path ?? defaultPath;
const canConnect = ncUrl.trim() && ncUsername.trim() && ncPassword;
return (
<div>
<SectionHeading title="Storage" subtitle="Configure Nextcloud credentials and your band's recording folder." />
<SectionHeading title="Storage" subtitle="Connect a storage provider to scan and index your band's recordings." />
{/* NC Connection */}
<div style={{ marginBottom: 8 }}>
<div style={{ display: "flex", alignItems: "center", gap: 8, marginBottom: 16 }}>
<div style={{ width: 8, height: 8, borderRadius: "50%", background: me.nc_configured ? "#34d399" : "rgba(232,233,240,0.25)", flexShrink: 0, boxShadow: me.nc_configured ? "0 0 6px rgba(52,211,153,0.5)" : "none" }} />
<span style={{ fontSize: 12, color: me.nc_configured ? "#34d399" : "rgba(232,233,240,0.4)" }}>
{me.nc_configured ? "Nextcloud connected" : "Nextcloud not configured"}
</span>
</div>
<div style={{ padding: "14px 16px", background: "rgba(255,255,255,0.02)", border: `1px solid ${border}`, borderRadius: 10, marginBottom: 16 }}>
<div style={{ display: "flex", alignItems: "flex-start", justifyContent: "space-between", gap: 8, marginBottom: 12 }}>
<div>
<div style={{ fontSize: 13, fontWeight: 600, color: "#e8e9f0", marginBottom: 2 }}>Nextcloud Connection</div>
<div style={{ fontSize: 11, color: "rgba(232,233,240,0.35)" }}>
Your personal credentials will move to per-band config in a future update.
</div>
{/* Status card */}
<div style={{ padding: "14px 16px", background: "rgba(255,255,255,0.02)", border: `1px solid ${border}`, borderRadius: 10, marginBottom: 16 }}>
{storageLoading ? (
<p style={{ margin: 0, fontSize: 12, color: "rgba(232,233,240,0.3)" }}>Loading</p>
) : activeStorage ? (
<>
<div style={{ display: "flex", alignItems: "center", gap: 8, marginBottom: 10 }}>
<div style={{ width: 8, height: 8, borderRadius: "50%", background: "#34d399", flexShrink: 0, boxShadow: "0 0 6px rgba(52,211,153,0.5)" }} />
<span style={{ fontSize: 13, fontWeight: 600, color: "#34d399" }}>
{activeStorage.label ?? activeStorage.provider}
</span>
<span style={{ fontSize: 11, color: "rgba(232,233,240,0.3)", textTransform: "capitalize" }}>({activeStorage.provider})</span>
</div>
{activeStorage.root_path && (
<div style={{ marginBottom: 10 }}>
<Label>Scan path</Label>
<code style={{ fontSize: 12, color: "#34d399", fontFamily: "monospace" }}>{activeStorage.root_path}</code>
</div>
)}
{amAdmin && (
<button
onClick={() => disconnectMutation.mutate()}
disabled={disconnectMutation.isPending}
style={{ padding: "5px 12px", background: "rgba(244,63,94,0.08)", border: "1px solid rgba(244,63,94,0.2)", borderRadius: 6, color: "#f87171", cursor: "pointer", fontSize: 11, fontFamily: "inherit" }}
>
{disconnectMutation.isPending ? "Disconnecting…" : "Disconnect"}
</button>
)}
</>
) : (
<div style={{ display: "flex", alignItems: "center", gap: 8 }}>
<div style={{ width: 8, height: 8, borderRadius: "50%", background: "rgba(232,233,240,0.2)", flexShrink: 0 }} />
<span style={{ fontSize: 12, color: "rgba(232,233,240,0.35)" }}>No storage connected</span>
</div>
)}
</div>
<div style={{ display: "grid", gap: 12 }}>
{/* Connect form — admin only, shown when no active storage or toggled */}
{amAdmin && (!activeStorage || showConnect) && (
<>
{activeStorage && <Divider />}
<div style={{ fontSize: 13, fontWeight: 600, color: "#e8e9f0", marginBottom: 12 }}>
{activeStorage ? "Replace connection" : "Connect Nextcloud"}
</div>
<div style={{ display: "grid", gap: 12, marginBottom: 14 }}>
<div>
<Label>Nextcloud URL</Label>
<Input value={ncUrl} onChange={(e) => setNcUrl(e.target.value)} placeholder="https://cloud.example.com" />
@@ -376,69 +402,58 @@ function StorageSection({ bandId, band, amAdmin, me }: { bandId: string; band: B
<Input value={ncUsername} onChange={(e) => setNcUsername(e.target.value)} />
</div>
<div>
<Label>Password / App Password</Label>
<Input type="password" value={ncPassword} onChange={(e) => setNcPassword(e.target.value)} placeholder={me.nc_configured ? "•••••••• (leave blank to keep)" : ""} />
<Label>App Password</Label>
<Input type="password" value={ncPassword} onChange={(e) => setNcPassword(e.target.value)} placeholder="Generate in Nextcloud → Settings → Security" />
</div>
<div>
<Label>Root path <span style={{ color: "rgba(232,233,240,0.25)", fontWeight: 400 }}>(optional)</span></Label>
<Input value={ncRootPath} onChange={(e) => setNcRootPath(e.target.value)} placeholder={`bands/${band.slug}/`} style={{ fontFamily: "monospace" }} />
<div style={{ fontSize: 11, color: "rgba(232,233,240,0.28)", marginTop: 4 }}>
Use an app password from Nextcloud Settings Security.
Leave blank to auto-create <code style={{ fontFamily: "monospace" }}>bands/{band.slug}/</code>
</div>
</div>
</div>
{ncError && <p style={{ color: "#f87171", fontSize: 12, margin: "12px 0 0" }}>{ncError}</p>}
<div style={{ marginTop: 14 }}>
<SaveBtn pending={ncMutation.isPending} saved={ncSaved} onClick={() => ncMutation.mutate()} />
</div>
</div>
</div>
{/* Scan folder — admin only */}
{amAdmin && (
<>
<Divider />
<div style={{ fontSize: 13, fontWeight: 600, color: "#e8e9f0", marginBottom: 4 }}>Scan Folder</div>
<div style={{ fontSize: 12, color: "rgba(232,233,240,0.35)", marginBottom: 16, lineHeight: 1.55 }}>
RehearsalHub reads recordings from your Nextcloud files are never copied to our servers.
</div>
<div style={{ background: "rgba(255,255,255,0.02)", border: `1px solid ${border}`, borderRadius: 10, padding: "12px 16px", marginBottom: 14 }}>
<div style={{ display: "flex", alignItems: "center", justifyContent: "space-between", gap: 8 }}>
<div>
<Label>Scan path</Label>
<code style={{ fontSize: 13, color: "#34d399", fontFamily: "monospace" }}>{currentPath}</code>
</div>
{!editingPath && (
<button
onClick={() => { setFolderInput(band.nc_folder_path ?? ""); setEditingPath(true); }}
style={{ padding: "4px 10px", background: "transparent", border: `1px solid ${border}`, borderRadius: 6, color: "rgba(232,233,240,0.42)", cursor: "pointer", fontSize: 11, fontFamily: "inherit" }}
>
Edit
</button>
)}
</div>
{editingPath && (
<div style={{ marginTop: 12 }}>
<Input value={folderInput} onChange={(e) => setFolderInput(e.target.value)} placeholder={defaultPath} style={{ fontFamily: "monospace" }} />
<div style={{ display: "flex", gap: 8, marginTop: 8 }}>
<button onClick={() => pathMutation.mutate(folderInput)} disabled={pathMutation.isPending}
style={{ padding: "6px 14px", background: "rgba(20,184,166,0.12)", border: "1px solid rgba(20,184,166,0.3)", borderRadius: 6, color: "#2dd4bf", cursor: "pointer", fontSize: 12, fontWeight: 600, fontFamily: "inherit" }}>
{pathMutation.isPending ? "Saving…" : "Save"}
</button>
<button onClick={() => setEditingPath(false)}
style={{ padding: "6px 14px", background: "transparent", border: `1px solid ${border}`, borderRadius: 6, color: "rgba(232,233,240,0.42)", cursor: "pointer", fontSize: 12, fontFamily: "inherit" }}>
Cancel
</button>
</div>
</div>
{connectError && <p style={{ color: "#f87171", fontSize: 12, marginBottom: 12 }}>{connectError}</p>}
<div style={{ display: "flex", gap: 8 }}>
<button
onClick={() => connectMutation.mutate()}
disabled={!canConnect || connectMutation.isPending}
style={{ padding: "8px 18px", background: canConnect ? "linear-gradient(135deg, #0d9488, #06b6d4)" : "rgba(20,184,166,0.2)", border: "none", borderRadius: 8, color: canConnect ? "white" : "rgba(255,255,255,0.35)", cursor: canConnect ? "pointer" : "default", fontSize: 13, fontWeight: 600, fontFamily: "inherit" }}
>
{connectMutation.isPending ? "Connecting…" : "Connect"}
</button>
{activeStorage && (
<button onClick={() => setShowConnect(false)}
style={{ padding: "8px 14px", background: "transparent", border: `1px solid ${border}`, borderRadius: 8, color: "rgba(232,233,240,0.42)", cursor: "pointer", fontSize: 13, fontFamily: "inherit" }}>
Cancel
</button>
)}
</div>
</>
)}
{amAdmin && activeStorage && !showConnect && (
<button
onClick={() => setShowConnect(true)}
style={{ padding: "7px 14px", background: "transparent", border: `1px solid ${border}`, borderRadius: 8, color: "rgba(232,233,240,0.42)", cursor: "pointer", fontSize: 12, fontFamily: "inherit", marginBottom: 16 }}
>
Replace connection
</button>
)}
{/* Scan — admin only, only if active storage */}
{amAdmin && activeStorage && (
<>
<Divider />
<div style={{ fontSize: 13, fontWeight: 600, color: "#e8e9f0", marginBottom: 4 }}>Scan Recordings</div>
<div style={{ fontSize: 12, color: "rgba(232,233,240,0.35)", marginBottom: 12, lineHeight: 1.55 }}>
RehearsalHub reads recordings from storage files are never copied to our servers.
</div>
<button
onClick={startScan} disabled={scanning}
style={{ padding: "7px 16px", background: scanning ? "transparent" : "rgba(52,211,153,0.08)", border: `1px solid ${scanning ? border : "rgba(52,211,153,0.25)"}`, borderRadius: 8, color: scanning ? "rgba(232,233,240,0.28)" : "#34d399", cursor: scanning ? "default" : "pointer", fontSize: 12, fontFamily: "inherit", transition: "all 0.12s" }}>
{scanning ? "Scanning…" : "⟳ Scan Nextcloud"}
{scanning ? "Scanning…" : "⟳ Scan Storage"}
</button>
{scanning && scanProgress && (
<div style={{ marginTop: 10, background: "rgba(255,255,255,0.03)", border: `1px solid ${border}`, borderRadius: 8, color: "rgba(232,233,240,0.42)", fontSize: 12, padding: "8px 14px", fontFamily: "monospace" }}>
{scanProgress}
@@ -750,7 +765,7 @@ export function SettingsPage() {
<div style={{ padding: "0 16px 24px" }}>
{section === "profile" && <ProfileSection me={me} />}
{section === "members" && activeBandId && band && <MembersSection bandId={activeBandId} band={band} amAdmin={amAdmin} members={members} membersLoading={membersLoading} />}
{section === "storage" && activeBandId && band && <StorageSection bandId={activeBandId} band={band} amAdmin={amAdmin} me={me} />}
{section === "storage" && activeBandId && band && <StorageSection bandId={activeBandId} band={band} amAdmin={amAdmin} />}
{section === "band" && activeBandId && band && amAdmin && <BandSection bandId={activeBandId} band={band} />}
</div>
</div>
@@ -832,7 +847,7 @@ export function SettingsPage() {
<MembersSection bandId={activeBandId} band={band} amAdmin={amAdmin} members={members} membersLoading={membersLoading} />
)}
{section === "storage" && activeBandId && band && (
<StorageSection bandId={activeBandId} band={band} amAdmin={amAdmin} me={me} />
<StorageSection bandId={activeBandId} band={band} amAdmin={amAdmin} />
)}
{section === "band" && activeBandId && band && amAdmin && (
<BandSection bandId={activeBandId} band={band} />

View File

@@ -22,9 +22,29 @@ RUN --mount=type=bind,from=essentia-builder,source=/usr/local/lib,target=/essent
RUN pip install uv
FROM base AS development
COPY pyproject.toml .
RUN uv sync --all-extras --no-install-project --frozen || uv sync --all-extras --no-install-project
ENV PYTHONPATH=/app/src
ENV PYTHONUNBUFFERED=1
ENV LOG_LEVEL=DEBUG
CMD ["/bin/sh", "-c", "PYTHONPATH=/app/src exec /app/.venv/bin/watchfiles --ignore-permission-denied '/app/.venv/bin/python -m worker.main' src"]
FROM base AS production
COPY pyproject.toml .
RUN uv sync --no-dev --frozen || uv sync --no-dev
COPY . .
ENV PYTHONPATH=/app/src
# Pre-warm librosa/numba JIT cache and pooch downloads so they happen at build
# time and are baked into the image rather than downloaded on every cold start.
RUN uv run python -c "\
import numpy as np; \
import librosa; \
_dummy = np.zeros(22050, dtype=np.float32); \
librosa.beat.beat_track(y=_dummy, sr=22050); \
librosa.feature.chroma_stft(y=_dummy, sr=22050); \
print('librosa warmup done') \
"
CMD ["uv", "run", "python", "-m", "worker.main"]

View File

@@ -26,6 +26,7 @@ dev = [
"pytest-asyncio>=0.23",
"pytest-cov>=5",
"ruff>=0.4",
"watchfiles>=0.21",
]
[tool.hatch.build.targets.wheel]

View File

@@ -10,9 +10,8 @@ class WorkerSettings(BaseSettings):
redis_url: str = "redis://localhost:6379/0"
job_queue_key: str = "rh:jobs"
nextcloud_url: str = "http://nextcloud"
nextcloud_user: str = "ncadmin"
nextcloud_pass: str = ""
api_url: str = "http://api:8000"
internal_secret: str = "dev-change-me-in-production"
audio_tmp_dir: str = "/tmp/audio"
analysis_version: str = "1.0.0"

View File

@@ -36,6 +36,14 @@ class AudioVersionModel(Base):
uploaded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
class SongModel(Base):
__tablename__ = "songs"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
global_bpm: Mapped[Optional[float]] = mapped_column(Numeric(6, 2))
global_key: Mapped[Optional[str]] = mapped_column(String(30))
class RangeAnalysisModel(Base):
__tablename__ = "range_analyses"

View File

@@ -23,23 +23,30 @@ from worker.pipeline.analyse_range import run_range_analysis
from worker.pipeline.transcode import get_duration_ms, transcode_to_hls
from worker.pipeline.waveform import extract_peaks, generate_waveform_file
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
# Numba floods logs with JIT compilation details at DEBUG level — keep it quiet
logging.getLogger("numba").setLevel(logging.WARNING)
log = logging.getLogger("worker")
async def load_audio(nc_path: str, tmp_dir: str, settings) -> tuple[np.ndarray, int, str]:
"""Download from Nextcloud and load as numpy array. Returns (audio, sr, local_path)."""
async def load_audio(version_id: str, filename: str, tmp_dir: str, settings) -> tuple[np.ndarray, int, str]:
"""Download audio via the internal API and load as numpy array. Returns (audio, sr, local_path)."""
import httpx
local_path = os.path.join(tmp_dir, Path(nc_path).name)
dav_url = f"{settings.nextcloud_url}/remote.php/dav/files/{settings.nextcloud_user}/{nc_path.lstrip('/')}"
local_path = os.path.join(tmp_dir, filename)
url = f"{settings.api_url}/api/v1/internal/audio/{version_id}/stream"
log.info("Fetching audio for version %s from %s", version_id, url)
async with httpx.AsyncClient(
auth=(settings.nextcloud_user, settings.nextcloud_pass), timeout=120.0
headers={"X-Internal-Token": settings.internal_secret}, timeout=120.0
) as client:
resp = await client.get(dav_url)
resp.raise_for_status()
with open(local_path, "wb") as f:
f.write(resp.content)
async with client.stream("GET", url) as resp:
resp.raise_for_status()
with open(local_path, "wb") as f:
async for chunk in resp.aiter_bytes(65536):
f.write(chunk)
loop = asyncio.get_event_loop()
audio, sr = await loop.run_in_executor(
@@ -53,7 +60,7 @@ async def handle_transcode(payload: dict, session: AsyncSession, settings) -> No
nc_path = payload["nc_file_path"]
with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp:
audio, sr, local_path = await load_audio(nc_path, tmp, settings)
audio, sr, local_path = await load_audio(str(version_id), Path(nc_path).name, tmp, settings)
duration_ms = await get_duration_ms(local_path)
hls_dir = os.path.join(tmp, "hls")
@@ -99,7 +106,7 @@ async def handle_analyse_range(payload: dict, session: AsyncSession, settings) -
raise ValueError(f"AudioVersion {version_id} not found")
with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp:
audio, sr, _ = await load_audio(version.nc_file_path, tmp, settings)
audio, sr, _ = await load_audio(str(version_id), Path(version.nc_file_path).name, tmp, settings)
await run_range_analysis(audio, sr, version_id, annotation_id, start_ms, end_ms, session)
log.info("Range analysis complete for annotation %s", annotation_id)
@@ -116,7 +123,7 @@ async def handle_extract_peaks(payload: dict, session: AsyncSession, settings) -
nc_path = payload["nc_file_path"]
with tempfile.TemporaryDirectory(dir=settings.audio_tmp_dir) as tmp:
audio, _sr, _local_path = await load_audio(nc_path, tmp, settings)
audio, _sr, _local_path = await load_audio(str(version_id), Path(nc_path).name, tmp, settings)
loop = asyncio.get_event_loop()
peaks_500 = await loop.run_in_executor(None, extract_peaks, audio, 500)
@@ -146,17 +153,27 @@ HANDLERS = {
async def main() -> None:
settings = get_settings()
os.makedirs(settings.audio_tmp_dir, exist_ok=True)
log.info(
"Worker config — redis_url=%s api_url=%s queue=%s",
settings.redis_url, settings.api_url, settings.job_queue_key,
)
engine = create_async_engine(settings.database_url, pool_pre_ping=True)
session_factory = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
redis = aioredis.from_url(settings.redis_url, decode_responses=True)
# Drain stale job IDs left in Redis from previous runs whose API transactions
# were never committed (e.g. crashed processes).
stale = await redis.llen(settings.job_queue_key)
if stale:
log.warning("Draining %d stale job IDs from Redis queue before starting", stale)
await redis.delete(settings.job_queue_key)
# Wait for Redis to be reachable before proceeding (startup race condition guard).
for attempt in range(1, 31):
try:
await redis.ping()
log.info("Redis connection established (attempt %d)", attempt)
break
except Exception as exc:
if attempt == 30:
log.error("Redis unreachable after 30 attempts — giving up: %s", exc)
raise
log.warning("Redis not ready (attempt %d/30): %s — retrying in 2s", attempt, exc)
await asyncio.sleep(2)
log.info("Worker started. Listening for jobs on %s", settings.job_queue_key)

View File

@@ -3,15 +3,28 @@
from __future__ import annotations
import asyncio
import logging
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any
import numpy as np
from sqlalchemy.ext.asyncio import AsyncSession
from worker.analyzers.base import AnalysisResult
from worker.analyzers.bpm import BPMAnalyzer
from worker.analyzers.key import KeyAnalyzer
log = logging.getLogger(__name__)
# Dedicated pool so heavy Essentia threads can't starve the default executor.
# max_workers=2 covers BPM + Key running sequentially per job.
_analysis_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="analysis")
# Per-analyzer timeout in seconds. Essentia multifeature BPM can be slow on
# long recordings; 3 minutes is generous for a single-track analysis pass.
_ANALYZER_TIMEOUT = 180.0
async def run_full_analysis(
audio: np.ndarray,
@@ -19,10 +32,25 @@ async def run_full_analysis(
version_id: uuid.UUID,
session: AsyncSession,
) -> dict[str, Any]:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
bpm_result = await loop.run_in_executor(None, BPMAnalyzer().analyze, audio, sample_rate)
key_result = await loop.run_in_executor(None, KeyAnalyzer().analyze, audio, sample_rate)
try:
bpm_result = await asyncio.wait_for(
loop.run_in_executor(_analysis_pool, BPMAnalyzer().analyze, audio, sample_rate),
timeout=_ANALYZER_TIMEOUT,
)
except asyncio.TimeoutError:
log.warning("BPM analysis timed out for version %s — storing null", version_id)
bpm_result = AnalysisResult(analyzer_name="bpm", fields={"bpm": None, "bpm_confidence": None})
try:
key_result = await asyncio.wait_for(
loop.run_in_executor(_analysis_pool, KeyAnalyzer().analyze, audio, sample_rate),
timeout=_ANALYZER_TIMEOUT,
)
except asyncio.TimeoutError:
log.warning("Key analysis timed out for version %s — storing null", version_id)
key_result = AnalysisResult(analyzer_name="key", fields={"key": None, "scale": None, "key_confidence": None})
fields: dict[str, Any] = {**bpm_result.fields, **key_result.fields}
@@ -33,15 +61,32 @@ async def run_full_analysis(
global_bpm = fields.get("bpm")
global_key = fields.get("key")
from worker.db import SongModel
# Mark version analysis done
stmt = (
update(AudioVersionModel)
.where(AudioVersionModel.id == version_id)
.values(
analysis_status="done",
**({} if global_bpm is None else {"global_bpm": global_bpm}),
)
.values(analysis_status="done")
)
await session.execute(stmt)
# Write BPM/key to the song (global_bpm/global_key live on songs, not audio_versions)
version = await session.get(AudioVersionModel, version_id)
if version is not None:
song_extra: dict[str, Any] = {}
if global_bpm is not None:
song_extra["global_bpm"] = global_bpm
if global_key is not None:
song_extra["global_key"] = global_key
if song_extra:
song_stmt = (
update(SongModel)
.where(SongModel.id == version.song_id)
.values(**song_extra)
)
await session.execute(song_stmt)
await session.commit()
return fields

View File

@@ -41,16 +41,26 @@ async def get_duration_ms(input_path: str) -> int:
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
try:
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30.0)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise RuntimeError(f"ffprobe timed out for {input_path}")
info = json.loads(stdout)
duration_s = float(info.get("format", {}).get("duration", 0))
return int(duration_s * 1000)
async def _run_ffmpeg(cmd: list[str]) -> None:
async def _run_ffmpeg(cmd: list[str], timeout: float = 600.0) -> None:
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE
)
_, stderr = await proc.communicate()
try:
_, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise RuntimeError(f"FFmpeg timed out after {timeout}s")
if proc.returncode != 0:
raise RuntimeError(f"FFmpeg failed: {stderr.decode()[:500]}")