Enhances console task rendering with rich library

Refactors spinner and log rendering to use the Rich library for better formatting and improved user experience.
Adds support for task prefixes, step tracking, and transient live updates.
Replaces manual console manipulation with structured rendering for clarity and maintainability.
This commit is contained in:
2025-04-26 16:18:19 +00:00
parent 72699ed32f
commit cc82d883c0

View File

@@ -1,64 +1,70 @@
import sys
import threading import threading
import time import time
import subprocess import subprocess
from typing import List, Optional from typing import List, Optional
from rich.console import Console
from rich.live import Live
from rich.text import Text
class ConsoleTask: class ConsoleTask:
def __init__(self, title: str, max_log_lines: int = 10): def __init__(self, prefix:str, title: str, step_number: Optional[int] = None, total_steps: Optional[int] = None, max_log_lines: int = 10):
self.prefix = prefix
self.title = title self.title = title
self.step_number = step_number
self.total_steps = total_steps
self.max_log_lines = max_log_lines self.max_log_lines = max_log_lines
self.spinner_cycle = ['|', '/', '-', '\\'] self.spinner_cycle = ['|', '/', '-', '\\']
self.stop_event = threading.Event() self.stop_event = threading.Event()
self.spinner_thread: Optional[threading.Thread] = None self.spinner_thread: Optional[threading.Thread] = None
self.output_lines: List[str] = [] self.output_lines: List[str] = []
self._stdout_lock = threading.Lock() self._stdout_lock = threading.Lock()
self._drawn_lines = 0 # Track only real drawn lines, no pre-reservation anymore self.console = Console()
self.live: Optional[Live] = None
self.spinner_idx = 0
def start_spinner(self): def start_spinner(self):
self.live = Live(console=self.console, refresh_per_second=30, transient=True)
self.live.start()
self.spinner_thread = threading.Thread(target=self._spinner_task, daemon=True) self.spinner_thread = threading.Thread(target=self._spinner_task, daemon=True)
self.spinner_thread.start() self.spinner_thread.start()
def _spinner_task(self): def _spinner_task(self):
idx = 0
while not self.stop_event.is_set(): while not self.stop_event.is_set():
with self._stdout_lock: with self._stdout_lock:
self._redraw_spinner(idx) self._redraw_spinner()
idx += 1 self.spinner_idx += 1
time.sleep(0.1) time.sleep(0.1)
def _redraw_spinner(self, idx: int): def _render_content(self) -> Text:
visible_lines = self.output_lines[-self.max_log_lines:] visible_lines = self.output_lines[-self.max_log_lines:]
# Clear only previously drawn lines prefix_text = f"[grey50]\[{self.prefix}][/grey50]" if self.prefix else ""
if self._drawn_lines > 0: step_text = f"[bold blue]Step {self.step_number}/{self.total_steps}[/bold blue]" if self.step_number and self.total_steps else ""
sys.stdout.write(f"\033[{self._drawn_lines}F") # Move cursor up title_text = f"[bold]{self.title}[/bold]" if self.title else ""
for _ in range(self._drawn_lines): spinner_markup = f"{prefix_text} {step_text} {title_text} {self.spinner_cycle[self.spinner_idx % len(self.spinner_cycle)]}"
sys.stdout.write("\r\033[K") # Clear line
sys.stdout.write("\033[1B") # Move cursor one line down
sys.stdout.write(f"\033[{self._drawn_lines}F") # Move cursor up to redraw start spinner_text = Text.from_markup(spinner_markup)
log_text = Text("\n".join(visible_lines))
# Draw fresh content full_text = spinner_text + Text("\n") + log_text
self._drawn_lines = 0 # Reset counter
# Draw spinner line return full_text
sys.stdout.write(f"\r{self.title} {self.spinner_cycle[idx % len(self.spinner_cycle)]}\n")
self._drawn_lines += 1
# Draw log lines def _redraw_spinner(self):
for line in visible_lines: if self.live:
sys.stdout.write(line + "\n") self.live.update(self._render_content())
self._drawn_lines += 1
sys.stdout.flush()
def log(self, message: str): def log(self, message: str):
with self._stdout_lock: with self._stdout_lock:
self.output_lines.append(message) self.output_lines.append(message)
if len(self.output_lines) > self.max_log_lines: if len(self.output_lines) > self.max_log_lines:
self.output_lines = self.output_lines[-self.max_log_lines:] self.output_lines = self.output_lines[-self.max_log_lines:]
if self.live:
self.live.update(self._render_content())
def run_command(self, cmd: List[str], cwd: Optional[str] = None, silent: bool = False) -> int: def run_command(self, cmd: List[str], cwd: Optional[str] = None, silent: bool = False) -> int:
success = False success = False
@@ -96,28 +102,38 @@ class ConsoleTask:
duration = time.time() - start_time duration = time.time() - start_time
# Final redraw after stop # Finalize output
with self._stdout_lock: with self._stdout_lock:
self._finalize_output(success, duration) self._finalize_output(success, duration)
return 0 if success else 1 return 0 if success else 1
def _finalize_output(self, success: bool, duration: float): def _finalize_output(self, success: bool, duration: float):
visible_lines = self.output_lines[-self.max_log_lines:] if self.live:
self.live.stop()
# Clear last drawn area prefix_text = f"[grey50]\[{self.prefix}][/grey50]" if self.prefix else ""
if self._drawn_lines > 0: status_symbol = "[green]✅[/green]" if success else "[red]❌[/red]"
sys.stdout.write(f"\033[{self._drawn_lines}F") step_text = f"[bold blue]Step {self.step_number}/{self.total_steps}[/bold blue]" if self.step_number and self.total_steps else ""
for _ in range(self._drawn_lines): status_title = f"[bold green]{self.title}[/bold green]" if success else f"[bold red]{self.title}[/bold red]"
sys.stdout.write("\r\033[K") final_line = f"{prefix_text} {step_text} {status_title} {status_symbol} [bold green]({duration:.1f}s[/bold green])"
sys.stdout.write("\033[1B")
sys.stdout.write(f"\033[{self._drawn_lines}F")
self._drawn_lines = 0 # Reset # Final full output
self.console.print(final_line)
# Write final status line class ConsoleUtils:
status = "\033[92m✅\033[0m" if success else "\033[91m❌\033[0m" def __init__(self,
sys.stdout.write(f"\r{self.title} {status} ({duration:.1f}s)\n") prefix: str = "",
self._drawn_lines += 1 step_number: Optional[int] = None,
total_steps: Optional[int] = None
):
self.prefix = prefix
self.step_number = step_number
self.total_steps = total_steps
self.console = Console()
sys.stdout.flush() def print(self, message: str):
prefix = f"[grey50]\[{self.prefix}][/grey50]" if self.prefix else ""
step_info = f"[bold blue]Step {self.step_number}/{self.total_steps}[/bold blue]" if self.step_number and self.total_steps else ""
message_text = f"{prefix} {step_info} {message}"
self.console.print(message_text)