diff --git a/src/utils/console_utils.py b/src/utils/console_utils.py index a4162fe..861902b 100644 --- a/src/utils/console_utils.py +++ b/src/utils/console_utils.py @@ -1,64 +1,70 @@ -import sys import threading import time import subprocess from typing import List, Optional +from rich.console import Console +from rich.live import Live +from rich.text import Text + class ConsoleTask: - def __init__(self, title: str, max_log_lines: int = 10): + def __init__(self, prefix:str, title: str, step_number: Optional[int] = None, total_steps: Optional[int] = None, max_log_lines: int = 10): + self.prefix = prefix self.title = title + self.step_number = step_number + self.total_steps = total_steps self.max_log_lines = max_log_lines self.spinner_cycle = ['|', '/', '-', '\\'] self.stop_event = threading.Event() self.spinner_thread: Optional[threading.Thread] = None self.output_lines: List[str] = [] self._stdout_lock = threading.Lock() - self._drawn_lines = 0 # Track only real drawn lines, no pre-reservation anymore + self.console = Console() + self.live: Optional[Live] = None + self.spinner_idx = 0 def start_spinner(self): + self.live = Live(console=self.console, refresh_per_second=30, transient=True) + self.live.start() self.spinner_thread = threading.Thread(target=self._spinner_task, daemon=True) self.spinner_thread.start() def _spinner_task(self): - idx = 0 while not self.stop_event.is_set(): with self._stdout_lock: - self._redraw_spinner(idx) - idx += 1 + self._redraw_spinner() + self.spinner_idx += 1 time.sleep(0.1) - def _redraw_spinner(self, idx: int): + def _render_content(self) -> Text: visible_lines = self.output_lines[-self.max_log_lines:] - # Clear only previously drawn lines - if self._drawn_lines > 0: - sys.stdout.write(f"\033[{self._drawn_lines}F") # Move cursor up + prefix_text = f"[grey50]\[{self.prefix}][/grey50]" if self.prefix else "" + step_text = f"[bold blue]Step {self.step_number}/{self.total_steps}[/bold blue]" if self.step_number and self.total_steps else "" + title_text = f"[bold]{self.title}[/bold]" if self.title else "" - for _ in range(self._drawn_lines): - sys.stdout.write("\r\033[K") # Clear line - sys.stdout.write("\033[1B") # Move cursor one line down + spinner_markup = f"{prefix_text} {step_text} {title_text} {self.spinner_cycle[self.spinner_idx % len(self.spinner_cycle)]}" - sys.stdout.write(f"\033[{self._drawn_lines}F") # Move cursor up to redraw start + spinner_text = Text.from_markup(spinner_markup) + log_text = Text("\n".join(visible_lines)) - # Draw fresh content - self._drawn_lines = 0 # Reset counter + full_text = spinner_text + Text("\n") + log_text - # Draw spinner line - sys.stdout.write(f"\r{self.title} {self.spinner_cycle[idx % len(self.spinner_cycle)]}\n") - self._drawn_lines += 1 + return full_text - # Draw log lines - for line in visible_lines: - sys.stdout.write(line + "\n") - self._drawn_lines += 1 - - sys.stdout.flush() + def _redraw_spinner(self): + if self.live: + self.live.update(self._render_content()) def log(self, message: str): with self._stdout_lock: self.output_lines.append(message) if len(self.output_lines) > self.max_log_lines: self.output_lines = self.output_lines[-self.max_log_lines:] + + if self.live: + self.live.update(self._render_content()) + def run_command(self, cmd: List[str], cwd: Optional[str] = None, silent: bool = False) -> int: success = False @@ -96,28 +102,38 @@ class ConsoleTask: duration = time.time() - start_time - # Final redraw after stop + # Finalize output with self._stdout_lock: self._finalize_output(success, duration) return 0 if success else 1 def _finalize_output(self, success: bool, duration: float): - visible_lines = self.output_lines[-self.max_log_lines:] + if self.live: + self.live.stop() - # Clear last drawn area - if self._drawn_lines > 0: - sys.stdout.write(f"\033[{self._drawn_lines}F") - for _ in range(self._drawn_lines): - sys.stdout.write("\r\033[K") - sys.stdout.write("\033[1B") - sys.stdout.write(f"\033[{self._drawn_lines}F") + prefix_text = f"[grey50]\[{self.prefix}][/grey50]" if self.prefix else "" + status_symbol = "[green]✅[/green]" if success else "[red]❌[/red]" + step_text = f"[bold blue]Step {self.step_number}/{self.total_steps}[/bold blue]" if self.step_number and self.total_steps else "" + status_title = f"[bold green]{self.title}[/bold green]" if success else f"[bold red]{self.title}[/bold red]" + final_line = f"{prefix_text} {step_text} {status_title} {status_symbol} [bold green]({duration:.1f}s[/bold green])" - self._drawn_lines = 0 # Reset + # Final full output + self.console.print(final_line) - # Write final status line - status = "\033[92m✅\033[0m" if success else "\033[91m❌\033[0m" - sys.stdout.write(f"\r{self.title} {status} ({duration:.1f}s)\n") - self._drawn_lines += 1 +class ConsoleUtils: + def __init__(self, + prefix: str = "", + step_number: Optional[int] = None, + total_steps: Optional[int] = None + ): + self.prefix = prefix + self.step_number = step_number + self.total_steps = total_steps + self.console = Console() - sys.stdout.flush() + def print(self, message: str): + prefix = f"[grey50]\[{self.prefix}][/grey50]" if self.prefix else "" + step_info = f"[bold blue]Step {self.step_number}/{self.total_steps}[/bold blue]" if self.step_number and self.total_steps else "" + message_text = f"{prefix} {step_info} {message}" + self.console.print(message_text) \ No newline at end of file