feat: incremental SSE scan, recursive NC traversal, custom folder support
- nc_scan.py: recursive collect_audio_files (fixes depth-1 bug); scan_band_folder yields ndjson events (progress/song/session/skipped/done) for streaming - songs.py: replace old flat scan with scan_band_folder; add GET nc-scan/stream endpoint using _member_from_request so ?token= auth works for fetch-based SSE - BandPage.tsx: scan button now consumes ndjson stream via fetch+ReadableStream; sessions/unattributed invalidated as each song/session event arrives - session.py: add extract_session_folder() for YYMMDD path extraction - rehearsal_session.py: get_or_create uses begin_nested() savepoint to handle races - band.py: add get_by_nc_folder_prefix() for custom nc_folder_path band lookup - internal.py: nc-upload falls back to prefix match when slug lookup fails - event_loop.py: remove hardcoded bands/ guard; let internal API handle filtering Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,23 +1,24 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from rehearsalhub.db.engine import get_session
|
||||
from rehearsalhub.db.engine import get_session, get_session_factory
|
||||
from rehearsalhub.db.models import Member
|
||||
from rehearsalhub.dependencies import get_current_member
|
||||
from rehearsalhub.repositories.audio_version import AudioVersionRepository
|
||||
from rehearsalhub.routers.versions import _member_from_request
|
||||
from rehearsalhub.repositories.band import BandRepository
|
||||
from rehearsalhub.repositories.comment import CommentRepository
|
||||
from rehearsalhub.repositories.rehearsal_session import RehearsalSessionRepository
|
||||
from rehearsalhub.repositories.song import SongRepository
|
||||
from rehearsalhub.schemas.comment import SongCommentCreate, SongCommentRead
|
||||
from rehearsalhub.schemas.song import SongCreate, SongRead, SongUpdate
|
||||
from rehearsalhub.services.band import BandService
|
||||
from rehearsalhub.services.session import parse_rehearsal_date
|
||||
from rehearsalhub.services.nc_scan import scan_band_folder
|
||||
from rehearsalhub.services.song import SongService
|
||||
from rehearsalhub.storage.nextcloud import NextcloudClient
|
||||
|
||||
@@ -137,6 +138,58 @@ async def create_song(
|
||||
return read
|
||||
|
||||
|
||||
async def _get_band_and_assert_member(
|
||||
band_id: uuid.UUID,
|
||||
current_member: Member,
|
||||
session: AsyncSession,
|
||||
):
|
||||
band_svc = BandService(session)
|
||||
try:
|
||||
await band_svc.assert_membership(band_id, current_member.id)
|
||||
except PermissionError:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member")
|
||||
band_repo = BandRepository(session)
|
||||
band = await band_repo.get_by_id(band_id)
|
||||
if band is None:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Band not found")
|
||||
return band
|
||||
|
||||
|
||||
@router.get("/bands/{band_id}/nc-scan/stream")
|
||||
async def scan_nextcloud_stream(
|
||||
band_id: uuid.UUID,
|
||||
session: AsyncSession = Depends(get_session),
|
||||
current_member: Member = Depends(_member_from_request),
|
||||
):
|
||||
"""
|
||||
SSE endpoint: streams scan progress as newline-delimited JSON events.
|
||||
Each event is a JSON object on its own line.
|
||||
Accepts ?token= for EventSource clients that can't set headers.
|
||||
"""
|
||||
band = await _get_band_and_assert_member(band_id, current_member, session)
|
||||
band_folder = band.nc_folder_path or f"bands/{band.slug}/"
|
||||
nc = NextcloudClient.for_member(current_member)
|
||||
member_id = current_member.id
|
||||
|
||||
async def event_generator():
|
||||
async with get_session_factory()() as db:
|
||||
try:
|
||||
async for event in scan_band_folder(db, nc, band_id, band_folder, member_id):
|
||||
yield json.dumps(event) + "\n"
|
||||
if event.get("type") in ("song", "session"):
|
||||
await db.commit()
|
||||
except Exception as exc:
|
||||
log.exception("SSE scan error for band %s", band_id)
|
||||
yield json.dumps({"type": "error", "message": str(exc)}) + "\n"
|
||||
finally:
|
||||
await db.commit()
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="application/x-ndjson",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/bands/{band_id}/nc-scan", response_model=NcScanResult)
|
||||
async def scan_nextcloud(
|
||||
band_id: uuid.UUID,
|
||||
@@ -144,170 +197,30 @@ async def scan_nextcloud(
|
||||
current_member: Member = Depends(get_current_member),
|
||||
):
|
||||
"""
|
||||
Scan the band's Nextcloud folder for audio files and import any not yet
|
||||
registered as songs/versions. Idempotent — safe to call multiple times.
|
||||
Blocking scan — collects all results then returns. Delegates to scan_band_folder.
|
||||
Prefer the SSE /nc-scan/stream endpoint for large folders.
|
||||
"""
|
||||
band_svc = BandService(session)
|
||||
try:
|
||||
await band_svc.assert_membership(band_id, current_member.id)
|
||||
except PermissionError:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member")
|
||||
|
||||
band_repo = BandRepository(session)
|
||||
band = await band_repo.get_by_id(band_id)
|
||||
if band is None:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Band not found")
|
||||
|
||||
nc = NextcloudClient.for_member(current_member)
|
||||
version_repo = AudioVersionRepository(session)
|
||||
session_repo = RehearsalSessionRepository(session)
|
||||
song_svc = SongService(session)
|
||||
|
||||
# dav_prefix to strip full WebDAV hrefs → user-relative paths
|
||||
dav_prefix = f"/remote.php/dav/files/{nc._auth[0]}/"
|
||||
|
||||
def relative(href: str) -> str:
|
||||
if href.startswith(dav_prefix):
|
||||
return href[len(dav_prefix):]
|
||||
return href.lstrip("/")
|
||||
|
||||
imported_songs: list[SongRead] = []
|
||||
skipped_count = 0
|
||||
band = await _get_band_and_assert_member(band_id, current_member, session)
|
||||
band_folder = band.nc_folder_path or f"bands/{band.slug}/"
|
||||
nc = NextcloudClient.for_member(current_member)
|
||||
|
||||
log.info("NC scan START — band='%s' folder='%s' nc_user='%s'", band.slug, band_folder, nc._auth[0])
|
||||
songs: list[SongRead] = []
|
||||
stats = {"found": 0, "imported": 0, "skipped": 0}
|
||||
|
||||
try:
|
||||
items = await nc.list_folder(band_folder)
|
||||
except Exception as exc:
|
||||
log.error("NC scan FAILED — could not list '%s': %s", band_folder, exc)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_502_BAD_GATEWAY,
|
||||
detail=f"Cannot read Nextcloud folder '{band_folder}': {exc}",
|
||||
)
|
||||
async for event in scan_band_folder(session, nc, band_id, band_folder, current_member.id):
|
||||
if event["type"] == "song":
|
||||
songs.append(SongRead(**event["song"]))
|
||||
elif event["type"] == "done":
|
||||
stats = event["stats"]
|
||||
elif event["type"] == "error":
|
||||
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=event["message"])
|
||||
|
||||
log.info("NC scan — found %d top-level entries in '%s'", len(items), band_folder)
|
||||
for item in items:
|
||||
log.info(" entry href=%s → rel=%s", item.path, relative(item.path))
|
||||
|
||||
# Collect (nc_file_path, nc_folder, song_title, rehearsal_label) tuples.
|
||||
# nc_folder is the directory that groups versions of the same song.
|
||||
# For YYMMDD / dated rehearsal subfolders each file is its own song —
|
||||
# the song title comes from the filename stem, not the folder name.
|
||||
to_import: list[tuple[str, str, str, str | None]] = []
|
||||
|
||||
for item in items:
|
||||
rel = relative(item.path)
|
||||
if rel.endswith("/"):
|
||||
dir_name = Path(rel.rstrip("/")).name
|
||||
try:
|
||||
sub_items = await nc.list_folder(rel)
|
||||
except Exception as exc:
|
||||
log.warning("NC scan — could not list subfolder '%s': %s", rel, exc)
|
||||
continue
|
||||
|
||||
all_sub = [relative(s.path) for s in sub_items]
|
||||
audio_files = [s for s in sub_items if Path(relative(s.path)).suffix.lower() in AUDIO_EXTENSIONS]
|
||||
log.info(
|
||||
"NC scan — subfolder '%s': %d entries total, %d audio files",
|
||||
dir_name, len(all_sub), len(audio_files),
|
||||
)
|
||||
for s in sub_items:
|
||||
sr = relative(s.path)
|
||||
ext = Path(sr).suffix.lower()
|
||||
if ext and ext not in AUDIO_EXTENSIONS:
|
||||
log.info(" skip (not audio ext=%s): %s", ext, sr)
|
||||
|
||||
for sub in audio_files:
|
||||
sub_rel = relative(sub.path)
|
||||
song_title = Path(sub_rel).stem
|
||||
song_folder = str(Path(sub_rel).parent) + "/"
|
||||
rehearsal_label = dir_name
|
||||
log.info(" queue for import: %s → title='%s' folder='%s'", sub_rel, song_title, song_folder)
|
||||
to_import.append((sub_rel, song_folder, song_title, rehearsal_label))
|
||||
else:
|
||||
ext = Path(rel).suffix.lower()
|
||||
if ext in AUDIO_EXTENSIONS:
|
||||
folder = str(Path(rel).parent) + "/"
|
||||
title = Path(rel).stem
|
||||
log.info(" queue for import (root-level): %s → title='%s'", rel, title)
|
||||
to_import.append((rel, folder, title, None))
|
||||
elif ext:
|
||||
log.info(" skip root-level (not audio ext=%s): %s", ext, rel)
|
||||
|
||||
log.info("NC scan — %d audio files queued for import", len(to_import))
|
||||
|
||||
song_repo = SongRepository(session)
|
||||
from rehearsalhub.schemas.audio_version import AudioVersionCreate # noqa: PLC0415
|
||||
|
||||
for nc_file_path, nc_folder, song_title, rehearsal_label in to_import:
|
||||
# Skip if this exact file version is already registered
|
||||
try:
|
||||
meta = await nc.get_file_metadata(nc_file_path)
|
||||
etag = meta.etag
|
||||
except Exception as exc:
|
||||
log.warning("Could not fetch metadata for '%s': %s — skipping", nc_file_path, exc)
|
||||
continue
|
||||
|
||||
if etag and await version_repo.get_by_etag(etag):
|
||||
log.debug("Skipping '%s' — etag already registered", nc_file_path)
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Resolve rehearsal session from YYMMDD folder segment
|
||||
rehearsal_date = parse_rehearsal_date(nc_file_path)
|
||||
rehearsal_session_id = None
|
||||
if rehearsal_date:
|
||||
rs = await session_repo.get_or_create(band_id, rehearsal_date, nc_folder)
|
||||
rehearsal_session_id = rs.id
|
||||
|
||||
# Find or create song record
|
||||
song = await song_repo.get_by_nc_folder_path(nc_folder)
|
||||
if song is None:
|
||||
song = await song_repo.get_by_title_and_band(band_id, song_title)
|
||||
if song is None:
|
||||
log.info("Creating new song '%s' (folder: %s)", song_title, nc_folder)
|
||||
song = await song_repo.create(
|
||||
band_id=band_id,
|
||||
session_id=rehearsal_session_id,
|
||||
title=song_title,
|
||||
status="jam",
|
||||
notes=None,
|
||||
nc_folder_path=nc_folder,
|
||||
created_by=current_member.id,
|
||||
)
|
||||
else:
|
||||
log.info("Found existing song '%s' (id: %s)", song.title, song.id)
|
||||
if rehearsal_session_id and song.session_id is None:
|
||||
song = await song_repo.update(song, session_id=rehearsal_session_id)
|
||||
|
||||
await song_svc.register_version(
|
||||
song.id,
|
||||
AudioVersionCreate(
|
||||
nc_file_path=nc_file_path,
|
||||
nc_file_etag=etag,
|
||||
format=Path(nc_file_path).suffix.lstrip(".").lower(),
|
||||
file_size_bytes=meta.size if etag else None,
|
||||
),
|
||||
current_member.id,
|
||||
)
|
||||
|
||||
read = SongRead.model_validate(song)
|
||||
read.version_count = 1
|
||||
imported_songs.append(read)
|
||||
label_info = f" [rehearsal: {rehearsal_label}]" if rehearsal_label else ""
|
||||
log.info("Imported '%s' as song '%s'%s", nc_file_path, song_title, label_info)
|
||||
|
||||
log.info(
|
||||
"NC scan complete for '%s': %d imported, %d skipped (already registered)",
|
||||
band_folder, len(imported_songs), skipped_count,
|
||||
)
|
||||
return NcScanResult(
|
||||
folder=band_folder,
|
||||
files_found=len(to_import),
|
||||
imported=len(imported_songs),
|
||||
skipped=skipped_count,
|
||||
songs=imported_songs,
|
||||
files_found=stats["found"],
|
||||
imported=stats["imported"],
|
||||
skipped=stats["skipped"],
|
||||
songs=songs,
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user