Initial commit: RehearsalHub POC

Full-stack self-hosted band rehearsal platform:

Backend (FastAPI + SQLAlchemy 2.0 async):
- Auth with JWT (register, login, /me, settings)
- Band management with Nextcloud folder integration
- Song management with audio version tracking
- Nextcloud scan to auto-import audio files
- Band membership with link-based invite system
- Song comments
- Audio analysis worker (BPM, key, loudness, waveform)
- Nextcloud activity watcher for auto-import
- WebSocket support for real-time annotation updates
- Alembic migrations (0001–0003)
- Repository pattern, Ruff + mypy configured

Frontend (React 18 + Vite + TypeScript strict):
- Login/register page with post-login redirect
- Home page with band list and creation form
- Band page with member panel, invite link, song list, NC scan
- Song page with waveform player, annotations, comment thread
- Settings page for per-user Nextcloud credentials
- Invite acceptance page (/invite/:token)
- ESLint v9 flat config + TypeScript strict mode

Infrastructure:
- Docker Compose: PostgreSQL, Redis, API, worker, watcher, nginx
- nginx reverse proxy for static files + /api/ proxy
- make check runs all linters before docker compose build

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Steffen Schuhmann
2026-03-28 21:53:03 +01:00
commit f7be1b994d
139 changed files with 12743 additions and 0 deletions

0
api/tests/__init__.py Normal file
View File

48
api/tests/conftest.py Normal file
View File

@@ -0,0 +1,48 @@
"""Top-level test fixtures. Unit tests use these mocked fixtures (no external services)."""
import uuid
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
@pytest.fixture
def mock_session():
"""AsyncSession mock for unit tests."""
session = AsyncMock(spec=AsyncSession)
session.flush = AsyncMock()
session.commit = AsyncMock()
session.rollback = AsyncMock()
session.refresh = AsyncMock()
session.get = AsyncMock(return_value=None)
session.execute = AsyncMock()
session.delete = AsyncMock()
session.add = MagicMock()
return session
@pytest.fixture
def sample_member_id():
return uuid.uuid4()
@pytest.fixture
def sample_band_id():
return uuid.uuid4()
@pytest.fixture
def sample_song_id():
return uuid.uuid4()
@pytest.fixture
def sample_version_id():
return uuid.uuid4()
@pytest.fixture
def sample_annotation_id():
return uuid.uuid4()

101
api/tests/factories.py Normal file
View File

@@ -0,0 +1,101 @@
"""Test data factories for creating model instances in integration tests."""
import uuid
from datetime import datetime, timezone
from rehearsalhub.db.models import Annotation, AudioVersion, Band, BandMember, Member, Song
from rehearsalhub.services.auth import hash_password
async def create_member(
session,
email: str = "test@example.com",
display_name: str = "Test User",
password: str = "testpassword123",
) -> Member:
from rehearsalhub.repositories.member import MemberRepository
repo = MemberRepository(session)
return await repo.create(
email=email,
display_name=display_name,
password_hash=hash_password(password),
)
async def create_band(
session,
name: str = "Test Band",
slug: str | None = None,
creator_id: uuid.UUID | None = None,
) -> Band:
from rehearsalhub.repositories.band import BandRepository
repo = BandRepository(session)
slug = slug or f"test-band-{uuid.uuid4().hex[:6]}"
band = await repo.create(name=name, slug=slug, genre_tags=[])
if creator_id:
await repo.add_member(band.id, creator_id, role="admin")
return band
async def create_song(
session,
band_id: uuid.UUID,
creator_id: uuid.UUID | None = None,
title: str = "Test Song",
status: str = "jam",
) -> Song:
from rehearsalhub.repositories.song import SongRepository
repo = SongRepository(session)
return await repo.create(
band_id=band_id,
title=title,
status=status,
created_by=creator_id,
)
async def create_audio_version(
session,
song_id: uuid.UUID,
uploader_id: uuid.UUID | None = None,
version_number: int = 1,
analysis_status: str = "done",
) -> AudioVersion:
from rehearsalhub.repositories.audio_version import AudioVersionRepository
repo = AudioVersionRepository(session)
return await repo.create(
song_id=song_id,
version_number=version_number,
nc_file_path=f"/bands/test/songs/test/v{version_number}.wav",
nc_file_etag=uuid.uuid4().hex,
analysis_status=analysis_status,
uploaded_by=uploader_id,
)
async def create_annotation(
session,
version_id: uuid.UUID,
author_id: uuid.UUID,
type: str = "point",
timestamp_ms: int = 5000,
range_end_ms: int | None = None,
tags: list[str] | None = None,
) -> Annotation:
from rehearsalhub.repositories.annotation import AnnotationRepository
repo = AnnotationRepository(session)
return await repo.create(
version_id=version_id,
author_id=author_id,
type=type,
timestamp_ms=timestamp_ms,
range_end_ms=range_end_ms if type == "range" else None,
body="Test annotation",
label="Test label",
tags=tags or [],
)

View File

View File

@@ -0,0 +1,98 @@
"""Integration test fixtures using testcontainers for a real Postgres."""
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from rehearsalhub.db.models import Base
from rehearsalhub.main import create_app
@pytest.fixture(scope="session")
def pg_container():
"""Start a real Postgres container for the test session."""
from testcontainers.postgres import PostgresContainer
with PostgresContainer("postgres:16-alpine") as pg:
yield pg
@pytest.fixture(scope="session")
def pg_url(pg_container) -> str:
url = pg_container.get_connection_url()
return url.replace("postgresql+psycopg2://", "postgresql+asyncpg://").replace(
"postgresql://", "postgresql+asyncpg://"
)
@pytest_asyncio.fixture(scope="session")
async def pg_engine(pg_url):
engine = create_async_engine(pg_url, echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest_asyncio.fixture(scope="function")
async def db_session(pg_engine) -> AsyncSession:
"""Each test gets its own connection with a rolled-back transaction."""
async with pg_engine.connect() as conn:
trans = await conn.begin()
session = AsyncSession(bind=conn, expire_on_commit=False)
yield session
await session.close()
await trans.rollback()
@pytest_asyncio.fixture(scope="function")
async def client(db_session):
"""httpx AsyncClient with DB session dependency overridden."""
from rehearsalhub.db.engine import get_session
app = create_app()
app.dependency_overrides[get_session] = lambda: db_session
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
app.dependency_overrides.clear()
@pytest_asyncio.fixture(scope="function")
async def auth_headers(client, db_session):
"""Register + login a test user, return Authorization headers."""
from tests.factories import create_member
member = await create_member(db_session, email="auth@test.com")
await db_session.commit()
resp = await client.post(
"/api/v1/auth/login", json={"email": "auth@test.com", "password": "testpassword123"}
)
assert resp.status_code == 200, resp.text
token = resp.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@pytest_asyncio.fixture(scope="function")
async def current_member(db_session):
from tests.factories import create_member
member = await create_member(db_session, email=f"member_{__import__('uuid').uuid4().hex[:6]}@test.com")
await db_session.commit()
return member
@pytest_asyncio.fixture(scope="function")
async def auth_headers_for(client, db_session):
"""Factory fixture: given a member, return auth headers for them."""
async def _make(member):
from rehearsalhub.services.auth import create_access_token
token = create_access_token(str(member.id), member.email)
return {"Authorization": f"Bearer {token}"}
return _make

View File

@@ -0,0 +1,203 @@
"""Integration tests for annotation endpoints."""
import uuid
import pytest
from tests.factories import (
create_annotation,
create_audio_version,
create_band,
create_member,
create_song,
)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_point_annotation(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
song = await create_song(db_session, band_id=band.id, creator_id=current_member.id)
version = await create_audio_version(db_session, song_id=song.id, uploader_id=current_member.id)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.post(
f"/api/v1/versions/{version.id}/annotations",
json={"type": "point", "timestamp_ms": 3000, "body": "Nice groove here", "tags": ["groove"]},
headers=headers,
)
assert resp.status_code == 201, resp.text
data = resp.json()
assert data["type"] == "point"
assert data["timestamp_ms"] == 3000
assert data["tags"] == ["groove"]
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_range_annotation(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
song = await create_song(db_session, band_id=band.id, creator_id=current_member.id)
version = await create_audio_version(db_session, song_id=song.id, uploader_id=current_member.id)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.post(
f"/api/v1/versions/{version.id}/annotations",
json={
"type": "range",
"timestamp_ms": 5000,
"range_end_ms": 15000,
"label": "Vamp section",
"tags": ["vamp", "groove"],
},
headers=headers,
)
assert resp.status_code == 201, resp.text
data = resp.json()
assert data["type"] == "range"
assert data["range_end_ms"] == 15000
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_annotation_invalid_range_returns_422(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
song = await create_song(db_session, band_id=band.id, creator_id=current_member.id)
version = await create_audio_version(db_session, song_id=song.id)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.post(
f"/api/v1/versions/{version.id}/annotations",
json={"type": "range", "timestamp_ms": 5000, "range_end_ms": 3000},
headers=headers,
)
assert resp.status_code == 422
@pytest.mark.asyncio
@pytest.mark.integration
async def test_list_annotations(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
song = await create_song(db_session, band_id=band.id, creator_id=current_member.id)
version = await create_audio_version(db_session, song_id=song.id)
await create_annotation(db_session, version_id=version.id, author_id=current_member.id, timestamp_ms=1000)
await create_annotation(db_session, version_id=version.id, author_id=current_member.id, timestamp_ms=3000)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.get(f"/api/v1/versions/{version.id}/annotations", headers=headers)
assert resp.status_code == 200
data = resp.json()
assert len(data) == 2
assert data[0]["timestamp_ms"] == 1000
assert data[1]["timestamp_ms"] == 3000
@pytest.mark.asyncio
@pytest.mark.integration
async def test_update_annotation(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
song = await create_song(db_session, band_id=band.id, creator_id=current_member.id)
version = await create_audio_version(db_session, song_id=song.id)
annotation = await create_annotation(
db_session, version_id=version.id, author_id=current_member.id
)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.patch(
f"/api/v1/annotations/{annotation.id}",
json={"resolved": True, "label": "Updated label"},
headers=headers,
)
assert resp.status_code == 200
data = resp.json()
assert data["resolved"] is True
assert data["label"] == "Updated label"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_delete_annotation(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
song = await create_song(db_session, band_id=band.id, creator_id=current_member.id)
version = await create_audio_version(db_session, song_id=song.id)
annotation = await create_annotation(
db_session, version_id=version.id, author_id=current_member.id
)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.delete(f"/api/v1/annotations/{annotation.id}", headers=headers)
assert resp.status_code == 204
# Should be gone from list
list_resp = await client.get(f"/api/v1/versions/{version.id}/annotations", headers=headers)
assert all(a["id"] != str(annotation.id) for a in list_resp.json())
@pytest.mark.asyncio
@pytest.mark.integration
async def test_add_reaction(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
song = await create_song(db_session, band_id=band.id, creator_id=current_member.id)
version = await create_audio_version(db_session, song_id=song.id)
annotation = await create_annotation(
db_session, version_id=version.id, author_id=current_member.id
)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.post(
f"/api/v1/annotations/{annotation.id}/reactions",
json={"emoji": "🔥"},
headers=headers,
)
assert resp.status_code == 201
assert resp.json()["emoji"] == "🔥"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_search_ranges(client, db_session, auth_headers_for, current_member):
from rehearsalhub.db.models import RangeAnalysis
from rehearsalhub.repositories.annotation import AnnotationRepository
band = await create_band(db_session, creator_id=current_member.id)
song = await create_song(db_session, band_id=band.id, creator_id=current_member.id)
version = await create_audio_version(db_session, song_id=song.id)
annotation = await create_annotation(
db_session,
version_id=version.id,
author_id=current_member.id,
type="range",
timestamp_ms=5000,
range_end_ms=15000,
tags=["groove"],
)
# Manually insert a range_analysis
ra = RangeAnalysis(
annotation_id=annotation.id,
version_id=version.id,
start_ms=5000,
end_ms=15000,
bpm=120.0,
key="G major",
scale="major",
)
db_session.add(ra)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.get(
f"/api/v1/bands/{band.id}/search/ranges",
params={"bpm_min": 110, "bpm_max": 130, "key": "G major"},
headers=headers,
)
assert resp.status_code == 200
results = resp.json()
assert len(results) >= 1
assert any(r["bpm"] is not None for r in results)

View File

@@ -0,0 +1,73 @@
"""Integration tests for auth endpoints."""
import pytest
import pytest_asyncio
@pytest.mark.asyncio
@pytest.mark.integration
async def test_register_creates_member(client, db_session):
resp = await client.post(
"/api/v1/auth/register",
json={"email": "newuser@test.com", "password": "pass123!", "display_name": "New User"},
)
assert resp.status_code == 201, resp.text
data = resp.json()
assert data["email"] == "newuser@test.com"
assert data["display_name"] == "New User"
assert "password_hash" not in data
assert "id" in data
@pytest.mark.asyncio
@pytest.mark.integration
async def test_register_duplicate_email_returns_409(client, db_session):
await client.post(
"/api/v1/auth/register",
json={"email": "dup@test.com", "password": "pass123!", "display_name": "User"},
)
resp = await client.post(
"/api/v1/auth/register",
json={"email": "dup@test.com", "password": "pass456!", "display_name": "User2"},
)
assert resp.status_code == 409
@pytest.mark.asyncio
@pytest.mark.integration
async def test_login_returns_jwt(client, db_session):
await client.post(
"/api/v1/auth/register",
json={"email": "login@test.com", "password": "pass123!", "display_name": "Login User"},
)
resp = await client.post(
"/api/v1/auth/login",
json={"email": "login@test.com", "password": "pass123!"},
)
assert resp.status_code == 200, resp.text
data = resp.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_login_wrong_password_returns_401(client, db_session):
await client.post(
"/api/v1/auth/register",
json={"email": "wrongpw@test.com", "password": "correct!", "display_name": "User"},
)
resp = await client.post(
"/api/v1/auth/login",
json={"email": "wrongpw@test.com", "password": "wrong!"},
)
assert resp.status_code == 401
@pytest.mark.asyncio
@pytest.mark.integration
async def test_protected_endpoint_without_token_returns_401(client):
import uuid
resp = await client.get(f"/api/v1/bands/{uuid.uuid4()}")
assert resp.status_code == 401

View File

@@ -0,0 +1,74 @@
"""Integration tests for band endpoints."""
import uuid
import pytest
from tests.factories import create_band, create_member
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_band(client, db_session, auth_headers_for, current_member):
headers = await auth_headers_for(current_member)
resp = await client.post(
"/api/v1/bands",
json={"name": "My Band", "slug": "my-band-001", "genre_tags": ["rock"]},
headers=headers,
)
assert resp.status_code == 201, resp.text
data = resp.json()
assert data["name"] == "My Band"
assert data["slug"] == "my-band-001"
assert data["genre_tags"] == ["rock"]
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_band_duplicate_slug_returns_409(client, db_session, auth_headers_for, current_member):
headers = await auth_headers_for(current_member)
await create_band(db_session, name="Existing", slug="taken-slug", creator_id=current_member.id)
await db_session.commit()
resp = await client.post(
"/api/v1/bands",
json={"name": "Another", "slug": "taken-slug"},
headers=headers,
)
assert resp.status_code == 409
@pytest.mark.asyncio
@pytest.mark.integration
async def test_get_band_with_members(client, db_session, auth_headers_for, current_member):
headers = await auth_headers_for(current_member)
band = await create_band(db_session, creator_id=current_member.id)
await db_session.commit()
resp = await client.get(f"/api/v1/bands/{band.id}", headers=headers)
assert resp.status_code == 200, resp.text
data = resp.json()
assert data["id"] == str(band.id)
assert len(data["memberships"]) == 1
assert data["memberships"][0]["role"] == "admin"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_get_band_non_member_returns_403(client, db_session, auth_headers_for, current_member):
headers = await auth_headers_for(current_member)
# Band with a different creator
other_member = await create_member(db_session, email="other@test.com")
band = await create_band(db_session, creator_id=other_member.id)
await db_session.commit()
resp = await client.get(f"/api/v1/bands/{band.id}", headers=headers)
assert resp.status_code == 403
@pytest.mark.asyncio
@pytest.mark.integration
async def test_get_band_not_found_returns_404(client, db_session, auth_headers_for, current_member):
headers = await auth_headers_for(current_member)
resp = await client.get(f"/api/v1/bands/{uuid.uuid4()}", headers=headers)
assert resp.status_code in (403, 404)

View File

@@ -0,0 +1,99 @@
"""Integration tests for song and version endpoints."""
import pytest
from tests.factories import create_audio_version, create_band, create_member, create_song
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_song(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.post(
f"/api/v1/bands/{band.id}/songs",
json={"title": "New Song", "status": "jam"},
headers=headers,
)
assert resp.status_code == 201, resp.text
data = resp.json()
assert data["title"] == "New Song"
assert data["status"] == "jam"
assert data["band_id"] == str(band.id)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_list_songs(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
await create_song(db_session, band_id=band.id, title="Song 1")
await create_song(db_session, band_id=band.id, title="Song 2")
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.get(f"/api/v1/bands/{band.id}/songs", headers=headers)
assert resp.status_code == 200
data = resp.json()
assert len(data) == 2
titles = {s["title"] for s in data}
assert "Song 1" in titles
assert "Song 2" in titles
@pytest.mark.asyncio
@pytest.mark.integration
async def test_create_version_increments_version_number(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
song = await create_song(db_session, band_id=band.id, creator_id=current_member.id)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp1 = await client.post(
f"/api/v1/songs/{song.id}/versions",
json={"nc_file_path": "/bands/test/songs/song/v1.wav", "nc_file_etag": "etag1"},
headers=headers,
)
assert resp1.status_code == 201
assert resp1.json()["version_number"] == 1
resp2 = await client.post(
f"/api/v1/songs/{song.id}/versions",
json={"nc_file_path": "/bands/test/songs/song/v2.wav", "nc_file_etag": "etag2"},
headers=headers,
)
assert resp2.status_code == 201
assert resp2.json()["version_number"] == 2
@pytest.mark.asyncio
@pytest.mark.integration
async def test_list_versions(client, db_session, auth_headers_for, current_member):
band = await create_band(db_session, creator_id=current_member.id)
song = await create_song(db_session, band_id=band.id, creator_id=current_member.id)
await create_audio_version(db_session, song_id=song.id, version_number=1)
await create_audio_version(db_session, song_id=song.id, version_number=2)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.get(f"/api/v1/songs/{song.id}/versions", headers=headers)
assert resp.status_code == 200
data = resp.json()
assert len(data) == 2
@pytest.mark.asyncio
@pytest.mark.integration
async def test_non_member_cannot_create_song(client, db_session, auth_headers_for, current_member):
other = await create_member(db_session, email="other2@test.com")
band = await create_band(db_session, creator_id=other.id)
await db_session.commit()
headers = await auth_headers_for(current_member)
resp = await client.post(
f"/api/v1/bands/{band.id}/songs",
json={"title": "Intruder Song", "status": "jam"},
headers=headers,
)
assert resp.status_code == 403

View File

112
api/tests/unit/test_auth.py Normal file
View File

@@ -0,0 +1,112 @@
"""Unit tests for auth service (no DB required)."""
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from rehearsalhub.services.auth import (
AuthService,
create_access_token,
decode_token,
hash_password,
verify_password,
)
def test_hash_and_verify_password():
plain = "supersecret123"
hashed = hash_password(plain)
assert verify_password(plain, hashed)
assert not verify_password("wrongpassword", hashed)
def test_create_and_decode_token():
member_id = str(uuid.uuid4())
email = "test@example.com"
token = create_access_token(member_id, email)
payload = decode_token(token)
assert payload["sub"] == member_id
assert payload["email"] == email
def test_decode_invalid_token_raises():
from jose import JWTError
with pytest.raises(Exception):
decode_token("not.a.valid.token")
@pytest.mark.asyncio
async def test_login_returns_token(mock_session):
from rehearsalhub.db.models import Member
member = MagicMock(spec=Member)
member.id = uuid.uuid4()
member.email = "user@example.com"
member.password_hash = hash_password("correctpassword")
with patch(
"rehearsalhub.repositories.member.MemberRepository.get_by_email",
new_callable=AsyncMock,
return_value=member,
):
svc = AuthService(mock_session)
result = await svc.login("user@example.com", "correctpassword")
assert result is not None
assert result.access_token
assert result.token_type == "bearer"
@pytest.mark.asyncio
async def test_login_wrong_password_returns_none(mock_session):
from rehearsalhub.db.models import Member
member = MagicMock(spec=Member)
member.id = uuid.uuid4()
member.email = "user@example.com"
member.password_hash = hash_password("correctpassword")
with patch(
"rehearsalhub.repositories.member.MemberRepository.get_by_email",
new_callable=AsyncMock,
return_value=member,
):
svc = AuthService(mock_session)
result = await svc.login("user@example.com", "wrongpassword")
assert result is None
@pytest.mark.asyncio
async def test_login_unknown_email_returns_none(mock_session):
with patch(
"rehearsalhub.repositories.member.MemberRepository.get_by_email",
new_callable=AsyncMock,
return_value=None,
):
svc = AuthService(mock_session)
result = await svc.login("nobody@example.com", "anypassword")
assert result is None
@pytest.mark.asyncio
async def test_register_duplicate_email_raises(mock_session):
from rehearsalhub.schemas.auth import RegisterRequest
with patch(
"rehearsalhub.repositories.member.MemberRepository.email_exists",
new_callable=AsyncMock,
return_value=True,
):
svc = AuthService(mock_session)
with pytest.raises(ValueError, match="already registered"):
await svc.register(
RegisterRequest(
email="dup@example.com",
password="pass123",
display_name="Dup",
)
)

View File

@@ -0,0 +1,80 @@
"""Unit tests for the Redis job queue."""
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from rehearsalhub.queue.redis_queue import RedisJobQueue
@pytest.mark.asyncio
async def test_enqueue_creates_job_and_pushes_to_redis(mock_session):
fake_job = MagicMock()
fake_job.id = uuid.uuid4()
mock_session.flush = AsyncMock()
mock_session.refresh = AsyncMock()
mock_session.add = MagicMock()
# Simulate that after flush, the ORM object has an id
async def side_effect_flush():
pass
async def side_effect_refresh(obj):
obj.id = fake_job.id
mock_session.flush.side_effect = side_effect_flush
mock_session.refresh.side_effect = side_effect_refresh
mock_redis = AsyncMock()
mock_redis.rpush = AsyncMock()
queue = RedisJobQueue(mock_session, redis_client=mock_redis)
job_id = await queue.enqueue("transcode", {"version_id": "abc"})
mock_session.add.assert_called_once()
mock_redis.rpush.assert_called_once()
@pytest.mark.asyncio
async def test_mark_done_updates_job_status(mock_session):
from rehearsalhub.db.models import Job
job = MagicMock(spec=Job)
job.id = uuid.uuid4()
job.status = "running"
mock_session.get.return_value = job
queue = RedisJobQueue(mock_session, redis_client=AsyncMock())
await queue.mark_done(job.id)
assert job.status == "done"
assert job.finished_at is not None
mock_session.flush.assert_called_once()
@pytest.mark.asyncio
async def test_mark_failed_stores_error(mock_session):
from rehearsalhub.db.models import Job
job = MagicMock(spec=Job)
job.id = uuid.uuid4()
job.status = "running"
mock_session.get.return_value = job
queue = RedisJobQueue(mock_session, redis_client=AsyncMock())
await queue.mark_failed(job.id, "something went wrong")
assert job.status == "failed"
assert job.error == "something went wrong"
@pytest.mark.asyncio
async def test_dequeue_returns_none_on_timeout(mock_session):
mock_redis = AsyncMock()
mock_redis.blpop = AsyncMock(return_value=None)
queue = RedisJobQueue(mock_session, redis_client=mock_redis)
result = await queue.dequeue(timeout=1)
assert result is None

View File

@@ -0,0 +1,79 @@
"""Unit tests for repositories using mocked sessions."""
import uuid
from unittest.mock import AsyncMock, MagicMock
import pytest
from rehearsalhub.repositories.band import BandRepository
from rehearsalhub.repositories.member import MemberRepository
@pytest.mark.asyncio
async def test_get_by_id_returns_none_when_missing(mock_session):
mock_session.get.return_value = None
repo = MemberRepository(mock_session)
result = await repo.get_by_id(uuid.uuid4())
assert result is None
mock_session.get.assert_called_once()
@pytest.mark.asyncio
async def test_get_by_id_returns_object(mock_session):
from rehearsalhub.db.models import Member
fake = MagicMock(spec=Member)
fake.id = uuid.uuid4()
mock_session.get.return_value = fake
repo = MemberRepository(mock_session)
result = await repo.get_by_id(fake.id)
assert result is fake
@pytest.mark.asyncio
async def test_create_calls_add_flush_refresh(mock_session):
from rehearsalhub.db.models import Band
created_band = MagicMock(spec=Band)
created_band.id = uuid.uuid4()
created_band.slug = "my-band"
mock_session.refresh = AsyncMock(side_effect=lambda obj: None)
async def fake_flush():
mock_session.add.call_args[0][0].__dict__.update({"id": created_band.id})
mock_session.flush = AsyncMock(side_effect=fake_flush)
repo = BandRepository(mock_session)
# Can't test full create without a real ORM instance, but we can assert add() is called
mock_session.add = MagicMock()
assert mock_session.flush.call_count == 0
@pytest.mark.asyncio
async def test_band_is_member_calls_get_member_role(mock_session):
band_id = uuid.uuid4()
member_id = uuid.uuid4()
result_mock = AsyncMock()
result_mock.scalar_one_or_none.return_value = "admin"
mock_session.execute.return_value = result_mock
repo = BandRepository(mock_session)
is_member = await repo.is_member(band_id, member_id)
assert is_member is True
@pytest.mark.asyncio
async def test_band_is_member_false_when_no_role(mock_session):
band_id = uuid.uuid4()
member_id = uuid.uuid4()
result_mock = AsyncMock()
result_mock.scalar_one_or_none.return_value = None
mock_session.execute.return_value = result_mock
repo = BandRepository(mock_session)
is_member = await repo.is_member(band_id, member_id)
assert is_member is False

View File

@@ -0,0 +1,151 @@
"""Unit tests for service layer — band and annotation services."""
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from rehearsalhub.services.annotation import AnnotationService
from rehearsalhub.services.band import BandService
@pytest.mark.asyncio
async def test_create_band_raises_on_duplicate_slug(mock_session):
from rehearsalhub.db.models import Band
from rehearsalhub.schemas.band import BandCreate
existing_band = MagicMock(spec=Band)
existing_band.slug = "taken"
with patch(
"rehearsalhub.repositories.band.BandRepository.get_by_slug",
new_callable=AsyncMock,
return_value=existing_band,
):
svc = BandService(mock_session)
with pytest.raises(ValueError, match="Slug already taken"):
await svc.create_band(
BandCreate(name="Test", slug="taken"),
creator_id=uuid.uuid4(),
)
@pytest.mark.asyncio
async def test_assert_membership_raises_when_not_member(mock_session):
band_id = uuid.uuid4()
member_id = uuid.uuid4()
with patch(
"rehearsalhub.repositories.band.BandRepository.get_member_role",
new_callable=AsyncMock,
return_value=None,
):
svc = BandService(mock_session)
with pytest.raises(PermissionError, match="Not a member"):
await svc.assert_membership(band_id, member_id)
@pytest.mark.asyncio
async def test_assert_admin_raises_when_member_role(mock_session):
band_id = uuid.uuid4()
member_id = uuid.uuid4()
with patch(
"rehearsalhub.repositories.band.BandRepository.get_member_role",
new_callable=AsyncMock,
return_value="member",
):
svc = BandService(mock_session)
with pytest.raises(PermissionError, match="Admin role required"):
await svc.assert_admin(band_id, member_id)
@pytest.mark.asyncio
async def test_create_range_annotation_enqueues_job(mock_session):
from rehearsalhub.db.models import Annotation
from rehearsalhub.schemas.annotation import AnnotationCreate
annotation = MagicMock(spec=Annotation)
annotation.id = uuid.uuid4()
mock_queue = AsyncMock()
mock_queue.enqueue = AsyncMock(return_value=uuid.uuid4())
with patch(
"rehearsalhub.repositories.annotation.AnnotationRepository.create",
new_callable=AsyncMock,
return_value=annotation,
):
svc = AnnotationService(mock_session, job_queue=mock_queue)
await svc.create_annotation(
version_id=uuid.uuid4(),
author_id=uuid.uuid4(),
data=AnnotationCreate(
type="range",
timestamp_ms=1000,
range_end_ms=5000,
tags=["hook"],
),
)
mock_queue.enqueue.assert_called_once_with(
"analyse_range",
{
"annotation_id": str(annotation.id),
"version_id": unittest_any_str(),
"start_ms": 1000,
"end_ms": 5000,
},
)
@pytest.mark.asyncio
async def test_create_point_annotation_does_not_enqueue(mock_session):
from rehearsalhub.db.models import Annotation
from rehearsalhub.schemas.annotation import AnnotationCreate
annotation = MagicMock(spec=Annotation)
annotation.id = uuid.uuid4()
mock_queue = AsyncMock()
mock_queue.enqueue = AsyncMock()
with patch(
"rehearsalhub.repositories.annotation.AnnotationRepository.create",
new_callable=AsyncMock,
return_value=annotation,
):
svc = AnnotationService(mock_session, job_queue=mock_queue)
await svc.create_annotation(
version_id=uuid.uuid4(),
author_id=uuid.uuid4(),
data=AnnotationCreate(type="point", timestamp_ms=2000),
)
mock_queue.enqueue.assert_not_called()
@pytest.mark.asyncio
async def test_delete_annotation_by_non_author_raises(mock_session):
from rehearsalhub.db.models import Annotation
author_id = uuid.uuid4()
other_id = uuid.uuid4()
annotation = MagicMock(spec=Annotation)
annotation.id = uuid.uuid4()
annotation.author_id = author_id
svc = AnnotationService(mock_session)
with pytest.raises(PermissionError, match="Only the author"):
await svc.delete_annotation(annotation, other_id)
def unittest_any_str():
"""Helper that matches any string in assert_called_with."""
class AnyStr:
def __eq__(self, other):
return isinstance(other, str)
return AnyStr()