Files
rehearshalhub/api/tests/unit/test_queue.py
Steffen Schuhmann f7be1b994d Initial commit: RehearsalHub POC
Full-stack self-hosted band rehearsal platform:

Backend (FastAPI + SQLAlchemy 2.0 async):
- Auth with JWT (register, login, /me, settings)
- Band management with Nextcloud folder integration
- Song management with audio version tracking
- Nextcloud scan to auto-import audio files
- Band membership with link-based invite system
- Song comments
- Audio analysis worker (BPM, key, loudness, waveform)
- Nextcloud activity watcher for auto-import
- WebSocket support for real-time annotation updates
- Alembic migrations (0001–0003)
- Repository pattern, Ruff + mypy configured

Frontend (React 18 + Vite + TypeScript strict):
- Login/register page with post-login redirect
- Home page with band list and creation form
- Band page with member panel, invite link, song list, NC scan
- Song page with waveform player, annotations, comment thread
- Settings page for per-user Nextcloud credentials
- Invite acceptance page (/invite/:token)
- ESLint v9 flat config + TypeScript strict mode

Infrastructure:
- Docker Compose: PostgreSQL, Redis, API, worker, watcher, nginx
- nginx reverse proxy for static files + /api/ proxy
- make check runs all linters before docker compose build

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-28 21:53:03 +01:00

81 lines
2.2 KiB
Python

"""Unit tests for the Redis job queue."""
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from rehearsalhub.queue.redis_queue import RedisJobQueue
@pytest.mark.asyncio
async def test_enqueue_creates_job_and_pushes_to_redis(mock_session):
fake_job = MagicMock()
fake_job.id = uuid.uuid4()
mock_session.flush = AsyncMock()
mock_session.refresh = AsyncMock()
mock_session.add = MagicMock()
# Simulate that after flush, the ORM object has an id
async def side_effect_flush():
pass
async def side_effect_refresh(obj):
obj.id = fake_job.id
mock_session.flush.side_effect = side_effect_flush
mock_session.refresh.side_effect = side_effect_refresh
mock_redis = AsyncMock()
mock_redis.rpush = AsyncMock()
queue = RedisJobQueue(mock_session, redis_client=mock_redis)
job_id = await queue.enqueue("transcode", {"version_id": "abc"})
mock_session.add.assert_called_once()
mock_redis.rpush.assert_called_once()
@pytest.mark.asyncio
async def test_mark_done_updates_job_status(mock_session):
from rehearsalhub.db.models import Job
job = MagicMock(spec=Job)
job.id = uuid.uuid4()
job.status = "running"
mock_session.get.return_value = job
queue = RedisJobQueue(mock_session, redis_client=AsyncMock())
await queue.mark_done(job.id)
assert job.status == "done"
assert job.finished_at is not None
mock_session.flush.assert_called_once()
@pytest.mark.asyncio
async def test_mark_failed_stores_error(mock_session):
from rehearsalhub.db.models import Job
job = MagicMock(spec=Job)
job.id = uuid.uuid4()
job.status = "running"
mock_session.get.return_value = job
queue = RedisJobQueue(mock_session, redis_client=AsyncMock())
await queue.mark_failed(job.id, "something went wrong")
assert job.status == "failed"
assert job.error == "something went wrong"
@pytest.mark.asyncio
async def test_dequeue_returns_none_on_timeout(mock_session):
mock_redis = AsyncMock()
mock_redis.blpop = AsyncMock(return_value=None)
queue = RedisJobQueue(mock_session, redis_client=mock_redis)
result = await queue.dequeue(timeout=1)
assert result is None