fix: watcher path normalization, subject filter, and debug logging
Two filter bugs caused every activity to be silently dropped:
1. Path format: Nextcloud activity API returns paths as
/username/files/bands/... but is_band_audio_path() was checking
parts[0] == "bands" after strip("/"), getting "username" instead.
Fix: normalize_nc_path() strips the /username/files/ and DAV prefixes
before any path checks are applied.
2. Subject names: filter was checking for "file_created"/"file_changed"
but Nextcloud uses "created_by"/"changed_by"/"created_self"/etc.
Fix: expanded _UPLOAD_SUBJECTS to cover all known NC activity subjects.
3. Expose NextcloudWatcherClient.username so normalize_nc_path() can
construct the correct prefix for the configured user.
4. Set watcher log level to DEBUG with per-activity skip reasons logged,
so the next filter edge case is immediately diagnosable. httpx/httpcore
kept at INFO/WARNING to avoid flooding the output.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -13,86 +13,139 @@ from watcher.nc_client import NextcloudWatcherClient
|
|||||||
|
|
||||||
log = logging.getLogger("watcher.event_loop")
|
log = logging.getLogger("watcher.event_loop")
|
||||||
|
|
||||||
# Persist last seen activity ID across polls (in-process state; persistent across restarts
|
# Persist last seen activity ID in-process (good enough for a POC)
|
||||||
# would require a small DB or file, but good enough for a POC)
|
|
||||||
_last_activity_id: int = 0
|
_last_activity_id: int = 0
|
||||||
|
|
||||||
|
# Nextcloud activity app subject names for file create/modify events.
|
||||||
|
# These differ across NC versions; we accept all known variants.
|
||||||
|
_UPLOAD_SUBJECTS = {
|
||||||
|
"file_created",
|
||||||
|
"file_changed",
|
||||||
|
"created_by",
|
||||||
|
"changed_by",
|
||||||
|
"created_public",
|
||||||
|
"created_self",
|
||||||
|
"changed_self",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def is_audio_file(path: str, extensions: list[str]) -> bool:
|
def is_audio_file(path: str, extensions: list[str]) -> bool:
|
||||||
return Path(path).suffix.lower() in extensions
|
return Path(path).suffix.lower() in extensions
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_nc_path(raw_path: str, username: str) -> str:
|
||||||
|
"""
|
||||||
|
Strip the Nextcloud WebDAV/activity path prefix so we get a plain
|
||||||
|
user-relative path.
|
||||||
|
|
||||||
|
Activity objects can look like:
|
||||||
|
/username/files/bands/slug/...
|
||||||
|
/remote.php/dav/files/username/bands/slug/...
|
||||||
|
bands/slug/... (already relative)
|
||||||
|
"""
|
||||||
|
path = raw_path.strip("/")
|
||||||
|
|
||||||
|
# /remote.php/dav/files/<user>/...
|
||||||
|
dav_prefix = f"remote.php/dav/files/{username}/"
|
||||||
|
if path.startswith(dav_prefix):
|
||||||
|
return path[len(dav_prefix):]
|
||||||
|
|
||||||
|
# /<username>/files/... (activity app format)
|
||||||
|
user_files_prefix = f"{username}/files/"
|
||||||
|
if path.startswith(user_files_prefix):
|
||||||
|
return path[len(user_files_prefix):]
|
||||||
|
|
||||||
|
# files/...
|
||||||
|
if path.startswith("files/"):
|
||||||
|
return path[len("files/"):]
|
||||||
|
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
def is_band_audio_path(path: str) -> bool:
|
def is_band_audio_path(path: str) -> bool:
|
||||||
"""Check if the path looks like /bands/<slug>/songs/**"""
|
"""True if the user-relative path is inside a band folder."""
|
||||||
parts = path.strip("/").split("/")
|
parts = path.strip("/").split("/")
|
||||||
return len(parts) >= 3 and parts[0] == "bands"
|
return len(parts) >= 2 and parts[0] == "bands"
|
||||||
|
|
||||||
|
|
||||||
def extract_nc_file_path(activity: dict[str, Any]) -> str | None:
|
def extract_nc_file_path(activity: dict[str, Any]) -> str | None:
|
||||||
"""Extract the server-relative file path from an activity event."""
|
"""Extract the server-relative file path from an activity event."""
|
||||||
objects = activity.get("objects", {})
|
objects = activity.get("objects", {})
|
||||||
for file_id, file_path in objects.items():
|
if isinstance(objects, dict):
|
||||||
|
for _file_id, file_path in objects.items():
|
||||||
if isinstance(file_path, str):
|
if isinstance(file_path, str):
|
||||||
return file_path
|
return file_path
|
||||||
return activity.get("object_name")
|
# Fallback: older NC versions put it in object_name
|
||||||
|
return activity.get("object_name") or None
|
||||||
|
|
||||||
|
|
||||||
async def register_version_with_api(
|
async def register_version_with_api(nc_file_path: str, nc_file_etag: str | None, api_url: str) -> bool:
|
||||||
nc_file_path: str,
|
|
||||||
nc_file_etag: str | None,
|
|
||||||
api_url: str,
|
|
||||||
) -> bool:
|
|
||||||
"""
|
|
||||||
Call POST /api/v1/songs/{song_id}/versions to register the new file.
|
|
||||||
We infer song context from the path: /bands/{slug}/songs/{song_folder}/file.ext
|
|
||||||
In a full implementation this would look up the song_id from the API.
|
|
||||||
Here we emit a best-effort registration event.
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
payload = {
|
payload = {"nc_file_path": nc_file_path, "nc_file_etag": nc_file_etag}
|
||||||
"nc_file_path": nc_file_path,
|
async with httpx.AsyncClient(timeout=15.0) as c:
|
||||||
"nc_file_etag": nc_file_etag,
|
|
||||||
}
|
|
||||||
async with httpx.AsyncClient(timeout=10.0) as c:
|
|
||||||
resp = await c.post(f"{api_url}/api/v1/internal/nc-upload", json=payload)
|
resp = await c.post(f"{api_url}/api/v1/internal/nc-upload", json=payload)
|
||||||
return resp.status_code in (200, 201)
|
if resp.status_code in (200, 201):
|
||||||
|
log.info("Registered version via internal API: %s", nc_file_path)
|
||||||
|
return True
|
||||||
|
log.warning(
|
||||||
|
"Internal API returned %d for %s: %s",
|
||||||
|
resp.status_code, nc_file_path, resp.text[:200],
|
||||||
|
)
|
||||||
|
return False
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
log.warning("Failed to register version with API: %s", exc)
|
log.warning("Failed to register version with API for %s: %s", nc_file_path, exc)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
async def poll_once(
|
async def poll_once(nc_client: NextcloudWatcherClient, settings: WatcherSettings) -> None:
|
||||||
nc_client: NextcloudWatcherClient,
|
|
||||||
settings: WatcherSettings,
|
|
||||||
) -> None:
|
|
||||||
global _last_activity_id
|
global _last_activity_id
|
||||||
|
|
||||||
activities = await nc_client.get_activities(since_id=_last_activity_id)
|
activities = await nc_client.get_activities(since_id=_last_activity_id)
|
||||||
if not activities:
|
if not activities:
|
||||||
|
log.debug("No new activities since id=%d", _last_activity_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
log.info("Received %d activities (since id=%d)", len(activities), _last_activity_id)
|
||||||
|
|
||||||
for activity in activities:
|
for activity in activities:
|
||||||
activity_id = activity.get("activity_id", 0)
|
activity_id = int(activity.get("activity_id", 0))
|
||||||
subject = activity.get("subject", "")
|
subject = activity.get("subject", "")
|
||||||
|
raw_path = extract_nc_file_path(activity)
|
||||||
|
|
||||||
if subject not in ("file_created", "file_changed"):
|
log.debug(
|
||||||
|
"Activity id=%d subject=%r path=%r",
|
||||||
|
activity_id, subject, raw_path,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Advance the cursor regardless of whether we act on this event
|
||||||
_last_activity_id = max(_last_activity_id, activity_id)
|
_last_activity_id = max(_last_activity_id, activity_id)
|
||||||
|
|
||||||
|
if subject not in _UPLOAD_SUBJECTS:
|
||||||
|
log.debug("Skipping activity %d: subject %r not a file upload event", activity_id, subject)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
nc_path = extract_nc_file_path(activity)
|
if raw_path is None:
|
||||||
if nc_path is None:
|
log.debug("Skipping activity %d: no file path in payload", activity_id)
|
||||||
_last_activity_id = max(_last_activity_id, activity_id)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
nc_path = normalize_nc_path(raw_path, nc_client.username)
|
||||||
|
|
||||||
if not is_audio_file(nc_path, settings.audio_extensions):
|
if not is_audio_file(nc_path, settings.audio_extensions):
|
||||||
_last_activity_id = max(_last_activity_id, activity_id)
|
log.debug(
|
||||||
|
"Skipping activity %d: '%s' is not an audio file (ext: %s)",
|
||||||
|
activity_id, nc_path, Path(nc_path).suffix.lower(),
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not is_band_audio_path(nc_path):
|
if not is_band_audio_path(nc_path):
|
||||||
_last_activity_id = max(_last_activity_id, activity_id)
|
log.debug(
|
||||||
|
"Skipping activity %d: '%s' is not inside a band folder",
|
||||||
|
activity_id, nc_path,
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
log.info("Detected audio upload: %s", nc_path)
|
log.info("Detected audio upload: %s (activity %d)", nc_path, activity_id)
|
||||||
etag = await nc_client.get_file_etag(nc_path)
|
etag = await nc_client.get_file_etag(nc_path)
|
||||||
await register_version_with_api(nc_path, etag, settings.api_url)
|
success = await register_version_with_api(nc_path, etag, settings.api_url)
|
||||||
_last_activity_id = max(_last_activity_id, activity_id)
|
if not success:
|
||||||
|
log.warning("Failed to register upload for activity %d (%s)", activity_id, nc_path)
|
||||||
|
|||||||
@@ -9,7 +9,10 @@ from watcher.config import get_settings
|
|||||||
from watcher.event_loop import poll_once
|
from watcher.event_loop import poll_once
|
||||||
from watcher.nc_client import NextcloudWatcherClient
|
from watcher.nc_client import NextcloudWatcherClient
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
||||||
|
# Quiet httpx's per-request noise at DEBUG; keep our own loggers verbose
|
||||||
|
logging.getLogger("httpx").setLevel(logging.INFO)
|
||||||
|
logging.getLogger("httpcore").setLevel(logging.WARNING)
|
||||||
log = logging.getLogger("watcher")
|
log = logging.getLogger("watcher")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ class NextcloudWatcherClient:
|
|||||||
def __init__(self, base_url: str, username: str, password: str) -> None:
|
def __init__(self, base_url: str, username: str, password: str) -> None:
|
||||||
self._base = base_url.rstrip("/")
|
self._base = base_url.rstrip("/")
|
||||||
self._auth = (username, password)
|
self._auth = (username, password)
|
||||||
|
self.username = username
|
||||||
|
|
||||||
def _client(self) -> httpx.AsyncClient:
|
def _client(self) -> httpx.AsyncClient:
|
||||||
return httpx.AsyncClient(
|
return httpx.AsyncClient(
|
||||||
|
|||||||
Reference in New Issue
Block a user