- Introduce models for managing repository catalog and categories - Add utilities for directory size and Git status checks - Enable repository metadata management, validation, and operations
119 lines
3.5 KiB
Python
119 lines
3.5 KiB
Python
from __future__ import annotations
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional, List
|
|
from pydantic import BaseModel, Field
|
|
import shutil
|
|
|
|
from repocat.models.config import RepoCategory, RepoCatConfig
|
|
from repocat.utils.fsutils import get_dir_size, is_git_dirty, is_git_unpushed
|
|
|
|
|
|
class GitStatus(BaseModel):
|
|
dirty: bool
|
|
unpushed: bool
|
|
branch: Optional[str] = None
|
|
remote_url: Optional[str] = None
|
|
|
|
@property
|
|
def is_protected(self) -> bool:
|
|
return self.dirty or self.unpushed
|
|
|
|
|
|
class RepoModel(BaseModel):
|
|
name: str
|
|
path: Path
|
|
size_bytes: int
|
|
last_modified: datetime
|
|
git: Optional[GitStatus] = None
|
|
|
|
@property
|
|
def size_mb(self) -> float:
|
|
return self.size_bytes / 1024 / 1024
|
|
|
|
@property
|
|
def age_days(self) -> int:
|
|
return (datetime.now() - self.last_modified).days
|
|
|
|
@property
|
|
def is_git_repo(self) -> bool:
|
|
return self.git is not None
|
|
|
|
def is_expired(self, days: int) -> bool:
|
|
return self.age_days >= days
|
|
|
|
def move_to(self, target_dir: Path) -> None:
|
|
destination = target_dir / self.name
|
|
if destination.exists():
|
|
raise FileExistsError(f"Zielordner existiert bereits: {destination}")
|
|
shutil.move(str(self.path), str(destination))
|
|
self.path = destination
|
|
|
|
|
|
class RepoCategoryState(BaseModel):
|
|
config: RepoCategory
|
|
path: Path
|
|
repos: List[RepoModel] = Field(default_factory=list)
|
|
|
|
def total_size_mb(self) -> float:
|
|
return sum(repo.size_mb for repo in self.repos)
|
|
|
|
def find_repo(self, name: str) -> Optional[RepoModel]:
|
|
return next((r for r in self.repos if r.name == name), None)
|
|
|
|
|
|
class RepoCatalogState(BaseModel):
|
|
config: RepoCatConfig
|
|
categories: List[RepoCategoryState] = Field(default_factory=list)
|
|
|
|
@classmethod
|
|
def from_config(cls, config: RepoCatConfig) -> RepoCatalogState:
|
|
categories: List[RepoCategoryState] = []
|
|
|
|
for cat_cfg in config.categories:
|
|
cat_path = config.base_dir / cat_cfg.subdir
|
|
repos: List[RepoModel] = []
|
|
|
|
if cat_path.exists():
|
|
for item in sorted(cat_path.iterdir()):
|
|
if not item.is_dir():
|
|
continue
|
|
|
|
last_mod = datetime.fromtimestamp(item.stat().st_mtime)
|
|
size = get_dir_size(item)
|
|
|
|
git = None
|
|
if (item / ".git").exists():
|
|
git = GitStatus(
|
|
dirty=is_git_dirty(item),
|
|
unpushed=is_git_unpushed(item),
|
|
)
|
|
|
|
repos.append(RepoModel(
|
|
name=item.name,
|
|
path=item,
|
|
size_bytes=size,
|
|
last_modified=last_mod,
|
|
git=git,
|
|
))
|
|
|
|
categories.append(RepoCategoryState(
|
|
config=cat_cfg,
|
|
path=cat_path,
|
|
repos=repos,
|
|
))
|
|
|
|
return cls(config=config, categories=categories)
|
|
|
|
def get_category(self, name: str) -> Optional[RepoCategoryState]:
|
|
return next((c for c in self.categories if c.config.name == name), None)
|
|
|
|
def find_repo(self, name: str) -> Optional[RepoModel]:
|
|
for cat in self.categories:
|
|
if (repo := cat.find_repo(name)):
|
|
return repo
|
|
return None
|
|
|
|
def missing_directories(self) -> List[Path]:
|
|
return [cat.path for cat in self.categories if not cat.path.exists()]
|