"""Nextcloud WebDAV + OCS storage client."""
from __future__ import annotations
import logging
import xml.etree.ElementTree as ET
from typing import Any
from urllib.parse import unquote
import httpx
from rehearsalhub.storage.protocol import FileMetadata
logger = logging.getLogger(__name__)
_DAV_NS = "{DAV:}"
class NextcloudClient:
def __init__(
self,
base_url: str,
username: str,
password: str,
) -> None:
if not base_url or not username:
raise ValueError("Nextcloud credentials must be provided explicitly")
self._base = base_url.rstrip("/")
self._username = username
self._auth = (username, password)
self._dav_root = f"{self._base}/remote.php/dav/files/{username}"
# Prefix stripped from WebDAV hrefs to produce relative paths
self._dav_prefix = f"/remote.php/dav/files/{username}/"
def _client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(auth=self._auth, timeout=30.0)
def _dav_url(self, path: str) -> str:
return f"{self._dav_root}/{path.lstrip('/')}"
async def create_folder(self, path: str) -> None:
async with self._client() as c:
resp = await c.request("MKCOL", self._dav_url(path))
if resp.status_code not in (201, 405): # 405 = already exists
resp.raise_for_status()
async def get_file_metadata(self, path: str) -> FileMetadata:
body = (
''
''
" "
""
)
async with self._client() as c:
resp = await c.request(
"PROPFIND",
self._dav_url(path),
headers={"Depth": "0", "Content-Type": "application/xml"},
content=body,
)
resp.raise_for_status()
return _parse_propfind_single(resp.text, path)
async def list_folder(self, path: str) -> list[FileMetadata]:
body = (
''
''
" "
""
)
async with self._client() as c:
resp = await c.request(
"PROPFIND",
self._dav_url(path),
headers={"Depth": "1", "Content-Type": "application/xml"},
content=body,
)
resp.raise_for_status()
items = _parse_propfind_multi(resp.text)
# Normalise WebDAV absolute hrefs to provider-relative paths so callers
# never need to know about DAV internals. URL-decode to handle
# filenames that contain spaces or non-ASCII characters.
for item in items:
decoded = unquote(item.path)
if decoded.startswith(self._dav_prefix):
item.path = decoded[len(self._dav_prefix):]
else:
item.path = decoded.lstrip("/")
return items
async def download(self, path: str) -> bytes:
logger.debug("Downloading file from Nextcloud: %s", path)
try:
async with self._client() as c:
resp = await c.get(self._dav_url(path))
resp.raise_for_status()
logger.debug("Successfully downloaded file: %s", path)
return resp.content
except httpx.ConnectError as e:
logger.error("Failed to connect to Nextcloud at %s: %s", self._base, str(e))
raise
except httpx.HTTPStatusError as e:
logger.error("Nextcloud request failed for %s: %s (status: %d)",
path, str(e), e.response.status_code)
raise
except Exception as e:
logger.error("Unexpected error downloading from Nextcloud: %s", str(e))
raise
async def get_direct_url(self, path: str) -> str:
return self._dav_url(path)
async def delete(self, path: str) -> None:
async with self._client() as c:
resp = await c.request("DELETE", self._dav_url(path))
resp.raise_for_status()
async def get_activities(self, since_id: int = 0, limit: int = 50) -> list[dict[str, Any]]:
"""Fetch recent file activity from Nextcloud OCS API."""
url = f"{self._base}/ocs/v2.php/apps/activity/api/v2/activity/files"
params: dict[str, Any] = {"since": since_id, "limit": limit, "format": "json"}
async with self._client() as c:
resp = await c.get(url, params=params, headers={"OCS-APIRequest": "true"})
resp.raise_for_status()
data = resp.json()
return data.get("ocs", {}).get("data", [])
def _parse_propfind_single(xml_text: str, path: str) -> FileMetadata:
root = ET.fromstring(xml_text)
response = root.find(f"{_DAV_NS}response")
return _response_to_metadata(response, path) # type: ignore[arg-type]
def _parse_propfind_multi(xml_text: str) -> list[FileMetadata]:
root = ET.fromstring(xml_text)
results = []
for i, response in enumerate(root.findall(f"{_DAV_NS}response")):
if i == 0:
continue # skip the folder itself
href = response.findtext(f"{_DAV_NS}href") or ""
results.append(_response_to_metadata(response, href))
return results
def _response_to_metadata(response: ET.Element, fallback_path: str) -> FileMetadata:
propstat = response.find(f"{_DAV_NS}propstat")
prop = propstat.find(f"{_DAV_NS}prop") if propstat is not None else None # type: ignore[union-attr]
href = response.findtext(f"{_DAV_NS}href") or fallback_path
etag = (prop.findtext(f"{_DAV_NS}getetag") or "").strip('"') if prop is not None else ""
size_text = prop.findtext(f"{_DAV_NS}getcontentlength") if prop is not None else "0"
ctype = (
prop.findtext(f"{_DAV_NS}getcontenttype") or "application/octet-stream"
if prop is not None
else "application/octet-stream"
)
return FileMetadata(
path=href,
etag=etag,
size=int(size_text or 0),
content_type=ctype,
)