diff --git a/src/repocat/models/catalog.py b/src/repocat/models/catalog.py new file mode 100644 index 0000000..e5958d8 --- /dev/null +++ b/src/repocat/models/catalog.py @@ -0,0 +1,118 @@ +from __future__ import annotations +from pathlib import Path +from datetime import datetime +from typing import Optional, List +from pydantic import BaseModel, Field +import shutil + +from repocat.models.config import RepoCategory, RepoCatConfig +from repocat.utils.fsutils import get_dir_size, is_git_dirty, is_git_unpushed + + +class GitStatus(BaseModel): + dirty: bool + unpushed: bool + branch: Optional[str] = None + remote_url: Optional[str] = None + + @property + def is_protected(self) -> bool: + return self.dirty or self.unpushed + + +class RepoModel(BaseModel): + name: str + path: Path + size_bytes: int + last_modified: datetime + git: Optional[GitStatus] = None + + @property + def size_mb(self) -> float: + return self.size_bytes / 1024 / 1024 + + @property + def age_days(self) -> int: + return (datetime.now() - self.last_modified).days + + @property + def is_git_repo(self) -> bool: + return self.git is not None + + def is_expired(self, days: int) -> bool: + return self.age_days >= days + + def move_to(self, target_dir: Path) -> None: + destination = target_dir / self.name + if destination.exists(): + raise FileExistsError(f"Zielordner existiert bereits: {destination}") + shutil.move(str(self.path), str(destination)) + self.path = destination + + +class RepoCategoryState(BaseModel): + config: RepoCategory + path: Path + repos: List[RepoModel] = Field(default_factory=list) + + def total_size_mb(self) -> float: + return sum(repo.size_mb for repo in self.repos) + + def find_repo(self, name: str) -> Optional[RepoModel]: + return next((r for r in self.repos if r.name == name), None) + + +class RepoCatalogState(BaseModel): + config: RepoCatConfig + categories: List[RepoCategoryState] = Field(default_factory=list) + + @classmethod + def from_config(cls, config: RepoCatConfig) -> RepoCatalogState: + categories: List[RepoCategoryState] = [] + + for cat_cfg in config.categories: + cat_path = config.base_dir / cat_cfg.subdir + repos: List[RepoModel] = [] + + if cat_path.exists(): + for item in sorted(cat_path.iterdir()): + if not item.is_dir(): + continue + + last_mod = datetime.fromtimestamp(item.stat().st_mtime) + size = get_dir_size(item) + + git = None + if (item / ".git").exists(): + git = GitStatus( + dirty=is_git_dirty(item), + unpushed=is_git_unpushed(item), + ) + + repos.append(RepoModel( + name=item.name, + path=item, + size_bytes=size, + last_modified=last_mod, + git=git, + )) + + categories.append(RepoCategoryState( + config=cat_cfg, + path=cat_path, + repos=repos, + )) + + return cls(config=config, categories=categories) + + def get_category(self, name: str) -> Optional[RepoCategoryState]: + return next((c for c in self.categories if c.config.name == name), None) + + def find_repo(self, name: str) -> Optional[RepoModel]: + for cat in self.categories: + if (repo := cat.find_repo(name)): + return repo + return None + + def missing_directories(self) -> List[Path]: + return [cat.path for cat in self.categories if not cat.path.exists()] diff --git a/src/repocat/utils/fsutils.py b/src/repocat/utils/fsutils.py new file mode 100644 index 0000000..df07b79 --- /dev/null +++ b/src/repocat/utils/fsutils.py @@ -0,0 +1,43 @@ +import subprocess +from pathlib import Path + +def get_dir_size(path: Path) -> int: + """Returns directory size in bytes.""" + return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) + +def is_git_dirty(path: Path) -> bool: + """Returns True if there are uncommitted changes.""" + try: + result = subprocess.run( + ["git", "status", "--porcelain"], + cwd=path, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + text=True, + ) + return bool(result.stdout.strip()) + except Exception: + return False + +def is_git_unpushed(path: Path) -> bool: + """Returns True if there are commits that haven't been pushed to upstream.""" + try: + # Check if upstream exists + subprocess.run( + ["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], + cwd=path, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True, + ) + + result = subprocess.run( + ["git", "log", "@{u}..HEAD", "--oneline"], + cwd=path, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + text=True, + ) + return bool(result.stdout.strip()) + except Exception: + return False \ No newline at end of file