diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8ed608a..dcd0128 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -8,6 +8,7 @@ * Split up multiple unrelated changes in multiple pull requests. +* If it's a fix for a unreported bug, make a bug report and link the pull request. * Purely cosmetic changes won't be accepted without a very good explanation of its value. * (Some design choices are for better configurability of syntax highlighting.) diff --git a/bpytop.py b/bpytop.py index 58fb595..085fea6 100755 --- a/bpytop.py +++ b/bpytop.py @@ -73,7 +73,7 @@ DEFAULT_CONF: Template = Template(f'#? Config file for bpytop v. {VERSION}' + '' #* Corresponding folder with a trailing / needs to be appended, example: color_theme="user_themes/monokai" color_theme="$color_theme" -#* Update time in milliseconds, increases automatically if set below internal loops processing time, recommended 2000 ms or above for better sample times for graphs. +#* Update time in milliseconds, increases automatically if set below internal loops processing time, recommended 1000 ms or above for better sample times for graphs. update_ms=$update_ms #* Processes sorting, "pid" "program" "arguments" "threads" "user" "memory" "cpu lazy" "cpu responsive", @@ -120,12 +120,15 @@ swap_disk=$swap_disk #* If mem box should be split to also show disks info. show_disks=$show_disks +#* Show init screen at startup, the init screen is purely cosmetical +show_init=$show_init + #* Enable check for new version from github.com/aristocratos/bpytop at start. update_check=$update_check -#* Set loglevel for "~/.config/bpytop/error.log" levels are: "CRITICAL" "ERROR" "WARNING" "INFO" "DEBUG". +#* Set loglevel for "~/.config/bpytop/error.log" levels are: "ERROR" "WARNING" "INFO" "DEBUG". #* The level set includes all lower levels, i.e. "DEBUG" will show all logging info. -log_level="$log_level" +log_level=$log_level ''') CONFIG_DIR: str = f'{os.path.expanduser("~")}/.config/bpytop' @@ -146,6 +149,11 @@ THREADS: int = psutil.cpu_count(logical=True) or 1 THREAD_ERROR: int = 0 +if "--debug" in sys.argv: + DEBUG = True +else: + DEBUG = False + DEFAULT_THEME: Dict[str, str] = { "main_bg" : "", "main_fg" : "#cc", @@ -205,9 +213,9 @@ except PermissionError: print(f'ERROR!\nNo permission to write to "{CONFIG_DIR}" directory!') quit(1) -#! Timers, remove -----------------------------------------------------------------------> +#? Timers for testing and debugging --------------------------------------------------------------> -class Timer: +class TimeIt: timers: Dict[str, float] = {} paused: Dict[str, float] = {} @@ -231,25 +239,22 @@ class Timer: del cls.paused[name] errlog.debug(f'{name} completed in {total:.6f} seconds') -def timerd(func): +def timeit_decorator(func): def timed(*args, **kw): ts = time() out = func(*args, **kw) - te = time() - errlog.debug(f'{func.__name__} completed in {te - ts:.6f} seconds') + errlog.debug(f'{func.__name__} completed in {time() - ts:.6f} seconds') return out return timed -#! Timers, remove -----------------------------------------------------------------------< - -#? Set up config class and load config ------------------------------------------------> +#? Set up config class and load config -----------------------------------------------------------> class Config: '''Holds all config variables and functions for loading from and saving to disk''' - keys: List[str] = ["color_theme", "update_ms", "proc_sorting", "proc_reversed", "proc_tree", "check_temp", "draw_clock", "background_update", "custom_cpu_name", "proc_gradient", "proc_per_core", "disks_filter", "update_check", "log_level", "mem_graphs", "show_swap", "swap_disk", "show_disks"] + keys: List[str] = ["color_theme", "update_ms", "proc_sorting", "proc_reversed", "proc_tree", "check_temp", "draw_clock", "background_update", "custom_cpu_name", "proc_gradient", "proc_per_core", "disks_filter", "update_check", "log_level", "mem_graphs", "show_swap", "swap_disk", "show_disks", "show_init"] conf_dict: Dict[str, Union[str, int, bool]] = {} color_theme: str = "Default" - update_ms: int = 2500 + update_ms: int = 1000 proc_sorting: str = "cpu lazy" proc_reversed: bool = False proc_tree: bool = False @@ -262,15 +267,17 @@ class Config: disks_filter: str = "" update_check: bool = True mem_graphs: bool = True - show_swap: bool = False + show_swap: bool = True swap_disk: bool = True show_disks: bool = True + show_init: bool = True log_level: str = "WARNING" warnings: List[str] = [] + info: List[str] = [] sorting_options: List[str] = ["pid", "program", "arguments", "threads", "user", "memory", "cpu lazy", "cpu responsive"] - log_levels: List[str] = ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"] + log_levels: List[str] = ["ERROR", "WARNING", "INFO", "DEBUG"] changed: bool = False recreate: bool = False @@ -283,10 +290,10 @@ class Config: conf: Dict[str, Union[str, int, bool]] = self.load_config() if not "version" in conf.keys(): self.recreate = True - self.warnings.append(f'Config file malformatted or missing, will be recreated on exit!') + self.info.append(f'Config file malformatted or missing, will be recreated on exit!') elif conf["version"] != VERSION: self.recreate = True - self.warnings.append(f'Config file version and bpytop version missmatch, will be recreated on exit!') + self.info.append(f'Config file version and bpytop version missmatch, will be recreated on exit!') for key in self.keys: if key in conf.keys() and conf[key] != "_error_": setattr(self, key, conf[key]) @@ -337,6 +344,9 @@ class Config: if "log_level" in new_config and not new_config["log_level"] in self.log_levels: new_config["log_level"] = "_error_" self.warnings.append(f'Config key "log_level" didn\'t get an acceptable value!') + if isinstance(new_config["update_ms"], int) and new_config["update_ms"] < 100: + new_config["update_ms"] = 100 + self.warnings.append(f'Config key "update_ms" can\'t be lower than 100!') return new_config def save_config(self): @@ -350,9 +360,17 @@ class Config: try: CONFIG: Config = Config(CONFIG_FILE) - errlog.setLevel(getattr(logging, CONFIG.log_level)) + if DEBUG: + errlog.setLevel(DEBUG) + else: + errlog.setLevel(getattr(logging, CONFIG.log_level)) + if CONFIG.log_level == "DEBUG": DEBUG = True errlog.info(f'New instance of bpytop version {VERSION} started with pid {os.getpid()}') errlog.debug(f'Loglevel set to {CONFIG.log_level}') + if CONFIG.info: + for info in CONFIG.info: + errlog.info(info) + CONFIG.info = [] if CONFIG.warnings: for warning in CONFIG.warnings: errlog.warning(warning) @@ -378,30 +396,41 @@ class Term: alt_screen = "\033[?1049h" #* Switch to alternate screen normal_screen = "\033[?1049l" #* Switch to normal screen clear = "\033[2J\033[0;0f" #* Clear screen and set cursor to position 0,0 + mouse_on = "\033[?1001h\033[?1015h\033[?1006h" #* Enable reporting of mouse position on click and release + mouse_off = "\033[?1001l\033[?1015l\033[?1006l" #* Disable mouse reporting + winch = threading.Event() @classmethod def refresh(cls, *args): """Update width, height and set resized flag if terminal has been resized""" - if cls.resized == True: return + if cls.resized: cls.winch.set(); return cls._w, cls._h = os.get_terminal_size() - while (cls._w, cls._h) != (cls.width, cls.height): + while (cls._w, cls._h) != (cls.width, cls.height) or (cls._w < 80 or cls._h < 24): + if Init.running: Init.resized = True cls.resized = True - Collector.collect_interrupt = True + Collector.resize_interrupt = True cls.width, cls.height = cls._w, cls._h Draw.now(Term.clear) - Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 3, "resizing")}{THEME.main_fg}{Fx.b}{Mv.r(12)}Width : {cls._w} Height: {cls._h}') + Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 3, "resizing", line_color=Colors.red, title_color=Colors.white)}', + f'{Mv.r(12)}{Colors.default}{Colors.black_bg}{Fx.b}Width : {cls._w} Height: {cls._h}{Fx.ub}{Term.bg}{Term.fg}') while cls._w < 80 or cls._h < 24: Draw.now(Term.clear) - Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 4, "warning")}{THEME.main_fg}{Fx.b}{Mv.r(12)}Width: {cls._w} Height: {cls._h}\ - {Mv.to(cls._h // 2, cls._w // 2 - 23)}Width and Height needs to be at least 80x24!') - sleep(0.3) + Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 4, "warning", line_color=Colors.red, title_color=Colors.white)}', + f'{Mv.r(12)}{Colors.default}{Colors.black_bg}{Fx.b}Width: {Colors.red if cls._w < 80 else Colors.green}{cls._w} ', + f'{Colors.default}Height: {Colors.red if cls._h < 24 else Colors.green}{cls._h}{Term.bg}{Term.fg}', + f'{Mv.to(cls._h // 2, cls._w // 2 - 23)}{Colors.default}{Colors.black_bg}Width and Height needs to be at least 80 x 24 !{Fx.ub}{Term.bg}{Term.fg}') + cls.winch.wait(0.3) + cls.winch.clear() cls._w, cls._h = os.get_terminal_size() - sleep(0.3) + cls.winch.wait(0.3) + cls.winch.clear() cls._w, cls._h = os.get_terminal_size() - cls.resized = False - Box.calc_sizes() - Box.draw_bg(now=True if not Init.running else False) + Box.calc_sizes() + if Init.running: cls.resized = False; return + Box.draw_bg() + cls.resized = False + Timer.finish() @staticmethod def echo(on: bool): @@ -496,6 +525,34 @@ class Mv: class Key: """Handles the threaded input reader""" list: List[str] = [] + mouse: Dict[str, List[Tuple[int, int]]] = {} + escape: Dict[Union[str, Tuple[str, str]], str] = { + "\n" : "enter", + ("\x7f", "\x08") : "backspace", + ("[A", "OA") : "up", + ("[B", "OB") : "down", + ("[D", "OD") : "left", + ("[C", "OC") : "right", + "[2~" : "insert", + "[3~" : "delete", + "[H" : "home", + "[F" : "end", + "[5~" : "page_up", + "[6~" : "page_down", + "[Z" : "shift_tab", + "OP" : "f1", + "OQ" : "f2", + "OR" : "f3", + "OS" : "f4", + "[15" : "f5", + "[17" : "f6", + "[18" : "f7", + "[19" : "f8", + "[20" : "f9", + "[21" : "f10", + "[23" : "f11", + "[24" : "f12" + } new = threading.Event() idle = threading.Event() idle.set() @@ -551,34 +608,7 @@ class Key: """Get a single key from stdin, convert to readable format and save to keys list. Meant to be run in it's own thread.""" input_key: str = "" clean_key: str = "" - escape: Dict[Union[str, Tuple[str, str]], str] = { - "\n" : "enter", - ("\x7f", "\x08") : "backspace", - ("[A", "OA") : "up", - ("[B", "OB") : "down", - ("[D", "OD") : "left", - ("[C", "OC") : "right", - "[2~" : "insert", - "[3~" : "delete", - "[H" : "home", - "[F" : "end", - "[5~" : "page_up", - "[6~" : "page_down", - "[Z" : "shift_tab", - "OP" : "f1", - "OQ" : "f2", - "OR" : "f3", - "OS" : "f4", - "[15" : "f5", - "[17" : "f6", - "[18" : "f7", - "[19" : "f8", - "[20" : "f9", - "[21" : "f10", - "[23" : "f11", - "[24" : "f12" - } - + mouse_press: Tuple[int, int] try: while not cls.stopping: with Raw(sys.stdin): @@ -586,21 +616,33 @@ class Key: continue cls.idle.clear() #* Report IO block in progress to prevent Draw functions to get IO Block error input_key += sys.stdin.read(1) - if input_key == "\033": #* If first character is a escape sequence read 5 more keys + if input_key == "\033" or input_key == "0": #* If first character is a escape sequence read 5 more keys Draw.idle.wait() #* Wait for Draw function to finish if busy with Nonblocking(sys.stdin): #* Set non blocking to prevent read stall if less than 5 characters - input_key += sys.stdin.read(5) + input_key += sys.stdin.read(20) if input_key == "\033": clean_key = "escape" #* Key is escape if only containing \033 + elif input_key.startswith("\033[<0;") and input_key.endswith("m"): + try: + mouse_press = (int(input_key.split(";")[1]), int(input_key.split(";")[2].rstrip("m"))) + except: + pass + else: + errlog.debug(f'{mouse_press}') + for key_name, positions in cls.mouse.items(): + if mouse_press in positions: + clean_key = key_name + break + #errlog.debug(f'Mouse press at line={input_key.split(";")[2].rstrip("m")} col={input_key.split(";")[1]}') elif input_key == "\\": clean_key = "\\" #* Clean up "\" to not return escaped else: - for code in escape.keys(): #* Go trough dict of escape codes to get the cleaned key name + for code in cls.escape.keys(): #* Go trough dict of escape codes to get the cleaned key name if input_key.lstrip("\033").startswith(code): - clean_key = escape[code] + clean_key = cls.escape[code] break else: #* If not found in escape dict and length of key is 1, assume regular character if len(input_key) == 1: clean_key = input_key - if testing: errlog.debug(f'Input key: {repr(input_key)} Clean key: {clean_key}') #! Remove + #errlog.debug(f'Input key: {repr(input_key)} Clean key: {clean_key}') #! Remove if clean_key: cls.list.append(clean_key) #* Store up to 10 keys in input queue for later processing if len(cls.list) > 10: del cls.list[0] @@ -808,6 +850,13 @@ class Color: if len(args) > 1: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="bg") else: return cls.escape_color(hexa=args[0], depth="bg") +class Colors: + default = Color("#cc") + white = Color("#ff") + red = Color("#bf3636") + green = Color("#68bf36") + black_bg = Color("#00", depth="bg") + class Theme: '''__init__ accepts a dict containing { "color_element" : "color" }''' @@ -1070,7 +1119,8 @@ class Graph: self.out += f'{"" if not self.colors else self.colors[h]}{self.graphs[self.current][h if not self.invert else (self.height - 1) - h]}' if self.colors: self.out += f'{Term.fg}' - def add(self, value: int) -> str: + def __call__(self, value: Union[int, None] = None) -> str: + if not isinstance(value, int): return self.out self.current = not self.current if self.height == 1: if self.graphs[self.current][0].startswith(self.symbol[0.0]): @@ -1084,8 +1134,8 @@ class Graph: self._create([value]) return self.out - def __call__(self, value: int) -> str: - return self.add(value) + def add(self, value: Union[int, None] = None) -> str: + return self.__call__(value) def __str__(self): return self.out @@ -1099,7 +1149,7 @@ class Graphs: cpu: Dict[str, Graph] = {} cores: List[Graph] = [NotImplemented] * THREADS temps: List[Graph] = [NotImplemented] * (THREADS + 1) - net: Graph + net: Dict[str, Graph] = {} detailed_cpu: Graph detailed_mem: Graph pid_cpu: Dict[int, Graph] = {} @@ -1113,31 +1163,40 @@ class Meter: out: str color_gradient: List[str] color_inactive: Color + gradient_name: str width: int + theme: str saved: Dict[int, str] def __init__(self, value: int, width: int, gradient_name: str): + self.gradient_name = gradient_name self.color_gradient = THEME.gradient[gradient_name] self.color_inactive = THEME.inactive_fg self.width = width + self.theme = CONFIG.color_theme + self.saved = {} self.out = self._create(value) - self.saved = { value : self.out } - def __call__(self, value: int): - if value in self.saved.keys(): + def __call__(self, value: Union[int, None]) -> str: + if not isinstance(value, int): return self.out + if value > 100: value = 100 + elif value < 0: value = 100 + if self.theme != CONFIG.color_theme: + self.saved = {} + self.theme = CONFIG.color_theme + if value in self.saved: self.out = self.saved[value] else: self.out = self._create(value) - self.saved[value] = self.out return self.out - def __str__(self): + def __str__(self) -> str: return self.out def __repr__(self): return repr(self.out) - def _create(self, value: int): + def _create(self, value: int) -> str: if value > 100: value = 100 elif value < 0: value = 100 out: str = "" @@ -1149,6 +1208,8 @@ class Meter: break else: out += f'{Term.fg}' + if not value in self.saved: + self.saved[value] = out return out class Meters: @@ -1184,8 +1245,16 @@ class Box: @classmethod def draw_update_ms(cls, now: bool = True): - Draw.buffer("update_ms!" if now and not Menu.active else "update_ms", f'{Mv.to(CpuBox.y - 1, CpuBox.x + CpuBox.width - len(str(CONFIG.update_ms)) - 7)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("+")} ' - f'{THEME.title(str(CONFIG.update_ms))} {THEME.hi_fg("+")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}', save=True if Menu.active else False) + update_string: str = f'{CONFIG.update_ms}ms' + xpos: int = CpuBox.x + CpuBox.width - len(update_string) - 14 + Key.mouse["+"] = []; Key.mouse["-"] = [] + for i in range(3): + Key.mouse["+"].append((xpos + 7 + i, CpuBox.y)) + Key.mouse["-"].append((CpuBox.x + CpuBox.width - 4 + i, CpuBox.y)) + errlog.debug(f'{Key.mouse}') + Draw.buffer("update_ms!" if now and not Menu.active else "update_ms", + f'{Mv.to(CpuBox.y, xpos)}{THEME.cpu_box(Symbol.h_line * 7, Symbol.title_left)}{Fx.b}{THEME.hi_fg("+")} ', + f'{THEME.title(update_string)} {THEME.hi_fg("-")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}', save=True if Menu.active else False) if now and not Menu.active: Draw.clear("update_ms") @classmethod @@ -1264,13 +1333,14 @@ class CpuBox(Box, SubBox): cx = cy = cc = 0 ccw = (bw + 1) // cls.box_columns - freq: str = f'{cpu.cpu_freq} Mhz' if cpu.cpu_freq < 1000 else f'{float(cpu.cpu_freq / 1000):.1f} GHz' - out += (f'{Mv.to(by - 1, bx + bw - 9)}{THEME.div_line(Symbol.title_left)}{Fx.b}{THEME.title(freq)}{Fx.ub}{THEME.div_line(Symbol.title_right)}' - f'{Mv.to(y, x)}{Graphs.cpu["up"](cpu.cpu_usage[0][-1])}{Mv.to(y + hh, x)}{Graphs.cpu["down"](cpu.cpu_usage[0][-1])}' + if cpu.cpu_freq: + freq: str = f'{cpu.cpu_freq} Mhz' if cpu.cpu_freq < 1000 else f'{float(cpu.cpu_freq / 1000):.1f} GHz' + out += f'{Mv.to(by - 1, bx + bw - 9)}{THEME.div_line(Symbol.title_left)}{Fx.b}{THEME.title(freq)}{Fx.ub}{THEME.div_line(Symbol.title_right)}' + out += (f'{Mv.to(y, x)}{Graphs.cpu["up"](None if cls.resized else cpu.cpu_usage[0][-1])}{Mv.to(y + hh, x)}{Graphs.cpu["down"](None if cls.resized else cpu.cpu_usage[0][-1])}' f'{THEME.main_fg}{Mv.to(by + cy, bx + cx)}{"CPU "}{Meters.cpu(cpu.cpu_usage[0][-1])}' f'{THEME.gradient["cpu"][cpu.cpu_usage[0][-1]]}{cpu.cpu_usage[0][-1]:>4}{THEME.main_fg}%') if CONFIG.check_temp: - out += (f'{THEME.inactive_fg} ⡀⡀⡀⡀⡀{Mv.l(5)}{THEME.gradient["temp"][cpu.cpu_temp[0][-1]]}{Graphs.temps[0](cpu.cpu_temp[0][-1])}' + out += (f'{THEME.inactive_fg} ⡀⡀⡀⡀⡀{Mv.l(5)}{THEME.gradient["temp"][cpu.cpu_temp[0][-1]]}{Graphs.temps[0](None if cls.resized else cpu.cpu_temp[0][-1])}' f'{cpu.cpu_temp[0][-1]:>4}{THEME.main_fg}°C') cy += 1 @@ -1278,13 +1348,13 @@ class CpuBox(Box, SubBox): for n in range(1, THREADS + 1): out += f'{THEME.main_fg}{Mv.to(by + cy, bx + cx)}{"Core" + str(n):<{7 if cls.column_size > 0 else 5}}' if cls.column_size > 0: - out += f'{THEME.inactive_fg}{"⡀" * (5 * cls.column_size)}{Mv.l(5 * cls.column_size)}{THEME.gradient["cpu"][cpu.cpu_usage[n][-1]]}{Graphs.cores[n-1](cpu.cpu_usage[n][-1])}' + out += f'{THEME.inactive_fg}{"⡀" * (5 * cls.column_size)}{Mv.l(5 * cls.column_size)}{THEME.gradient["cpu"][cpu.cpu_usage[n][-1]]}{Graphs.cores[n-1](None if cls.resized else cpu.cpu_usage[n][-1])}' else: out += f'{THEME.gradient["cpu"][cpu.cpu_usage[n][-1]]}' out += f'{cpu.cpu_usage[n][-1]:>4}{THEME.main_fg}%' if CONFIG.check_temp: if cls.column_size > 1: - out += f'{THEME.inactive_fg} ⡀⡀⡀⡀⡀{Mv.l(5)}{THEME.gradient["temp"][cpu.cpu_temp[n][-1]]}{Graphs.temps[n](cpu.cpu_temp[n][-1])}' + out += f'{THEME.inactive_fg} ⡀⡀⡀⡀⡀{Mv.l(5)}{THEME.gradient["temp"][cpu.cpu_temp[n][-1]]}{Graphs.temps[n](None if cls.resized else cpu.cpu_temp[n][-1])}' else: out += f'{THEME.gradient["temp"][cpu.cpu_temp[n][-1]]}' out += f'{cpu.cpu_temp[n][-1]:>4}{THEME.main_fg}°C' @@ -1309,9 +1379,6 @@ class CpuBox(Box, SubBox): Draw.buffer(cls.buffer, f'{out}{Term.fg}', save=Menu.active) cls.resized = False - - - class MemBox(Box): name = "mem" height_p = 40 @@ -1347,7 +1414,8 @@ class MemBox(Box): else: cls.mem_width = cls.width - 1 - if cls.height - (3 if cls.swap_on and not CONFIG.swap_disk else 2) > 2 * (6 if cls.swap_on and not CONFIG.swap_disk else 4): cls.mem_size = 3 + item_height: int = 6 if cls.swap_on and not CONFIG.swap_disk else 4 + if cls.height - (3 if cls.swap_on and not CONFIG.swap_disk else 2) > 2 * item_height: cls.mem_size = 3 elif cls.mem_width > 25: cls.mem_size = 2 else: cls.mem_size = 1 @@ -1356,7 +1424,7 @@ class MemBox(Box): if cls.mem_meter < 1: cls.mem_meter = 0 if CONFIG.mem_graphs: - cls.graph_height = round(((cls.height - (2 if cls.swap_on and not CONFIG.swap_disk else 1)) - (2 if cls.mem_size == 3 else 1) * (6 if cls.swap_on and not CONFIG.swap_disk else 4)) / (6 if cls.swap_on and not CONFIG.swap_disk else 4)) + cls.graph_height = round(((cls.height - (2 if cls.swap_on and not CONFIG.swap_disk else 1)) - (2 if cls.mem_size == 3 else 1) * item_height) / item_height) if cls.graph_height == 0: cls.graph_height = 1 if cls.graph_height > 1: cls.mem_meter += 6 else: @@ -1387,7 +1455,7 @@ class MemBox(Box): gmv: str = "" gli: str = "" x, y, h = cls.x + 1, cls.y + 1, cls.height - 2 - if cls.resized: + if cls.resized or cls.redraw: Meters.mem = {} Meters.swap = {} Meters.disks_used = {} @@ -1410,15 +1478,15 @@ class MemBox(Box): else: Meters.swap[name] = Meter(mem.swap_percent[name], cls.mem_meter, name) if cls.disk_meter > 0: - for name in mem.disks.keys(): + for n, name in enumerate(mem.disks.keys()): + if n * 2 > h: break Meters.disks_used[name] = Meter(mem.disks[name]["used_percent"], cls.disk_meter, "used") if len(mem.disks) * 3 <= h + 1: Meters.disks_free[name] = Meter(mem.disks[name]["free_percent"], cls.disk_meter, "free") + #* Mem out += f'{Mv.to(y, x+1)}{THEME.title}{Fx.b}Total:{mem.string["total"]:>{cls.mem_width - 9}}{Fx.ub}{THEME.main_fg}' cx = 1; cy = 1 - # if cls.graph_height == 1: - # gbg = f'{THEME.inactive_fg}{"⡀" * cls.mem_meter}{Mv.l(cls.mem_meter)}{THEME.main_fg}' if cls.graph_height > 0: gli = f'{Mv.l(2)}{THEME.mem_box(Symbol.title_right)}{THEME.div_line}{Symbol.h_line * (cls.mem_width - 1)}{"" if CONFIG.show_disks else THEME.mem_box}{Symbol.title_left}{Mv.l(cls.mem_width - 1)}{THEME.title}' if cls.graph_height >= 2: @@ -1426,47 +1494,55 @@ class MemBox(Box): gmv = f'{Mv.l(cls.mem_width - 2)}{Mv.u(cls.graph_height - 1)}' - + big_mem: bool = True if cls.mem_width > 21 else False for name in cls.mem_names: if cls.mem_size > 2: - out += (f'{Mv.to(y+cy, x+cx)}{gli}{name.capitalize()[:None if cls.mem_width > 21 else 5]+":":<{1 if cls.mem_width > 21 else 6.6}}{Mv.to(y+cy, x+cx + cls.mem_width - 3 - (len(mem.string[name])))}{mem.string[name]}' - f'{Mv.to(y+cy+1, x+cx)}{gbg}{Meters.mem[name](mem.percent[name])}{gmv}{str(mem.percent[name])+"%":>4}'); cy += 2 if not cls.graph_height else cls.graph_height + 1 + out += (f'{Mv.to(y+cy, x+cx)}{gli}{name.capitalize()[:None if big_mem else 5]+":":<{1 if big_mem else 6.6}}{Mv.to(y+cy, x+cx + cls.mem_width - 3 - (len(mem.string[name])))}{Fx.trans(mem.string[name])}' + f'{Mv.to(y+cy+1, x+cx)}{gbg}{Meters.mem[name](None if cls.resized else mem.percent[name])}{gmv}{str(mem.percent[name])+"%":>4}') + cy += 2 if not cls.graph_height else cls.graph_height + 1 else: - out += f'{Mv.to(y+cy, x+cx)}{name.capitalize():{5.5 if cls.mem_size > 1 else 1.1}} {gbg}{Meters.mem[name](mem.percent[name])}{mem.string[name][:None if cls.mem_size > 1 else -2]:>{9 if cls.mem_size > 1 else 7}}'; cy += 1 if not cls.graph_height else cls.graph_height + out += f'{Mv.to(y+cy, x+cx)}{name.capitalize():{5.5 if cls.mem_size > 1 else 1.1}} {gbg}{Meters.mem[name](None if cls.resized else mem.percent[name])}{mem.string[name][:None if cls.mem_size > 1 else -2]:>{9 if cls.mem_size > 1 else 7}}' + cy += 1 if not cls.graph_height else cls.graph_height #* Swap if cls.swap_on and CONFIG.show_swap and not CONFIG.swap_disk: if h - cy > 5: if cls.graph_height > 0: out += f'{Mv.to(y+cy, x+cx)}{gli}' cy += 1 - out += f'{Mv.to(y+cy, x+cx)}{THEME.title}{Fx.b}Swap:{mem.swap_string["total"]:>{cls.mem_width - 8}}{Fx.ub}{THEME.main_fg}'; cy += 1 + out += f'{Mv.to(y+cy, x+cx)}{THEME.title}{Fx.b}Swap:{mem.swap_string["total"]:>{cls.mem_width - 8}}{Fx.ub}{THEME.main_fg}' + cy += 1 for name in cls.swap_names: if cls.mem_size > 2: - out += (f'{Mv.to(y+cy, x+cx)}{gli}{name.capitalize()[:None if cls.mem_width > 21 else 5]+":":<{1 if cls.mem_width > 21 else 6.6}}{Mv.to(y+cy, x+cx + cls.mem_width - 3 - (len(mem.swap_string[name])))}{mem.swap_string[name]}' - f'{Mv.to(y+cy+1, x+cx)}{gbg}{Meters.swap[name](mem.swap_percent[name])}{gmv}{str(mem.swap_percent[name])+"%":>4}'); cy += 2 if not cls.graph_height else cls.graph_height + 1 + out += (f'{Mv.to(y+cy, x+cx)}{gli}{name.capitalize()[:None if big_mem else 5]+":":<{1 if big_mem else 6.6}}{Mv.to(y+cy, x+cx + cls.mem_width - 3 - (len(mem.swap_string[name])))}{Fx.trans(mem.swap_string[name])}' + f'{Mv.to(y+cy+1, x+cx)}{gbg}{Meters.swap[name](None if cls.resized else mem.swap_percent[name])}{gmv}{str(mem.swap_percent[name])+"%":>4}') + cy += 2 if not cls.graph_height else cls.graph_height + 1 else: - out += f'{Mv.to(y+cy, x+cx)}{name.capitalize():{5.5 if cls.mem_size > 1 else 1.1}} {gbg}{Meters.swap[name](mem.swap_percent[name])}{mem.swap_string[name][:None if cls.mem_size > 1 else -2]:>{9 if cls.mem_size > 1 else 7}}'; cy += 1 if not cls.graph_height else cls.graph_height + out += f'{Mv.to(y+cy, x+cx)}{name.capitalize():{5.5 if cls.mem_size > 1 else 1.1}} {gbg}{Meters.swap[name](None if cls.resized else mem.swap_percent[name])}{mem.swap_string[name][:None if cls.mem_size > 1 else -2]:>{9 if cls.mem_size > 1 else 7}}'; cy += 1 if not cls.graph_height else cls.graph_height if cls.graph_height > 0 and not cy == h: out += f'{Mv.to(y+cy, x+cx)}{gli}' #* Disks if CONFIG.show_disks: cx = x + cls.mem_width - 1; cy = 0 + big_disk: bool = True if cls.disks_width >= 25 else False + gli = f'{Mv.l(2)}{THEME.div_line}{Symbol.title_right}{Symbol.h_line * cls.disks_width}{THEME.mem_box}{Symbol.title_left}{Mv.l(cls.disks_width - 1)}' for name, item in mem.disks.items(): if cy > h - 2: break - out += (f'{Mv.to(y+cy, x+cx)}{THEME.title}{Fx.b}{item["name"]:{cls.disks_width - 2}.12}{Mv.to(y+cy, x + cx + cls.disks_width - 11)}{item["total"][:None if cls.disks_width >= 25 else -2]:>9}' - f'{Mv.to(y+cy, x + cx + (cls.disks_width // 2) - (len(item["io"]) // 2) - 2)}{Fx.ub}{THEME.main_fg}{item["io"]}{Fx.ub}{THEME.main_fg}{Mv.to(y+cy+1, x+cx)}') - out += f'Used:{str(item["used_percent"]) + "%":>4} ' if cls.disks_width >= 25 else "U " - out += f'{Meters.disks_used[name]}{item["used"][:None if cls.disks_width >= 25 else -2]:>{9 if cls.disks_width >= 25 else 7}}'; cy += 2 + out += Fx.trans(f'{Mv.to(y+cy, x+cx)}{gli}{THEME.title}{Fx.b}{item["name"]:{cls.disks_width - 2}.12}{Mv.to(y+cy, x + cx + cls.disks_width - 11)}{item["total"][:None if big_disk else -2]:>9}') + out += f'{Mv.to(y+cy, x + cx + (cls.disks_width // 2) - (len(item["io"]) // 2) - 2)}{Fx.ub}{THEME.main_fg}{item["io"]}{Fx.ub}{THEME.main_fg}{Mv.to(y+cy+1, x+cx)}' + out += f'Used:{str(item["used_percent"]) + "%":>4} ' if big_disk else "U " + out += f'{Meters.disks_used[name]}{item["used"][:None if big_disk else -2]:>{9 if big_disk else 7}}' + cy += 2 if len(mem.disks) * 3 <= h + 1: if cy > h - 1: break out += Mv.to(y+cy, x+cx) - out += f'Free:{str(item["free_percent"]) + "%":>4} ' if cls.disks_width >= 25 else f'{"F "}' - out += f'{Meters.disks_free[name]}{item["free"][:None if cls.disks_width >= 25 else -2]:>{9 if cls.disks_width >= 25 else 7}}'; cy += 1 + out += f'Free:{str(item["free_percent"]) + "%":>4} ' if big_disk else f'{"F "}' + out += f'{Meters.disks_free[name]}{item["free"][:None if big_disk else -2]:>{9 if big_disk else 7}}' + cy += 1 if len(mem.disks) * 4 <= h + 1: cy += 1 Draw.buffer(cls.buffer, f'{out}{Term.fg}', save=Menu.active) - cls.resized = False + cls.resized = cls.redraw = False class NetBox(Box, SubBox): name = "net" @@ -1474,8 +1550,12 @@ class NetBox(Box, SubBox): width_p = 45 x = 1 y = 1 + resized: bool = True redraw: bool = True + graph_height: Dict[str, int] = {} + symbols: Dict[str, str] = {"download" : "▼", "upload" : "▲"} buffer: str = "net" + Box.buffers.append(buffer) @classmethod @@ -1483,17 +1563,64 @@ class NetBox(Box, SubBox): cls.width = round(Term.width * cls.width_p / 100) cls.height = Term.height - Box._b_cpu_h - Box._b_mem_h cls.y = Term.height - cls.height + 1 - cls.box_width = 24 - cls.box_height = 9 if cls.height > 12 else cls.height - 2 - cls.box_x = cls.width - cls.box_width - 2 + cls.box_width = 27 + cls.box_height = 9 if cls.height > 10 else cls.height - 2 + cls.box_x = cls.width - cls.box_width - 1 cls.box_y = cls.y + ((cls.height - 2) // 2) - cls.box_height // 2 + 1 - cls.redraw_all = True + cls.graph_height["download"] = round((cls.height - 2) / 2) + cls.graph_height["upload"] = cls.height - 2 - cls.graph_height["download"] + cls.redraw = True @classmethod def _draw_bg(cls) -> str: return f'{create_box(box=cls, line_color=THEME.net_box)}\ {create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title="Download", title2="Upload")}' + @classmethod + def _draw_fg(cls): + net = NetCollector + if not net.nic: return + out: str = "" + out_misc: str = "" + x, y, w, h = cls.x + 1, cls.y + 1, cls.width - 2, cls.height - 2 + bx, by, bw, bh = cls.box_x + 1, cls.box_y + 1, cls.box_width - 2, cls.box_height - 2 + + if cls.resized or cls.redraw: + cls.redraw = True + Key.mouse["b"] = []; Key.mouse["n"] = [] + for i in range(4): + Key.mouse["b"].append((x+w - len(net.nic) - 9 + i, y-1)) + Key.mouse["n"].append((x+w - 5 + i, y-1)) + + out_misc += (f'{Mv.to(y-1, x+w - 19)}{THEME.net_box}{Symbol.h_line * (10 - len(net.nic))}{Symbol.title_left}{Fx.b}{THEME.hi_fg("")}' + f'{Fx.ub}{THEME.net_box(Symbol.title_right)}{Term.fg}') + Draw.buffer("net_misc", out_misc, save=True) + + cy = 0 + for direction in ["download", "upload"]: + strings = net.strings[net.nic][direction] + stats = net.stats[net.nic][direction] + if stats["redraw"] or cls.redraw: + if cls.redraw: stats["redraw"] = True + Graphs.net[direction] = Graph(w - bw - 3, cls.graph_height[direction], THEME.gradient[direction], stats["speed"], max_value=stats["graph_top"], invert=False if direction == "download" else True) + out += f'{Mv.to(y if direction == "download" else y + cls.graph_height["download"], x)}{Graphs.net[direction](None if stats["redraw"] else stats["speed"][-1])}' + + out += f'{Mv.to(by+cy, bx)}{THEME.main_fg}{cls.symbols[direction]} {strings["byte_ps"]:<10.10}{Mv.to(by+cy, bx+bw - 12)}{cls.symbols[direction] + " " + strings["bit_ps"]:>12.12}' + cy += 1 if bh != 3 else 2 + if bh >= 6: + out += f'{Mv.to(by+cy, bx)}{cls.symbols[direction]} {"Top:"}{Mv.to(by+cy, bx+bw - 10)}{strings["top"]:>10.10}' + cy += 1 + if bh >= 4: + out += f'{Mv.to(by+cy, bx)}{cls.symbols[direction]} {"Total:"}{Mv.to(by+cy, bx+bw - 10)}{strings["total"]:>10.10}' + if bh > 2 and bh % 2: cy += 2 + else: cy += 1 + stats["redraw"] = False + + out += f'{Mv.to(y, x)}{THEME.inactive_fg(net.strings[net.nic]["download"]["graph_top"])}{Mv.to(y+h-1, x)}{THEME.inactive_fg(net.strings[net.nic]["upload"]["graph_top"])}' + + Draw.buffer(cls.buffer, f'{out}{out_misc}{Term.fg}', save=Menu.active) + cls.redraw = cls.resized = False + class ProcBox(Box): name = "proc" height_p = 68 @@ -1540,6 +1667,7 @@ class Collector: collect_done = threading.Event() collect_queue: List = [] collect_interrupt: bool = False + resize_interrupt: bool = False @classmethod def start(cls): @@ -1565,26 +1693,30 @@ class Collector: def _runner(cls): '''This is meant to run in it's own thread, collecting and drawing when collect_run is set''' draw_buffers: List[str] = [] + debugged: bool = False try: while not cls.stopping: cls.collect_run.wait(0.1) if not cls.collect_run.is_set(): continue draw_buffers = [] - #if cls.collect_interrupt and bkp_queue: - # for q in bkp_queue: - # if q not in cls.collect_queue: cls.collect_queue.append(q) - #bkp_queue = [] cls.collect_interrupt = False cls.collect_run.clear() cls.collect_idle.clear() + if DEBUG and not debugged: TimeIt.start("Collect and draw") while cls.collect_queue: collector = cls.collect_queue.pop() collector._collect() collector._draw() - #bkp_queue.append(collector) draw_buffers.append(collector.buffer) if cls.collect_interrupt: break + if cls.resize_interrupt: + while Term.resized: + sleep(0.01) + cls.resize_interrupt = False + cls.collect_run.set() + continue + if DEBUG and not debugged: TimeIt.stop("Collect and draw"); debugged = True if cls.draw_now and not Menu.active and not cls.collect_interrupt: Draw.out(*draw_buffers) cls.collect_idle.set() @@ -1652,6 +1784,7 @@ class CpuCollector(Collector): if SYSTEM == "Linux" and which("vcgencmd") and subprocess.check_output("vcgencmd measure_temp", text=True).rstrip().endswith("'C"): return "vcgencmd" except: pass + CONFIG.check_temp = False return "" sensor_method: str = _get_sensors.__func__() # type: ignore @@ -1666,7 +1799,8 @@ class CpuCollector(Collector): if len(cls.cpu_usage[n]) > Term.width * 2: del cls.cpu_usage[n][0] - cls.cpu_freq = round(psutil.cpu_freq().current) + if hasattr(psutil.cpu_freq(), "current"): + cls.cpu_freq = round(psutil.cpu_freq().current) cls.load_avg = [round(lavg, 2) for lavg in os.getloadavg()] cls.uptime = str(timedelta(seconds=round(time()-psutil.boot_time(),0)))[:-3] @@ -1752,6 +1886,9 @@ class MemCollector(Collector): disks: Dict[str, Dict] disk_hist: Dict[str, Tuple] = {} + timestamp: float = time() + + old_disks: List[str] = [] excludes: List[str] = ["squashfs"] if SYSTEM == "BSD": excludes += ["devfs", "tmpfs", "procfs", "linprocfs", "gvfs", "fusefs"] @@ -1784,8 +1921,9 @@ class MemCollector(Collector): cls.swap_values["total"], cls.swap_values["free"] = swap.total, swap.free cls.swap_values["used"] = cls.swap_values["total"] - cls.swap_values["free"] - MemBox.swap_on = bool(swap.total) if swap.total: + if not MemBox.swap_on: + MemBox.redraw = True MemBox.swap_on = True for key, value in cls.swap_values.items(): cls.swap_string[key] = floating_humanizer(value) @@ -1796,8 +1934,12 @@ class MemCollector(Collector): cls.swap_vlist[key].append(cls.swap_percent[key]) if len(cls.swap_vlist[key]) > MemBox.width: del cls.swap_vlist[key][0] else: + if MemBox.swap_on: + MemBox.redraw = True MemBox.swap_on = False else: + if MemBox.swap_on: + MemBox.redraw = True MemBox.swap_on = False @@ -1863,8 +2005,8 @@ class MemCollector(Collector): disk_io = io_counters else: raise Exception - disk_read = disk_io.read_bytes - cls.disk_hist[disk.device][0] - disk_write = disk_io.write_bytes - cls.disk_hist[disk.device][1] + disk_read = round((disk_io.read_bytes - cls.disk_hist[disk.device][0]) / (time() - cls.timestamp)) + disk_write = round((disk_io.write_bytes - cls.disk_hist[disk.device][1]) / (time() - cls.timestamp)) except: pass disk_read = disk_write = 0 @@ -1881,7 +2023,7 @@ class MemCollector(Collector): cls.disks[disk.device]["io"] = io_string - if CONFIG.swap_disk: + if CONFIG.swap_disk and MemBox.swap_on: cls.disks["__swap"] = {} cls.disks["__swap"]["name"] = "swap" cls.disks["__swap"]["used_percent"] = cls.swap_percent["used"] @@ -1898,32 +2040,134 @@ class MemCollector(Collector): except: pass + if disk_list != cls.old_disks: + MemBox.redraw = True + cls.old_disks = disk_list.copy() + + cls.timestamp = time() @classmethod def _draw(cls): MemBox._draw_fg() +class NetCollector(Collector): + buffer: str = NetBox.buffer + nics: List[str] = [] + nic_i: int = 0 + nic: str = "" + new_nic: str = "" + graph_raise: Dict[str, int] = {"download" : 5, "upload" : 5} + graph_lower: Dict[str, int] = {"download" : 5, "upload" : 5} + min_top: int = 10<<10 + #* Stats structure = stats[netword device][download, upload][total, last, top, graph_top, offset, speed, redraw, graph_raise, graph_low] = int, List[int], bool + stats: Dict[str, Dict[str, Dict[str, Any]]] = {} + #* Strings structure strings[network device][download, upload][total, byte_ps, bit_ps, top, graph_top] = str + strings: Dict[str, Dict[str, Dict[str, str]]] = {} + switched: bool = False + timestamp: float = time() + + @classmethod + def _get_nics(cls): + '''Get a list of all network devices sorted by highest throughput''' + cls.nic_i = 0 + cls.nic = "" + io_all = psutil.net_io_counters(pernic=True) + if not io_all: return + up_stat = psutil.net_if_stats() + for nic in sorted(psutil.net_if_addrs(), key=lambda nic: (io_all[nic].bytes_recv + io_all[nic].bytes_sent), reverse=True): + if nic not in up_stat or not up_stat[nic].isup: + continue + cls.nics.append(nic) + if not cls.nics: cls.nics = [""] + cls.nic = cls.nics[cls.nic_i] + + @classmethod + def switch(cls, key: str): + if len(cls.nics) < 2: return + cls.nic_i += +1 if key == "n" else -1 + if cls.nic_i >= len(cls.nics): cls.nic_i = 0 + elif cls.nic_i < 0: cls.nic_i = len(cls.nics) - 1 + cls.new_nic = cls.nics[cls.nic_i] + cls.switched = True + NetBox.redraw = True + + @classmethod + def _collect(cls): + speed: int + stat: Dict + up_stat = psutil.net_if_stats() + + if cls.switched: + cls.nic = cls.new_nic + cls.switched = False + + if not cls.nic or cls.nic not in up_stat or not up_stat[cls.nic].isup: + cls._get_nics() + if not cls.nic: return + try: + io_all = psutil.net_io_counters(pernic=True)[cls.nic] + except KeyError: + pass + return + if not cls.nic in cls.stats: + cls.stats[cls.nic] = {} + cls.strings[cls.nic] = { "download" : {}, "upload" : {}} + for direction, value in ["download", io_all.bytes_recv], ["upload", io_all.bytes_sent]: + cls.stats[cls.nic][direction] = { "total" : value, "last" : value, "top" : 0, "graph_top" : cls.min_top, "offset" : 0, "speed" : [], "redraw" : True, "graph_raise" : 5, "graph_lower" : 5 } + for v in ["total", "byte_ps", "bit_ps", "top", "graph_top"]: + cls.strings[cls.nic][direction][v] = "" + + cls.stats[cls.nic]["download"]["total"] = io_all.bytes_recv + cls.stats[cls.nic]["upload"]["total"] = io_all.bytes_sent + + for direction in ["download", "upload"]: + stat = cls.stats[cls.nic][direction] + strings = cls.strings[cls.nic][direction] + #* Calculate current speed + stat["speed"].append(round((stat["total"] - stat["last"]) / (time() - cls.timestamp))) + stat["last"] = stat["total"] + speed = stat["speed"][-1] + + if len(stat["speed"]) > NetBox.width * 2: + del stat["speed"][0] + + strings["total"] = floating_humanizer(stat["total"]) + strings["byte_ps"] = floating_humanizer(stat["speed"][-1], per_second=True) + strings["bit_ps"] = floating_humanizer(stat["speed"][-1], bit=True, per_second=True) + + if speed > stat["top"] or not stat["top"]: + stat["top"] = speed + strings["top"] = floating_humanizer(stat["top"], bit=True, per_second=True) + + if speed > stat["graph_top"]: + stat["graph_raise"] += 1 + if stat["graph_lower"] > 0: stat["graph_lower"] -= 1 + elif stat["graph_top"] > cls.min_top and speed < stat["graph_top"] // 10: + stat["graph_lower"] += 1 + if stat["graph_raise"] > 0: stat["graph_raise"] -= 1 + + if stat["graph_raise"] >= 5 or stat["graph_lower"] >= 5: + if stat["graph_raise"] >= 5: + stat["graph_top"] = round(max(stat["speed"][-10:]) / 0.8) + elif stat["graph_lower"] >= 5: + stat["graph_top"] = max(stat["speed"][-10:]) * 3 + if stat["graph_top"] < cls.min_top: stat["graph_top"] = cls.min_top + stat["graph_raise"] = 0 + stat["graph_lower"] = 0 + stat["redraw"] = True + strings["graph_top"] = floating_humanizer(stat["graph_top"], short=True) + + cls.timestamp = time() + + + + @classmethod + def _draw(cls): + NetBox._draw_fg() + + #class ProcCollector(Collector): #! add interrupt on _collect and _draw -@timerd -def testing_collectors(): - - # CONFIG.check_temp = True - # Box.calc_sizes() - Box.draw_bg() - - #for _ in range(1000): - while True: - Collector.collect() - Collector.collect_done.wait() - Draw.now(Mv.to(1, 1)) - sleep(1) - - Draw.now(Mv.to(Term.height - 5, 1)) - #Draw.now(f'Cpu usage: {CpuCollector.cpu_usage}\nCpu freq: {CpuCollector.cpu_freq}\nLoad avg: {CpuCollector.load_avg}\n\ - # Temps: {CpuCollector.cpu_temp}\n') - - class Menu: '''Holds the main menu and all submenus''' active: bool = False @@ -1931,6 +2175,27 @@ class Menu: #Draw.buffer("menubg", Draw.last_buffer, z=1000, uncolor=True) #Draw.clear("menubg", last=True) +class Timer: + timestamp: float + + @classmethod + def stamp(cls): + cls.timestamp = time() + + @classmethod + def not_zero(cls) -> bool: + if cls.timestamp + (CONFIG.update_ms / 1000) > time(): + return True + else: + return False + + @classmethod + def left(cls) -> float: + return cls.timestamp + (CONFIG.update_ms / 1000) - time() + + @classmethod + def finish(cls): + cls.timestamp = time() - (CONFIG.update_ms / 1000) @@ -1972,10 +2237,11 @@ def get_cpu_name() -> str: return name -def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", title2: str = "", line_color: Color = None, fill: bool = True, box = None) -> str: +def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", title2: str = "", line_color: Color = None, title_color: Color = None, fill: bool = True, box = None) -> str: '''Create a box from a box object or by given arguments''' out: str = f'{Term.fg}{Term.bg}' if not line_color: line_color = THEME.div_line + if not title_color: title_color = THEME.title #* Get values from box class if given if box: @@ -2011,9 +2277,9 @@ def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: s #* Draw titles if enabled if title: - out += f'{Mv.to(y, x + 2)}{Symbol.title_left}{THEME.title}{Fx.b}{title}{Fx.ub}{line_color}{Symbol.title_right}' + out += f'{Mv.to(y, x + 2)}{Symbol.title_left}{title_color}{Fx.b}{title}{Fx.ub}{line_color}{Symbol.title_right}' if title2: - out += f'{Mv.to(y + height - 1, x + 2)}{Symbol.title_left}{THEME.title}{Fx.b}{title2}{Fx.ub}{line_color}{Symbol.title_right}' + out += f'{Mv.to(y + height - 1, x + 2)}{Symbol.title_left}{title_color}{Fx.b}{title2}{Fx.ub}{line_color}{Symbol.title_right}' return f'{out}{Term.fg}{Mv.to(y + 1, x + 1)}' @@ -2021,17 +2287,22 @@ def now_sleeping(signum, frame): """Reset terminal settings and stop background input read before putting to sleep""" Key.stop() Collector.stop() - Draw.now(Term.clear, Term.normal_screen, Term.show_cursor) + Draw.now(Term.clear, Term.normal_screen, Term.show_cursor, Term.mouse_off) Term.echo(True) os.kill(os.getpid(), signal.SIGSTOP) def now_awake(signum, frame): """Set terminal settings and restart background input read""" - Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor) + Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor, Term.mouse_on) Term.echo(False) Key.start() + Term.refresh() + Box.calc_sizes() + Box.draw_bg() Collector.start() + #Draw.out() + def quit_sigint(signum, frame): """SIGINT redirection to clean_quit()""" clean_quit() @@ -2047,8 +2318,7 @@ def clean_quit(errcode: int = 0, errmsg: str = "", thread: bool = False): Key.stop() Collector.stop() if not errcode: CONFIG.save_config() - if not testing: Draw.now(Term.clear, Term.normal_screen, Term.show_cursor) #! Enable - else: Draw.now(Term.show_cursor) #! Remove + Draw.now(Term.clear, Term.normal_screen, Term.show_cursor, Term.mouse_off) Term.echo(True) if errcode == 0: errlog.info(f'Exiting. Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n') @@ -2086,19 +2356,42 @@ def floating_humanizer(value: Union[float, int], bit: bool = False, per_second: out = f'{value}'[:-2] + "." + f'{value}'[-decimals:] elif len(f'{value}') >= 2: out = f'{value}'[:-2] + else: + out = f'{value}' if short: out = out.split(".")[0] out += f'{"" if short else " "}{unit[selector][0] if short else unit[selector]}' - if per_second: out += "/s" if bit else "ps" + if per_second: out += "ps" if bit else "/s" return out +def process_keys(): + while Key.has_key(): + key = Key.get() + if key == "q": + clean_quit() + if key == "+" and CONFIG.update_ms + 100 <= 86399900: + CONFIG.update_ms += 100 + Box.draw_update_ms() + if key == "-" and CONFIG.update_ms - 100 >= 100: + CONFIG.update_ms -= 100 + Box.draw_update_ms() + if key in ["b", "n"]: + NetCollector.switch(key) + Collector.collect(NetCollector) + #? Main function ---------------------------------------------------------------------------------> def main(): - clean_quit() + Timer.stamp() + + while Timer.not_zero(): + if Key.input_wait(Timer.left()): + process_keys() + + Collector.collect() #? Pre main --------------------------------------------------------------------------------------> @@ -2106,179 +2399,11 @@ def main(): CPU_NAME: str = get_cpu_name() -testing = True #! Remove - -#! For testing -------------------------------------------------------------------------------> - - - -def waitone(t: float = 0.0): - if t > 0.0: Key.new.wait(t) - else: Key.new.wait() - Key.new.clear() - Draw.clear() - Draw.now(Term.clear) - -@timerd -def testing_gradients(): - # for theme in THEME.themes.keys(): - # THEME(theme) - # timer = time() - - for key in THEME.gradient.keys(): - Draw.now(f'{Term.fg}{key}:\n{"█".join(THEME.gradient[key])}\n') - Draw.now(f'{Term.fg}') - # Draw.now(f'Theme creation of {CONFIG.color_theme} took {time() - timer:.5f} seconds\n\n') - Draw.now("\n") - # THEME("Default") - -@timerd -def testing_humanizer(): - for i in range(1, 101, 3): - for n in range(1, 6): - Draw.now(floating_humanizer(i * (1050 * n) * (n << 10 ), bit=False, per_second=False, short=False), " ") - Draw.now("\n") - for i in range(1, 30): - for n in range(1, 8): - Draw.now(floating_humanizer((995 + i) << (n * 10), bit=False), " ") - Draw.now("\n") - - -@timerd -def testing_colors(): - for item, _ in DEFAULT_THEME.items(): - Draw.buffer("+testing", Fx.b, getattr(THEME, item)(f'{item:<20}'), Fx.ub, f'{"hex=" + getattr(THEME, item).hexa:<20} dec={getattr(THEME, item).dec}\n') - - Draw.out() - -@timerd -def testing_boxes(): - #Box.calc_sizes() - Box.draw_bg() - Draw.now(Mv.to(Term.height - 3, 1)) - #Draw.now(create_box(20, 20, 15, 10, "Hej")) - #waitone() - #Draw.now(Term.normal_screen, Term.show_cursor) - -@timerd -def testing_banner(): - Draw.buffer("banner", Banner.draw(18, center=True)) - Draw.out() - - Draw.now(Mv.to(35, 1)) - -@timerd -def testing_meter(): - Draw.clear("meters") - for _ in range(10): - Draw.buffer("+meters", "1234567890") - Draw.buffer("+meters", "\n") - - stamp = time() - test_meter = Meter(0, Term.width, "cpu") - - for i in range(0,101, 2): - Draw.buffer("+meters", test_meter(i), "\n") - - Draw.buffer("+meter", f'{time() - stamp}') - Draw.out() - -def testing_keyinput(): - line: str = "" - this_key: str = "" - count: int = 0 - while True: - count += 1 - Draw.now(f'{Mv.to(1,1)}{Fx.b}{THEME.temp_start("Count:")} {count} {THEME.cached_mid("Time:")} {strftime("%H:%M:%S", localtime())}', - f'{Color.fg("#ff")} Width: {Term.width} Height: {Term.height} Resized: {Term.resized}') - if Key.input_wait(1): - while Key.list: - #Key.new.clear() - this_key = Key.list.pop() - Draw.now(f'{Mv.to(2,1)}{Color.fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key):14}{" "}') - if this_key == "backspace": - line = line[:-1] - elif this_key == "escape": - line = "" - elif this_key == "Q": - clean_quit() - elif this_key == "R": - raise Exception("Test ERROR") - elif len(this_key) == 1: - line += this_key - Draw.now(f'{Color.fg("#90ff50")}{Fx.b}Command= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}\033[0K\n') - if this_key == "enter": - try: - exec(line) - except: - pass - Draw.clear() - -def testing_graphs(): - my_data = [x for x in range(0, 101)] - my_data += [x for x in range(100, -1, -1)] - - - my_graph = Graph(100, 1, THEME.main_fg, my_data) - Draw.now(f'{Fx.ub}{Mv.to(0, 0)}{my_graph}') - - return - - my_data100 = [randint(0, 100) for _ in range(Term.width * 2)] - - my_data2 = my_data[-90:] - my_data3 = my_data[:86] - - my_colors = [] - for i in range(51): - for _ in range(2): my_colors.append(Color.fg(i, i, i)) - #my_colors.reverse() - - - my_graph = Graph(Term.width, Term.height // 2, my_colors, my_data100, invert=True) - my_graph2 = Graph(Term.width, Term.height // 2, my_colors, my_data100, invert=False) - - # my_graph3 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data2) - # my_graph4 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data3) - # my_graph5 = Graph(100, Term.height // 3, THEME.inactive_fg, my_data) - - #pause = re.compile(r"\033\[\d+;\d?;?\d*;?\d*;?\d*m{1}") - #repl = "\033[0;37m" - - banner = Banner.draw(Term.height // 3 - 2, center=True) - - Draw.now(f'{Fx.ub}{Mv.to(0, 0)}{my_graph}\ - {Mv.to(Term.height // 2, 0)}{my_graph2}\ - {banner}') - - # {Mv.to(Term.height - (Term.height // 3), Term.width // 2 - 50)}{my_graph5}\ - # {Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 - 50)}{my_graph3}\ - # {Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 + 7)}{my_graph4}\ - - #t = 1 - x = 0 - for _ in range(200): - sleep(0.05) - x = randint(0, 100) - # x += 1 if t == 1 else -1 - # if x == 100: t = 0 - # if x == 0: t = 1 - Draw.now(f'{Fx.ub}{Mv.to(0, 0)}{my_graph.add(x)}\ - {Mv.to(Term.height // 2, 0)}{my_graph2.add(x)}\ - {banner}') - - # Draw.now(f'{Mv.to(Term.height - (Term.height // 3), Term.width // 2 - 50)}{my_graph5.add(x)}') - # Draw.now(f'{Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 - 50)}{my_graph3.add(x)}') - # Draw.now(f'{Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 + 7)}{my_graph4.add(x)}') - - Draw.now(Mv.to(Term.height -4, 0)) - -#! Remove ------------------------------------------------------------------------------------< if __name__ == "__main__": #? Init --------------------------------------------------------------------------------------> - Timer.start("Init") + if DEBUG: TimeIt.start("Init") class Init: running: bool = True @@ -2286,37 +2411,39 @@ if __name__ == "__main__": initbg_data: List[int] initbg_up: Graph initbg_down: Graph + resized = False @staticmethod def fail(err): - Draw.buffer("+init!", f'{Mv.restore}{Symbol.fail}') + if CONFIG.show_init: + Draw.buffer("+init!", f'{Mv.restore}{Symbol.fail}') + sleep(2) errlog.exception(f'{err}') - sleep(2) clean_quit(1, errmsg=f'Error during init! See {CONFIG_DIR}/error.log for more information.') @classmethod def success(cls, start: bool = False): + if not CONFIG.show_init or cls.resized: return if start: - Draw.buffer("initbg", z=10) Draw.buffer("init", z=1) + Draw.buffer("initbg", z=10) for i in range(51): for _ in range(2): cls.initbg_colors.append(Color.fg(i, i, i)) Draw.buffer("banner", f'{Banner.draw(Term.height // 2 - 10, center=True)}{Color.fg("#50")}\n', z=2) - for _i in range(10): - perc = f'{str((_i + 1) * 10) + "%":>5}' + for _i in range(7): + perc = f'{str(round((_i + 1) * 14 + 2)) + "%":>5}' Draw.buffer("+banner", f'{Mv.to(Term.height // 2 - 3 + _i, Term.width // 2 - 28)}{Fx.trans(perc)}{Symbol.v_line}') Draw.out("banner") Draw.buffer("+init!", f'{Color.fg("#cc")}{Fx.b}{Mv.to(Term.height // 2 - 3, Term.width // 2 - 21)}{Mv.save}') - if start or Term.resized: cls.initbg_data = [randint(0, 100) for _ in range(Term.width * 2)] cls.initbg_up = Graph(Term.width, Term.height // 2, cls.initbg_colors, cls.initbg_data, invert=True) cls.initbg_down = Graph(Term.width, Term.height // 2, cls.initbg_colors, cls.initbg_data, invert=False) if start: return - if not testing: - cls.draw_bg(10) + + cls.draw_bg(5) Draw.buffer("+init!", f'{Mv.restore}{Symbol.ok}\n{Mv.r(Term.width // 2 - 22)}{Mv.save}') @classmethod @@ -2324,27 +2451,34 @@ if __name__ == "__main__": for _ in range(times): sleep(0.05) x = randint(0, 100) - Draw.buffer("initbg", f'{Fx.ub}{Mv.to(0, 0)}{cls.initbg_up.add(x)}{Mv.to(Term.height // 2, 0)}{cls.initbg_down.add(x)}') + Draw.buffer("initbg", f'{Fx.ub}{Mv.to(0, 0)}{cls.initbg_up(x)}{Mv.to(Term.height // 2, 0)}{cls.initbg_down(x)}') Draw.out("initbg", "banner", "init") @classmethod def done(cls): - cls.draw_bg(20) - Draw.clear("initbg", "banner", "init") cls.running = False + if not CONFIG.show_init: return + if cls.resized: + Draw.now(Term.clear) + else: + cls.draw_bg(15) + Draw.clear("initbg", "banner", "init") del cls.initbg_up, cls.initbg_down, cls.initbg_data, cls.initbg_colors #? Switch to alternate screen, clear screen, hide cursor and disable input echo - if not testing: Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor) #! Enable - else: Draw.now(Term.clear, Term.hide_cursor) #! Disable + Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor, Term.mouse_on) + Term.echo(False) + Term.refresh() #? Draw banner and init status - if not testing: Init.success(start=True) + if CONFIG.show_init: + Init.success(start=True) #? Load theme - Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Loading theme and creating colors... ")}{Mv.save}') + if CONFIG.show_init: + Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Loading theme and creating colors... ")}{Mv.save}') try: THEME: Theme = Theme(CONFIG.color_theme) except Exception as e: @@ -2353,7 +2487,8 @@ if __name__ == "__main__": Init.success() #? Setup boxes - Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Doing some maths and drawing... ")}{Mv.save}') + if CONFIG.show_init: + Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Doing some maths and drawing... ")}{Mv.save}') try: Box.calc_sizes() Box.draw_bg(now=False) @@ -2363,7 +2498,8 @@ if __name__ == "__main__": Init.success() #? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH - Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Setting up signal handlers... ")}{Mv.save}') + if CONFIG.show_init: + Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Setting up signal handlers... ")}{Mv.save}') try: signal.signal(signal.SIGTSTP, now_sleeping) #* Ctrl-Z signal.signal(signal.SIGCONT, now_awake) #* Resume @@ -2375,7 +2511,8 @@ if __name__ == "__main__": Init.success() #? Start a separate thread for reading keyboard input - Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Starting input reader thread... ")}{Mv.save}') + if CONFIG.show_init: + Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Starting input reader thread... ")}{Mv.save}') try: Key.start() except Exception as e: @@ -2384,7 +2521,8 @@ if __name__ == "__main__": Init.success() #? Start a separate thread for data collection and drawing - Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Starting data collection and drawer thread... ")}{Mv.save}') + if CONFIG.show_init: + Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Starting data collection and drawer thread... ")}{Mv.save}') try: Collector.start() except Exception as e: @@ -2393,67 +2531,30 @@ if __name__ == "__main__": Init.success() #? Collect data and draw to buffer - Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Collecting data and drawing... ")}{Mv.save}') + if CONFIG.show_init: + Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Collecting data and drawing... ")}{Mv.save}') try: - #Collector.collect(draw_now=False) + Collector.collect(draw_now=False) pass except Exception as e: Init.fail(e) else: Init.success() - Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Collecting nuclear launch codes... ")}{Mv.save}') - Init.success() - Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Launching missiles... ")}{Mv.save}') - Init.success() - Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Alien invasion... ")}{Mv.save}') - Init.success() - #? Draw to screen - Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Finishing up... ")}{Mv.save}') + if CONFIG.show_init: + Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Finishing up... ")}{Mv.save}') try: - #Collector.collect_done.wait() - pass + Collector.collect_done.wait() except Exception as e: Init.fail(e) else: Init.success() - - if not testing: Init.done() #! Remove if - if not testing: Draw.out(clear=True) #! Remove if - else: Draw.clear(); Draw.now(Term.clear); Init.running = False #! Remove - Timer.stop("Init") - - - #! For testing -------------------------------------------------------------------------------> - if testing: - try: - #testing_graphs() - testing_collectors() - #testing_humanizer() - # waitone(1) - #testing_keyinput() - #testing_banner() - # waitone(1) - #testing_colors() - # waitone(1) - #testing_gradients() - # waitone(1) - #testing_boxes() - # waitone(1) - #testing_meter() - # Draw.idle.clear() - #Draw.now(f'{Mv.to(Term.height - 5, 1)}Any key to exit!') - #waitone() - # Draw.idle.set() - #sleep(2) - except Exception as e: - errlog.exception(f'{e}') - clean_quit(1) - - clean_quit() - #! Remove ------------------------------------------------------------------------------------< + Init.done() + Term.refresh() + Draw.out(clear=True) + if DEBUG: TimeIt.stop("Init") #? Start main loop while not False: