326 lines
14 KiB
Python
326 lines
14 KiB
Python
'''
|
||
Function:
|
||
Implementation of Logging Related Utils
|
||
Author:
|
||
Zhenchao Jin
|
||
WeChat Official Account (微信公众号):
|
||
Charles的皮卡丘
|
||
'''
|
||
from __future__ import annotations
|
||
import re
|
||
import os
|
||
import shutil
|
||
import logging
|
||
import collections.abc
|
||
import tabulate as tabmod
|
||
from wcwidth import wcswidth
|
||
from tabulate import tabulate
|
||
from prettytable import PrettyTable
|
||
from platformdirs import user_log_dir
|
||
from prompt_toolkit.layout import Layout
|
||
from prompt_toolkit.application import Application
|
||
from prompt_toolkit.key_binding import KeyBindings
|
||
from prompt_toolkit.layout.containers import HSplit, Window
|
||
from prompt_toolkit.application.current import get_app_or_none
|
||
from prompt_toolkit.layout.controls import FormattedTextControl
|
||
from prompt_toolkit.formatted_text import ANSI, to_formatted_text
|
||
from typing import Any, List, Optional, Sequence, Set, Tuple, Union, Dict
|
||
from prompt_toolkit.formatted_text.utils import fragment_list_width, split_lines, get_cwidth
|
||
|
||
|
||
'''settings'''
|
||
tabmod.WIDE_CHARS_MODE = True
|
||
NoTruncSpec = Optional[Sequence[Union[int, str]]]
|
||
ANSI_CSI_RE = re.compile(r"\x1b\[[0-9;?]*[ -/]*[@-~]")
|
||
AMBIGUOUS_MAP: Dict[str, str] = {
|
||
"·": ".", "•": "*", "…": "...", "“": '"', "”": '"', "„": '"', "‟": '"', "‘": "'", "’": "'", "‚": "'", "‛": "'", "—": "-", "–": "-", "−": "-", " ": " ",
|
||
}
|
||
COLORS = {
|
||
'red': '\033[31m', 'green': '\033[32m', 'yellow': '\033[33m', 'blue': '\033[34m', 'pink': '\033[35m', 'cyan': '\033[36m', 'highlight': '\033[93m',
|
||
'number': '\033[96m', 'singer': '\033[93m', 'flac': '\033[95m', 'songname': '\033[91m'
|
||
}
|
||
|
||
|
||
'''LoggerHandle'''
|
||
class LoggerHandle():
|
||
appname, appauthor = 'musicdl', 'zcjin'
|
||
def __init__(self):
|
||
# set up log dir
|
||
log_dir = user_log_dir(appname=self.appname, appauthor=self.appauthor)
|
||
os.makedirs(log_dir, exist_ok=True)
|
||
log_file_path = os.path.join(log_dir, "musicdl.log")
|
||
self.log_file_path = log_file_path
|
||
# config logging
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler(log_file_path, encoding="utf-8"), logging.StreamHandler()])
|
||
'''log'''
|
||
@staticmethod
|
||
def log(level, message):
|
||
message = str(message)
|
||
logger = logging.getLogger(LoggerHandle.appname)
|
||
logger.log(level, message)
|
||
'''debug'''
|
||
def debug(self, message, disable_print=False):
|
||
message = str(message)
|
||
if disable_print:
|
||
fp = open(self.log_file_path, 'a', encoding='utf-8')
|
||
fp.write(message + '\n')
|
||
else:
|
||
LoggerHandle.log(logging.DEBUG, message)
|
||
'''info'''
|
||
def info(self, message, disable_print=False):
|
||
message = str(message)
|
||
if disable_print:
|
||
fp = open(self.log_file_path, 'a', encoding='utf-8')
|
||
fp.write(message + '\n')
|
||
else:
|
||
LoggerHandle.log(logging.INFO, message)
|
||
'''warning'''
|
||
def warning(self, message, disable_print=False):
|
||
message = str(message)
|
||
if disable_print:
|
||
fp = open(self.log_file_path, 'a', encoding='utf-8')
|
||
fp.write(message + '\n')
|
||
else:
|
||
if '\033[31m' not in message: message = colorize(message, 'red')
|
||
LoggerHandle.log(logging.WARNING, message)
|
||
'''error'''
|
||
def error(self, message, disable_print=False):
|
||
message = str(message)
|
||
if disable_print:
|
||
fp = open(self.log_file_path, 'a', encoding='utf-8')
|
||
fp.write(message + '\n')
|
||
else:
|
||
if '\033[31m' not in message: message = colorize(message, 'red')
|
||
LoggerHandle.log(logging.ERROR, message)
|
||
|
||
|
||
'''colorize'''
|
||
def colorize(string, color):
|
||
string = str(string)
|
||
if color not in COLORS: return string
|
||
return COLORS[color] + string + '\033[0m'
|
||
|
||
|
||
'''printfullline'''
|
||
def printfullline(ch: str = "*", end: str = '\n', terminal_right_space_len: int = 1):
|
||
cols = shutil.get_terminal_size().columns - terminal_right_space_len
|
||
assert cols > 0, f'"terminal_right_space_len" should smaller than {shutil.get_terminal_size()}'
|
||
print(ch * cols, end=end)
|
||
|
||
|
||
'''printtable'''
|
||
def printtable(titles, items, terminal_right_space_len=4):
|
||
assert isinstance(titles, collections.abc.Sequence) and isinstance(items, collections.abc.Sequence), 'title and items should be iterable'
|
||
table = PrettyTable(titles)
|
||
for item in items: table.add_row(item)
|
||
max_width = shutil.get_terminal_size().columns - terminal_right_space_len
|
||
assert max_width > 0, f'"terminal_right_space_len" should smaller than {shutil.get_terminal_size()}'
|
||
table.max_table_width = max_width
|
||
print(table)
|
||
return table
|
||
|
||
|
||
'''ptsizefallback'''
|
||
def ptsizefallback() -> Tuple[int, int]:
|
||
app = get_app_or_none()
|
||
if app is not None and getattr(app, "output", None) is not None:
|
||
try:
|
||
sz = app.output.get_size()
|
||
cols, rows = int(sz.columns), int(sz.rows)
|
||
if cols > 0 and rows > 0: return cols, rows
|
||
except Exception:
|
||
pass
|
||
s = shutil.get_terminal_size(fallback=(80, 24))
|
||
return int(s.columns), int(s.lines)
|
||
|
||
|
||
'''stripansi'''
|
||
def stripansi(s: str) -> str:
|
||
return ANSI_CSI_RE.sub("", s)
|
||
|
||
|
||
'''dispwidth'''
|
||
def dispwidth(s: Any) -> int:
|
||
if s is None: return 0
|
||
w = wcswidth(stripansi(str(s)))
|
||
return max(0, w)
|
||
|
||
|
||
'''normalizeforconsole'''
|
||
def normalizeforconsole(text: Any, *, enable: bool) -> str:
|
||
s = "" if text is None else str(text)
|
||
if not s: return s
|
||
s = s.replace("\r", "")
|
||
s = s.replace("\n", " ").replace("\t", " ")
|
||
if enable: s = "".join(AMBIGUOUS_MAP.get(ch, ch) for ch in s)
|
||
return s
|
||
|
||
|
||
'''truncatebydispwidth'''
|
||
def truncatebydispwidth(text: Any, max_width: int, ellipsis: str = "...") -> str:
|
||
s = "" if text is None else str(text)
|
||
if max_width <= 0: return ""
|
||
if dispwidth(s) <= max_width: return s
|
||
ell_w = dispwidth(ellipsis)
|
||
target = max_width if max_width <= ell_w else (max_width - ell_w)
|
||
out, used, i, emitted_ansi = [], 0, 0, False
|
||
while i < len(s) and used < target:
|
||
if s[i] == "\x1b":
|
||
m = ANSI_CSI_RE.match(s, i)
|
||
if m: out.append(m.group(0)); emitted_ansi = True; i = m.end(); continue
|
||
i += 1; continue
|
||
ch = s[i]; ch_w = max(wcswidth(ch), 0)
|
||
if used + ch_w > target: break
|
||
out.append(ch); used += ch_w; i += 1
|
||
if emitted_ansi and (not out or not str(out[-1]).endswith("\x1b[0m")): out.append("\x1b[0m")
|
||
core = "".join(out)
|
||
return core if max_width <= ell_w else (core + ellipsis)
|
||
|
||
|
||
'''truncatefragmentstocols'''
|
||
def truncatefragmentstocols(fragments: Sequence[Tuple], cols: int) -> List[Tuple]:
|
||
if cols <= 0: return []
|
||
out, used = [], 0
|
||
for style, text, *rest in fragments:
|
||
if not text: continue
|
||
buf: List[str] = []
|
||
for ch in text:
|
||
cw = get_cwidth(ch)
|
||
if used + cw > cols: break
|
||
buf.append(ch); used += cw
|
||
if buf: out.append((style, "".join(buf), *rest))
|
||
if used >= cols: break
|
||
return out
|
||
|
||
|
||
'''truncateandpadline'''
|
||
def truncateandpadline(fragments: Sequence[Tuple], cols: int) -> List[Tuple]:
|
||
line = truncatefragmentstocols(fragments, cols)
|
||
pad = cols - fragment_list_width(line)
|
||
if pad > 0: return list(line) + [("", " " * pad)]
|
||
return truncatefragmentstocols(line, cols)
|
||
|
||
|
||
'''smarttrunctable'''
|
||
def smarttrunctable(headers: Sequence[Any], rows: Sequence[Sequence[Any]], *, max_col_width: int = 40, min_col_width: int = 4, terminal_right_space_len: int = 2, no_trunc_cols: NoTruncSpec = None, term_width: Optional[int] = None, tablefmt: str = "grid", max_iterations: int = 2000) -> str:
|
||
headers_s = ["" if h is None else str(h) for h in headers]
|
||
rows_s, ncols = [[("" if c is None else str(c)) for c in r] for r in rows], len(headers_s)
|
||
if any(len(r) != ncols for r in rows_s): raise ValueError("All rows must have the same number of columns as headers")
|
||
if term_width is None: term_width = ptsizefallback()[0]
|
||
target_width = max(1, term_width - max(0, terminal_right_space_len))
|
||
protected: Set[int] = set()
|
||
if no_trunc_cols:
|
||
header_to_idx = {h: i for i, h in enumerate(headers_s)}
|
||
for spec in no_trunc_cols:
|
||
if isinstance(spec, int) and 0 <= spec < ncols: protected.add(spec)
|
||
elif not isinstance(spec, int):
|
||
idx = header_to_idx.get(str(spec))
|
||
if idx is not None: protected.add(idx)
|
||
col_natural = [dispwidth(h) for h in headers_s]
|
||
col_natural = [max(col_natural[j], *(dispwidth(r[j]) for r in rows_s)) for j in range(len(col_natural))]
|
||
col_limit: List[Optional[int]] = []
|
||
for j in range(ncols):
|
||
if j in protected: col_limit.append(None)
|
||
else: cap = col_natural[j]; cap = min(cap, max_col_width) if max_col_width else cap; col_limit.append(max(min_col_width, cap))
|
||
def rendercurrent() -> str:
|
||
th = [h if col_limit[j] is None else truncatebydispwidth(h, col_limit[j]) for j, h in enumerate(headers_s)]
|
||
tr = [[cell if col_limit[j] is None else truncatebydispwidth(cell, col_limit[j]) for j, cell in enumerate(r)] for r in rows_s]
|
||
return tabulate(tr, headers=th, tablefmt=tablefmt)
|
||
def tablewidth(table_str: str) -> int:
|
||
return max((dispwidth(line) for line in table_str.splitlines()), default=0)
|
||
last = ""
|
||
for _ in range(max_iterations):
|
||
table_str = rendercurrent()
|
||
last = table_str
|
||
if tablewidth(table_str) <= target_width: return table_str
|
||
cur_w = [dispwidth(h if col_limit[j] is None else truncatebydispwidth(h, col_limit[j])) for j, h in enumerate(headers_s)]
|
||
any(cur_w.__setitem__(j, max(cur_w[j], dispwidth(cell if col_limit[j] is None else truncatebydispwidth(cell, col_limit[j])))) or False for r in rows_s for j, cell in enumerate(r))
|
||
shrinkable = [j for j in range(ncols) if col_limit[j] is not None and col_limit[j] > min_col_width]
|
||
if not shrinkable: return last
|
||
j_widest = max(shrinkable, key=lambda j: cur_w[j])
|
||
col_limit[j_widest] = max(min_col_width, int(col_limit[j_widest]) - 1)
|
||
return last
|
||
|
||
|
||
'''cursorpickintable'''
|
||
def cursorpickintable(headers: Sequence[Any], rows: Sequence[Sequence[Any]], row_ids: Sequence[Any], *, no_trunc_cols: NoTruncSpec = None, terminal_right_space_len: int = 2, normalize_ambiguous: Optional[bool] = None, tablefmt: Optional[str] = None) -> List[Any]:
|
||
if len(rows) != len(row_ids): raise ValueError("rows and row_ids length mismatch")
|
||
ncols = len(headers)
|
||
if any(len(r) != ncols for r in rows): raise ValueError("All rows must have same number of columns as headers")
|
||
if normalize_ambiguous is None: normalize_ambiguous = (os.name == "nt")
|
||
if tablefmt is None: tablefmt = "grid" if os.name == "nt" else "fancy_grid"
|
||
headers_s = [normalizeforconsole(h, enable=normalize_ambiguous) for h in headers]
|
||
rows_s = [[normalizeforconsole(c, enable=normalize_ambiguous) for c in r] for r in rows]
|
||
kb, current, picked, view_start = KeyBindings(), 0, set(), 0
|
||
FIRST_DATA_LINE, LINES_PER_ROW = 3, 2
|
||
def termsize() -> Tuple[int, int]: return ptsizefallback()
|
||
def maxvisiblerows(term_lines: int) -> int:
|
||
overhead = 10; usable = max(2, term_lines - overhead)
|
||
return max(1, usable // LINES_PER_ROW)
|
||
def computeview() -> Tuple[int, int]:
|
||
nonlocal view_start; _, term_lines = termsize()
|
||
page = maxvisiblerows(term_lines)
|
||
start = max(0, min(current - page // 2, len(rows_s) - page))
|
||
end, view_start = min(len(rows_s), start + page), start
|
||
return start, end
|
||
def buildtable() -> str:
|
||
cols, _ = termsize()
|
||
start, end = computeview()
|
||
def marker(i: int) -> str:
|
||
at, sel = (i == current), (row_ids[i] in picked)
|
||
if at and sel: return ">*"
|
||
if at: return "> "
|
||
if sel: return "* "
|
||
return " "
|
||
view_rows: List[List[str]] = []
|
||
for i in range(start, end): row = list(rows_s[i]); row[0] = marker(i) + row[0]; view_rows.append(row)
|
||
view_headers = list(headers_s)
|
||
view_headers[0] = f"{view_headers[0]} ({start+1}-{end}/{len(rows_s)})"
|
||
return smarttrunctable(headers=view_headers, rows=view_rows, no_trunc_cols=no_trunc_cols, terminal_right_space_len=terminal_right_space_len, term_width=cols, tablefmt=tablefmt)
|
||
def render() -> List[Tuple]:
|
||
cols, term_lines = termsize()
|
||
frags = to_formatted_text(ANSI(buildtable()))
|
||
highlight_line = FIRST_DATA_LINE + (current - view_start) * LINES_PER_ROW
|
||
out, line_count = [], 0
|
||
for li, line_frags in enumerate(split_lines(frags)):
|
||
if li == highlight_line: line_frags = [(((style + " reverse").strip() if style else "reverse"), text, *rest) for style, text, *rest in line_frags]
|
||
out.extend(truncateandpadline(line_frags, cols)); out.append(("", "\n")); line_count += 1
|
||
help_text = ("\nUse ↑/↓ to move, PgUp/PgDn to jump, <space> toggle, a: all, i: invert, <enter> confirm, q/Esc cancel.\n")
|
||
help_frags = to_formatted_text(ANSI(help_text))
|
||
for line_frags in split_lines(help_frags): out.extend(truncateandpadline(line_frags, cols)); out.append(("", "\n")); line_count += 1
|
||
while line_count < term_lines: out.append(("", " " * cols)); out.append(("", "\n")); line_count += 1
|
||
return out
|
||
def invalidate(event) -> None: event.app.invalidate()
|
||
@kb.add("up")
|
||
def _(event):
|
||
nonlocal current; current = max(0, current - 1)
|
||
invalidate(event)
|
||
@kb.add("down")
|
||
def _(event):
|
||
nonlocal current; current = min(len(rows_s) - 1, current + 1)
|
||
invalidate(event)
|
||
@kb.add("pageup")
|
||
def _(event):
|
||
nonlocal current; _, term_lines = termsize()
|
||
current = max(0, current - maxvisiblerows(term_lines))
|
||
invalidate(event)
|
||
@kb.add("pagedown")
|
||
def _(event):
|
||
nonlocal current; _, term_lines = termsize()
|
||
current = min(len(rows_s) - 1, current + maxvisiblerows(term_lines))
|
||
invalidate(event)
|
||
@kb.add(" ")
|
||
def _(event): rid = row_ids[current]; (picked.remove(rid) if rid in picked else picked.add(rid)); invalidate(event)
|
||
@kb.add("a")
|
||
@kb.add("A")
|
||
def _(event): picked.clear(); picked.update(row_ids); invalidate(event)
|
||
@kb.add("i")
|
||
@kb.add("I")
|
||
def _(event): picked.symmetric_difference_update(row_ids); invalidate(event)
|
||
@kb.add("enter")
|
||
def _(event): event.app.exit(result=[rid for rid in row_ids if rid in picked])
|
||
@kb.add("escape")
|
||
@kb.add("q")
|
||
def _(event): event.app.exit(result=[])
|
||
app = Application(layout=Layout(HSplit([Window(FormattedTextControl(render), wrap_lines=False)])), key_bindings=kb, full_screen=True)
|
||
return app.run() |