diff --git a/src/repocat/__init__.py b/src/repocat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/repocat/__main__.py b/src/repocat/__main__.py new file mode 100644 index 0000000..619d68a --- /dev/null +++ b/src/repocat/__main__.py @@ -0,0 +1,7 @@ +from repocat.cli import app + +def main(): + app() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/repocat/cli/__init__.py b/src/repocat/cli/__init__.py new file mode 100644 index 0000000..5fa670c --- /dev/null +++ b/src/repocat/cli/__init__.py @@ -0,0 +1,12 @@ +from typer import Typer + +from repocat.cli import archive, clean, init, move, review, status + +app = Typer() + +app.add_typer(init.app, name="init") +app.add_typer(clean.app, name="clean") +app.add_typer(move.app, name=None) +app.add_typer(status.app, name=None) +app.add_typer(review.app, name="review") +app.add_typer(archive.app, name="archive") \ No newline at end of file diff --git a/src/repocat/cli/archive.py b/src/repocat/cli/archive.py new file mode 100644 index 0000000..10aff9d --- /dev/null +++ b/src/repocat/cli/archive.py @@ -0,0 +1,109 @@ +import shutil +import tarfile +from datetime import datetime, timedelta +from pathlib import Path +from typer import Typer, Option +from rich.console import Console +from rich.table import Table +import subprocess +import zstandard +import uuid + +from repocat.config import load_config +from repocat.cli.clean import is_git_dirty, is_git_unpushed, get_dir_size + +app = Typer() +console = Console() + + +def get_archive_target_dir(): + config = load_config() + for cat in config.categories: + if cat.is_archive: + return config.base_dir / cat.subdir + raise ValueError("Keine Archiv-Kategorie (is_archive: true) in der Konfiguration gefunden.") + +def archive_repo(source: Path, target_dir: Path, dry_run: bool): + today = datetime.today().strftime("%Y.%m.%d") + unique = uuid.uuid4().hex[:8] + archive_name = f"{today} - {source.name} - {unique}.tar.zst" + archive_path = target_dir / archive_name + + if archive_path.exists(): + raise FileExistsError(f"Archiv existiert bereits: {archive_path}") + + if dry_run: + return archive_path # Nur anzeigen + + with open(archive_path, "wb") as f: + cctx = zstandard.ZstdCompressor(level=20) + with cctx.stream_writer(f) as compressor: + with tarfile.open(fileobj=compressor, mode="w|") as tar: + tar.add(source, arcname=source.name) + + shutil.rmtree(source) + return archive_path + +@app.command("run") +def run( + older_than: int = Option(30, "--older-than", help="Nur Repositories älter als X Tage archivieren."), + dry_run: bool = Option(False, "--dry-run", help="Keine Änderungen vornehmen."), + category: list[str] = Option(None, "--category", "-c", help="Nur bestimmte Kategorien prüfen."), +): + config = load_config() + now = datetime.now() + archive_path = get_archive_target_dir() + archive_path.mkdir(parents=True, exist_ok=True) + + base = config.base_dir + table = Table(title="Archivierung") + table.add_column("Kategorie") + table.add_column("Repository") + table.add_column("Alter", justify="right") + table.add_column("Größe (MB)", justify="right") + table.add_column("Aktion") + + archived = 0 + skipped = 0 + + for cat in config.categories: + if cat.is_archive or cat.name == "review" or cat.volatile: + continue + if category and cat.name not in category: + continue + + source_dir = base / cat.subdir + if not source_dir.exists(): + continue + + for repo in sorted(source_dir.iterdir()): + if not repo.is_dir(): + continue + + mtime = datetime.fromtimestamp(repo.stat().st_mtime) + age_days = (now - mtime).days + size_mb = get_dir_size(repo) / 1024 / 1024 + + limit_days = cat.days if cat.days is not None else older_than + if age_days < limit_days: + table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", "[blue]Zu jung[/]") + skipped += 1 + continue + + if (repo / ".git").exists(): + if is_git_dirty(repo) or is_git_unpushed(repo): + table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", "[blue]Geschützt (git)[/]") + skipped += 1 + continue + + try: + archive_file = archive_repo(repo, archive_path, dry_run) + action = "[yellow]Würde archivieren[/]" if dry_run else f"[green]Archiviert →[/] {archive_file.name}" + archived += 1 + except Exception as e: + action = f"[red]Fehler: {e}[/]" + + table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", action) + + console.print(table) + console.print(f"\n[green]✓ Archivierung abgeschlossen[/] – {archived} archiviert, {skipped} übersprungen") diff --git a/src/repocat/cli/clean.py b/src/repocat/cli/clean.py new file mode 100644 index 0000000..084e97e --- /dev/null +++ b/src/repocat/cli/clean.py @@ -0,0 +1,185 @@ +import subprocess +from typer import Typer, Option, Argument +from pathlib import Path +import shutil +from datetime import datetime, timedelta +from rich.console import Console +from rich.table import Table +import os + +from repocat.config import load_config + +app = Typer() +console = Console() + +def is_git_dirty(path: Path) -> bool: + try: + result = subprocess.run( + ["git", "status", "--porcelain"], + cwd=path, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + text=True, + ) + return bool(result.stdout.strip()) + except Exception: + return False + +def is_git_unpushed(path: Path) -> bool: + try: + # Check if upstream exists + subprocess.run(["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], + cwd=path, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) + + result = subprocess.run( + ["git", "log", "@{u}..HEAD", "--oneline"], + cwd=path, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + text=True, + ) + return bool(result.stdout.strip()) + except Exception: + return False + +def get_dir_size(path: Path) -> int: + """Returns size in bytes.""" + return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) + +def complete_categories(incomplete: str): + config = load_config() + return [c.name for c in config.categories if c.name.startswith(incomplete)] + +@app.command("volatile") +def clean_volatile( + dry_run: bool = Option(False, "--dry-run", help="Nur anzeigen, was gelöscht würde"), + category: list[str] = Option(None, "--category", "-c", help="Nur bestimmte Kategorien reinigen", autocompletion=complete_categories), +): + """ + Löscht **bedingungslos** alle Repositories aus `volatile`-Kategorien (z. B. `scratches`). + """ + config = load_config() + base = config.base_dir + + table = Table(title="Clean – Volatile (Zerstörung)") + table.add_column("Kategorie") + table.add_column("Repository") + table.add_column("Größe (MB)", justify="right") + table.add_column("Aktion") + + deleted = 0 + skipped = 0 + + for cat in config.categories: + if not cat.volatile: + continue + if category and cat.name not in category: + continue + + path = base / cat.subdir + if not path.exists(): + continue + + for repo in sorted(path.iterdir()): + if not repo.is_dir(): + continue + + size_mb = get_dir_size(repo) / 1024 / 1024 + + if dry_run: + action = "[yellow]Würde löschen[/]" + else: + try: + shutil.rmtree(repo) + action = "[red]Gelöscht[/]" + deleted += 1 + except Exception as e: + action = f"[red]Fehler: {e}[/]" + + table.add_row(cat.name, repo.name, f"{size_mb:.1f}", action) + + console.print(table) + console.print(f"\n[green]✓ Volatile-Clean abgeschlossen[/] – {deleted} gelöscht, {skipped} übersprungen") + +@app.command() +def run( + dry_run: bool = Option(False, "--dry-run", help="Nur anzeigen, was gelöscht würde"), + category: list[str] = Option(None, "--category", "-c", help="Nur bestimmte Kategorien reinigen", autocompletion=complete_categories), +): + """ + Löscht alte oder volatile Repositories aus den konfigurierten Kategorien. + """ + config = load_config() + now = datetime.now() + base = config.base_dir + + console.print(f"[bold yellow]Base-Verzeichnis:[/] {base}") + + table = Table(title="Clean-Ergebnis") + table.add_column("Kategorie") + table.add_column("Repository") + table.add_column("Alter (Tage)", justify="right") + table.add_column("Größe (MB)", justify="right") + table.add_column("Aktion") + + deleted = 0 + skipped = 0 + + for cat in config.categories: + if category and cat.name not in category: + continue + + if cat.is_archive: continue + + path = base / cat.subdir + if not path.exists(): + continue + + cutoff = now - timedelta(days=cat.days or 0) + + for repo in sorted(path.iterdir()): + if not repo.is_dir(): + continue + + mtime = datetime.fromtimestamp(repo.stat().st_mtime) + age_days = (now - mtime).days + size_mb = get_dir_size(repo) / 1024 / 1024 + + should_delete = ( + (cat.volatile) + or (cat.days is not None and mtime < cutoff) + ) + + # Git-Schutz prüfen + is_git = (repo / ".git").exists() + dirty = is_git and is_git_dirty(repo) + unpushed = is_git and is_git_unpushed(repo) + + if is_git and (dirty or unpushed): + reason = [] + if dirty: + reason.append("uncommitted") + if unpushed: + reason.append("unpushed") + status = "/".join(reason) + action = f"[blue]Geschützt ({status})[/]" + skipped += 1 + else: + if should_delete: + if dry_run: + action = "[yellow]Würde löschen[/]" + else: + try: + shutil.rmtree(repo) + action = "[red]Gelöscht[/]" + deleted += 1 + except Exception as e: + action = f"[red]Fehler: {e}[/]" + else: + action = "[cyan]Keine Aktion[/]" + skipped += 1 + + table.add_row(cat.name, repo.name, str(age_days), f"{size_mb:.1f}", action) + + console.print(table) + console.print(f"\n[green]✓ Clean abgeschlossen[/] – {deleted} gelöscht, {skipped} übersprungen") diff --git a/src/repocat/cli/init.py b/src/repocat/cli/init.py new file mode 100644 index 0000000..54e6761 --- /dev/null +++ b/src/repocat/cli/init.py @@ -0,0 +1,80 @@ +from typer import Typer +from rich.console import Console +from rich.table import Table +from pathlib import Path +from rich.prompt import Confirm + +from repocat.models.config import RepoCatConfig, RepoCategory +from repocat.config import DEFAULT_CONFIG_PATH + +app = Typer() +console = Console() + +# Standard-Konfigurationswert +DEFAULT_CONFIG = RepoCatConfig( + base_dir=Path("~/Repositories").expanduser(), + categories=[ + RepoCategory(name="stable", subdir="stable", description="Langfristige Projekte"), + RepoCategory(name="experiments", subdir="experiments", days=14, description="Kurzzeitige Tests"), + RepoCategory(name="mirrors", subdir="mirrors", days=7), + RepoCategory(name="review", subdir="review", days=30), + RepoCategory(name="scratches", subdir="scratches", volatile=True), + RepoCategory(name="archive", subdir="archive", is_archive=True, description="Archivierte Repos"), + ], +) + +@app.command("sync") +def sync_directories(confirm_create: bool = True): + """ + Zeigt die geplante Ordnerstruktur an und legt sie optional an. + """ + config = DEFAULT_CONFIG # oder: load_config() bei existierender Datei + base = config.base_dir + + table = Table(title="Geplante Ordnerstruktur") + table.add_column("Kategorie") + table.add_column("Pfad") + table.add_column("Status") + + planned = [] + + for category in config.categories: + path = base / category.subdir + if path.exists(): + status = "[cyan]Vorhanden[/]" + else: + status = "[yellow]Fehlt[/]" + planned.append(path) + table.add_row(category.name, str(path), status) + + console.print(table) + + if not planned: + console.print("[green]Alle benötigten Ordner sind bereits vorhanden.[/]") + return + + if Confirm.ask(f"[bold yellow]Möchtest du die fehlenden Ordner ([cyan]{len(planned)}[/]) erstellen?[/]"): + for path in planned: + path.mkdir(parents=True, exist_ok=True) + console.print(f"[green]✓ Angelegt:[/] {path}") + else: + console.print("[yellow]Keine Änderungen vorgenommen.[/]") + + +@app.command("create") +def create_config(): + """ + Erstellt eine neue Konfiguration unter ~/.repocat.yml + """ + if DEFAULT_CONFIG_PATH.exists(): + console.print(f"[yellow]Konfiguration existiert bereits:[/] {DEFAULT_CONFIG_PATH}") + return + + with open(DEFAULT_CONFIG_PATH, "w") as f: + yaml_data = DEFAULT_CONFIG.model_dump(mode="json") + import yaml + yaml.dump(yaml_data, f) + + console.print(f"[green]Standardkonfiguration erstellt unter:[/] {DEFAULT_CONFIG_PATH}") + + sync_directories() diff --git a/src/repocat/cli/move.py b/src/repocat/cli/move.py new file mode 100644 index 0000000..652cdb9 --- /dev/null +++ b/src/repocat/cli/move.py @@ -0,0 +1,81 @@ +from typer import Typer, Argument +from rich.console import Console +from pathlib import Path +import shutil +import typer + +from repocat.config import load_config + +app = Typer() +console = Console() + + +def complete_source_repo(incomplete: str): + config = load_config() + base = config.base_dir + results = [] + + for cat in config.categories: + if cat.is_archive: + continue + cat_path = base / cat.subdir + if not cat_path.exists(): + continue + for item in cat_path.iterdir(): + if item.is_dir(): + full_name = f"{cat.name}/{item.name}" + if full_name.startswith(incomplete): + results.append(full_name) + return sorted(results) + + +def complete_category(incomplete: str): + config = load_config() + return [c.name for c in config.categories if c.name.startswith(incomplete) and c.name != "archive"] + + +@app.command("move") +def move_command( + source: str = Argument(..., help="Quell-Repo im Format /", autocompletion=complete_source_repo), + target: str = Argument(..., help="Zielkategorie", autocompletion=complete_category), +): + """ + Verschiebt ein Repository in eine andere Kategorie. + """ + config = load_config() + base = config.base_dir + + # Quelle aufspalten in Kategorie und Repo + try: + source_cat_name, repo_name = source.split("/", 1) + except ValueError: + console.print("[red]Ungültiges Format für Quelle. Bitte / angeben.[/]") + raise typer.Exit(1) + + source_cat = next((c for c in config.categories if c.name == source_cat_name), None) + if not source_cat: + console.print(f"[red]Unbekannte Quellkategorie:[/] {source_cat_name}") + raise typer.Exit(1) + + target_cat = next((c for c in config.categories if c.name == target), None) + if not target_cat: + console.print(f"[red]Ungültige Zielkategorie:[/] {target}") + raise typer.Exit(1) + + source_path = base / source_cat.subdir / repo_name + destination_path = base / target_cat.subdir / repo_name + + if not source_path.exists(): + console.print(f"[red]Quell-Repository existiert nicht:[/] {source_path}") + raise typer.Exit(1) + + if destination_path.exists(): + console.print(f"[red]Zielordner existiert bereits:[/] {destination_path}") + raise typer.Exit(1) + + try: + shutil.move(str(source_path), str(destination_path)) + console.print(f"[green]✓ Erfolgreich verschoben:[/] {source_cat.name}/{repo_name} → {target}") + except Exception as e: + console.print(f"[red]Fehler beim Verschieben:[/] {e}") + raise typer.Exit(1) diff --git a/src/repocat/cli/review.py b/src/repocat/cli/review.py new file mode 100644 index 0000000..20639c5 --- /dev/null +++ b/src/repocat/cli/review.py @@ -0,0 +1,102 @@ +from typer import Typer, confirm, prompt +from rich.console import Console +from rich.table import Table +from pathlib import Path +import shutil +from datetime import datetime +import typer +from rich.prompt import Prompt, Confirm + +from repocat.cli.move import move_command +from repocat.config import load_config +from repocat.models.config import RepoCategory +from repocat.cli.archive import archive_repo + +from repocat.cli.clean import get_dir_size, is_git_dirty, is_git_unpushed + +app = Typer() +console = Console() + + +@app.command("run") +def review(): + """ + Führt interaktiv durch alle Repositories im Review-Ordner. + """ + config = load_config() + base = config.base_dir + + archive_cat = next((c for c in config.categories if c.is_archive), None) + if not archive_cat: + console.print("[red]Keine Archiv-Kategorie in der Konfiguration gefunden.[/]") + raise typer.Exit(1) + archive_path = base / archive_cat.subdir + archive_path.mkdir(parents=True, exist_ok=True) + + + review_cat = next((c for c in config.categories if c.name == "review"), None) + if not review_cat: + console.print("[red]Kein 'review'-Eintrag in der Konfiguration gefunden.[/]") + raise typer.Exit(1) + + review_path = base / review_cat.subdir + if not review_path.exists(): + console.print(f"[yellow]Hinweis:[/] Kein review-Ordner vorhanden unter {review_path}") + return + + repos = [p for p in review_path.iterdir() if p.is_dir()] + if not repos: + console.print("[green]✓ Keine Repositories im Review-Ordner.[/]") + return + + for repo in sorted(repos): + console.rule(f"[bold cyan]{repo.name}") + + age_days = (datetime.now() - datetime.fromtimestamp(repo.stat().st_mtime)).days + size_mb = get_dir_size(repo) / 1024 / 1024 + + status = [] + if is_git_dirty(repo): + status.append("uncommitted") + if is_git_unpushed(repo): + status.append("unpushed") + if not status: + status.append("clean") + + console.print(f"[bold]Alter:[/] {age_days} Tage") + console.print(f"[bold]Größe:[/] {size_mb:.1f} MB") + console.print(f"[bold]Git:[/] {', '.join(status)}") + + while True: + action = Prompt.ask( + "Aktion \\[m]ove / \\[a]rchive / \\[d]elete / \\[s]kip / \\[q]uit", + choices=["m", "a", "d", "s", "q"], + show_choices=False + ) + + if action in ("s", "skip", ""): + break + elif action in ("q", "quit"): + console.print("[bold yellow]Abgebrochen.[/]") + raise typer.Exit() + elif action in ("d", "delete"): + if Confirm.ask(f"Bist du sicher, dass du [red]{repo.name}[/] löschen willst?"): + shutil.rmtree(repo) + console.print(f"[red]✓ Gelöscht:[/] {repo.name}") + break + elif action in ("m", "move"): + target_categories = [c.name for c in config.categories if c.name != "review" and not c.is_archive] + target = Prompt.ask("Zielkategorie", choices=target_categories) + move_command(source=f"review/{repo.name}", target=target) + break + elif action in ("a", "archive"): + try: + archive_file = archive_repo(repo, archive_path, dry_run=False) + console.print(f"[green]✓ Archiviert:[/] {archive_file.name}") + break + except Exception as e: + console.print(f"[red]Fehler beim Archivieren:[/] {e}") + else: + console.print("[yellow]Ungültige Eingabe. Gültig sind: m, d, s, q[/]") + + console.print("[green]✓ Review abgeschlossen.[/]") diff --git a/src/repocat/cli/status.py b/src/repocat/cli/status.py new file mode 100644 index 0000000..06e2a2d --- /dev/null +++ b/src/repocat/cli/status.py @@ -0,0 +1,75 @@ +from typer import Typer +from rich.console import Console +from rich.table import Table +from pathlib import Path +from datetime import datetime +from repocat.config import load_config +from repocat.models.config import RepoCategory + +app = Typer() +console = Console() + + +def get_dir_size(path: Path) -> int: + """Returns size in bytes.""" + return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) + + +def summarize_category(category: RepoCategory, base_path: Path): + cat_path = base_path / category.subdir + if not cat_path.exists(): + return 0, 0.0, None, None + + now = datetime.now() + repo_infos = [] + + for repo in cat_path.iterdir(): + if not repo.is_dir(): + continue + + size_bytes = get_dir_size(repo) + mtime = datetime.fromtimestamp(repo.stat().st_mtime) + age_days = (now - mtime).days + + repo_infos.append((repo.name, size_bytes, age_days, mtime)) + + if not repo_infos: + return 0, 0.0, None, None + + total_size = sum(info[1] for info in repo_infos) / 1024 / 1024 + avg_age = sum(info[2] for info in repo_infos) // len(repo_infos) + + oldest = max(repo_infos, key=lambda x: x[2]) # größtes Alter + + return len(repo_infos), total_size, avg_age, (oldest[0], oldest[2]) + + +@app.command("status") +def status(): + """ + Zeigt eine Übersicht über alle Repository-Kategorien. + """ + config = load_config() + base = config.base_dir + + table = Table(title="Repository-Status") + table.add_column("Kategorie") + table.add_column("Repos", justify="right") + table.add_column("Gesamtgröße", justify="right") + table.add_column("Ø Alter", justify="right") + table.add_column("Ältestes Repo", justify="left") + + for cat in config.categories: + if cat.is_archive: + continue + count, total_size, avg_age, oldest = summarize_category(cat, base) + + table.add_row( + cat.name, + str(count), + f"{total_size:.1f} MB", + f"{avg_age or '–'} d", + f"{oldest[0]} ({oldest[1]} d)" if oldest else "–" + ) + + console.print(table) diff --git a/src/repocat/config.py b/src/repocat/config.py new file mode 100644 index 0000000..43fb123 --- /dev/null +++ b/src/repocat/config.py @@ -0,0 +1,31 @@ +import yaml +from pathlib import Path +from repocat.models.config import RepoCatConfig +from rich.console import Console + +console = Console() + +DEFAULT_CONFIG_PATH = Path.home() / ".repocat.yml" +FALLBACK_PATH = Path("config.yml") # Optional, falls keine globale Config vorhanden + +def load_config(file_path: Path | None = None) -> RepoCatConfig: + config_path = file_path or DEFAULT_CONFIG_PATH + + if not config_path.exists(): + console.print(f"[yellow]Warnung:[/] Konfigurationsdatei nicht gefunden: {config_path}") + if FALLBACK_PATH.exists(): + console.print(f"[blue]Versuche stattdessen:[/] {FALLBACK_PATH}") + config_path = FALLBACK_PATH + else: + raise FileNotFoundError(f"Keine Konfigurationsdatei gefunden in {config_path} oder {FALLBACK_PATH}") + + try: + with open(config_path, "r") as f: + raw = yaml.safe_load(f) + + config = RepoCatConfig(**raw) + config.base_dir = Path(config.base_dir).expanduser() + return config + except Exception as e: + console.print(f"[red]Fehler beim Laden der Konfiguration:[/] {e}") + raise diff --git a/src/repocat/models/config.py b/src/repocat/models/config.py new file mode 100644 index 0000000..7b9bd19 --- /dev/null +++ b/src/repocat/models/config.py @@ -0,0 +1,20 @@ +from pydantic import BaseModel, Field +from pathlib import Path +from typing import List, Optional + +class RepoCategory(BaseModel): + name: str + subdir: str + days: Optional[int] = Field(default=None, ge=1) + volatile: bool = False + description: Optional[str] = None + is_archive: bool = False + +class RepoCatDefaults(BaseModel): + dry_run: bool = True + auto_create_directories: bool = True + +class RepoCatConfig(BaseModel): + base_dir: Path = Field(default=Path("~/Repositories").expanduser()) + categories: List[RepoCategory] + defaults: Optional[RepoCatDefaults] = RepoCatDefaults() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29