Compare commits
7 Commits
51e5c10a5d
...
719f257739
| Author | SHA1 | Date | |
|---|---|---|---|
|
719f257739
|
|||
|
55e79d224f
|
|||
|
68b3233fcb
|
|||
|
9bff626f30
|
|||
|
458d965062
|
|||
|
d3ee7ee63d
|
|||
|
eb5bf52f2f
|
@@ -1,29 +1,21 @@
|
||||
import shutil
|
||||
import tarfile
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typer import Typer, Option
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
import subprocess
|
||||
import zstandard
|
||||
import uuid
|
||||
|
||||
from repocat.config import load_config
|
||||
from repocat.cli.clean import is_git_dirty, is_git_unpushed, get_dir_size
|
||||
from repocat.models.catalog import RepoCatalogState
|
||||
|
||||
app = Typer()
|
||||
console = Console()
|
||||
|
||||
|
||||
def get_archive_target_dir():
|
||||
config = load_config()
|
||||
for cat in config.categories:
|
||||
if cat.is_archive:
|
||||
return config.base_dir / cat.subdir
|
||||
raise ValueError("Keine Archiv-Kategorie (is_archive: true) in der Konfiguration gefunden.")
|
||||
|
||||
def archive_repo(source: Path, target_dir: Path, dry_run: bool):
|
||||
def archive_repo(source: Path, target_dir: Path, dry_run: bool) -> Path:
|
||||
today = datetime.today().strftime("%Y.%m.%d")
|
||||
unique = uuid.uuid4().hex[:8]
|
||||
archive_name = f"{today} - {source.name} - {unique}.tar.zst"
|
||||
@@ -33,7 +25,7 @@ def archive_repo(source: Path, target_dir: Path, dry_run: bool):
|
||||
raise FileExistsError(f"Archiv existiert bereits: {archive_path}")
|
||||
|
||||
if dry_run:
|
||||
return archive_path # Nur anzeigen
|
||||
return archive_path
|
||||
|
||||
with open(archive_path, "wb") as f:
|
||||
cctx = zstandard.ZstdCompressor(level=20)
|
||||
@@ -44,6 +36,7 @@ def archive_repo(source: Path, target_dir: Path, dry_run: bool):
|
||||
shutil.rmtree(source)
|
||||
return archive_path
|
||||
|
||||
|
||||
@app.command("run")
|
||||
def run(
|
||||
older_than: int = Option(30, "--older-than", help="Nur Repositories älter als X Tage archivieren."),
|
||||
@@ -51,11 +44,16 @@ def run(
|
||||
category: list[str] = Option(None, "--category", "-c", help="Nur bestimmte Kategorien prüfen."),
|
||||
):
|
||||
config = load_config()
|
||||
now = datetime.now()
|
||||
archive_path = get_archive_target_dir()
|
||||
catalog = RepoCatalogState.from_config(config)
|
||||
|
||||
archive_cat = next((c for c in catalog.categories if c.config.is_archive), None)
|
||||
if not archive_cat:
|
||||
console.print("[red]Keine Archiv-Kategorie in der Konfiguration gefunden.[/]")
|
||||
raise SystemExit(1)
|
||||
|
||||
archive_path = archive_cat.path
|
||||
archive_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
base = config.base_dir
|
||||
table = Table(title="Archivierung")
|
||||
table.add_column("Kategorie")
|
||||
table.add_column("Repository")
|
||||
@@ -66,44 +64,33 @@ def run(
|
||||
archived = 0
|
||||
skipped = 0
|
||||
|
||||
for cat in config.categories:
|
||||
if cat.is_archive or cat.name == "review" or cat.volatile:
|
||||
for cat in catalog.categories:
|
||||
if cat.config.is_archive or cat.config.name == "review" or cat.config.volatile:
|
||||
continue
|
||||
if category and cat.name not in category:
|
||||
if category and cat.config.name not in category:
|
||||
continue
|
||||
|
||||
source_dir = base / cat.subdir
|
||||
if not source_dir.exists():
|
||||
continue
|
||||
limit_days = cat.config.days if cat.config.days is not None else older_than
|
||||
|
||||
for repo in sorted(source_dir.iterdir()):
|
||||
if not repo.is_dir():
|
||||
continue
|
||||
|
||||
mtime = datetime.fromtimestamp(repo.stat().st_mtime)
|
||||
age_days = (now - mtime).days
|
||||
size_mb = get_dir_size(repo) / 1024 / 1024
|
||||
|
||||
limit_days = cat.days if cat.days is not None else older_than
|
||||
if age_days < limit_days:
|
||||
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", "[blue]Zu jung[/]")
|
||||
for repo in cat.repos:
|
||||
if not repo.is_expired(limit_days):
|
||||
table.add_row(cat.config.name, repo.name, str(repo.age_days), f"{repo.size_mb:.1f}", "[blue]Zu jung[/]")
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
if (repo / ".git").exists():
|
||||
if is_git_dirty(repo) or is_git_unpushed(repo):
|
||||
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", "[blue]Geschützt (git)[/]")
|
||||
skipped += 1
|
||||
continue
|
||||
if repo.git and repo.git.is_protected:
|
||||
table.add_row(cat.config.name, repo.name, str(repo.age_days), f"{repo.size_mb:.1f}", "[blue]Geschützt (git)[/]")
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
archive_file = archive_repo(repo, archive_path, dry_run)
|
||||
archive_file = archive_repo(repo.path, archive_path, dry_run)
|
||||
action = "[yellow]Würde archivieren[/]" if dry_run else f"[green]Archiviert →[/] {archive_file.name}"
|
||||
archived += 1
|
||||
except Exception as e:
|
||||
action = f"[red]Fehler: {e}[/]"
|
||||
|
||||
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", action)
|
||||
table.add_row(cat.config.name, repo.name, str(repo.age_days), f"{repo.size_mb:.1f}", action)
|
||||
|
||||
console.print(table)
|
||||
console.print(f"\n[green]✓ Archivierung abgeschlossen[/] – {archived} archiviert, {skipped} übersprungen")
|
||||
console.print(f"\n[green]✓ Archivierung abgeschlossen[/] – {archived} archiviert, {skipped} übersprungen")
|
||||
@@ -1,65 +1,29 @@
|
||||
import subprocess
|
||||
from typer import Typer, Option, Argument
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from datetime import datetime, timedelta
|
||||
from typer import Typer, Option
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from repocat.config import load_config
|
||||
from repocat.models.catalog import RepoCatalogState
|
||||
|
||||
app = Typer()
|
||||
console = Console()
|
||||
|
||||
def is_git_dirty(path: Path) -> bool:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "status", "--porcelain"],
|
||||
cwd=path,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
)
|
||||
return bool(result.stdout.strip())
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def is_git_unpushed(path: Path) -> bool:
|
||||
try:
|
||||
# Check if upstream exists
|
||||
subprocess.run(["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
|
||||
cwd=path, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
|
||||
|
||||
result = subprocess.run(
|
||||
["git", "log", "@{u}..HEAD", "--oneline"],
|
||||
cwd=path,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
)
|
||||
return bool(result.stdout.strip())
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def get_dir_size(path: Path) -> int:
|
||||
"""Returns size in bytes."""
|
||||
return sum(f.stat().st_size for f in path.rglob("*") if f.is_file())
|
||||
|
||||
def complete_categories(incomplete: str):
|
||||
config = load_config()
|
||||
return [c.name for c in config.categories if c.name.startswith(incomplete)]
|
||||
|
||||
|
||||
@app.command("volatile")
|
||||
def clean_volatile(
|
||||
dry_run: bool = Option(False, "--dry-run", help="Nur anzeigen, was gelöscht würde"),
|
||||
category: list[str] = Option(None, "--category", "-c", help="Nur bestimmte Kategorien reinigen", autocompletion=complete_categories),
|
||||
):
|
||||
"""
|
||||
Löscht **bedingungslos** alle Repositories aus `volatile`-Kategorien (z. B. `scratches`).
|
||||
Löscht **bedingungslos** alle Repositories aus `volatile`-Kategorien (z.\u200bB. `scratches`).
|
||||
"""
|
||||
config = load_config()
|
||||
base = config.base_dir
|
||||
catalog = RepoCatalogState.from_config(load_config())
|
||||
|
||||
table = Table(title="Clean – Volatile (Zerstörung)")
|
||||
table.add_column("Kategorie")
|
||||
@@ -70,37 +34,31 @@ def clean_volatile(
|
||||
deleted = 0
|
||||
skipped = 0
|
||||
|
||||
for cat in config.categories:
|
||||
if not cat.volatile:
|
||||
for cat in catalog.categories:
|
||||
if not cat.config.volatile:
|
||||
continue
|
||||
if category and cat.name not in category:
|
||||
if category and cat.config.name not in category:
|
||||
continue
|
||||
|
||||
path = base / cat.subdir
|
||||
if not path.exists():
|
||||
continue
|
||||
|
||||
for repo in sorted(path.iterdir()):
|
||||
if not repo.is_dir():
|
||||
continue
|
||||
|
||||
size_mb = get_dir_size(repo) / 1024 / 1024
|
||||
for repo in cat.repos:
|
||||
size_mb = repo.size_mb
|
||||
|
||||
if dry_run:
|
||||
action = "[yellow]Würde löschen[/]"
|
||||
else:
|
||||
try:
|
||||
shutil.rmtree(repo)
|
||||
shutil.rmtree(repo.path)
|
||||
action = "[red]Gelöscht[/]"
|
||||
deleted += 1
|
||||
except Exception as e:
|
||||
action = f"[red]Fehler: {e}[/]"
|
||||
|
||||
table.add_row(cat.name, repo.name, f"{size_mb:.1f}", action)
|
||||
table.add_row(cat.config.name, repo.name, f"{size_mb:.1f}", action)
|
||||
|
||||
console.print(table)
|
||||
console.print(f"\n[green]✓ Volatile-Clean abgeschlossen[/] – {deleted} gelöscht, {skipped} übersprungen")
|
||||
|
||||
|
||||
@app.command()
|
||||
def run(
|
||||
dry_run: bool = Option(False, "--dry-run", help="Nur anzeigen, was gelöscht würde"),
|
||||
@@ -109,11 +67,7 @@ def run(
|
||||
"""
|
||||
Löscht alte oder volatile Repositories aus den konfigurierten Kategorien.
|
||||
"""
|
||||
config = load_config()
|
||||
now = datetime.now()
|
||||
base = config.base_dir
|
||||
|
||||
console.print(f"[bold yellow]Base-Verzeichnis:[/] {base}")
|
||||
catalog = RepoCatalogState.from_config(load_config())
|
||||
|
||||
table = Table(title="Clean-Ergebnis")
|
||||
table.add_column("Kategorie")
|
||||
@@ -125,43 +79,27 @@ def run(
|
||||
deleted = 0
|
||||
skipped = 0
|
||||
|
||||
for cat in config.categories:
|
||||
if category and cat.name not in category:
|
||||
for cat in catalog.categories:
|
||||
if category and cat.config.name not in category:
|
||||
continue
|
||||
if cat.config.is_archive:
|
||||
continue
|
||||
|
||||
if cat.is_archive: continue
|
||||
|
||||
path = base / cat.subdir
|
||||
if not path.exists():
|
||||
continue
|
||||
|
||||
cutoff = now - timedelta(days=cat.days or 0)
|
||||
|
||||
for repo in sorted(path.iterdir()):
|
||||
if not repo.is_dir():
|
||||
continue
|
||||
|
||||
mtime = datetime.fromtimestamp(repo.stat().st_mtime)
|
||||
age_days = (now - mtime).days
|
||||
size_mb = get_dir_size(repo) / 1024 / 1024
|
||||
limit_days = cat.config.days or 0
|
||||
|
||||
for repo in cat.repos:
|
||||
should_delete = (
|
||||
(cat.volatile)
|
||||
or (cat.days is not None and mtime < cutoff)
|
||||
cat.config.volatile
|
||||
or (cat.config.days is not None and repo.is_expired(limit_days))
|
||||
)
|
||||
|
||||
# Git-Schutz prüfen
|
||||
is_git = (repo / ".git").exists()
|
||||
dirty = is_git and is_git_dirty(repo)
|
||||
unpushed = is_git and is_git_unpushed(repo)
|
||||
|
||||
if is_git and (dirty or unpushed):
|
||||
reason = []
|
||||
if dirty:
|
||||
reason.append("uncommitted")
|
||||
if unpushed:
|
||||
reason.append("unpushed")
|
||||
status = "/".join(reason)
|
||||
if repo.git and repo.git.is_protected:
|
||||
reasons = []
|
||||
if repo.git.dirty:
|
||||
reasons.append("uncommitted")
|
||||
if repo.git.unpushed:
|
||||
reasons.append("unpushed")
|
||||
status = "/".join(reasons)
|
||||
action = f"[blue]Geschützt ({status})[/]"
|
||||
skipped += 1
|
||||
else:
|
||||
@@ -170,7 +108,7 @@ def run(
|
||||
action = "[yellow]Würde löschen[/]"
|
||||
else:
|
||||
try:
|
||||
shutil.rmtree(repo)
|
||||
shutil.rmtree(repo.path)
|
||||
action = "[red]Gelöscht[/]"
|
||||
deleted += 1
|
||||
except Exception as e:
|
||||
@@ -179,7 +117,7 @@ def run(
|
||||
action = "[cyan]Keine Aktion[/]"
|
||||
skipped += 1
|
||||
|
||||
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", action)
|
||||
table.add_row(cat.config.name, repo.name, str(repo.age_days), f"{repo.size_mb:.1f}", action)
|
||||
|
||||
console.print(table)
|
||||
console.print(f"\n[green]✓ Clean abgeschlossen[/] – {deleted} gelöscht, {skipped} übersprungen")
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
from typer import Typer, Argument
|
||||
from rich.console import Console
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import typer
|
||||
|
||||
from repocat.config import load_config
|
||||
from repocat.models.catalog import RepoCatalogState
|
||||
|
||||
app = Typer()
|
||||
console = Console()
|
||||
@@ -12,20 +11,16 @@ console = Console()
|
||||
|
||||
def complete_source_repo(incomplete: str):
|
||||
config = load_config()
|
||||
base = config.base_dir
|
||||
catalog = RepoCatalogState.from_config(config)
|
||||
results = []
|
||||
|
||||
for cat in config.categories:
|
||||
if cat.is_archive:
|
||||
for cat in catalog.categories:
|
||||
if cat.config.is_archive:
|
||||
continue
|
||||
cat_path = base / cat.subdir
|
||||
if not cat_path.exists():
|
||||
continue
|
||||
for item in cat_path.iterdir():
|
||||
if item.is_dir():
|
||||
full_name = f"{cat.name}/{item.name}"
|
||||
if full_name.startswith(incomplete):
|
||||
results.append(full_name)
|
||||
for repo in cat.repos:
|
||||
full_name = f"{cat.config.name}/{repo.name}"
|
||||
if full_name.startswith(incomplete):
|
||||
results.append(full_name)
|
||||
return sorted(results)
|
||||
|
||||
|
||||
@@ -43,39 +38,35 @@ def move_command(
|
||||
Verschiebt ein Repository in eine andere Kategorie.
|
||||
"""
|
||||
config = load_config()
|
||||
base = config.base_dir
|
||||
catalog = RepoCatalogState.from_config(config)
|
||||
|
||||
# Quelle aufspalten in Kategorie und Repo
|
||||
try:
|
||||
source_cat_name, repo_name = source.split("/", 1)
|
||||
except ValueError:
|
||||
console.print("[red]Ungültiges Format für Quelle. Bitte <kategorie>/<reponame> angeben.[/]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
source_cat = next((c for c in config.categories if c.name == source_cat_name), None)
|
||||
source_cat = catalog.get_category(source_cat_name)
|
||||
if not source_cat:
|
||||
console.print(f"[red]Unbekannte Quellkategorie:[/] {source_cat_name}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
target_cat = next((c for c in config.categories if c.name == target), None)
|
||||
target_cat = catalog.get_category(target)
|
||||
if not target_cat:
|
||||
console.print(f"[red]Ungültige Zielkategorie:[/] {target}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
source_path = base / source_cat.subdir / repo_name
|
||||
destination_path = base / target_cat.subdir / repo_name
|
||||
|
||||
if not source_path.exists():
|
||||
console.print(f"[red]Quell-Repository existiert nicht:[/] {source_path}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
if destination_path.exists():
|
||||
console.print(f"[red]Zielordner existiert bereits:[/] {destination_path}")
|
||||
repo = source_cat.find_repo(repo_name)
|
||||
if not repo:
|
||||
console.print(f"[red]Repository '{repo_name}' nicht gefunden in Kategorie '{source_cat_name}'.[/]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
try:
|
||||
shutil.move(str(source_path), str(destination_path))
|
||||
console.print(f"[green]✓ Erfolgreich verschoben:[/] {source_cat.name}/{repo_name} → {target}")
|
||||
repo.move_to(target_cat.path)
|
||||
console.print(f"[green]✓ Erfolgreich verschoben:[/] {source_cat_name}/{repo_name} → {target}")
|
||||
except FileExistsError as e:
|
||||
console.print(f"[red]{e}[/]")
|
||||
raise typer.Exit(1)
|
||||
except Exception as e:
|
||||
console.print(f"[red]Fehler beim Verschieben:[/] {e}")
|
||||
raise typer.Exit(1)
|
||||
raise typer.Exit(1)
|
||||
@@ -1,19 +1,15 @@
|
||||
from typer import Typer, confirm, prompt
|
||||
from typer import Typer
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
import typer
|
||||
from rich.prompt import Prompt, Confirm
|
||||
from datetime import datetime
|
||||
import shutil
|
||||
import typer
|
||||
|
||||
from repocat.cli.move import move_command
|
||||
from repocat.config import load_config
|
||||
from repocat.models.config import RepoCategory
|
||||
from repocat.models.catalog import RepoCatalogState
|
||||
from repocat.cli.move import move_command
|
||||
from repocat.cli.archive import archive_repo
|
||||
|
||||
from repocat.cli.clean import get_dir_size, is_git_dirty, is_git_unpushed
|
||||
|
||||
app = Typer()
|
||||
console = Console()
|
||||
|
||||
@@ -23,48 +19,39 @@ def review():
|
||||
"""
|
||||
Führt interaktiv durch alle Repositories im Review-Ordner.
|
||||
"""
|
||||
config = load_config()
|
||||
base = config.base_dir
|
||||
catalog = RepoCatalogState.from_config(load_config())
|
||||
|
||||
archive_cat = next((c for c in config.categories if c.is_archive), None)
|
||||
archive_cat = next((c for c in catalog.categories if c.config.is_archive), None)
|
||||
if not archive_cat:
|
||||
console.print("[red]Keine Archiv-Kategorie in der Konfiguration gefunden.[/]")
|
||||
raise typer.Exit(1)
|
||||
archive_path = base / archive_cat.subdir
|
||||
archive_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
review_cat = next((c for c in config.categories if c.name == "review"), None)
|
||||
review_cat = catalog.get_category("review")
|
||||
if not review_cat:
|
||||
console.print("[red]Kein 'review'-Eintrag in der Konfiguration gefunden.[/]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
review_path = base / review_cat.subdir
|
||||
if not review_path.exists():
|
||||
console.print(f"[yellow]Hinweis:[/] Kein review-Ordner vorhanden unter {review_path}")
|
||||
return
|
||||
|
||||
repos = [p for p in review_path.iterdir() if p.is_dir()]
|
||||
if not repos:
|
||||
if not review_cat.repos:
|
||||
console.print("[green]✓ Keine Repositories im Review-Ordner.[/]")
|
||||
return
|
||||
|
||||
for repo in sorted(repos):
|
||||
for repo in sorted(review_cat.repos, key=lambda r: r.name):
|
||||
console.rule(f"[bold cyan]{repo.name}")
|
||||
|
||||
age_days = (datetime.now() - datetime.fromtimestamp(repo.stat().st_mtime)).days
|
||||
size_mb = get_dir_size(repo) / 1024 / 1024
|
||||
console.print(f"[bold]Alter:[/] {repo.age_days} Tage")
|
||||
console.print(f"[bold]Größe:[/] {repo.size_mb:.1f} MB")
|
||||
|
||||
status = []
|
||||
if is_git_dirty(repo):
|
||||
status.append("uncommitted")
|
||||
if is_git_unpushed(repo):
|
||||
status.append("unpushed")
|
||||
if not status:
|
||||
status.append("clean")
|
||||
if repo.git:
|
||||
status = []
|
||||
if repo.git.dirty:
|
||||
status.append("uncommitted")
|
||||
if repo.git.unpushed:
|
||||
status.append("unpushed")
|
||||
if not status:
|
||||
status.append("clean")
|
||||
else:
|
||||
status = ["kein Git"]
|
||||
|
||||
console.print(f"[bold]Alter:[/] {age_days} Tage")
|
||||
console.print(f"[bold]Größe:[/] {size_mb:.1f} MB")
|
||||
console.print(f"[bold]Git:[/] {', '.join(status)}")
|
||||
|
||||
while True:
|
||||
@@ -81,17 +68,17 @@ def review():
|
||||
raise typer.Exit()
|
||||
elif action in ("d", "delete"):
|
||||
if Confirm.ask(f"Bist du sicher, dass du [red]{repo.name}[/] löschen willst?"):
|
||||
shutil.rmtree(repo)
|
||||
shutil.rmtree(repo.path)
|
||||
console.print(f"[red]✓ Gelöscht:[/] {repo.name}")
|
||||
break
|
||||
elif action in ("m", "move"):
|
||||
target_categories = [c.name for c in config.categories if c.name != "review" and not c.is_archive]
|
||||
target_categories = [c.config.name for c in catalog.categories if c.config.name != "review" and not c.config.is_archive]
|
||||
target = Prompt.ask("Zielkategorie", choices=target_categories)
|
||||
move_command(source=f"review/{repo.name}", target=target)
|
||||
break
|
||||
elif action in ("a", "archive"):
|
||||
try:
|
||||
archive_file = archive_repo(repo, archive_path, dry_run=False)
|
||||
archive_file = archive_repo(repo.path, archive_cat.path, dry_run=False)
|
||||
console.print(f"[green]✓ Archiviert:[/] {archive_file.name}")
|
||||
break
|
||||
except Exception as e:
|
||||
@@ -99,4 +86,4 @@ def review():
|
||||
else:
|
||||
console.print("[yellow]Ungültige Eingabe. Gültig sind: m, d, s, q[/]")
|
||||
|
||||
console.print("[green]✓ Review abgeschlossen.[/]")
|
||||
console.print("[green]✓ Review abgeschlossen.[/]")
|
||||
@@ -1,71 +1,45 @@
|
||||
from typer import Typer
|
||||
from typer import Typer, Option
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from repocat.config import load_config
|
||||
from repocat.models.config import RepoCategory
|
||||
from repocat.models.catalog import RepoCatalogState
|
||||
|
||||
app = Typer()
|
||||
console = Console()
|
||||
|
||||
|
||||
def get_dir_size(path: Path) -> int:
|
||||
"""Returns size in bytes."""
|
||||
return sum(f.stat().st_size for f in path.rglob("*") if f.is_file())
|
||||
|
||||
|
||||
def summarize_category(category: RepoCategory, base_path: Path):
|
||||
cat_path = base_path / category.subdir
|
||||
if not cat_path.exists():
|
||||
return 0, 0.0, None, None
|
||||
|
||||
now = datetime.now()
|
||||
repo_infos = []
|
||||
|
||||
for repo in cat_path.iterdir():
|
||||
if not repo.is_dir():
|
||||
continue
|
||||
|
||||
size_bytes = get_dir_size(repo)
|
||||
mtime = datetime.fromtimestamp(repo.stat().st_mtime)
|
||||
age_days = (now - mtime).days
|
||||
|
||||
repo_infos.append((repo.name, size_bytes, age_days, mtime))
|
||||
|
||||
if not repo_infos:
|
||||
return 0, 0.0, None, None
|
||||
|
||||
total_size = sum(info[1] for info in repo_infos) / 1024 / 1024
|
||||
avg_age = sum(info[2] for info in repo_infos) // len(repo_infos)
|
||||
|
||||
oldest = max(repo_infos, key=lambda x: x[2]) # größtes Alter
|
||||
|
||||
return len(repo_infos), total_size, avg_age, (oldest[0], oldest[2])
|
||||
|
||||
|
||||
@app.command("status")
|
||||
def status():
|
||||
def status(verbose: bool = Option(False, "--verbose", "-v", help="Zeige Details zu einzelnen Repositories")):
|
||||
"""
|
||||
Zeigt eine Übersicht über alle Repository-Kategorien.
|
||||
"""
|
||||
config = load_config()
|
||||
base = config.base_dir
|
||||
catalog = RepoCatalogState.from_config(load_config())
|
||||
|
||||
table = Table(title="Repository-Status")
|
||||
table.add_column("Kategorie")
|
||||
table.add_column("Repos", justify="right")
|
||||
table.add_column("Gesamtgröße", justify="right")
|
||||
table.add_column("Ø Alter", justify="right")
|
||||
table.add_column("Ältestes Repo", justify="left")
|
||||
table.add_column("\u00d8 Alter", justify="right")
|
||||
table.add_column("\u00c4ltestes Repo", justify="left")
|
||||
|
||||
for cat in config.categories:
|
||||
if cat.is_archive:
|
||||
for category in catalog.categories:
|
||||
if category.config.is_archive:
|
||||
continue
|
||||
count, total_size, avg_age, oldest = summarize_category(cat, base)
|
||||
|
||||
repos = category.repos
|
||||
count = len(repos)
|
||||
total_size = category.total_size_mb()
|
||||
|
||||
if not repos:
|
||||
avg_age = None
|
||||
oldest = None
|
||||
else:
|
||||
ages = [repo.age_days for repo in repos]
|
||||
avg_age = sum(ages) // len(ages)
|
||||
oldest_repo = max(repos, key=lambda r: r.age_days)
|
||||
oldest = (oldest_repo.name, oldest_repo.age_days)
|
||||
|
||||
table.add_row(
|
||||
cat.name,
|
||||
category.config.name,
|
||||
str(count),
|
||||
f"{total_size:.1f} MB",
|
||||
f"{avg_age or '–'} d",
|
||||
@@ -73,3 +47,38 @@ def status():
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
|
||||
if verbose:
|
||||
for category in catalog.categories:
|
||||
if category.config.is_archive:
|
||||
continue
|
||||
|
||||
if not category.repos:
|
||||
continue
|
||||
|
||||
repo_table = Table(title=f"Details: {category.config.name}")
|
||||
repo_table.add_column("Repository")
|
||||
repo_table.add_column("Alter (Tage)", justify="right")
|
||||
repo_table.add_column("Größe (MB)", justify="right")
|
||||
repo_table.add_column("Git", justify="left")
|
||||
|
||||
for repo in sorted(category.repos, key=lambda r: r.age_days, reverse=True):
|
||||
git_status = ""
|
||||
if repo.git:
|
||||
if repo.git.dirty:
|
||||
git_status += "uncommitted "
|
||||
if repo.git.unpushed:
|
||||
git_status += "unpushed"
|
||||
if not git_status:
|
||||
git_status = "clean"
|
||||
else:
|
||||
git_status = "–"
|
||||
|
||||
repo_table.add_row(
|
||||
repo.name,
|
||||
str(repo.age_days),
|
||||
f"{repo.size_mb:.1f}",
|
||||
git_status.strip()
|
||||
)
|
||||
|
||||
console.print(repo_table)
|
||||
|
||||
118
src/repocat/models/catalog.py
Normal file
118
src/repocat/models/catalog.py
Normal file
@@ -0,0 +1,118 @@
|
||||
from __future__ import annotations
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Optional, List
|
||||
from pydantic import BaseModel, Field
|
||||
import shutil
|
||||
|
||||
from repocat.models.config import RepoCategory, RepoCatConfig
|
||||
from repocat.utils.fsutils import get_dir_size, is_git_dirty, is_git_unpushed
|
||||
|
||||
|
||||
class GitStatus(BaseModel):
|
||||
dirty: bool
|
||||
unpushed: bool
|
||||
branch: Optional[str] = None
|
||||
remote_url: Optional[str] = None
|
||||
|
||||
@property
|
||||
def is_protected(self) -> bool:
|
||||
return self.dirty or self.unpushed
|
||||
|
||||
|
||||
class RepoModel(BaseModel):
|
||||
name: str
|
||||
path: Path
|
||||
size_bytes: int
|
||||
last_modified: datetime
|
||||
git: Optional[GitStatus] = None
|
||||
|
||||
@property
|
||||
def size_mb(self) -> float:
|
||||
return self.size_bytes / 1024 / 1024
|
||||
|
||||
@property
|
||||
def age_days(self) -> int:
|
||||
return (datetime.now() - self.last_modified).days
|
||||
|
||||
@property
|
||||
def is_git_repo(self) -> bool:
|
||||
return self.git is not None
|
||||
|
||||
def is_expired(self, days: int) -> bool:
|
||||
return self.age_days >= days
|
||||
|
||||
def move_to(self, target_dir: Path) -> None:
|
||||
destination = target_dir / self.name
|
||||
if destination.exists():
|
||||
raise FileExistsError(f"Zielordner existiert bereits: {destination}")
|
||||
shutil.move(str(self.path), str(destination))
|
||||
self.path = destination
|
||||
|
||||
|
||||
class RepoCategoryState(BaseModel):
|
||||
config: RepoCategory
|
||||
path: Path
|
||||
repos: List[RepoModel] = Field(default_factory=list)
|
||||
|
||||
def total_size_mb(self) -> float:
|
||||
return sum(repo.size_mb for repo in self.repos)
|
||||
|
||||
def find_repo(self, name: str) -> Optional[RepoModel]:
|
||||
return next((r for r in self.repos if r.name == name), None)
|
||||
|
||||
|
||||
class RepoCatalogState(BaseModel):
|
||||
config: RepoCatConfig
|
||||
categories: List[RepoCategoryState] = Field(default_factory=list)
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, config: RepoCatConfig) -> RepoCatalogState:
|
||||
categories: List[RepoCategoryState] = []
|
||||
|
||||
for cat_cfg in config.categories:
|
||||
cat_path = config.base_dir / cat_cfg.subdir
|
||||
repos: List[RepoModel] = []
|
||||
|
||||
if cat_path.exists():
|
||||
for item in sorted(cat_path.iterdir()):
|
||||
if not item.is_dir():
|
||||
continue
|
||||
|
||||
last_mod = datetime.fromtimestamp(item.stat().st_mtime)
|
||||
size = get_dir_size(item)
|
||||
|
||||
git = None
|
||||
if (item / ".git").exists():
|
||||
git = GitStatus(
|
||||
dirty=is_git_dirty(item),
|
||||
unpushed=is_git_unpushed(item),
|
||||
)
|
||||
|
||||
repos.append(RepoModel(
|
||||
name=item.name,
|
||||
path=item,
|
||||
size_bytes=size,
|
||||
last_modified=last_mod,
|
||||
git=git,
|
||||
))
|
||||
|
||||
categories.append(RepoCategoryState(
|
||||
config=cat_cfg,
|
||||
path=cat_path,
|
||||
repos=repos,
|
||||
))
|
||||
|
||||
return cls(config=config, categories=categories)
|
||||
|
||||
def get_category(self, name: str) -> Optional[RepoCategoryState]:
|
||||
return next((c for c in self.categories if c.config.name == name), None)
|
||||
|
||||
def find_repo(self, name: str) -> Optional[RepoModel]:
|
||||
for cat in self.categories:
|
||||
if (repo := cat.find_repo(name)):
|
||||
return repo
|
||||
return None
|
||||
|
||||
def missing_directories(self) -> List[Path]:
|
||||
return [cat.path for cat in self.categories if not cat.path.exists()]
|
||||
43
src/repocat/utils/fsutils.py
Normal file
43
src/repocat/utils/fsutils.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
def get_dir_size(path: Path) -> int:
|
||||
"""Returns directory size in bytes."""
|
||||
return sum(f.stat().st_size for f in path.rglob("*") if f.is_file())
|
||||
|
||||
def is_git_dirty(path: Path) -> bool:
|
||||
"""Returns True if there are uncommitted changes."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "status", "--porcelain"],
|
||||
cwd=path,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
)
|
||||
return bool(result.stdout.strip())
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def is_git_unpushed(path: Path) -> bool:
|
||||
"""Returns True if there are commits that haven't been pushed to upstream."""
|
||||
try:
|
||||
# Check if upstream exists
|
||||
subprocess.run(
|
||||
["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
|
||||
cwd=path,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
check=True,
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
["git", "log", "@{u}..HEAD", "--oneline"],
|
||||
cwd=path,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
)
|
||||
return bool(result.stdout.strip())
|
||||
except Exception:
|
||||
return False
|
||||
Reference in New Issue
Block a user