refactor(archive): streamline repository archiving logic

- Simplify archive directory resolution using catalog state
- Replace redundant directory checks with repository model methods
- Remove unused imports and legacy functions for cleaner code
- Improve error handling for missing archive categories
This commit is contained in:
2025-05-11 14:38:06 +02:00
parent eb5bf52f2f
commit d3ee7ee63d

View File

@@ -1,29 +1,21 @@
import shutil import shutil
import tarfile import tarfile
from datetime import datetime, timedelta from datetime import datetime
from pathlib import Path from pathlib import Path
from typer import Typer, Option from typer import Typer, Option
from rich.console import Console from rich.console import Console
from rich.table import Table from rich.table import Table
import subprocess
import zstandard import zstandard
import uuid import uuid
from repocat.config import load_config from repocat.config import load_config
from repocat.cli.clean import is_git_dirty, is_git_unpushed, get_dir_size from repocat.models.catalog import RepoCatalogState
app = Typer() app = Typer()
console = Console() console = Console()
def get_archive_target_dir(): def archive_repo(source: Path, target_dir: Path, dry_run: bool) -> Path:
config = load_config()
for cat in config.categories:
if cat.is_archive:
return config.base_dir / cat.subdir
raise ValueError("Keine Archiv-Kategorie (is_archive: true) in der Konfiguration gefunden.")
def archive_repo(source: Path, target_dir: Path, dry_run: bool):
today = datetime.today().strftime("%Y.%m.%d") today = datetime.today().strftime("%Y.%m.%d")
unique = uuid.uuid4().hex[:8] unique = uuid.uuid4().hex[:8]
archive_name = f"{today} - {source.name} - {unique}.tar.zst" archive_name = f"{today} - {source.name} - {unique}.tar.zst"
@@ -33,7 +25,7 @@ def archive_repo(source: Path, target_dir: Path, dry_run: bool):
raise FileExistsError(f"Archiv existiert bereits: {archive_path}") raise FileExistsError(f"Archiv existiert bereits: {archive_path}")
if dry_run: if dry_run:
return archive_path # Nur anzeigen return archive_path
with open(archive_path, "wb") as f: with open(archive_path, "wb") as f:
cctx = zstandard.ZstdCompressor(level=20) cctx = zstandard.ZstdCompressor(level=20)
@@ -44,6 +36,7 @@ def archive_repo(source: Path, target_dir: Path, dry_run: bool):
shutil.rmtree(source) shutil.rmtree(source)
return archive_path return archive_path
@app.command("run") @app.command("run")
def run( def run(
older_than: int = Option(30, "--older-than", help="Nur Repositories älter als X Tage archivieren."), older_than: int = Option(30, "--older-than", help="Nur Repositories älter als X Tage archivieren."),
@@ -51,11 +44,16 @@ def run(
category: list[str] = Option(None, "--category", "-c", help="Nur bestimmte Kategorien prüfen."), category: list[str] = Option(None, "--category", "-c", help="Nur bestimmte Kategorien prüfen."),
): ):
config = load_config() config = load_config()
now = datetime.now() catalog = RepoCatalogState.from_config(config)
archive_path = get_archive_target_dir()
archive_cat = next((c for c in catalog.categories if c.config.is_archive), None)
if not archive_cat:
console.print("[red]Keine Archiv-Kategorie in der Konfiguration gefunden.[/]")
raise SystemExit(1)
archive_path = archive_cat.path
archive_path.mkdir(parents=True, exist_ok=True) archive_path.mkdir(parents=True, exist_ok=True)
base = config.base_dir
table = Table(title="Archivierung") table = Table(title="Archivierung")
table.add_column("Kategorie") table.add_column("Kategorie")
table.add_column("Repository") table.add_column("Repository")
@@ -66,44 +64,33 @@ def run(
archived = 0 archived = 0
skipped = 0 skipped = 0
for cat in config.categories: for cat in catalog.categories:
if cat.is_archive or cat.name == "review" or cat.volatile: if cat.config.is_archive or cat.config.name == "review" or cat.config.volatile:
continue continue
if category and cat.name not in category: if category and cat.config.name not in category:
continue continue
source_dir = base / cat.subdir limit_days = cat.config.days if cat.config.days is not None else older_than
if not source_dir.exists():
continue
for repo in sorted(source_dir.iterdir()): for repo in cat.repos:
if not repo.is_dir(): if not repo.is_expired(limit_days):
continue table.add_row(cat.config.name, repo.name, str(repo.age_days), f"{repo.size_mb:.1f}", "[blue]Zu jung[/]")
mtime = datetime.fromtimestamp(repo.stat().st_mtime)
age_days = (now - mtime).days
size_mb = get_dir_size(repo) / 1024 / 1024
limit_days = cat.days if cat.days is not None else older_than
if age_days < limit_days:
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", "[blue]Zu jung[/]")
skipped += 1 skipped += 1
continue continue
if (repo / ".git").exists(): if repo.git and repo.git.is_protected:
if is_git_dirty(repo) or is_git_unpushed(repo): table.add_row(cat.config.name, repo.name, str(repo.age_days), f"{repo.size_mb:.1f}", "[blue]Geschützt (git)[/]")
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", "[blue]Geschützt (git)[/]") skipped += 1
skipped += 1 continue
continue
try: try:
archive_file = archive_repo(repo, archive_path, dry_run) archive_file = archive_repo(repo.path, archive_path, dry_run)
action = "[yellow]Würde archivieren[/]" if dry_run else f"[green]Archiviert →[/] {archive_file.name}" action = "[yellow]Würde archivieren[/]" if dry_run else f"[green]Archiviert →[/] {archive_file.name}"
archived += 1 archived += 1
except Exception as e: except Exception as e:
action = f"[red]Fehler: {e}[/]" action = f"[red]Fehler: {e}[/]"
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", action) table.add_row(cat.config.name, repo.name, str(repo.age_days), f"{repo.size_mb:.1f}", action)
console.print(table) console.print(table)
console.print(f"\n[green]✓ Archivierung abgeschlossen[/] – {archived} archiviert, {skipped} übersprungen") console.print(f"\n[green]✓ Archivierung abgeschlossen[/] – {archived} archiviert, {skipped} übersprungen")