From 3a07ef4bfd94de12cbaa502fa4f51ec48685d9c6 Mon Sep 17 00:00:00 2001 From: aristocratos Date: Fri, 24 Jul 2020 03:44:11 +0200 Subject: [PATCH] Added proc collection --- bpytop.py | 972 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 814 insertions(+), 158 deletions(-) diff --git a/bpytop.py b/bpytop.py index 085fea6..5daa0de 100755 --- a/bpytop.py +++ b/bpytop.py @@ -21,6 +21,7 @@ import os, sys, threading, signal, re, subprocess, logging, logging.handlers from time import time, sleep, strftime, localtime from datetime import timedelta from _thread import interrupt_main +from collections import defaultdict from select import select from distutils.util import strtobool from string import Template @@ -64,16 +65,16 @@ BANNER_SRC: List[Tuple[str, str, str]] = [ ("#a86b00", "#006e85", "██████╔╝██║ ██║ ██║ ╚██████╔╝██║"), ("#000000", "#000000", "╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝"), ] -VERSION: str = "0.4.1" +VERSION: str = "0.6.1" #*?This is the template used to create the config file DEFAULT_CONF: Template = Template(f'#? Config file for bpytop v. {VERSION}' + ''' -#* Color theme, looks for a .theme file in "~/.config/bpytop/themes" and "~/.config/bpytop/user_themes", "Default" for builtin default theme -#* Corresponding folder with a trailing / needs to be appended, example: color_theme="user_themes/monokai" +#* Color theme, looks for a .theme file in "~/.config/bpytop/themes" and "~/.config/bpytop/user_themes", "Default" for builtin default theme. +#* Themes located in "user_themes" folder should be suffixed by a star "*", i.e. color_theme="monokai*" color_theme="$color_theme" -#* Update time in milliseconds, increases automatically if set below internal loops processing time, recommended 1000 ms or above for better sample times for graphs. +#* Update time in milliseconds, increases automatically if set below internal loops processing time, recommended 2000 ms or above for better sample times for graphs. update_ms=$update_ms #* Processes sorting, "pid" "program" "arguments" "threads" "user" "memory" "cpu lazy" "cpu responsive", @@ -86,6 +87,15 @@ proc_reversed=$proc_reversed #* Show processes as a tree proc_tree=$proc_tree +#* Use the cpu graph colors in the process list. +proc_colors=$proc_colors + +#* Use a darkening gradient in the process list. +proc_gradient=$proc_gradient + +#* If process cpu usage should be of the core it's running on or usage of the total available cpu power. +proc_per_core=$proc_per_core + #* Check cpu temperature, needs "vcgencmd" on Raspberry Pi and "osx-cpu-temp" on MacOS X. check_temp=$check_temp @@ -98,12 +108,6 @@ background_update=$background_update #* Custom cpu model name, empty string to disable. custom_cpu_name="$custom_cpu_name" -#* Show color gradient in process list, True or False. -proc_gradient=$proc_gradient - -#* If process cpu usage should be of the core it's running on or usage of the total available cpu power. -proc_per_core=$proc_per_core - #* Optional filter for shown disks, should be last folder in path of a mountpoint, "root" replaces "/", separate multiple values with comma. #* Begin line with "exclude=" to change to exclude filter, oterwise defaults to "most include" filter. Example: disks_filter="exclude=boot, home" disks_filter="$disks_filter" @@ -194,6 +198,41 @@ DEFAULT_THEME: Dict[str, str] = { "upload_end" : "#dcafde" } +MENUS: Dict[str, Dict[str, Tuple[str, ...]]] = { + "options" : { + "normal" : ( + "┌─┐┌─┐┌┬┐┬┌─┐┌┐┌┌─┐", + "│ │├─┘ │ ││ ││││└─┐", + "└─┘┴ ┴ ┴└─┘┘└┘└─┘"), + "selected" : ( + "╔═╗╔═╗╔╦╗╦╔═╗╔╗╔╔═╗", + "║ ║╠═╝ ║ ║║ ║║║║╚═╗", + "╚═╝╩ ╩ ╩╚═╝╝╚╝╚═╝") }, + "help" : { + "normal" : ( + "┬ ┬┌─┐┬ ┌─┐", + "├─┤├┤ │ ├─┘", + "┴ ┴└─┘┴─┘┴ "), + "selected" : ( + "╦ ╦╔═╗╦ ╔═╗", + "╠═╣║╣ ║ ╠═╝", + "╩ ╩╚═╝╩═╝╩ ") }, + "quit" : { + "normal" : ( + "┌─┐ ┬ ┬ ┬┌┬┐", + "│─┼┐│ │ │ │ ", + "└─┘└└─┘ ┴ ┴ "), + "selected" : ( + "╔═╗ ╦ ╦ ╦╔╦╗ ", + "║═╬╗║ ║ ║ ║ ", + "╚═╝╚╚═╝ ╩ ╩ ") } +} + +MENU_COLORS: Dict[str, Tuple[str, ...]] = { + "normal" : ("#0fd7ff", "#00bfe6", "#00a6c7", "#008ca8"), + "selected" : ("#ffa50a", "#f09800", "#db8b00", "#c27b00") +} + #? Units for floating_humanizer function UNITS: Dict[str, Tuple[str, ...]] = { "bit" : ("bit", "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", "Zib", "Yib", "Bib", "GEb"), @@ -251,19 +290,20 @@ def timeit_decorator(func): class Config: '''Holds all config variables and functions for loading from and saving to disk''' - keys: List[str] = ["color_theme", "update_ms", "proc_sorting", "proc_reversed", "proc_tree", "check_temp", "draw_clock", "background_update", "custom_cpu_name", "proc_gradient", "proc_per_core", "disks_filter", "update_check", "log_level", "mem_graphs", "show_swap", "swap_disk", "show_disks", "show_init"] + keys: List[str] = ["color_theme", "update_ms", "proc_sorting", "proc_reversed", "proc_tree", "check_temp", "draw_clock", "background_update", "custom_cpu_name", "proc_colors", "proc_gradient", "proc_per_core", "disks_filter", "update_check", "log_level", "mem_graphs", "show_swap", "swap_disk", "show_disks", "show_init"] conf_dict: Dict[str, Union[str, int, bool]] = {} color_theme: str = "Default" - update_ms: int = 1000 + update_ms: int = 2000 proc_sorting: str = "cpu lazy" proc_reversed: bool = False proc_tree: bool = False + proc_colors: bool = True + proc_gradient: bool = True + proc_per_core: bool = False check_temp: bool = True draw_clock: str = "%X" background_update: bool = True custom_cpu_name: str = "" - proc_gradient: bool = True - proc_per_core: bool = False disks_filter: str = "" update_check: bool = True mem_graphs: bool = True @@ -321,7 +361,7 @@ class Config: new_config["version"] = line[line.find("v. ") + 3:] for key in self.keys: if line.startswith(key): - line = line.lstrip(key + "=") + line = line.replace(key + "=", "") if line.startswith('"'): line = line.strip('"') if type(getattr(self, key)) == int: @@ -366,7 +406,7 @@ try: errlog.setLevel(getattr(logging, CONFIG.log_level)) if CONFIG.log_level == "DEBUG": DEBUG = True errlog.info(f'New instance of bpytop version {VERSION} started with pid {os.getpid()}') - errlog.debug(f'Loglevel set to {CONFIG.log_level}') + errlog.info(f'Loglevel set to {CONFIG.log_level}') if CONFIG.info: for info in CONFIG.info: errlog.info(info) @@ -389,15 +429,17 @@ class Term: resized: bool = False _w : int = 0 _h : int = 0 - fg: str = "" #* Default foreground color - bg: str = "" #* Default background color - hide_cursor = "\033[?25l" #* Hide terminal cursor - show_cursor = "\033[?25h" #* Show terminal cursor - alt_screen = "\033[?1049h" #* Switch to alternate screen - normal_screen = "\033[?1049l" #* Switch to normal screen - clear = "\033[2J\033[0;0f" #* Clear screen and set cursor to position 0,0 - mouse_on = "\033[?1001h\033[?1015h\033[?1006h" #* Enable reporting of mouse position on click and release - mouse_off = "\033[?1001l\033[?1015l\033[?1006l" #* Disable mouse reporting + fg: str = "" #* Default foreground color + bg: str = "" #* Default background color + hide_cursor = "\033[?25l" #* Hide terminal cursor + show_cursor = "\033[?25h" #* Show terminal cursor + alt_screen = "\033[?1049h" #* Switch to alternate screen + normal_screen = "\033[?1049l" #* Switch to normal screen + clear = "\033[2J\033[0;0f" #* Clear screen and set cursor to position 0,0 + mouse_on = "\033[?1002h\033[?1015h\033[?1006h" #* Enable reporting of mouse position on click and release + mouse_off = "\033[?1002l" #* Disable mouse reporting + mouse_direct_on = "\033[?1003h" #* Enable reporting of mouse position at any movement + mouse_direct_off = "\033[?1003l" #* Disable direct mouse reporting winch = threading.Event() @classmethod @@ -405,30 +447,34 @@ class Term: """Update width, height and set resized flag if terminal has been resized""" if cls.resized: cls.winch.set(); return cls._w, cls._h = os.get_terminal_size() + if (cls._w, cls._h) == (cls.width, cls.height): return while (cls._w, cls._h) != (cls.width, cls.height) or (cls._w < 80 or cls._h < 24): if Init.running: Init.resized = True cls.resized = True - Collector.resize_interrupt = True + Collector.collect_interrupt = True cls.width, cls.height = cls._w, cls._h Draw.now(Term.clear) - Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 3, "resizing", line_color=Colors.red, title_color=Colors.white)}', + Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 3, "resizing", line_color=Colors.green, title_color=Colors.white)}', f'{Mv.r(12)}{Colors.default}{Colors.black_bg}{Fx.b}Width : {cls._w} Height: {cls._h}{Fx.ub}{Term.bg}{Term.fg}') - while cls._w < 80 or cls._h < 24: - Draw.now(Term.clear) - Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 4, "warning", line_color=Colors.red, title_color=Colors.white)}', - f'{Mv.r(12)}{Colors.default}{Colors.black_bg}{Fx.b}Width: {Colors.red if cls._w < 80 else Colors.green}{cls._w} ', - f'{Colors.default}Height: {Colors.red if cls._h < 24 else Colors.green}{cls._h}{Term.bg}{Term.fg}', - f'{Mv.to(cls._h // 2, cls._w // 2 - 23)}{Colors.default}{Colors.black_bg}Width and Height needs to be at least 80 x 24 !{Fx.ub}{Term.bg}{Term.fg}') + if cls._w < 80 or cls._h < 24: + while cls._w < 80 or cls._h < 24: + Draw.now(Term.clear) + Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 4, "warning", line_color=Colors.red, title_color=Colors.white)}', + f'{Mv.r(12)}{Colors.default}{Colors.black_bg}{Fx.b}Width: {Colors.red if cls._w < 80 else Colors.green}{cls._w} ', + f'{Colors.default}Height: {Colors.red if cls._h < 24 else Colors.green}{cls._h}{Term.bg}{Term.fg}', + f'{Mv.to(cls._h // 2, cls._w // 2 - 23)}{Colors.default}{Colors.black_bg}Width and Height needs to be at least 80 x 24 !{Fx.ub}{Term.bg}{Term.fg}') + cls.winch.wait(0.3) + cls.winch.clear() + cls._w, cls._h = os.get_terminal_size() + else: cls.winch.wait(0.3) cls.winch.clear() - cls._w, cls._h = os.get_terminal_size() - cls.winch.wait(0.3) - cls.winch.clear() cls._w, cls._h = os.get_terminal_size() Box.calc_sizes() if Init.running: cls.resized = False; return - Box.draw_bg() + if Menu.active: Menu.resized = True + Box.draw_bg(now=False) cls.resized = False Timer.finish() @@ -445,8 +491,8 @@ class Term: class Fx: """Text effects - * trans(string: str): Replace whitespace with escape move right to not paint background in whitespace. - * uncolor(string: str) : Removes all color and returns string with THEME.inactive_fg color.""" + * trans(string: str): Replace whitespace with escape move right to not overwrite background behind whitespace. + * uncolor(string: str) : Removes all 24-bit color and returns string .""" start = "\033[" #* Escape sequence start sep = ";" #* Escape sequence separator end = "m" #* Escape sequence end @@ -464,6 +510,7 @@ class Fx: strike = s = "\033[9m" #* Strike / crossed-out on unstrike = us = "\033[29m" #* Strike / crossed-out off + #* Precompiled regex for finding a 24-bit color escape sequence in a string color_re = re.compile(r"\033\[\d+;\d?;?\d*;?\d*;?\d*m") @staticmethod @@ -472,7 +519,7 @@ class Fx: @classmethod def uncolor(cls, string: str) -> str: - return f'{THEME.inactive_fg}{cls.color_re.sub("", string)}{Term.fg}' + return f'{cls.color_re.sub("", string)}' class Raw(object): """Set raw input mode for device""" @@ -523,9 +570,10 @@ class Mv: d = down class Key: - """Handles the threaded input reader""" + """Handles the threaded input reader for keypresses and mouse events""" list: List[str] = [] - mouse: Dict[str, List[Tuple[int, int]]] = {} + mouse: Dict[str, List[List[int]]] = {} + mouse_pos: Tuple[int, int] = (0, 0) escape: Dict[Union[str, Tuple[str, str]], str] = { "\n" : "enter", ("\x7f", "\x08") : "backspace", @@ -539,6 +587,7 @@ class Key: "[F" : "end", "[5~" : "page_up", "[6~" : "page_down", + "\t" : "tab", "[Z" : "shift_tab", "OP" : "f1", "OQ" : "f2", @@ -555,6 +604,8 @@ class Key: } new = threading.Event() idle = threading.Event() + mouse_move = threading.Event() + mouse_report: bool = False idle.set() stopping: bool = False started: bool = False @@ -584,6 +635,20 @@ class Key: if cls.list: return cls.list.pop(0) else: return "" + @classmethod + def get_mouse(cls) -> Tuple[int, int]: + if cls.new.is_set(): + cls.new.clear() + return cls.mouse_pos + + @classmethod + def mouse_moved(cls) -> bool: + if cls.mouse_move.is_set(): + cls.mouse_move.clear() + return True + else: + return False + @classmethod def has_key(cls) -> bool: if cls.list: return True @@ -594,62 +659,86 @@ class Key: cls.list = [] @classmethod - def input_wait(cls, sec: float = 0.0) -> bool: + def input_wait(cls, sec: float = 0.0, mouse: bool = False) -> bool: '''Returns True if key is detected else waits out timer and returns False''' + if mouse: Draw.now(Term.mouse_direct_on) cls.new.wait(sec if sec > 0 else 0.0) + if mouse: Draw.now(Term.mouse_direct_off, Term.mouse_on) + if cls.new.is_set(): cls.new.clear() return True else: return False + @classmethod + def break_wait(cls): + cls.list.append("_null") + cls.new.set() + sleep(0.01) + cls.new.clear() + @classmethod def _get_key(cls): - """Get a single key from stdin, convert to readable format and save to keys list. Meant to be run in it's own thread.""" + """Get a key or escape sequence from stdin, convert to readable format and save to keys list. Meant to be run in it's own thread.""" input_key: str = "" clean_key: str = "" - mouse_press: Tuple[int, int] try: while not cls.stopping: with Raw(sys.stdin): - if not select([sys.stdin], [], [], 0.1)[0]: #* Wait 100ms for input on stdin then restart loop to check for stop flag + if not select([sys.stdin], [], [], 0.1)[0]: #* Wait 100ms for input on stdin then restart loop to check for stop flag continue - cls.idle.clear() #* Report IO block in progress to prevent Draw functions to get IO Block error - input_key += sys.stdin.read(1) - if input_key == "\033" or input_key == "0": #* If first character is a escape sequence read 5 more keys - Draw.idle.wait() #* Wait for Draw function to finish if busy - with Nonblocking(sys.stdin): #* Set non blocking to prevent read stall if less than 5 characters + input_key += sys.stdin.read(1) #* Read 1 key safely with blocking on + if input_key == "\033": #* If first character is a escape sequence keep reading + cls.idle.clear() #* Report IO block in progress to prevent Draw functions from getting a IO Block error + Draw.idle.wait() #* Wait for Draw function to finish if busy + with Nonblocking(sys.stdin): #* Set non blocking to prevent read stall input_key += sys.stdin.read(20) - if input_key == "\033": clean_key = "escape" #* Key is escape if only containing \033 - elif input_key.startswith("\033[<0;") and input_key.endswith("m"): + if input_key.startswith("\033[<35;"): + _ = sys.stdin.read(1000) + cls.idle.set() #* Report IO blocking done + #errlog.debug(f'{repr(input_key)}') + if input_key == "\033": clean_key = "escape" #* Key is "escape" key if only containing \033 + elif input_key.startswith(("\033[<0;", "\033[<35;", "\033[<64;", "\033[<65;")): #* Detected mouse event try: - mouse_press = (int(input_key.split(";")[1]), int(input_key.split(";")[2].rstrip("m"))) + cls.mouse_pos = (int(input_key.split(";")[1]), int(input_key.split(";")[2].rstrip("mM"))) except: pass else: - errlog.debug(f'{mouse_press}') - for key_name, positions in cls.mouse.items(): - if mouse_press in positions: - clean_key = key_name - break - #errlog.debug(f'Mouse press at line={input_key.split(";")[2].rstrip("m")} col={input_key.split(";")[1]}') - elif input_key == "\\": clean_key = "\\" #* Clean up "\" to not return escaped + if input_key.startswith("\033[<35;"): #* Detected mouse move in mouse direct mode + cls.mouse_move.set() + cls.new.set() + elif input_key.startswith("\033[<64;"): #* Detected mouse scroll up + clean_key = "mouse_scroll_up" + elif input_key.startswith("\033[<65;"): #* Detected mouse scroll down + clean_key = "mouse_scroll_down" + elif input_key.startswith("\033[<0;") and input_key.endswith("m"): #* Detected mouse click release + if Menu.active: + clean_key = "mouse_click" + else: + for key_name, positions in cls.mouse.items(): #* Check if mouse position is clickable + if list(cls.mouse_pos) in positions: + clean_key = key_name + break + else: + clean_key = "mouse_click" + elif input_key == "\\": clean_key = "\\" #* Clean up "\" to not return escaped else: - for code in cls.escape.keys(): #* Go trough dict of escape codes to get the cleaned key name + for code in cls.escape.keys(): #* Go trough dict of escape codes to get the cleaned key name if input_key.lstrip("\033").startswith(code): clean_key = cls.escape[code] break - else: #* If not found in escape dict and length of key is 1, assume regular character + else: #* If not found in escape dict and length of key is 1, assume regular character if len(input_key) == 1: clean_key = input_key #errlog.debug(f'Input key: {repr(input_key)} Clean key: {clean_key}') #! Remove if clean_key: - cls.list.append(clean_key) #* Store up to 10 keys in input queue for later processing + cls.list.append(clean_key) #* Store up to 10 keys in input queue for later processing if len(cls.list) > 10: del cls.list[0] clean_key = "" - cls.new.set() #* Set threading event to interrupt main thread sleep + cls.new.set() #* Set threading event to interrupt main thread sleep input_key = "" - cls.idle.set() #* Report IO blocking done + except Exception as e: errlog.exception(f'Input thread failed with exception: {e}') @@ -669,13 +758,15 @@ class Draw: ''' strings: Dict[str, str] = {} z_order: Dict[str, int] = {} - last: Dict[str, str] = {} + saved: Dict[str, str] = {} + save: Dict[str, bool] = {} + once: Dict[str, bool] = {} idle = threading.Event() idle.set() @classmethod def now(cls, *args): - '''Wait for input reader to be idle then print to screen''' + '''Wait for input reader and self to be idle then print to screen''' Key.idle.wait() cls.idle.wait() cls.idle.clear() @@ -688,7 +779,7 @@ class Draw: cls.idle.set() @classmethod - def buffer(cls, name: str, *args: str, append: bool = False, now: bool = False, z: int = 100, save: bool = False, uncolor: bool = False): + def buffer(cls, name: str, *args: str, append: bool = False, now: bool = False, z: int = 100, only_save: bool = False, no_save: bool = False, once: bool = False): string: str = "" if name.startswith("+"): name = name.lstrip("+") @@ -696,16 +787,18 @@ class Draw: if name.endswith("!"): name = name.rstrip("!") now = True + cls.save[name] = not no_save + cls.once[name] = once if not name in cls.z_order or z != 100: cls.z_order[name] = z if args: string = "".join(args) - if uncolor: string = Fx.uncolor(string) - if name not in cls.strings or not append: cls.strings[name] = "" - if save: - if name not in cls.last or not append: cls.last[name] = "" - cls.last[name] = string + if only_save: + if name not in cls.saved or not append: cls.saved[name] = "" + cls.saved[name] += string else: + if name not in cls.strings or not append: cls.strings[name] = "" cls.strings[name] += string - if now: cls.now(string) + if now: + cls.out(name) @classmethod def out(cls, *names: str, clear = False): @@ -713,49 +806,63 @@ class Draw: if not cls.strings: return if names: for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): - if name in names: + if name in names and name in cls.strings: out += cls.strings[name] - cls.last[name] = out - if clear: - del cls.strings[name] - del cls.z_order[name] + if cls.save[name]: + cls.saved[name] = cls.strings[name] + if clear or cls.once[name]: + cls.clear(name) cls.now(out) else: for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): if name in cls.strings: out += cls.strings[name] - cls.last[name] = out - if clear: cls.strings = {} + if cls.save[name]: + cls.saved[name] = out + if cls.once[name] and not clear: + cls.clear(name) + if clear: + cls.clear() cls.now(out) @classmethod - def last_buffer(cls) -> str: + def saved_buffer(cls) -> str: out: str = "" for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): - if name in cls.last: - out += cls.last[name] + if name in cls.saved: + out += cls.saved[name] return out @classmethod - def clear(cls, *names, last: bool = False): + def clear(cls, *names, saved: bool = False): if names: for name in names: if name in cls.strings: del cls.strings[name] - if last: + if name in cls.save: + del cls.save[name] + if name in cls.once: + del cls.once[name] + if saved: + if name in cls.saved: + del cls.saved[name] + if name in cls.z_order: del cls.z_order[name] - if name in cls.last: del cls.last[name] else: cls.strings = {} - if last: cls.last = {} + cls.save = {} + cls.once = {} + if saved: + cls.saved = {} + cls.z_order = {} class Color: '''Holds representations for a 24-bit color value __init__(color, depth="fg", default=False) -- color accepts 6 digit hexadecimal: string "#RRGGBB", 2 digit hexadecimal: string "#FF" or decimal RGB "255 255 255" as a string. -- depth accepts "fg" or "bg" - __call__(*args) converts arguments to a string and apply color + __call__(*args) joins str arguments to a string and apply color __str__ returns escape sequence to set color __iter__ returns iteration over red, green and blue in integer values of 0-255. * Values: .hexa: str | .dec: Tuple[int, int, int] | .red: int | .green: int | .blue: int | .depth: str | .escape: str @@ -815,7 +922,7 @@ class Color: for c in self.dec: yield c def __call__(self, *args: str) -> str: - if len(args) == 0: return "" + if len(args) < 1: return "" return f'{self.escape}{"".join(args)}{getattr(Term, self.depth)}' @staticmethod @@ -842,20 +949,24 @@ class Color: @classmethod def fg(cls, *args) -> str: - if len(args) > 1: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="fg") + if len(args) > 2: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="fg") else: return cls.escape_color(hexa=args[0], depth="fg") @classmethod def bg(cls, *args) -> str: - if len(args) > 1: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="bg") + if len(args) > 2: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="bg") else: return cls.escape_color(hexa=args[0], depth="bg") class Colors: + '''Standard colors for menus and dialogs''' default = Color("#cc") white = Color("#ff") red = Color("#bf3636") green = Color("#68bf36") + blue = Color("#0fd7ff") + yellow = Color("#db8b00") black_bg = Color("#00", depth="bg") + null = Color("") class Theme: '''__init__ accepts a dict containing { "color_element" : "color" }''' @@ -874,7 +985,9 @@ class Theme: "available" : [], "used" : [], "download" : [], - "upload" : [] + "upload" : [], + "proc" : [], + "proc_color" : [] } def __init__(self, theme: str): self.refresh() @@ -907,6 +1020,9 @@ class Theme: else: setattr(self, item, Color(value, depth=depth, default=default)) #* Create color gradients from one, two or three colors, 101 values indexed 0-100 + self.proc_start = self.main_fg; self.proc_mid = Colors.null; self.proc_end = self.inactive_fg + self.proc_color_start = self.inactive_fg; self.proc_color_mid = Colors.null; self.proc_color_end = self.cpu_start + rgb: Dict[str, Tuple[int, int, int]] colors: List[List[int]] = [] for name in self.gradient.keys(): @@ -1078,7 +1194,7 @@ class Graph: value_width = ceil(len(data) / 2) elif value_width < width: #* If the size of given data set is smaller then width of graph, fill graph with whitespace filler = self.symbol[0.0] * (width - value_width) - if len(data) % 2 == 1: data.insert(0, 0) + if len(data) % 2: data.insert(0, 0) for _ in range(height): for b in [True, False]: self.graphs[b].append(filler) @@ -1165,7 +1281,6 @@ class Meter: color_inactive: Color gradient_name: str width: int - theme: str saved: Dict[int, str] def __init__(self, value: int, width: int, gradient_name: str): @@ -1173,7 +1288,6 @@ class Meter: self.color_gradient = THEME.gradient[gradient_name] self.color_inactive = THEME.inactive_fg self.width = width - self.theme = CONFIG.color_theme self.saved = {} self.out = self._create(value) @@ -1181,9 +1295,6 @@ class Meter: if not isinstance(value, int): return self.out if value > 100: value = 100 elif value < 0: value = 100 - if self.theme != CONFIG.color_theme: - self.saved = {} - self.theme = CONFIG.color_theme if value in self.saved: self.out = self.saved[value] else: @@ -1247,21 +1358,17 @@ class Box: def draw_update_ms(cls, now: bool = True): update_string: str = f'{CONFIG.update_ms}ms' xpos: int = CpuBox.x + CpuBox.width - len(update_string) - 14 - Key.mouse["+"] = []; Key.mouse["-"] = [] - for i in range(3): - Key.mouse["+"].append((xpos + 7 + i, CpuBox.y)) - Key.mouse["-"].append((CpuBox.x + CpuBox.width - 4 + i, CpuBox.y)) - errlog.debug(f'{Key.mouse}') + Key.mouse["+"] = [[xpos + 7 + i, CpuBox.y] for i in range(3)] + Key.mouse["-"] = [[CpuBox.x + CpuBox.width - 4 + i, CpuBox.y] for i in range(3)] Draw.buffer("update_ms!" if now and not Menu.active else "update_ms", f'{Mv.to(CpuBox.y, xpos)}{THEME.cpu_box(Symbol.h_line * 7, Symbol.title_left)}{Fx.b}{THEME.hi_fg("+")} ', - f'{THEME.title(update_string)} {THEME.hi_fg("-")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}', save=True if Menu.active else False) + f'{THEME.title(update_string)} {THEME.hi_fg("-")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}', only_save=Menu.active, once=True) if now and not Menu.active: Draw.clear("update_ms") @classmethod def draw_bg(cls, now: bool = True): '''Draw all boxes outlines and titles''' - Draw.buffer("bg!" if now and not Menu.active else "bg", "".join(sub._draw_bg() for sub in cls.__subclasses__()), z=1000, save=True if Menu.active else False) # type: ignore - if now and not Menu.active: Draw.clear("bg") + Draw.buffer("bg", "".join(sub._draw_bg() for sub in cls.__subclasses__()), now=now, z=1000, only_save=Menu.active, once=True) # type: ignore cls.draw_update_ms(now=now) class SubBox: @@ -1279,6 +1386,7 @@ class CpuBox(Box, SubBox): height_p = 32 width_p = 100 resized: bool = True + redraw: bool = False buffer: str = "cpu" Box.buffers.append(buffer) @@ -1305,6 +1413,7 @@ class CpuBox(Box, SubBox): @classmethod def _draw_bg(cls) -> str: + Key.mouse["m"] = [[cls.x + 10 + i, cls.y] for i in range(6)] return (f'{create_box(box=cls, line_color=THEME.cpu_box)}' f'{Mv.to(cls.y, cls.x + 10)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("m")}{THEME.title("enu")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}' f'{create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title=CPU_NAME[:18 if CONFIG.check_temp else 9])}') @@ -1312,13 +1421,14 @@ class CpuBox(Box, SubBox): @classmethod def _draw_fg(cls): cpu = CpuCollector + if cpu.redraw: cls.redraw = True out: str = "" lavg: str = "" x, y, w, h = cls.x + 1, cls.y + 1, cls.width - 2, cls.height - 2 bx, by, bw, bh = cls.box_x + 1, cls.box_y + 1, cls.box_width - 2, cls.box_height - 2 hh: int = ceil(h / 2) - if cls.resized: + if cls.resized or cls.redraw: Graphs.cpu["up"] = Graph(w - bw - 3, hh, THEME.gradient["cpu"], cpu.cpu_usage[0]) Graphs.cpu["down"] = Graph(w - bw - 3, h - hh, THEME.gradient["cpu"], cpu.cpu_usage[0], invert=True) Meters.cpu = Meter(cpu.cpu_usage[0][-1], (bw - 9 - 13 if CONFIG.check_temp else bw - 9), "cpu") @@ -1376,8 +1486,8 @@ class CpuBox(Box, SubBox): out += f'{Mv.to(y + h - 1, x + 1)}{THEME.inactive_fg}up {cpu.uptime}' - Draw.buffer(cls.buffer, f'{out}{Term.fg}', save=Menu.active) - cls.resized = False + Draw.buffer(cls.buffer, f'{out}{Term.fg}', only_save=Menu.active) + cls.resized = cls.redraw = False class MemBox(Box): name = "mem" @@ -1450,12 +1560,16 @@ class MemBox(Box): @classmethod def _draw_fg(cls): mem = MemCollector + if mem.redraw: cls.redraw = True out: str = "" + out_misc: str = "" gbg: str = "" gmv: str = "" gli: str = "" - x, y, h = cls.x + 1, cls.y + 1, cls.height - 2 + x, y, w, h = cls.x + 1, cls.y + 1, cls.width - 2, cls.height - 2 if cls.resized or cls.redraw: + cls._calc_size() + out_misc += cls._draw_bg() Meters.mem = {} Meters.swap = {} Meters.disks_used = {} @@ -1484,6 +1598,17 @@ class MemBox(Box): if len(mem.disks) * 3 <= h + 1: Meters.disks_free[name] = Meter(mem.disks[name]["free_percent"], cls.disk_meter, "free") + Key.mouse["h"] = [[x + cls.mem_width - 8 + i, y-1] for i in range(5)] + out_misc += (f'{Mv.to(y-1, x + cls.mem_width - 9)}{THEME.mem_box(Symbol.title_left)}{Fx.b if CONFIG.mem_graphs else ""}' + f'{THEME.title("grap")}{THEME.hi_fg("h")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}') + if CONFIG.show_disks: + Key.mouse["p"] = [[x + w - 6 + i, y-1] for i in range(4)] + out_misc += (f'{Mv.to(y-1, x + w - 7)}{THEME.mem_box(Symbol.title_left)}{Fx.b if CONFIG.swap_disk else ""}' + f'{THEME.title("swa")}{THEME.hi_fg("p")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}') + + + Draw.buffer("mem_misc", out_misc, only_save=True) + #* Mem out += f'{Mv.to(y, x+1)}{THEME.title}{Fx.b}Total:{mem.string["total"]:>{cls.mem_width - 9}}{Fx.ub}{THEME.main_fg}' cx = 1; cy = 1 @@ -1541,7 +1666,7 @@ class MemBox(Box): cy += 1 if len(mem.disks) * 4 <= h + 1: cy += 1 - Draw.buffer(cls.buffer, f'{out}{Term.fg}', save=Menu.active) + Draw.buffer(cls.buffer, f'{out_misc}{out}{Term.fg}', only_save=Menu.active) cls.resized = cls.redraw = False class NetBox(Box, SubBox): @@ -1579,36 +1704,38 @@ class NetBox(Box, SubBox): @classmethod def _draw_fg(cls): net = NetCollector + if net.redraw: cls.redraw = True if not net.nic: return out: str = "" out_misc: str = "" x, y, w, h = cls.x + 1, cls.y + 1, cls.width - 2, cls.height - 2 bx, by, bw, bh = cls.box_x + 1, cls.box_y + 1, cls.box_width - 2, cls.box_height - 2 + reset: bool = bool(net.stats[net.nic]["download"]["offset"]) if cls.resized or cls.redraw: - cls.redraw = True - Key.mouse["b"] = []; Key.mouse["n"] = [] - for i in range(4): - Key.mouse["b"].append((x+w - len(net.nic) - 9 + i, y-1)) - Key.mouse["n"].append((x+w - 5 + i, y-1)) + out_misc += cls._draw_bg() + Key.mouse["b"] = [[x+w - len(net.nic) - 9 + i, y-1] for i in range(4)] + Key.mouse["n"] = [[x+w - 5 + i, y-1] for i in range(4)] + Key.mouse["z"] = [[x+w - len(net.nic) - 14 + i, y-1] for i in range(4)] - out_misc += (f'{Mv.to(y-1, x+w - 19)}{THEME.net_box}{Symbol.h_line * (10 - len(net.nic))}{Symbol.title_left}{Fx.b}{THEME.hi_fg("")}' - f'{Fx.ub}{THEME.net_box(Symbol.title_right)}{Term.fg}') - Draw.buffer("net_misc", out_misc, save=True) + out_misc += (f'{Mv.to(y-1, x+w - 25)}{THEME.net_box}{Symbol.h_line * (10 - len(net.nic))}{Symbol.title_left}{Fx.b if reset else ""}{THEME.hi_fg("z")}{THEME.title("ero")}' + f'{Fx.ub}{THEME.net_box(Symbol.title_right)}{Term.fg}' + f'{THEME.net_box}{Symbol.title_left}{Fx.b}{THEME.hi_fg("")}{Fx.ub}{THEME.net_box(Symbol.title_right)}{Term.fg}') + Draw.buffer("net_misc", out_misc, only_save=True) cy = 0 for direction in ["download", "upload"]: strings = net.strings[net.nic][direction] stats = net.stats[net.nic][direction] - if stats["redraw"] or cls.redraw: + if stats["redraw"] or cls.resized: if cls.redraw: stats["redraw"] = True Graphs.net[direction] = Graph(w - bw - 3, cls.graph_height[direction], THEME.gradient[direction], stats["speed"], max_value=stats["graph_top"], invert=False if direction == "download" else True) out += f'{Mv.to(y if direction == "download" else y + cls.graph_height["download"], x)}{Graphs.net[direction](None if stats["redraw"] else stats["speed"][-1])}' - out += f'{Mv.to(by+cy, bx)}{THEME.main_fg}{cls.symbols[direction]} {strings["byte_ps"]:<10.10}{Mv.to(by+cy, bx+bw - 12)}{cls.symbols[direction] + " " + strings["bit_ps"]:>12.12}' + out += f'{Mv.to(by+cy, bx)}{THEME.main_fg}{cls.symbols[direction]} {strings["byte_ps"]:<10.10}{Mv.to(by+cy, bx+bw - 12)}{"(" + strings["bit_ps"] + ")":>12.12}' cy += 1 if bh != 3 else 2 if bh >= 6: - out += f'{Mv.to(by+cy, bx)}{cls.symbols[direction]} {"Top:"}{Mv.to(by+cy, bx+bw - 10)}{strings["top"]:>10.10}' + out += f'{Mv.to(by+cy, bx)}{cls.symbols[direction]} {"Top:"}{Mv.to(by+cy, bx+bw - 12)}{"(" + strings["top"] + ")":>12.12}' cy += 1 if bh >= 4: out += f'{Mv.to(by+cy, bx)}{cls.symbols[direction]} {"Total:"}{Mv.to(by+cy, bx+bw - 10)}{strings["total"]:>10.10}' @@ -1618,7 +1745,7 @@ class NetBox(Box, SubBox): out += f'{Mv.to(y, x)}{THEME.inactive_fg(net.strings[net.nic]["download"]["graph_top"])}{Mv.to(y+h-1, x)}{THEME.inactive_fg(net.strings[net.nic]["upload"]["graph_top"])}' - Draw.buffer(cls.buffer, f'{out}{out_misc}{Term.fg}', save=Menu.active) + Draw.buffer(cls.buffer, f'{out_misc}{out}{Term.fg}', only_save=Menu.active) cls.redraw = cls.resized = False class ProcBox(Box): @@ -1627,13 +1754,21 @@ class ProcBox(Box): width_p = 55 x = 1 y = 1 + select_max: int = 0 + selected: int = 0 + selected_pid: int + moved: bool = False + start: int = 1 + count: int = 0 detailed: bool = False detailed_x: int = 0 detailed_y: int = 0 detailed_width: int = 0 detailed_height: int = 8 + resized: bool = True redraw: bool = True buffer: str = "proc" + pid_counter: Dict[int, int] = {} Box.buffers.append(buffer) @classmethod @@ -1646,12 +1781,200 @@ class ProcBox(Box): cls.detailed_y = cls.y cls.detailed_height = 8 cls.detailed_width = cls.width - cls.redraw_all = True + cls.select_max = cls.height - 3 + cls.redraw = True + cls.resized = True @classmethod def _draw_bg(cls) -> str: return create_box(box=cls, line_color=THEME.proc_box) + @classmethod + def selector(cls, key: str) -> bool: + old: Tuple[int, int] = (cls.start, cls.selected) + if key == "up": + if cls.selected == 1 and cls.start > 1: + cls.start -= 1 + elif cls.selected == 1: + cls.selected = 0 + elif cls.selected > 1: + cls.selected -= 1 + elif key == "down": + if cls.selected == cls.select_max and cls.start < ProcCollector.num_procs - cls.select_max + 1: + cls.start += 1 + elif cls.selected < cls.select_max: + cls.selected += 1 + elif key == "mouse_scroll_up" and cls.start > 1: + cls.start -= 5 + elif key == "mouse_scroll_down" and cls.start < ProcCollector.num_procs - cls.select_max + 1: + cls.start += 5 + elif key == "page_up" and cls.start > 1: + cls.start -= cls.select_max + elif key == "page_down" and cls.start < ProcCollector.num_procs - cls.select_max + 1: + cls.start += cls.select_max + elif key == "home": + if cls.start > 1: cls.start = 1 + elif cls.selected > 0: cls.selected = 0 + elif key == "end": + if cls.start < ProcCollector.num_procs - cls.select_max + 1: cls.start = ProcCollector.num_procs - cls.select_max + 1 + elif cls.selected < cls.select_max: cls.selected = cls.select_max + + if cls.start < 1: cls.start = 1 + if cls.start > ProcCollector.num_procs - cls.select_max + 1: cls.start = ProcCollector.num_procs - cls.select_max + 1 + if old != (cls.start, cls.selected): + cls.moved = True + return True + else: + return False + + @classmethod + def _draw_fg(cls): + proc = ProcCollector + if proc.redraw: cls.redraw = True + out: str = "" + out_misc: str = "" + n: int = 0 + x, y, w, h = cls.x + 1, cls.y + 1, cls.width - 2, cls.height - 2 + prog_len: int; arg_len: int; val: int; c_color: str; m_color: str; t_color: str; sort_pos: int; tree_len: int; is_selected: bool; calc: int + l_count: int = 0 + loc_string: str + indent: str = "" + offset: int = 0 + vals: List[str] + c_color = m_color = t_color = Fx.b + g_color: str = "" + end: str = "" + + if w > 67: arg_len = w - 53; prog_len = 15 + else: arg_len = 0; prog_len = w - 38 + if CONFIG.proc_tree: + tree_len = arg_len + prog_len + 6 + arg_len = 0 + + if cls.resized or cls.redraw: + sort_pos = x + w - len(CONFIG.proc_sorting) - 7 + Key.mouse["left"] = [[sort_pos + i, y-1] for i in range(3)] + Key.mouse["right"] = [[sort_pos + len(CONFIG.proc_sorting) + 3 + i, y-1] for i in range(3)] + Key.mouse["e"] = [[sort_pos - 5 + i, y-1] for i in range(4)] + + + out_misc += (f'{Mv.to(y-1, x + 15)}{THEME.proc_box(Symbol.h_line * (w - 16))}' + f'{Mv.to(y-1, sort_pos)}{THEME.proc_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("<")} {THEME.title(CONFIG.proc_sorting)} ' + f'{THEME.hi_fg(">")}{Fx.ub}{THEME.proc_box(Symbol.title_right)}' + f'{Mv.to(y-1, sort_pos - 6)}{THEME.proc_box(Symbol.title_left)}{Fx.b if CONFIG.proc_tree else ""}' + f'{THEME.title("tre")}{THEME.hi_fg("e")}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') + if w > 50: + Key.mouse["r"] = [[sort_pos - 14 + i, y-1] for i in range(7)] + out_misc += (f'{Mv.to(y-1, sort_pos - 15)}{THEME.proc_box(Symbol.title_left)}{Fx.b if CONFIG.proc_reversed else ""}' + f'{THEME.hi_fg("r")}{THEME.title("everse")}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') + if w > 58: + Key.mouse["c"] = [[sort_pos - 22 + i, y-1] for i in range(6)] + out_misc += (f'{Mv.to(y-1, sort_pos - 23)}{THEME.proc_box(Symbol.title_left)}{Fx.b if CONFIG.proc_colors else ""}' + f'{THEME.hi_fg("c")}{THEME.title("olors")}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') + if w > 68: + Key.mouse["g"] = [[sort_pos - 32 + i, y-1] for i in range(8)] + out_misc += (f'{Mv.to(y-1, sort_pos - 33)}{THEME.proc_box(Symbol.title_left)}{Fx.b if CONFIG.proc_gradient else ""}{THEME.hi_fg("g")}' + f'{THEME.title("radient")}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') + if w > 78: + Key.mouse["o"] = [[sort_pos - 42 + i, y-1] for i in range(8)] + out_misc += (f'{Mv.to(y-1, sort_pos - 43)}{THEME.proc_box(Symbol.title_left)}{Fx.b if CONFIG.proc_per_core else ""}' + f'{THEME.title("per-c")}{THEME.hi_fg("o")}{THEME.title("re")}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') + + Draw.buffer("proc_misc", out_misc, only_save=True) + + selected: str = CONFIG.proc_sorting + if selected == "memory": selected = "mem" + if selected == "threads" and not CONFIG.proc_tree and not arg_len: selected = "tr" + if CONFIG.proc_tree: + out += f'{THEME.title}{Fx.b}{Mv.to(y, x)}{" Tree:":<{tree_len-2}}' "Threads: " f'{"User:":<9}Mem%{"Cpu%":>11}{Fx.ub}{THEME.main_fg}' + if selected in ["program", "arguments"]: selected = "tree" + else: + out += (f'{THEME.title}{Fx.b}{Mv.to(y, x)}{"Pid:":>7} {"Program:" if prog_len > 8 else "Prg:":<{prog_len}}' + (f'{"Arguments:":<{arg_len-4}}' if arg_len else "") + + f'{"Threads:" if arg_len else " Tr:"} {"User:":<9}Mem%{"Cpu%":>11}{Fx.ub}{THEME.main_fg}') + if selected == "program" and prog_len <= 8: selected = "prg" + selected = selected.split(" ")[0].capitalize() + out = out.replace(selected, f'{Fx.u}{selected}{Fx.uu}') + cy = 1 + + if cls.start > proc.num_procs - cls.select_max + 1: cls.start = proc.num_procs - cls.select_max + 1 if proc.num_procs > cls.select_max else 0 + if cls.selected > cls.select_max: cls.selected = cls.select_max + + for n, (pid, items) in enumerate(proc.processes.items(), start=1): + if n < cls.start: continue + l_count += 1 + if l_count == cls.selected: + is_selected = True + cls.selected_pid = pid + else: is_selected = False + name, cmd, threads, username, mem, cpu = items.values() + if cpu > 1.0 or pid in Graphs.pid_cpu: + if pid not in Graphs.pid_cpu: + Graphs.pid_cpu[pid] = Graph(5, 1, None, [0]) + cls.pid_counter[pid] = 0 + elif cpu < 1.0: + cls.pid_counter[pid] += 1 + if cls.pid_counter[pid] > 10: + del cls.pid_counter[pid], Graphs.pid_cpu[pid] + else: + cls.pid_counter[pid] = 0 + + if CONFIG.proc_tree: + indent = name + name = cmd + offset = tree_len - len(f'{indent}{pid}') + if offset < 1: offset = 0 + indent = f'{indent:.{tree_len - len(str(pid))}}' + else: + offset = prog_len - 1 + end = f'{THEME.main_fg}{Fx.ub}' if CONFIG.proc_colors else Fx.ub + if cls.selected > cy: calc = cls.selected - cy + elif cls.selected > 0 and cls.selected <= cy: calc = cy - cls.selected + else: calc = cy + if CONFIG.proc_colors and not is_selected: + vals = [] + for v in [int(cpu), int(mem), int(threads // 3)]: + if CONFIG.proc_gradient: + val = ((v if v <= 100 else 100) + 100) - calc * 100 // cls.select_max + vals += [f'{THEME.gradient["proc_color" if val < 100 else "cpu"][val if val < 100 else val - 100]}'] + else: + vals += [f'{THEME.gradient["cpu"][v if v <= 100 else 100]}'] + c_color, m_color, t_color = vals + if CONFIG.proc_gradient and not is_selected: + g_color = f'{THEME.gradient["proc"][calc * 100 // cls.select_max]}' + if is_selected: + c_color = m_color = t_color = g_color = end = "" + out += f'{THEME.selected_bg}{THEME.selected_fg}{Fx.b}' + + out += (f'{Mv.to(y+cy, x)}{g_color}{indent}{pid:>{(1 if CONFIG.proc_tree else 7)}} ' + + f'{c_color}{name:<{offset}.{offset}} {end}' + + (f'{g_color}{cmd:<{arg_len}.{arg_len-1}}' if arg_len else "") + + t_color + (f'{threads:>4} ' if threads < 1000 else "999> ") + end + + g_color + (f'{username:<9.9}' if len(username) < 10 else f'{username[:8]:<8}+') + + m_color + (f'{mem:>4.1f}' if mem < 100 else f'{mem:>4.0f} ') + end + + f' {THEME.inactive_fg}{"⡀"*5}{THEME.main_fg}{g_color}{c_color}' + (f' {cpu:>4.1f} ' if cpu < 100 else f'{cpu:>5.0f} ') + end) + if pid in Graphs.pid_cpu: + if is_selected: c_color = THEME.proc_misc + out += f'{Mv.to(y+cy, x + w - 11)}{c_color if CONFIG.proc_colors else THEME.proc_misc}{Graphs.pid_cpu[pid](None if cls.moved else round(cpu))}{THEME.main_fg}' + if selected: out += f'{Fx.ub}{Term.fg}{Term.bg}' + + cy += 1 + if cy == h: break + if cy < h: + for i in range(h-cy): + out += f'{Mv.to(y+cy+i, x)}{" " * w}' + loc_string = f'{cls.start + cls.selected - 1}/{len(proc.processes)}' + out += (f'{Mv.to(y+h, x + w - 15 - len(loc_string))}{THEME.proc_box}{Symbol.h_line*10}{Symbol.title_left}{THEME.title}' + f'{Fx.b}{loc_string}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') + + cls.count += 1 + if cls.count == 100: + cls.count == 0 + for p in list(cls.pid_counter): + if not psutil.pid_exists(p): + del cls.pid_counter[p], Graphs.pid_cpu[p] + Draw.buffer(cls.buffer, f'{out}{out_misc}{Term.fg}', only_save=Menu.active) + cls.redraw = cls.resized = cls.moved = False + class Collector: '''Data collector master class * .start(): Starts collector thread @@ -1660,6 +1983,8 @@ class Collector: stopping: bool = False started: bool = False draw_now: bool = False + redraw: bool = False + only_draw: bool = False thread: threading.Thread collect_run = threading.Event() collect_idle = threading.Event() @@ -1667,7 +1992,7 @@ class Collector: collect_done = threading.Event() collect_queue: List = [] collect_interrupt: bool = False - resize_interrupt: bool = False + use_draw_list: bool = False @classmethod def start(cls): @@ -1703,22 +2028,19 @@ class Collector: cls.collect_interrupt = False cls.collect_run.clear() cls.collect_idle.clear() + cls.collect_done.clear() if DEBUG and not debugged: TimeIt.start("Collect and draw") while cls.collect_queue: collector = cls.collect_queue.pop() - collector._collect() + if not cls.only_draw: + collector._collect() collector._draw() - draw_buffers.append(collector.buffer) + if cls.use_draw_list: draw_buffers.append(collector.buffer) if cls.collect_interrupt: break - if cls.resize_interrupt: - while Term.resized: - sleep(0.01) - cls.resize_interrupt = False - cls.collect_run.set() - continue if DEBUG and not debugged: TimeIt.stop("Collect and draw"); debugged = True if cls.draw_now and not Menu.active and not cls.collect_interrupt: - Draw.out(*draw_buffers) + if cls.use_draw_list: Draw.out(*draw_buffers) + else: Draw.out() cls.collect_idle.set() cls.collect_done.set() except Exception as e: @@ -1727,25 +2049,23 @@ class Collector: clean_quit(1, thread=True) @classmethod - def collect(cls, *collectors, draw_now: bool = True, interrupt: bool = False): + def collect(cls, *collectors, draw_now: bool = True, interrupt: bool = False, redraw: bool = False, only_draw: bool = False): '''Setup collect queue for _runner''' - #* Set interrupt flag if True to stop _runner prematurely cls.collect_interrupt = interrupt - #* Wait for _runner to finish cls.collect_idle.wait() - #* Reset interrupt flag cls.collect_interrupt = False - #* Set draw_now flag if True to draw to screen instead of buffer + cls.use_draw_list = False cls.draw_now = draw_now - #* Append any collector given as argument to _runner queue + cls.redraw = redraw + cls.only_draw = only_draw + if collectors: cls.collect_queue = [*collectors] + cls.use_draw_list = True - #* Add all collectors to _runner queue if no collectors in argument else: cls.collect_queue = list(cls.__subclasses__()) - #* Set run flag to start _runner cls.collect_run.set() @@ -2051,11 +2371,13 @@ class MemCollector(Collector): MemBox._draw_fg() class NetCollector(Collector): + '''Collects network stats''' buffer: str = NetBox.buffer nics: List[str] = [] nic_i: int = 0 nic: str = "" new_nic: str = "" + reset: bool = False graph_raise: Dict[str, int] = {"download" : 5, "upload" : 5} graph_lower: Dict[str, int] = {"download" : 5, "upload" : 5} min_top: int = 10<<10 @@ -2089,7 +2411,6 @@ class NetCollector(Collector): elif cls.nic_i < 0: cls.nic_i = len(cls.nics) - 1 cls.new_nic = cls.nics[cls.nic_i] cls.switched = True - NetBox.redraw = True @classmethod def _collect(cls): @@ -2128,10 +2449,22 @@ class NetCollector(Collector): stat["last"] = stat["total"] speed = stat["speed"][-1] + if stat["offset"] and stat["offset"] > stat["total"]: + cls.reset = True + + if cls.reset: + if not stat["offset"]: + stat["offset"] = stat["total"] + else: + stat["offset"] = 0 + if direction == "upload": + cls.reset = False + NetBox.redraw = True + if len(stat["speed"]) > NetBox.width * 2: del stat["speed"][0] - strings["total"] = floating_humanizer(stat["total"]) + strings["total"] = floating_humanizer(stat["total"] - stat["offset"]) strings["byte_ps"] = floating_humanizer(stat["speed"][-1], per_second=True) strings["bit_ps"] = floating_humanizer(stat["speed"][-1], bit=True, per_second=True) @@ -2166,17 +2499,291 @@ class NetCollector(Collector): NetBox._draw_fg() -#class ProcCollector(Collector): #! add interrupt on _collect and _draw +class ProcCollector(Collector): #! add interrupt on _collect and _draw + '''Collects process stats''' + buffer: str = ProcBox.buffer + search_filter: str = "" + processes: Dict = {} + num_procs: int = 0 + proc_dict: Dict = {} + p_values: List[str] = ["pid", "name", "cmdline", "num_threads", "username", "memory_percent", "cpu_percent", "cpu_times", "create_time"] + sorting_index: int = Config.sorting_options.index(CONFIG.proc_sorting) + sort_expr: Dict = {} + sort_expr["pid"] = compile("p.info['pid']", "str", "eval") + sort_expr["program"] = compile("p.info['name']", "str", "eval") + sort_expr["arguments"] = compile("' '.join(str(p.info['cmdline'])) or p.info['name']", "str", "eval") + sort_expr["threads"] = compile("str(p.info['num_threads'])", "str", "eval") + sort_expr["user"] = compile("p.info['username']", "str", "eval") + sort_expr["memory"] = compile("str(p.info['memory_percent'])", "str", "eval") + sort_expr["cpu lazy"] = compile("(sum(p.info['cpu_times'][:2] if not p.info['cpu_times'] == 0.0 else [0.0, 0.0]) * 1000 / (time() - p.info['create_time']))", "str", "eval") + sort_expr["cpu responsive"] = compile("(p.info['cpu_percent'] if CONFIG.proc_per_core else (p.info['cpu_percent'] / THREADS))", "str", "eval") + + @classmethod + def _collect(cls): + '''List all processess with pid, name, arguments, threads, username, memory percent and cpu percent''' + out: Dict = {} + sorting: str = CONFIG.proc_sorting + reverse: bool = not CONFIG.proc_reversed + proc_per_cpu: bool = CONFIG.proc_per_core + search: str = cls.search_filter + err: float = 0.0 + n: int = 0 + + if CONFIG.proc_tree and sorting == "arguments": + sorting = "program" + + sort_cmd = cls.sort_expr[sorting] + + if CONFIG.proc_tree: + cls._tree(sort_cmd=sort_cmd, reverse=reverse, proc_per_cpu=proc_per_cpu, search=search) + return + + for p in sorted(psutil.process_iter(cls.p_values, err), key=lambda p: eval(sort_cmd), reverse=reverse): + if cls.collect_interrupt: + return + if p.info["name"] == "idle" or p.info["name"] == err or p.info["pid"] == err: + continue + if p.info["cmdline"] == err: + p.info["cmdline"] = "" + if p.info["username"] == err: + p.info["username"] = "" + if p.info["num_threads"] == err: + p.info["num_threads"] = 0 + if search: + for value in [ p.info["name"], " ".join(p.info["cmdline"]), str(p.info["pid"]), p.info["username"] ]: + for s in search.split(","): + if s.strip() in value: + break + else: continue + break + else: continue + + cpu = p.info["cpu_percent"] if proc_per_cpu else (p.info["cpu_percent"] / psutil.cpu_count()) + mem = p.info["memory_percent"] + + cmd = " ".join(p.info["cmdline"]) or "[" + p.info["name"] + "]" + + out[p.info["pid"]] = { + "name" : p.info["name"], + "cmd" : cmd, + "threads" : p.info["num_threads"], + "username" : p.info["username"], + "mem" : mem, + "cpu" : cpu } + + n += 1 + + cls.num_procs = n + cls.processes = out.copy() + + @classmethod + def _tree(cls, sort_cmd, reverse: bool, proc_per_cpu: bool, search: str): + '''List all processess in a tree view with pid, name, threads, username, memory percent and cpu percent''' + out: Dict = {} + err: float = 0.0 + infolist: Dict = {} + tree = defaultdict(list) + n: int = 0 + for p in sorted(psutil.process_iter(cls.p_values, err), key=lambda p: eval(sort_cmd), reverse=reverse): + if cls.collect_interrupt: return + try: + tree[p.ppid()].append(p.pid) + except (psutil.NoSuchProcess, psutil.ZombieProcess): + pass + else: + infolist[p.pid] = p.info + n += 1 + cls.num_procs = n + if 0 in tree and 0 in tree[0]: + tree[0].remove(0) + + def create_tree(pid: int, tree: defaultdict, indent: str = "", inindent: str = " ", found: bool = False): + nonlocal infolist, proc_per_cpu, search, out + name: str; threads: int; username: str; mem: float; cpu: float + cont: bool = True + getinfo: Union[Dict, None] + if cls.collect_interrupt: return + try: + name = psutil.Process(pid).name() + if name == "idle": return + except psutil.Error: + pass + cont = False + if pid in infolist: + getinfo = infolist[pid] + else: + getinfo = None + + if search and not found: + for value in [ name, str(pid), getinfo["username"] if getinfo else "" ]: + for s in search.split(","): + if s.strip() in value: + found = True + break + else: continue + break + else: cont = False + if cont: + if getinfo: + if getinfo["num_threads"] == err: threads = 0 + else: threads = getinfo["num_threads"] + if getinfo["username"] == err: username = "" + else: username = getinfo["username"] + cpu = getinfo["cpu_percent"] if proc_per_cpu else (getinfo["cpu_percent"] / psutil.cpu_count()) + mem = p.info["memory_percent"] + else: + threads = 0 + username = "" + mem = cpu = 0.0 + + out[pid] = { + "indent" : inindent, + "name": name, + "threads" : threads, + "username" : username, + "mem" : mem, + "cpu" : cpu } + + if pid not in tree: + return + children = tree[pid][:-1] + for child in children: + create_tree(child, tree, indent + " │ ", indent + " ├─ ", found=found) + child = tree[pid][-1] + create_tree(child, tree, indent + " ", indent + " └─ ") + + create_tree(min(tree), tree) + + if cls.collect_interrupt: return + cls.processes = out.copy() + + @classmethod + def sorting(cls, key: str): + cls.sorting_index += 1 if key == "right" else -1 + if cls.sorting_index >= len(CONFIG.sorting_options): cls.sorting_index = 0 + elif cls.sorting_index < 0: cls.sorting_index = len(CONFIG.sorting_options) - 1 + CONFIG.proc_sorting = CONFIG.sorting_options[cls.sorting_index] + ProcBox.redraw = True + + @classmethod + def _draw(cls): + ProcBox._draw_fg() class Menu: - '''Holds the main menu and all submenus''' + '''Holds all menus''' active: bool = False + close: bool = False + resized: bool = True + menus: Dict[str, Dict[str, str]] = {} + menu_length: Dict[str, int] = {} + for name, menu in MENUS.items(): + menu_length[name] = len(menu["normal"][0]) + menus[name] = {} + for sel in ["normal", "selected"]: + menus[name][sel] = "" + for i in range(len(menu[sel])): + menus[name][sel] += Fx.trans(f'{Color.fg(MENU_COLORS[sel][i])}{menu[sel][i]}') + if i < len(menu[sel]) - 1: menus[name][sel] += f'{Mv.d(1)}{Mv.l(len(menu[sel][i]))}' + + @classmethod + def main(cls): + out: str = "" + banner: str = "" + redraw: bool = True + key: str = "" + mx: int = 0 + my: int = 0 + skip: bool = False + mouse_over: bool = False + mouse_items: Dict[str, Dict[str, int]] = {} + cls.active = True + cls.resized = True + menu_names: List[str] = list(cls.menus.keys()) + menu_index: int = 0 + menu_current: str = menu_names[0] + background = f'{THEME.inactive_fg}' + Fx.uncolor(f'{Draw.saved_buffer()}') + f'{Term.fg}' + + while not cls.close: + key = "" + if cls.resized: + banner = (f'{Banner.draw(Term.height // 2 - 10, center=True)}{Mv.d(1)}{Mv.l(46)}{Colors.black_bg}{Colors.default}{Fx.b}← esc' + f'{Mv.r(30)}{Fx.i}Version: {VERSION}{Fx.ui}{Fx.ub}{Term.bg}{Term.fg}') + cy = 0 + for name, menu in cls.menus.items(): + ypos = Term.height // 2 - 2 + cy + xpos = Term.width // 2 - (cls.menu_length[name] // 2) + mouse_items[name] = { "x1" : xpos, "x2" : xpos + cls.menu_length[name] - 1, "y1" : ypos, "y2" : ypos + 2 } + cy += 3 + redraw = True + cls.resized = False + + if redraw: + out = "" + for name, menu in cls.menus.items(): + out += f'{Mv.to(mouse_items[name]["y1"], mouse_items[name]["x1"])}{menu["selected" if name == menu_current else "normal"]}' + + if skip and redraw: + Draw.now(out) + elif not skip: + Draw.now(f'{background}{banner}{out}') + skip = redraw = False + + if Key.has_key() or Key.input_wait(Timer.left(), mouse=True): + if Key.mouse_moved(): + mx, my = Key.get_mouse() + for name, pos in mouse_items.items(): + if mx >= pos["x1"] and mx <= pos["x2"] and my >= pos["y1"] and my <= pos["y2"]: + mouse_over = True + if name != menu_current: + menu_current = name + menu_index = menu_names.index(name) + redraw = True + break + else: + mouse_over = False + else: + key = Key.get() + + if key == "mouse_click" and not mouse_over: + key = "m" + + if key == "q": + clean_quit() + elif key in ["escape", "m"]: + cls.close = True + break + elif key in ["up", "mouse_scroll_up", "shift_tab"]: + menu_index -= 1 + if menu_index < 0: menu_index = len(menu_names) - 1 + menu_current = menu_names[menu_index] + redraw = True + elif key in ["down", "mouse_scroll_down", "tab"]: + menu_index += 1 + if menu_index > len(menu_names) - 1: menu_index = 0 + menu_current = menu_names[menu_index] + redraw = True + elif key == "enter" or (key == "mouse_click" and mouse_over): + if menu_current == "quit": + clean_quit() + + if Timer.not_zero() and not cls.resized: + skip = True + else: + Collector.collect() + Collector.collect_done.wait(1) + background = f'{THEME.inactive_fg}' + Fx.uncolor(f'{Draw.saved_buffer()}') + f'{Term.fg}' + Timer.stamp() + + + Draw.now(f'{Draw.saved_buffer()}') + cls.active = False + cls.close = False + - #Draw.buffer("menubg", Draw.last_buffer, z=1000, uncolor=True) - #Draw.clear("menubg", last=True) class Timer: timestamp: float + return_zero = False @classmethod def stamp(cls): @@ -2184,6 +2791,9 @@ class Timer: @classmethod def not_zero(cls) -> bool: + if cls.return_zero: + cls.return_zero = False + return False if cls.timestamp + (CONFIG.update_ms / 1000) > time(): return True else: @@ -2195,7 +2805,9 @@ class Timer: @classmethod def finish(cls): + cls.return_zero = True cls.timestamp = time() - (CONFIG.update_ms / 1000) + Key.break_wait() @@ -2287,7 +2899,7 @@ def now_sleeping(signum, frame): """Reset terminal settings and stop background input read before putting to sleep""" Key.stop() Collector.stop() - Draw.now(Term.clear, Term.normal_screen, Term.show_cursor, Term.mouse_off) + Draw.now(Term.clear, Term.normal_screen, Term.show_cursor, Term.mouse_off, Term.mouse_direct_off) Term.echo(True) os.kill(os.getpid(), signal.SIGSTOP) @@ -2318,7 +2930,7 @@ def clean_quit(errcode: int = 0, errmsg: str = "", thread: bool = False): Key.stop() Collector.stop() if not errcode: CONFIG.save_config() - Draw.now(Term.clear, Term.normal_screen, Term.show_cursor, Term.mouse_off) + Draw.now(Term.clear, Term.normal_screen, Term.show_cursor, Term.mouse_off, Term.mouse_direct_off) Term.echo(True) if errcode == 0: errlog.info(f'Exiting. Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n') @@ -2367,32 +2979,76 @@ def floating_humanizer(value: Union[float, int], bit: bool = False, per_second: return out def process_keys(): + mouse_pos: Tuple[int, int] while Key.has_key(): key = Key.get() - if key == "q": + if key in ["mouse_scroll_up", "mouse_scroll_down"]: + mouse_pos = Key.get_mouse() + if mouse_pos[0] >= ProcBox.x and mouse_pos[1] >= ProcBox.y: + pass + else: key = "_null" + + + if key == "_null": + continue + elif key == "q": clean_quit() - if key == "+" and CONFIG.update_ms + 100 <= 86399900: + elif key == "+" and CONFIG.update_ms + 100 <= 86399900: CONFIG.update_ms += 100 Box.draw_update_ms() - if key == "-" and CONFIG.update_ms - 100 >= 100: + elif key == "-" and CONFIG.update_ms - 100 >= 100: CONFIG.update_ms -= 100 Box.draw_update_ms() - if key in ["b", "n"]: + elif key in ["b", "n"]: NetCollector.switch(key) + Collector.collect(NetCollector, redraw=True) + elif key in ["m", "escape"]: + Menu.main() + elif key == "z": + NetCollector.reset = not NetCollector.reset Collector.collect(NetCollector) - + elif key in ["left", "right"]: + ProcCollector.sorting(key) + Collector.collect(ProcCollector, interrupt=True, redraw=True) + elif key == "e": + CONFIG.proc_tree = not CONFIG.proc_tree + Collector.collect(ProcCollector, interrupt=True, redraw=True) + elif key == "r": + CONFIG.proc_reversed = not CONFIG.proc_reversed + Collector.collect(ProcCollector, interrupt=True, redraw=True) + elif key == "c": + CONFIG.proc_colors = not CONFIG.proc_colors + Collector.collect(ProcCollector, redraw=True, only_draw=True) + elif key == "g": + CONFIG.proc_gradient = not CONFIG.proc_gradient + Collector.collect(ProcCollector, redraw=True, only_draw=True) + elif key == "o": + CONFIG.proc_per_core = not CONFIG.proc_per_core + Collector.collect(ProcCollector, interrupt=True, redraw=True) + elif key == "h": + CONFIG.mem_graphs = not CONFIG.mem_graphs + Collector.collect(MemCollector, redraw=True) + elif key == "p": + CONFIG.swap_disk = not CONFIG.swap_disk + Collector.collect(MemCollector, redraw=True) + elif key in ["up", "down", "mouse_scroll_up", "mouse_scroll_down", "page_up", "page_down", "home", "end"]: + if ProcBox.selector(key): + Collector.collect(ProcCollector, only_draw=True) #? Main function ---------------------------------------------------------------------------------> def main(): + Term.refresh() Timer.stamp() while Timer.not_zero(): - if Key.input_wait(Timer.left()): + if Key.input_wait(Timer.left()) and not Menu.active: process_keys() Collector.collect() + # Draw.buffer("!mouse_pos", f'{Key.mouse_pos}', z=0) + #? Pre main -------------------------------------------------------------------------------------->