Files
rehearshalhub/api/tests/unit/test_queue.py
Mistral Vibe 68da26588a security: fix auth, CORS, file upload, endpoint hardening + test fixes
- Add INTERNAL_SECRET shared-secret auth to /internal/nc-upload endpoint
- Add JWT token validation to WebSocket /ws/versions/{version_id}
- Fix NameError: band_slug → band.slug in internal.py
- Move inline imports to top of internal.py; add missing Member/NextcloudClient imports
- Remove ~15 debug print() statements from auth.py
- Replace Content-Type-only avatar check with extension whitelist + Pillow Image.verify()
- Sanitize exception details in versions.py (no more str(e) in 4xx/5xx responses)
- Restrict CORS allow_methods/allow_headers from "*" to explicit lists
- Add security headers middleware: X-Frame-Options, X-Content-Type-Options, Referrer-Policy
- Reduce JWT expiry from 7 days to 1 hour
- Add Pillow>=10.0 dependency; document INTERNAL_SECRET in .env.example
- Implement missing RedisJobQueue.dequeue() method (required by protocol)
- Fix 5 pre-existing unit test failures: settings env vars conftest, deferred Redis push,
  dequeue method, AsyncMock→MagicMock for sync scalar_one_or_none

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-30 21:02:56 +02:00

84 lines
2.4 KiB
Python

"""Unit tests for the Redis job queue."""
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from rehearsalhub.queue.redis_queue import RedisJobQueue, flush_pending_pushes
@pytest.mark.asyncio
async def test_enqueue_creates_job_and_pushes_to_redis(mock_session):
fake_job = MagicMock()
fake_job.id = uuid.uuid4()
mock_session.flush = AsyncMock()
mock_session.refresh = AsyncMock()
mock_session.add = MagicMock()
# Simulate that after flush, the ORM object has an id
async def side_effect_flush():
pass
async def side_effect_refresh(obj):
obj.id = fake_job.id
mock_session.flush.side_effect = side_effect_flush
mock_session.refresh.side_effect = side_effect_refresh
mock_redis = AsyncMock()
mock_redis.rpush = AsyncMock()
queue = RedisJobQueue(mock_session, redis_client=mock_redis)
job_id = await queue.enqueue("transcode", {"version_id": "abc"})
mock_session.add.assert_called_once()
# The Redis push is deferred; it fires when flush_pending_pushes is called after commit.
mock_redis.rpush.assert_not_called()
await flush_pending_pushes(mock_session)
mock_redis.rpush.assert_called_once()
@pytest.mark.asyncio
async def test_mark_done_updates_job_status(mock_session):
from rehearsalhub.db.models import Job
job = MagicMock(spec=Job)
job.id = uuid.uuid4()
job.status = "running"
mock_session.get.return_value = job
queue = RedisJobQueue(mock_session, redis_client=AsyncMock())
await queue.mark_done(job.id)
assert job.status == "done"
assert job.finished_at is not None
mock_session.flush.assert_called_once()
@pytest.mark.asyncio
async def test_mark_failed_stores_error(mock_session):
from rehearsalhub.db.models import Job
job = MagicMock(spec=Job)
job.id = uuid.uuid4()
job.status = "running"
mock_session.get.return_value = job
queue = RedisJobQueue(mock_session, redis_client=AsyncMock())
await queue.mark_failed(job.id, "something went wrong")
assert job.status == "failed"
assert job.error == "something went wrong"
@pytest.mark.asyncio
async def test_dequeue_returns_none_on_timeout(mock_session):
mock_redis = AsyncMock()
mock_redis.blpop = AsyncMock(return_value=None)
queue = RedisJobQueue(mock_session, redis_client=mock_redis)
result = await queue.dequeue(timeout=1)
assert result is None