feat(cli): add initial repository management commands

- Introduce CLI using Typer for repository management
- Add commands for archiving, cleaning, moving, reviewing, and status
- Implement configuration loading and default structure setup
- Include utilities for Git status checks and directory size calculation
- Lay groundwork for extensible repository operations

Signed-off-by: Max P. <Mail@MPassarello.de>
This commit is contained in:
2025-04-30 12:01:34 +02:00
parent d1e246c868
commit ef2bac4e88
12 changed files with 702 additions and 0 deletions

0
src/repocat/__init__.py Normal file
View File

7
src/repocat/__main__.py Normal file
View File

@@ -0,0 +1,7 @@
from repocat.cli import app
def main():
app()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,12 @@
from typer import Typer
from repocat.cli import archive, clean, init, move, review, status
app = Typer()
app.add_typer(init.app, name="init")
app.add_typer(clean.app, name="clean")
app.add_typer(move.app, name=None)
app.add_typer(status.app, name=None)
app.add_typer(review.app, name="review")
app.add_typer(archive.app, name="archive")

109
src/repocat/cli/archive.py Normal file
View File

@@ -0,0 +1,109 @@
import shutil
import tarfile
from datetime import datetime, timedelta
from pathlib import Path
from typer import Typer, Option
from rich.console import Console
from rich.table import Table
import subprocess
import zstandard
import uuid
from repocat.config import load_config
from repocat.cli.clean import is_git_dirty, is_git_unpushed, get_dir_size
app = Typer()
console = Console()
def get_archive_target_dir():
config = load_config()
for cat in config.categories:
if cat.is_archive:
return config.base_dir / cat.subdir
raise ValueError("Keine Archiv-Kategorie (is_archive: true) in der Konfiguration gefunden.")
def archive_repo(source: Path, target_dir: Path, dry_run: bool):
today = datetime.today().strftime("%Y.%m.%d")
unique = uuid.uuid4().hex[:8]
archive_name = f"{today} - {source.name} - {unique}.tar.zst"
archive_path = target_dir / archive_name
if archive_path.exists():
raise FileExistsError(f"Archiv existiert bereits: {archive_path}")
if dry_run:
return archive_path # Nur anzeigen
with open(archive_path, "wb") as f:
cctx = zstandard.ZstdCompressor(level=20)
with cctx.stream_writer(f) as compressor:
with tarfile.open(fileobj=compressor, mode="w|") as tar:
tar.add(source, arcname=source.name)
shutil.rmtree(source)
return archive_path
@app.command("run")
def run(
older_than: int = Option(30, "--older-than", help="Nur Repositories älter als X Tage archivieren."),
dry_run: bool = Option(False, "--dry-run", help="Keine Änderungen vornehmen."),
category: list[str] = Option(None, "--category", "-c", help="Nur bestimmte Kategorien prüfen."),
):
config = load_config()
now = datetime.now()
archive_path = get_archive_target_dir()
archive_path.mkdir(parents=True, exist_ok=True)
base = config.base_dir
table = Table(title="Archivierung")
table.add_column("Kategorie")
table.add_column("Repository")
table.add_column("Alter", justify="right")
table.add_column("Größe (MB)", justify="right")
table.add_column("Aktion")
archived = 0
skipped = 0
for cat in config.categories:
if cat.is_archive or cat.name == "review" or cat.volatile:
continue
if category and cat.name not in category:
continue
source_dir = base / cat.subdir
if not source_dir.exists():
continue
for repo in sorted(source_dir.iterdir()):
if not repo.is_dir():
continue
mtime = datetime.fromtimestamp(repo.stat().st_mtime)
age_days = (now - mtime).days
size_mb = get_dir_size(repo) / 1024 / 1024
limit_days = cat.days if cat.days is not None else older_than
if age_days < limit_days:
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", "[blue]Zu jung[/]")
skipped += 1
continue
if (repo / ".git").exists():
if is_git_dirty(repo) or is_git_unpushed(repo):
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", "[blue]Geschützt (git)[/]")
skipped += 1
continue
try:
archive_file = archive_repo(repo, archive_path, dry_run)
action = "[yellow]Würde archivieren[/]" if dry_run else f"[green]Archiviert →[/] {archive_file.name}"
archived += 1
except Exception as e:
action = f"[red]Fehler: {e}[/]"
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", action)
console.print(table)
console.print(f"\n[green]✓ Archivierung abgeschlossen[/] – {archived} archiviert, {skipped} übersprungen")

185
src/repocat/cli/clean.py Normal file
View File

@@ -0,0 +1,185 @@
import subprocess
from typer import Typer, Option, Argument
from pathlib import Path
import shutil
from datetime import datetime, timedelta
from rich.console import Console
from rich.table import Table
import os
from repocat.config import load_config
app = Typer()
console = Console()
def is_git_dirty(path: Path) -> bool:
try:
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=path,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
return bool(result.stdout.strip())
except Exception:
return False
def is_git_unpushed(path: Path) -> bool:
try:
# Check if upstream exists
subprocess.run(["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
cwd=path, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
result = subprocess.run(
["git", "log", "@{u}..HEAD", "--oneline"],
cwd=path,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
return bool(result.stdout.strip())
except Exception:
return False
def get_dir_size(path: Path) -> int:
"""Returns size in bytes."""
return sum(f.stat().st_size for f in path.rglob("*") if f.is_file())
def complete_categories(incomplete: str):
config = load_config()
return [c.name for c in config.categories if c.name.startswith(incomplete)]
@app.command("volatile")
def clean_volatile(
dry_run: bool = Option(False, "--dry-run", help="Nur anzeigen, was gelöscht würde"),
category: list[str] = Option(None, "--category", "-c", help="Nur bestimmte Kategorien reinigen", autocompletion=complete_categories),
):
"""
Löscht **bedingungslos** alle Repositories aus `volatile`-Kategorien (z. B. `scratches`).
"""
config = load_config()
base = config.base_dir
table = Table(title="Clean – Volatile (Zerstörung)")
table.add_column("Kategorie")
table.add_column("Repository")
table.add_column("Größe (MB)", justify="right")
table.add_column("Aktion")
deleted = 0
skipped = 0
for cat in config.categories:
if not cat.volatile:
continue
if category and cat.name not in category:
continue
path = base / cat.subdir
if not path.exists():
continue
for repo in sorted(path.iterdir()):
if not repo.is_dir():
continue
size_mb = get_dir_size(repo) / 1024 / 1024
if dry_run:
action = "[yellow]Würde löschen[/]"
else:
try:
shutil.rmtree(repo)
action = "[red]Gelöscht[/]"
deleted += 1
except Exception as e:
action = f"[red]Fehler: {e}[/]"
table.add_row(cat.name, repo.name, f"{size_mb:.1f}", action)
console.print(table)
console.print(f"\n[green]✓ Volatile-Clean abgeschlossen[/] – {deleted} gelöscht, {skipped} übersprungen")
@app.command()
def run(
dry_run: bool = Option(False, "--dry-run", help="Nur anzeigen, was gelöscht würde"),
category: list[str] = Option(None, "--category", "-c", help="Nur bestimmte Kategorien reinigen", autocompletion=complete_categories),
):
"""
Löscht alte oder volatile Repositories aus den konfigurierten Kategorien.
"""
config = load_config()
now = datetime.now()
base = config.base_dir
console.print(f"[bold yellow]Base-Verzeichnis:[/] {base}")
table = Table(title="Clean-Ergebnis")
table.add_column("Kategorie")
table.add_column("Repository")
table.add_column("Alter (Tage)", justify="right")
table.add_column("Größe (MB)", justify="right")
table.add_column("Aktion")
deleted = 0
skipped = 0
for cat in config.categories:
if category and cat.name not in category:
continue
if cat.is_archive: continue
path = base / cat.subdir
if not path.exists():
continue
cutoff = now - timedelta(days=cat.days or 0)
for repo in sorted(path.iterdir()):
if not repo.is_dir():
continue
mtime = datetime.fromtimestamp(repo.stat().st_mtime)
age_days = (now - mtime).days
size_mb = get_dir_size(repo) / 1024 / 1024
should_delete = (
(cat.volatile)
or (cat.days is not None and mtime < cutoff)
)
# Git-Schutz prüfen
is_git = (repo / ".git").exists()
dirty = is_git and is_git_dirty(repo)
unpushed = is_git and is_git_unpushed(repo)
if is_git and (dirty or unpushed):
reason = []
if dirty:
reason.append("uncommitted")
if unpushed:
reason.append("unpushed")
status = "/".join(reason)
action = f"[blue]Geschützt ({status})[/]"
skipped += 1
else:
if should_delete:
if dry_run:
action = "[yellow]Würde löschen[/]"
else:
try:
shutil.rmtree(repo)
action = "[red]Gelöscht[/]"
deleted += 1
except Exception as e:
action = f"[red]Fehler: {e}[/]"
else:
action = "[cyan]Keine Aktion[/]"
skipped += 1
table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", action)
console.print(table)
console.print(f"\n[green]✓ Clean abgeschlossen[/] – {deleted} gelöscht, {skipped} übersprungen")

80
src/repocat/cli/init.py Normal file
View File

@@ -0,0 +1,80 @@
from typer import Typer
from rich.console import Console
from rich.table import Table
from pathlib import Path
from rich.prompt import Confirm
from repocat.models.config import RepoCatConfig, RepoCategory
from repocat.config import DEFAULT_CONFIG_PATH
app = Typer()
console = Console()
# Standard-Konfigurationswert
DEFAULT_CONFIG = RepoCatConfig(
base_dir=Path("~/Repositories").expanduser(),
categories=[
RepoCategory(name="stable", subdir="stable", description="Langfristige Projekte"),
RepoCategory(name="experiments", subdir="experiments", days=14, description="Kurzzeitige Tests"),
RepoCategory(name="mirrors", subdir="mirrors", days=7),
RepoCategory(name="review", subdir="review", days=30),
RepoCategory(name="scratches", subdir="scratches", volatile=True),
RepoCategory(name="archive", subdir="archive", is_archive=True, description="Archivierte Repos"),
],
)
@app.command("sync")
def sync_directories(confirm_create: bool = True):
"""
Zeigt die geplante Ordnerstruktur an und legt sie optional an.
"""
config = DEFAULT_CONFIG # oder: load_config() bei existierender Datei
base = config.base_dir
table = Table(title="Geplante Ordnerstruktur")
table.add_column("Kategorie")
table.add_column("Pfad")
table.add_column("Status")
planned = []
for category in config.categories:
path = base / category.subdir
if path.exists():
status = "[cyan]Vorhanden[/]"
else:
status = "[yellow]Fehlt[/]"
planned.append(path)
table.add_row(category.name, str(path), status)
console.print(table)
if not planned:
console.print("[green]Alle benötigten Ordner sind bereits vorhanden.[/]")
return
if Confirm.ask(f"[bold yellow]Möchtest du die fehlenden Ordner ([cyan]{len(planned)}[/]) erstellen?[/]"):
for path in planned:
path.mkdir(parents=True, exist_ok=True)
console.print(f"[green]✓ Angelegt:[/] {path}")
else:
console.print("[yellow]Keine Änderungen vorgenommen.[/]")
@app.command("create")
def create_config():
"""
Erstellt eine neue Konfiguration unter ~/.repocat.yml
"""
if DEFAULT_CONFIG_PATH.exists():
console.print(f"[yellow]Konfiguration existiert bereits:[/] {DEFAULT_CONFIG_PATH}")
return
with open(DEFAULT_CONFIG_PATH, "w") as f:
yaml_data = DEFAULT_CONFIG.model_dump(mode="json")
import yaml
yaml.dump(yaml_data, f)
console.print(f"[green]Standardkonfiguration erstellt unter:[/] {DEFAULT_CONFIG_PATH}")
sync_directories()

81
src/repocat/cli/move.py Normal file
View File

@@ -0,0 +1,81 @@
from typer import Typer, Argument
from rich.console import Console
from pathlib import Path
import shutil
import typer
from repocat.config import load_config
app = Typer()
console = Console()
def complete_source_repo(incomplete: str):
config = load_config()
base = config.base_dir
results = []
for cat in config.categories:
if cat.is_archive:
continue
cat_path = base / cat.subdir
if not cat_path.exists():
continue
for item in cat_path.iterdir():
if item.is_dir():
full_name = f"{cat.name}/{item.name}"
if full_name.startswith(incomplete):
results.append(full_name)
return sorted(results)
def complete_category(incomplete: str):
config = load_config()
return [c.name for c in config.categories if c.name.startswith(incomplete) and c.name != "archive"]
@app.command("move")
def move_command(
source: str = Argument(..., help="Quell-Repo im Format <category>/<reponame>", autocompletion=complete_source_repo),
target: str = Argument(..., help="Zielkategorie", autocompletion=complete_category),
):
"""
Verschiebt ein Repository in eine andere Kategorie.
"""
config = load_config()
base = config.base_dir
# Quelle aufspalten in Kategorie und Repo
try:
source_cat_name, repo_name = source.split("/", 1)
except ValueError:
console.print("[red]Ungültiges Format für Quelle. Bitte <kategorie>/<reponame> angeben.[/]")
raise typer.Exit(1)
source_cat = next((c for c in config.categories if c.name == source_cat_name), None)
if not source_cat:
console.print(f"[red]Unbekannte Quellkategorie:[/] {source_cat_name}")
raise typer.Exit(1)
target_cat = next((c for c in config.categories if c.name == target), None)
if not target_cat:
console.print(f"[red]Ungültige Zielkategorie:[/] {target}")
raise typer.Exit(1)
source_path = base / source_cat.subdir / repo_name
destination_path = base / target_cat.subdir / repo_name
if not source_path.exists():
console.print(f"[red]Quell-Repository existiert nicht:[/] {source_path}")
raise typer.Exit(1)
if destination_path.exists():
console.print(f"[red]Zielordner existiert bereits:[/] {destination_path}")
raise typer.Exit(1)
try:
shutil.move(str(source_path), str(destination_path))
console.print(f"[green]✓ Erfolgreich verschoben:[/] {source_cat.name}/{repo_name}{target}")
except Exception as e:
console.print(f"[red]Fehler beim Verschieben:[/] {e}")
raise typer.Exit(1)

102
src/repocat/cli/review.py Normal file
View File

@@ -0,0 +1,102 @@
from typer import Typer, confirm, prompt
from rich.console import Console
from rich.table import Table
from pathlib import Path
import shutil
from datetime import datetime
import typer
from rich.prompt import Prompt, Confirm
from repocat.cli.move import move_command
from repocat.config import load_config
from repocat.models.config import RepoCategory
from repocat.cli.archive import archive_repo
from repocat.cli.clean import get_dir_size, is_git_dirty, is_git_unpushed
app = Typer()
console = Console()
@app.command("run")
def review():
"""
Führt interaktiv durch alle Repositories im Review-Ordner.
"""
config = load_config()
base = config.base_dir
archive_cat = next((c for c in config.categories if c.is_archive), None)
if not archive_cat:
console.print("[red]Keine Archiv-Kategorie in der Konfiguration gefunden.[/]")
raise typer.Exit(1)
archive_path = base / archive_cat.subdir
archive_path.mkdir(parents=True, exist_ok=True)
review_cat = next((c for c in config.categories if c.name == "review"), None)
if not review_cat:
console.print("[red]Kein 'review'-Eintrag in der Konfiguration gefunden.[/]")
raise typer.Exit(1)
review_path = base / review_cat.subdir
if not review_path.exists():
console.print(f"[yellow]Hinweis:[/] Kein review-Ordner vorhanden unter {review_path}")
return
repos = [p for p in review_path.iterdir() if p.is_dir()]
if not repos:
console.print("[green]✓ Keine Repositories im Review-Ordner.[/]")
return
for repo in sorted(repos):
console.rule(f"[bold cyan]{repo.name}")
age_days = (datetime.now() - datetime.fromtimestamp(repo.stat().st_mtime)).days
size_mb = get_dir_size(repo) / 1024 / 1024
status = []
if is_git_dirty(repo):
status.append("uncommitted")
if is_git_unpushed(repo):
status.append("unpushed")
if not status:
status.append("clean")
console.print(f"[bold]Alter:[/] {age_days} Tage")
console.print(f"[bold]Größe:[/] {size_mb:.1f} MB")
console.print(f"[bold]Git:[/] {', '.join(status)}")
while True:
action = Prompt.ask(
"Aktion \\[m]ove / \\[a]rchive / \\[d]elete / \\[s]kip / \\[q]uit",
choices=["m", "a", "d", "s", "q"],
show_choices=False
)
if action in ("s", "skip", ""):
break
elif action in ("q", "quit"):
console.print("[bold yellow]Abgebrochen.[/]")
raise typer.Exit()
elif action in ("d", "delete"):
if Confirm.ask(f"Bist du sicher, dass du [red]{repo.name}[/] löschen willst?"):
shutil.rmtree(repo)
console.print(f"[red]✓ Gelöscht:[/] {repo.name}")
break
elif action in ("m", "move"):
target_categories = [c.name for c in config.categories if c.name != "review" and not c.is_archive]
target = Prompt.ask("Zielkategorie", choices=target_categories)
move_command(source=f"review/{repo.name}", target=target)
break
elif action in ("a", "archive"):
try:
archive_file = archive_repo(repo, archive_path, dry_run=False)
console.print(f"[green]✓ Archiviert:[/] {archive_file.name}")
break
except Exception as e:
console.print(f"[red]Fehler beim Archivieren:[/] {e}")
else:
console.print("[yellow]Ungültige Eingabe. Gültig sind: m, d, s, q[/]")
console.print("[green]✓ Review abgeschlossen.[/]")

75
src/repocat/cli/status.py Normal file
View File

@@ -0,0 +1,75 @@
from typer import Typer
from rich.console import Console
from rich.table import Table
from pathlib import Path
from datetime import datetime
from repocat.config import load_config
from repocat.models.config import RepoCategory
app = Typer()
console = Console()
def get_dir_size(path: Path) -> int:
"""Returns size in bytes."""
return sum(f.stat().st_size for f in path.rglob("*") if f.is_file())
def summarize_category(category: RepoCategory, base_path: Path):
cat_path = base_path / category.subdir
if not cat_path.exists():
return 0, 0.0, None, None
now = datetime.now()
repo_infos = []
for repo in cat_path.iterdir():
if not repo.is_dir():
continue
size_bytes = get_dir_size(repo)
mtime = datetime.fromtimestamp(repo.stat().st_mtime)
age_days = (now - mtime).days
repo_infos.append((repo.name, size_bytes, age_days, mtime))
if not repo_infos:
return 0, 0.0, None, None
total_size = sum(info[1] for info in repo_infos) / 1024 / 1024
avg_age = sum(info[2] for info in repo_infos) // len(repo_infos)
oldest = max(repo_infos, key=lambda x: x[2]) # größtes Alter
return len(repo_infos), total_size, avg_age, (oldest[0], oldest[2])
@app.command("status")
def status():
"""
Zeigt eine Übersicht über alle Repository-Kategorien.
"""
config = load_config()
base = config.base_dir
table = Table(title="Repository-Status")
table.add_column("Kategorie")
table.add_column("Repos", justify="right")
table.add_column("Gesamtgröße", justify="right")
table.add_column("Ø Alter", justify="right")
table.add_column("Ältestes Repo", justify="left")
for cat in config.categories:
if cat.is_archive:
continue
count, total_size, avg_age, oldest = summarize_category(cat, base)
table.add_row(
cat.name,
str(count),
f"{total_size:.1f} MB",
f"{avg_age or ''} d",
f"{oldest[0]} ({oldest[1]} d)" if oldest else ""
)
console.print(table)

31
src/repocat/config.py Normal file
View File

@@ -0,0 +1,31 @@
import yaml
from pathlib import Path
from repocat.models.config import RepoCatConfig
from rich.console import Console
console = Console()
DEFAULT_CONFIG_PATH = Path.home() / ".repocat.yml"
FALLBACK_PATH = Path("config.yml") # Optional, falls keine globale Config vorhanden
def load_config(file_path: Path | None = None) -> RepoCatConfig:
config_path = file_path or DEFAULT_CONFIG_PATH
if not config_path.exists():
console.print(f"[yellow]Warnung:[/] Konfigurationsdatei nicht gefunden: {config_path}")
if FALLBACK_PATH.exists():
console.print(f"[blue]Versuche stattdessen:[/] {FALLBACK_PATH}")
config_path = FALLBACK_PATH
else:
raise FileNotFoundError(f"Keine Konfigurationsdatei gefunden in {config_path} oder {FALLBACK_PATH}")
try:
with open(config_path, "r") as f:
raw = yaml.safe_load(f)
config = RepoCatConfig(**raw)
config.base_dir = Path(config.base_dir).expanduser()
return config
except Exception as e:
console.print(f"[red]Fehler beim Laden der Konfiguration:[/] {e}")
raise

View File

@@ -0,0 +1,20 @@
from pydantic import BaseModel, Field
from pathlib import Path
from typing import List, Optional
class RepoCategory(BaseModel):
name: str
subdir: str
days: Optional[int] = Field(default=None, ge=1)
volatile: bool = False
description: Optional[str] = None
is_archive: bool = False
class RepoCatDefaults(BaseModel):
dry_run: bool = True
auto_create_directories: bool = True
class RepoCatConfig(BaseModel):
base_dir: Path = Field(default=Path("~/Repositories").expanduser())
categories: List[RepoCategory]
defaults: Optional[RepoCatDefaults] = RepoCatDefaults()

0
tests/__init__.py Normal file
View File