Source code for django_snapshots.storage.django_storage
"""DjangoStorageBackend — wraps Django's Storage API for django-snapshots.
Satisfies the basic ``SnapshotStorage`` protocol only. If you need streaming
or atomic operations, use ``LocalFileSystemBackend`` instead.
"""
from __future__ import annotations
import builtins
import os
from pathlib import Path
from typing import IO
from django.core.files.base import ContentFile
from django.core.files.storage import Storage
[docs]
class DjangoStorageBackend:
"""Wrap any Django ``Storage`` backend as a ``SnapshotStorage``.
Satisfies ``SnapshotStorage`` via structural subtyping. Does *not* satisfy
``AdvancedSnapshotStorage`` — use ``LocalFileSystemBackend`` when streaming
or atomic operations are required.
Args:
storage: Any ``django.core.files.storage.Storage`` instance.
"""
[docs]
def __init__(self, storage: Storage) -> None:
self._storage = storage
[docs]
def read(self, path: str) -> IO[bytes]:
return self._storage.open(path, "rb")
[docs]
def write(self, path: str, content: IO[bytes]) -> None:
data = content.read()
if self._storage.exists(path):
self._storage.delete(path)
self._storage.save(path, ContentFile(data))
[docs]
def list(self, prefix: str) -> builtins.list[str]:
results: builtins.list[str] = []
if hasattr(self._storage, "location"):
# FileSystemStorage: walk the filesystem directly for reliability
root = Path(self._storage.location) # type: ignore[attr-defined]
for dirpath, _, filenames in os.walk(root):
for filename in filenames:
full = Path(dirpath) / filename
rel = full.relative_to(root).as_posix()
if rel.startswith(prefix):
results.append(rel)
else:
self._collect("", prefix, results)
return results
def _collect(
self, current_dir: str, prefix: str, results: builtins.list[str]
) -> None:
"""Recursively collect paths matching prefix using Storage.listdir()."""
try:
dirs, files = self._storage.listdir(current_dir or ".")
except Exception:
return
for filename in files:
rel = f"{current_dir}/{filename}".lstrip("/") if current_dir else filename
if rel.startswith(prefix):
results.append(rel)
for dirname in dirs:
sub = f"{current_dir}/{dirname}".lstrip("/") if current_dir else dirname
self._collect(sub, prefix, results)
[docs]
def delete(self, path: str) -> None:
if self._storage.exists(path):
self._storage.delete(path)
[docs]
def exists(self, path: str) -> bool:
return self._storage.exists(path)