#!/usr/bin/env python3 # pylint: disable=not-callable, no-member # indent = tab # tab-size = 4 # Copyright 2020 Aristocratos (jakob@qvantnet.com) # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os, sys, threading, signal, re, subprocess, logging, logging.handlers from time import time, sleep, strftime, localtime from datetime import timedelta from _thread import interrupt_main from select import select from distutils.util import strtobool from string import Template from math import ceil, floor from random import randint from shutil import which from typing import List, Set, Dict, Tuple, Optional, Union, Any, Callable, ContextManager, Iterable, Type errors: List[str] = [] try: import fcntl, termios, tty except Exception as e: errors.append(f'{e}') try: import psutil # type: ignore except Exception as e: errors.append(f'{e}') SELF_START = time() SYSTEM: str if "linux" in sys.platform: SYSTEM = "Linux" elif "bsd" in sys.platform: SYSTEM = "BSD" elif "darwin" in sys.platform: SYSTEM = "MacOS" else: SYSTEM = "Other" if errors: print ("ERROR!") for error in errors: print(error) if SYSTEM == "Other": print("\nUnsupported platform!\n") else: print("\nInstall required modules!\n") quit(1) #? Variables -------------------------------------------------------------------------------------> BANNER_SRC: List[Tuple[str, str, str]] = [ ("#ffa50a", "#0fd7ff", "██████╗ ██████╗ ██╗ ██╗████████╗ ██████╗ ██████╗"), ("#f09800", "#00bfe6", "██╔══██╗██╔══██╗╚██╗ ██╔╝╚══██╔══╝██╔═══██╗██╔══██╗"), ("#db8b00", "#00a6c7", "██████╔╝██████╔╝ ╚████╔╝ ██║ ██║ ██║██████╔╝"), ("#c27b00", "#008ca8", "██╔══██╗██╔═══╝ ╚██╔╝ ██║ ██║ ██║██╔═══╝ "), ("#a86b00", "#006e85", "██████╔╝██║ ██║ ██║ ╚██████╔╝██║"), ("#000000", "#000000", "╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝"), ] VERSION: str = "0.4.1" #*?This is the template used to create the config file DEFAULT_CONF: Template = Template(f'#? Config file for bpytop v. {VERSION}' + ''' #* Color theme, looks for a .theme file in "~/.config/bpytop/themes" and "~/.config/bpytop/user_themes", "Default" for builtin default theme #* Corresponding folder with a trailing / needs to be appended, example: color_theme="user_themes/monokai" color_theme="$color_theme" #* Update time in milliseconds, increases automatically if set below internal loops processing time, recommended 2000 ms or above for better sample times for graphs. update_ms=$update_ms #* Processes sorting, "pid" "program" "arguments" "threads" "user" "memory" "cpu lazy" "cpu responsive", #* "cpu lazy" updates top process over time, "cpu responsive" updates top process directly. proc_sorting="$proc_sorting" #* Reverse sorting order, True or False. proc_reversed=$proc_reversed #* Show processes as a tree proc_tree=$proc_tree #* Check cpu temperature, needs "vcgencmd" on Raspberry Pi and "osx-cpu-temp" on MacOS X. check_temp=$check_temp #* Draw a clock at top of screen, formatting according to strftime, empty string to disable. draw_clock="$draw_clock" #* Update main ui in background when menus are showing, set this to false if the menus is flickering too much for comfort. background_update=$background_update #* Custom cpu model name, empty string to disable. custom_cpu_name="$custom_cpu_name" #* Show color gradient in process list, True or False. proc_gradient=$proc_gradient #* If process cpu usage should be of the core it's running on or usage of the total available cpu power. proc_per_core=$proc_per_core #* Optional filter for shown disks, should be last folder in path of a mountpoint, "root" replaces "/", separate multiple values with comma. #* Begin line with "exclude=" to change to exclude filter, oterwise defaults to "most include" filter. Example: disks_filter="exclude=boot, home" disks_filter="$disks_filter" #* Show graphs instead of meters for memory values. mem_graphs=$mem_graphs #* If swap memory should be shown in memory box. show_swap=$show_swap #* Show swap as a disk, ignores show_swap value above, inserts itself after first disk. swap_disk=$swap_disk #* If mem box should be split to also show disks info. show_disks=$show_disks #* Enable check for new version from github.com/aristocratos/bpytop at start. update_check=$update_check #* Set loglevel for "~/.config/bpytop/error.log" levels are: "CRITICAL" "ERROR" "WARNING" "INFO" "DEBUG". #* The level set includes all lower levels, i.e. "DEBUG" will show all logging info. log_level="$log_level" ''') CONFIG_DIR: str = f'{os.path.expanduser("~")}/.config/bpytop' if not os.path.isdir(CONFIG_DIR): try: os.makedirs(CONFIG_DIR) os.mkdir(f'{CONFIG_DIR}/themes') os.mkdir(f'{CONFIG_DIR}/user_themes') except PermissionError: print(f'ERROR!\nNo permission to write to "{CONFIG_DIR}" directory!') quit(1) CONFIG_FILE: str = f'{CONFIG_DIR}/bpytop.conf' THEME_DIR: str = f'{CONFIG_DIR}/themes' USER_THEME_DIR: str = f'{CONFIG_DIR}/user_themes' CORES: int = psutil.cpu_count(logical=False) or 1 THREADS: int = psutil.cpu_count(logical=True) or 1 THREAD_ERROR: int = 0 DEFAULT_THEME: Dict[str, str] = { "main_bg" : "", "main_fg" : "#cc", "title" : "#ee", "hi_fg" : "#90", "selected_bg" : "#7e2626", "selected_fg" : "#ee", "inactive_fg" : "#40", "proc_misc" : "#0de756", "cpu_box" : "#3d7b46", "mem_box" : "#8a882e", "net_box" : "#423ba5", "proc_box" : "#923535", "div_line" : "#30", "temp_start" : "#4897d4", "temp_mid" : "#5474e8", "temp_end" : "#ff40b6", "cpu_start" : "#50f095", "cpu_mid" : "#f2e266", "cpu_end" : "#fa1e1e", "free_start" : "#223014", "free_mid" : "#b5e685", "free_end" : "#dcff85", "cached_start" : "#0b1a29", "cached_mid" : "#74e6fc", "cached_end" : "#26c5ff", "available_start" : "#292107", "available_mid" : "#ffd77a", "available_end" : "#ffb814", "used_start" : "#3b1f1c", "used_mid" : "#d9626d", "used_end" : "#ff4769", "download_start" : "#231a63", "download_mid" : "#4f43a3", "download_end" : "#b0a9de", "upload_start" : "#510554", "upload_mid" : "#7d4180", "upload_end" : "#dcafde" } #? Units for floating_humanizer function UNITS: Dict[str, Tuple[str, ...]] = { "bit" : ("bit", "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", "Zib", "Yib", "Bib", "GEb"), "byte" : ("Byte", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB", "BiB", "GEB") } #? Setup error logger ----------------------------------------------------------------> try: errlog = logging.getLogger("ErrorLogger") errlog.setLevel(logging.DEBUG) eh = logging.handlers.RotatingFileHandler(f'{CONFIG_DIR}/error.log', maxBytes=1048576, backupCount=4) eh.setLevel(logging.DEBUG) eh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s: %(message)s", datefmt="%d/%m/%y (%X)")) errlog.addHandler(eh) except PermissionError: print(f'ERROR!\nNo permission to write to "{CONFIG_DIR}" directory!') quit(1) #! Timers, remove -----------------------------------------------------------------------> class Timer: timers: Dict[str, float] = {} paused: Dict[str, float] = {} @classmethod def start(cls, name): cls.timers[name] = time() @classmethod def pause(cls, name): if name in cls.timers: cls.paused[name] = time() - cls.timers[name] del cls.timers[name] @classmethod def stop(cls, name): if name in cls.timers: total: float = time() - cls.timers[name] del cls.timers[name] if name in cls.paused: total += cls.paused[name] del cls.paused[name] errlog.debug(f'{name} completed in {total:.6f} seconds') def timerd(func): def timed(*args, **kw): ts = time() out = func(*args, **kw) te = time() errlog.debug(f'{func.__name__} completed in {te - ts:.6f} seconds') return out return timed #! Timers, remove -----------------------------------------------------------------------< #? Set up config class and load config ------------------------------------------------> class Config: '''Holds all config variables and functions for loading from and saving to disk''' keys: List[str] = ["color_theme", "update_ms", "proc_sorting", "proc_reversed", "proc_tree", "check_temp", "draw_clock", "background_update", "custom_cpu_name", "proc_gradient", "proc_per_core", "disks_filter", "update_check", "log_level", "mem_graphs", "show_swap", "swap_disk", "show_disks"] conf_dict: Dict[str, Union[str, int, bool]] = {} color_theme: str = "Default" update_ms: int = 2500 proc_sorting: str = "cpu lazy" proc_reversed: bool = False proc_tree: bool = False check_temp: bool = True draw_clock: str = "%X" background_update: bool = True custom_cpu_name: str = "" proc_gradient: bool = True proc_per_core: bool = False disks_filter: str = "" update_check: bool = True mem_graphs: bool = True show_swap: bool = False swap_disk: bool = True show_disks: bool = True log_level: str = "WARNING" warnings: List[str] = [] sorting_options: List[str] = ["pid", "program", "arguments", "threads", "user", "memory", "cpu lazy", "cpu responsive"] log_levels: List[str] = ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"] changed: bool = False recreate: bool = False config_file: str = "" _initialized: bool = False def __init__(self, path: str): self.config_file = path conf: Dict[str, Union[str, int, bool]] = self.load_config() if not "version" in conf.keys(): self.recreate = True self.warnings.append(f'Config file malformatted or missing, will be recreated on exit!') elif conf["version"] != VERSION: self.recreate = True self.warnings.append(f'Config file version and bpytop version missmatch, will be recreated on exit!') for key in self.keys: if key in conf.keys() and conf[key] != "_error_": setattr(self, key, conf[key]) else: self.recreate = True self.conf_dict[key] = getattr(self, key) self._initialized = True def __setattr__(self, name, value): if self._initialized: object.__setattr__(self, "changed", True) object.__setattr__(self, name, value) if name not in ["_initialized", "recreate", "changed"]: self.conf_dict[name] = value def load_config(self) -> Dict[str, Union[str, int, bool]]: '''Load config from file, set correct types for values and return a dict''' new_config: Dict[str,Union[str, int, bool]] = {} if not os.path.isfile(self.config_file): return new_config try: with open(self.config_file, "r") as f: for line in f: line = line.strip() if line.startswith("#? Config"): new_config["version"] = line[line.find("v. ") + 3:] for key in self.keys: if line.startswith(key): line = line.lstrip(key + "=") if line.startswith('"'): line = line.strip('"') if type(getattr(self, key)) == int: try: new_config[key] = int(line) except ValueError: self.warnings.append(f'Config key "{key}" should be an integer!') if type(getattr(self, key)) == bool: try: new_config[key] = bool(strtobool(line)) except ValueError: self.warnings.append(f'Config key "{key}" can only be True or False!') if type(getattr(self, key)) == str: new_config[key] = str(line) except Exception as e: errlog.exception(str(e)) if "proc_sorting" in new_config and not new_config["proc_sorting"] in self.sorting_options: new_config["proc_sorting"] = "_error_" self.warnings.append(f'Config key "proc_sorted" didn\'t get an acceptable value!') if "log_level" in new_config and not new_config["log_level"] in self.log_levels: new_config["log_level"] = "_error_" self.warnings.append(f'Config key "log_level" didn\'t get an acceptable value!') return new_config def save_config(self): '''Save current config to config file if difference in values or version, creates a new file if not found''' if not self.changed and not self.recreate: return try: with open(self.config_file, "w" if os.path.isfile(self.config_file) else "x") as f: f.write(DEFAULT_CONF.substitute(self.conf_dict)) except Exception as e: errlog.exception(str(e)) try: CONFIG: Config = Config(CONFIG_FILE) errlog.setLevel(getattr(logging, CONFIG.log_level)) errlog.info(f'New instance of bpytop version {VERSION} started with pid {os.getpid()}') errlog.debug(f'Loglevel set to {CONFIG.log_level}') if CONFIG.warnings: for warning in CONFIG.warnings: errlog.warning(warning) CONFIG.warnings = [] except Exception as e: errlog.exception(f'{e}') quit(1) #? Classes ---------------------------------------------------------------------------------------> class Term: """Terminal info and commands""" width: int = os.get_terminal_size().columns #* Current terminal width in columns height: int = os.get_terminal_size().lines #* Current terminal height in lines resized: bool = False _w : int = 0 _h : int = 0 fg: str = "" #* Default foreground color bg: str = "" #* Default background color hide_cursor = "\033[?25l" #* Hide terminal cursor show_cursor = "\033[?25h" #* Show terminal cursor alt_screen = "\033[?1049h" #* Switch to alternate screen normal_screen = "\033[?1049l" #* Switch to normal screen clear = "\033[2J\033[0;0f" #* Clear screen and set cursor to position 0,0 @classmethod def refresh(cls, *args): """Update width, height and set resized flag if terminal has been resized""" if cls.resized == True: return cls._w, cls._h = os.get_terminal_size() while (cls._w, cls._h) != (cls.width, cls.height): cls.resized = True Collector.collect_interrupt = True cls.width, cls.height = cls._w, cls._h Draw.now(Term.clear) Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 3, "resizing")}{THEME.main_fg}{Fx.b}{Mv.r(12)}Width : {cls._w} Height: {cls._h}') while cls._w < 80 or cls._h < 24: Draw.now(Term.clear) Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 4, "warning")}{THEME.main_fg}{Fx.b}{Mv.r(12)}Width: {cls._w} Height: {cls._h}\ {Mv.to(cls._h // 2, cls._w // 2 - 23)}Width and Height needs to be at least 80x24!') sleep(0.3) cls._w, cls._h = os.get_terminal_size() sleep(0.3) cls._w, cls._h = os.get_terminal_size() cls.resized = False Box.calc_sizes() Box.draw_bg(now=True if not Init.running else False) @staticmethod def echo(on: bool): """Toggle input echo""" (iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = termios.tcgetattr(sys.stdin.fileno()) if on: lflag |= termios.ECHO # type: ignore else: lflag &= ~termios.ECHO # type: ignore new_attr = [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, new_attr) class Fx: """Text effects * trans(string: str): Replace whitespace with escape move right to not paint background in whitespace. * uncolor(string: str) : Removes all color and returns string with THEME.inactive_fg color.""" start = "\033[" #* Escape sequence start sep = ";" #* Escape sequence separator end = "m" #* Escape sequence end reset = rs = "\033[0m" #* Reset foreground/background color and text effects bold = b = "\033[1m" #* Bold on unbold = ub = "\033[22m" #* Bold off dark = d = "\033[2m" #* Dark on undark = ud = "\033[22m" #* Dark off italic = i = "\033[3m" #* Italic on unitalic = ui = "\033[23m" #* Italic off underline = u = "\033[4m" #* Underline on ununderline = uu = "\033[24m" #* Underline off blink = bl = "\033[5m" #* Blink on unblink = ubl = "\033[25m" #* Blink off strike = s = "\033[9m" #* Strike / crossed-out on unstrike = us = "\033[29m" #* Strike / crossed-out off color_re = re.compile(r"\033\[\d+;\d?;?\d*;?\d*;?\d*m") @staticmethod def trans(string: str): return string.replace(" ", "\033[1C") @classmethod def uncolor(cls, string: str) -> str: return f'{THEME.inactive_fg}{cls.color_re.sub("", string)}{Term.fg}' class Raw(object): """Set raw input mode for device""" def __init__(self, stream): self.stream = stream self.fd = self.stream.fileno() def __enter__(self): self.original_stty = termios.tcgetattr(self.stream) tty.setcbreak(self.stream) def __exit__(self, type, value, traceback): termios.tcsetattr(self.stream, termios.TCSANOW, self.original_stty) class Nonblocking(object): """Set nonblocking mode for device""" def __init__(self, stream): self.stream = stream self.fd = self.stream.fileno() def __enter__(self): self.orig_fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl | os.O_NONBLOCK) def __exit__(self, *args): fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl) class Mv: """Class with collection of cursor movement functions: .t[o](line, column) | .r[ight](columns) | .l[eft](columns) | .u[p](lines) | .d[own](lines) | .save() | .restore()""" @staticmethod def to(line: int, col: int) -> str: return f'\033[{line};{col}f' #* Move cursor to line, column @staticmethod def right(x: int) -> str: #* Move cursor right x columns return f'\033[{x}C' @staticmethod def left(x: int) -> str: #* Move cursor left x columns return f'\033[{x}D' @staticmethod def up(x: int) -> str: #* Move cursor up x lines return f'\033[{x}A' @staticmethod def down(x: int) -> str: #* Move cursor down x lines return f'\033[{x}B' save: str = "\033[s" #* Save cursor position restore: str = "\033[u" #* Restore saved cursor postion t = to r = right l = left u = up d = down class Key: """Handles the threaded input reader""" list: List[str] = [] new = threading.Event() idle = threading.Event() idle.set() stopping: bool = False started: bool = False reader: threading.Thread @classmethod def start(cls): cls.stopping = False cls.reader = threading.Thread(target=cls._get_key) cls.reader.start() cls.started = True @classmethod def stop(cls): if cls.started and cls.reader.is_alive(): cls.stopping = True try: cls.reader.join() except: pass @classmethod def last(cls) -> str: if cls.list: return cls.list.pop() else: return "" @classmethod def get(cls) -> str: if cls.list: return cls.list.pop(0) else: return "" @classmethod def has_key(cls) -> bool: if cls.list: return True else: return False @classmethod def clear(cls): cls.list = [] @classmethod def input_wait(cls, sec: float = 0.0) -> bool: '''Returns True if key is detected else waits out timer and returns False''' cls.new.wait(sec if sec > 0 else 0.0) if cls.new.is_set(): cls.new.clear() return True else: return False @classmethod def _get_key(cls): """Get a single key from stdin, convert to readable format and save to keys list. Meant to be run in it's own thread.""" input_key: str = "" clean_key: str = "" escape: Dict[Union[str, Tuple[str, str]], str] = { "\n" : "enter", ("\x7f", "\x08") : "backspace", ("[A", "OA") : "up", ("[B", "OB") : "down", ("[D", "OD") : "left", ("[C", "OC") : "right", "[2~" : "insert", "[3~" : "delete", "[H" : "home", "[F" : "end", "[5~" : "page_up", "[6~" : "page_down", "[Z" : "shift_tab", "OP" : "f1", "OQ" : "f2", "OR" : "f3", "OS" : "f4", "[15" : "f5", "[17" : "f6", "[18" : "f7", "[19" : "f8", "[20" : "f9", "[21" : "f10", "[23" : "f11", "[24" : "f12" } try: while not cls.stopping: with Raw(sys.stdin): if not select([sys.stdin], [], [], 0.1)[0]: #* Wait 100ms for input on stdin then restart loop to check for stop flag continue cls.idle.clear() #* Report IO block in progress to prevent Draw functions to get IO Block error input_key += sys.stdin.read(1) if input_key == "\033": #* If first character is a escape sequence read 5 more keys Draw.idle.wait() #* Wait for Draw function to finish if busy with Nonblocking(sys.stdin): #* Set non blocking to prevent read stall if less than 5 characters input_key += sys.stdin.read(5) if input_key == "\033": clean_key = "escape" #* Key is escape if only containing \033 elif input_key == "\\": clean_key = "\\" #* Clean up "\" to not return escaped else: for code in escape.keys(): #* Go trough dict of escape codes to get the cleaned key name if input_key.lstrip("\033").startswith(code): clean_key = escape[code] break else: #* If not found in escape dict and length of key is 1, assume regular character if len(input_key) == 1: clean_key = input_key if testing: errlog.debug(f'Input key: {repr(input_key)} Clean key: {clean_key}') #! Remove if clean_key: cls.list.append(clean_key) #* Store up to 10 keys in input queue for later processing if len(cls.list) > 10: del cls.list[0] clean_key = "" cls.new.set() #* Set threading event to interrupt main thread sleep input_key = "" cls.idle.set() #* Report IO blocking done except Exception as e: errlog.exception(f'Input thread failed with exception: {e}') cls.idle.set() cls.list.clear() clean_quit(1, thread=True) class Draw: '''Holds the draw buffer and manages IO blocking queue * .buffer([+]name[!], *args, append=False, now=False, z=100) : Add *args to buffer * - Adding "+" prefix to name sets append to True and appends to name's current string * - Adding "!" suffix to name sets now to True and print name's current string * .out(clear=False) : Print all strings in buffer, clear=True clear all buffers after * .now(*args) : Prints all arguments as a string * .clear(*names) : Clear named buffers, all if no argument * .last_screen() : Prints all saved buffers ''' strings: Dict[str, str] = {} z_order: Dict[str, int] = {} last: Dict[str, str] = {} idle = threading.Event() idle.set() @classmethod def now(cls, *args): '''Wait for input reader to be idle then print to screen''' Key.idle.wait() cls.idle.wait() cls.idle.clear() try: print(*args, sep="", end="", flush=True) except BlockingIOError: pass Key.idle.wait() print(*args, sep="", end="", flush=True) cls.idle.set() @classmethod def buffer(cls, name: str, *args: str, append: bool = False, now: bool = False, z: int = 100, save: bool = False, uncolor: bool = False): string: str = "" if name.startswith("+"): name = name.lstrip("+") append = True if name.endswith("!"): name = name.rstrip("!") now = True if not name in cls.z_order or z != 100: cls.z_order[name] = z if args: string = "".join(args) if uncolor: string = Fx.uncolor(string) if name not in cls.strings or not append: cls.strings[name] = "" if save: if name not in cls.last or not append: cls.last[name] = "" cls.last[name] = string else: cls.strings[name] += string if now: cls.now(string) @classmethod def out(cls, *names: str, clear = False): out: str = "" if not cls.strings: return if names: for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): if name in names: out += cls.strings[name] cls.last[name] = out if clear: del cls.strings[name] del cls.z_order[name] cls.now(out) else: for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): if name in cls.strings: out += cls.strings[name] cls.last[name] = out if clear: cls.strings = {} cls.now(out) @classmethod def last_buffer(cls) -> str: out: str = "" for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): if name in cls.last: out += cls.last[name] return out @classmethod def clear(cls, *names, last: bool = False): if names: for name in names: if name in cls.strings: del cls.strings[name] if last: del cls.z_order[name] if name in cls.last: del cls.last[name] else: cls.strings = {} if last: cls.last = {} class Color: '''Holds representations for a 24-bit color value __init__(color, depth="fg", default=False) -- color accepts 6 digit hexadecimal: string "#RRGGBB", 2 digit hexadecimal: string "#FF" or decimal RGB "255 255 255" as a string. -- depth accepts "fg" or "bg" __call__(*args) converts arguments to a string and apply color __str__ returns escape sequence to set color __iter__ returns iteration over red, green and blue in integer values of 0-255. * Values: .hexa: str | .dec: Tuple[int, int, int] | .red: int | .green: int | .blue: int | .depth: str | .escape: str ''' hexa: str; dec: Tuple[int, int, int]; red: int; green: int; blue: int; depth: str; escape: str; default: bool def __init__(self, color: str, depth: str = "fg", default: bool = False): self.depth = depth self.default = default try: if not color: self.dec = (-1, -1, -1) self.hexa = "" self.red = self.green = self.blue = -1 self.escape = "\033[49m" if depth == "bg" and default else "" return elif color.startswith("#"): self.hexa = color if len(self.hexa) == 3: self.hexa += self.hexa[1:3] + self.hexa[1:3] c = int(self.hexa[1:3], base=16) self.dec = (c, c, c) elif len(self.hexa) == 7: self.dec = (int(self.hexa[1:3], base=16), int(self.hexa[3:5], base=16), int(self.hexa[5:7], base=16)) else: raise ValueError(f'Incorrectly formatted hexadeciaml rgb string: {self.hexa}') else: c_t = tuple(map(int, color.split(" "))) if len(c_t) == 3: self.dec = c_t #type: ignore else: raise ValueError(f'RGB dec should be "0-255 0-255 0-255"') ct = self.dec[0] + self.dec[1] + self.dec[2] if ct > 255*3 or ct < 0: raise ValueError(f'RGB values out of range: {color}') except Exception as e: errlog.exception(str(e)) self.escape = "" return if self.dec and not self.hexa: self.hexa = f'{hex(self.dec[0]).lstrip("0x").zfill(2)}{hex(self.dec[1]).lstrip("0x").zfill(2)}{hex(self.dec[2]).lstrip("0x").zfill(2)}' if self.dec and self.hexa: self.red, self.green, self.blue = self.dec self.escape = f'\033[{38 if self.depth == "fg" else 48};2;{";".join(str(c) for c in self.dec)}m' def __str__(self) -> str: return self.escape def __repr__(self) -> str: return repr(self.escape) def __iter__(self) -> Iterable: for c in self.dec: yield c def __call__(self, *args: str) -> str: if len(args) == 0: return "" return f'{self.escape}{"".join(args)}{getattr(Term, self.depth)}' @staticmethod def escape_color(hexa: str = "", r: int = 0, g: int = 0, b: int = 0, depth: str = "fg") -> str: """Returns escape sequence to set color * accepts either 6 digit hexadecimal hexa="#RRGGBB", 2 digit hexadecimal: hexa="#FF" * or decimal RGB: r=0-255, g=0-255, b=0-255 * depth="fg" or "bg" """ dint: int = 38 if depth == "fg" else 48 color: str = "" if hexa: try: if len(hexa) == 3: c = int(hexa[1:], base=16) color = f'\033[{dint};2;{c};{c};{c}m' elif len(hexa) == 7: color = f'\033[{dint};2;{int(hexa[1:3], base=16)};{int(hexa[3:5], base=16)};{int(hexa[5:7], base=16)}m' except ValueError as e: errlog.exception(f'{e}') else: color = f'\033[{dint};2;{r};{g};{b}m' return color @classmethod def fg(cls, *args) -> str: if len(args) > 1: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="fg") else: return cls.escape_color(hexa=args[0], depth="fg") @classmethod def bg(cls, *args) -> str: if len(args) > 1: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="bg") else: return cls.escape_color(hexa=args[0], depth="bg") class Theme: '''__init__ accepts a dict containing { "color_element" : "color" }''' themes: Dict[str, str] = {} cached: Dict[str, Dict[str, str]] = { "Default" : DEFAULT_THEME } current: str = "" main_bg = main_fg = title = hi_fg = selected_bg = selected_fg = inactive_fg = proc_misc = cpu_box = mem_box = net_box = proc_box = div_line = temp_start = temp_mid = temp_end = cpu_start = cpu_mid = cpu_end = free_start = free_mid = free_end = cached_start = cached_mid = cached_end = available_start = available_mid = available_end = used_start = used_mid = used_end = download_start = download_mid = download_end = upload_start = upload_mid = upload_end = NotImplemented gradient: Dict[str, List[str]] = { "temp" : [], "cpu" : [], "free" : [], "cached" : [], "available" : [], "used" : [], "download" : [], "upload" : [] } def __init__(self, theme: str): self.refresh() self._load_theme(theme) def __call__(self, theme: str): for k in self.gradient.keys(): self.gradient[k] = [] self._load_theme(theme) def _load_theme(self, theme: str): tdict: Dict[str, str] if theme in self.cached: tdict = self.cached[theme] elif theme in self.themes: tdict = self._load_file(self.themes[theme]) self.cached[theme] = tdict else: errlog.warning(f'No theme named "{theme}" found!') theme = "Default" CONFIG.color_theme = theme tdict = DEFAULT_THEME self.current = theme #if CONFIG.color_theme != theme: CONFIG.color_theme = theme #* Get key names from DEFAULT_THEME dict to not leave any color unset if missing from theme dict for item, value in DEFAULT_THEME.items(): default = False if item not in ["main_fg", "main_bg"] else True depth = "fg" if item not in ["main_bg", "selected_bg"] else "bg" if item in tdict.keys(): setattr(self, item, Color(tdict[item], depth=depth, default=default)) else: setattr(self, item, Color(value, depth=depth, default=default)) #* Create color gradients from one, two or three colors, 101 values indexed 0-100 rgb: Dict[str, Tuple[int, int, int]] colors: List[List[int]] = [] for name in self.gradient.keys(): rgb = { "start" : getattr(self, f'{name}_start').dec, "mid" : getattr(self, f'{name}_mid').dec, "end" : getattr(self, f'{name}_end').dec } colors = [ list(getattr(self, f'{name}_start')) ] if rgb["end"][0] >= 0: r = 50 if rgb["mid"][0] >= 0 else 100 for first, second in ["start", "mid" if r == 50 else "end"], ["mid", "end"]: for i in range(r): colors += [[rgb[first][n] + i * (rgb[second][n] - rgb[first][n]) // r for n in range(3)]] if r == 100: break self.gradient[name] += [ Color.fg(*color) for color in colors ] else: c = Color.fg(*rgb["start"]) for _ in range(100): self.gradient[name] += [c] #* Set terminal colors Term.fg, Term.bg = self.main_fg, self.main_bg Draw.now(self.main_fg, self.main_bg) def refresh(self): '''Sets themes dict with names and paths to all found themes''' self.themes = { "Default" : "Default" } try: for d in (THEME_DIR, USER_THEME_DIR): for f in os.listdir(d): if f.endswith(".theme"): self.themes[f'{f[:-6] if d == THEME_DIR else f[:-6] + "*"}'] = f'{d}/{f}' except Exception as e: errlog.exception(str(e)) @staticmethod def _load_file(path: str) -> Dict[str, str]: '''Load a bashtop formatted theme file and return a dict''' new_theme: Dict[str, str] = {} try: with open(path) as f: for line in f: if not line.startswith("theme["): continue key = line[6:line.find("]")] s = line.find('"') value = line[s + 1:line.find('"', s + 1)] new_theme[key] = value except Exception as e: errlog.exception(str(e)) return new_theme class Banner: '''Holds the bpytop banner, .draw(line, [col=0], [center=False], [now=False])''' out: List[str] = [] c_color: str = "" length: int = 0 if not out: for num, (color, color2, line) in enumerate(BANNER_SRC): if len(line) > length: length = len(line) out_var = "" line_color = Color.fg(color) line_color2 = Color.fg(color2) line_dark = Color.fg(f'#{80 - num * 6}') for n, letter in enumerate(line): if letter == "█" and c_color != line_color: if n > 5 and n < 25: c_color = line_color2 else: c_color = line_color out_var += c_color elif letter == " ": letter = f'{Mv.r(1)}' c_color = "" elif letter != "█" and c_color != line_dark: c_color = line_dark out_var += line_dark out_var += letter out.append(out_var) @classmethod def draw(cls, line: int, col: int = 0, center: bool = False, now: bool = False): out: str = "" if center: col = Term.width // 2 - cls.length // 2 for n, o in enumerate(cls.out): out += f'{Mv.to(line + n, col)}{o}' out += f'{Term.fg}' if now: Draw.out(out) else: return out class Symbol: h_line: str = "─" v_line: str = "│" left_up: str = "┌" right_up: str = "┐" left_down: str = "└" right_down: str = "┘" title_left: str = "┤" title_right: str = "├" div_up: str = "┬" div_down: str = "┴" graph_up: Dict[float, str] = { 0.0 : " ", 0.1 : "⢀", 0.2 : "⢠", 0.3 : "⢰", 0.4 : "⢸", 1.0 : "⡀", 1.1 : "⣀", 1.2 : "⣠", 1.3 : "⣰", 1.4 : "⣸", 2.0 : "⡄", 2.1 : "⣄", 2.2 : "⣤", 2.3 : "⣴", 2.4 : "⣼", 3.0 : "⡆", 3.1 : "⣆", 3.2 : "⣦", 3.3 : "⣶", 3.4 : "⣾", 4.0 : "⡇", 4.1 : "⣇", 4.2 : "⣧", 4.3 : "⣷", 4.4 : "⣿" } graph_up_small = graph_up.copy() graph_up_small[0.0] = "\033[1C" graph_down: Dict[float, str] = { 0.0 : " ", 0.1 : "⠈", 0.2 : "⠘", 0.3 : "⠸", 0.4 : "⢸", 1.0 : "⠁", 1.1 : "⠉", 1.2 : "⠙", 1.3 : "⠹", 1.4 : "⢹", 2.0 : "⠃", 2.1 : "⠋", 2.2 : "⠛", 2.3 : "⠻", 2.4 : "⢻", 3.0 : "⠇", 3.1 : "⠏", 3.2 : "⠟", 3.3 : "⠿", 3.4 : "⢿", 4.0 : "⡇", 4.1 : "⡏", 4.2 : "⡟", 4.3 : "⡿", 4.4 : "⣿" } graph_down_small = graph_down.copy() graph_down_small[0.0] = "\033[1C" meter: str = "■" ok: str = f'{Color.fg("#30ff50")}√{Color.fg("#cc")}' fail: str = f'{Color.fg("#ff3050")}!{Color.fg("#cc")}' class Graph: '''Class for creating and adding to graphs * __str__ : returns graph as a string * add(value: int) : adds a value to graph and returns it as a string * __call__ : same as add ''' out: str width: int height: int graphs: Dict[bool, List[str]] colors: List[str] invert: bool max_value: int offset: int current: bool last: int symbol: Dict[float, str] def __init__(self, width: int, height: int, color: Union[List[str], Color, None], data: List[int], invert: bool = False, max_value: int = 0, offset: int = 0): self.graphs: Dict[bool, List[str]] = {False : [], True : []} self.current: bool = True self.colors: List[str] = [] if isinstance(color, list) and height > 1: for i in range(1, height + 1): self.colors.insert(0, color[i * 100 // height]) #* Calculate colors of graph if invert: self.colors.reverse() elif isinstance(color, Color) and height > 1: self.colors = [ f'{color}' for _ in range(height) ] else: if isinstance(color, list): self.colors = color elif isinstance(color, Color): self.colors = [ f'{color}' for _ in range(101) ] self.width = width self.height = height self.invert = invert self.offset = offset if not data: data = [0] if max_value: self.max_value = max_value data = [ (v + offset) * 100 // (max_value + offset) if v < max_value else 100 for v in data ] #* Convert values to percentage values of max_value with max_value as ceiling else: self.max_value = 0 if self.height == 1: self.symbol = Symbol.graph_down_small if invert else Symbol.graph_up_small else: self.symbol = Symbol.graph_down if invert else Symbol.graph_up value_width: int = ceil(len(data) / 2) filler: str = "" if value_width > width: #* If the size of given data set is bigger then width of graph, shrink data set data = data[-(width*2):] value_width = ceil(len(data) / 2) elif value_width < width: #* If the size of given data set is smaller then width of graph, fill graph with whitespace filler = self.symbol[0.0] * (width - value_width) if len(data) % 2 == 1: data.insert(0, 0) for _ in range(height): for b in [True, False]: self.graphs[b].append(filler) self._create(data, new=True) def _create(self, data: List[int], new: bool = False): h_high: int h_low: int value: Dict[str, int] = { "left" : 0, "right" : 0 } val: int side: str #* Create the graph for h in range(self.height): h_high = round(100 * (self.height - h) / self.height) if self.height > 1 else 100 h_low = round(100 * (self.height - (h + 1)) / self.height) if self.height > 1 else 0 for v in range(len(data)): if new: self.current = bool(v % 2) #* Switch between True and False graphs if new and v == 0: self.last = 0 for val, side in [self.last, "left"], [data[v], "right"]: # type: ignore if val >= h_high: value[side] = 4 elif val <= h_low: value[side] = 0 else: if self.height == 1: value[side] = round(val * 4 / 100 + 0.5) else: value[side] = round((val - h_low) * 4 / (h_high - h_low) + 0.1) if new: self.last = data[v] self.graphs[self.current][h] += self.symbol[float(value["left"] + value["right"] / 10)] if data: self.last = data[-1] self.out = "" if self.height == 1: self.out += f'{"" if not self.colors else self.colors[self.last]}{self.graphs[self.current][0]}' elif self.height > 1: for h in range(self.height): if h > 0: self.out += f'{Mv.d(1)}{Mv.l(self.width)}' self.out += f'{"" if not self.colors else self.colors[h]}{self.graphs[self.current][h if not self.invert else (self.height - 1) - h]}' if self.colors: self.out += f'{Term.fg}' def add(self, value: int) -> str: self.current = not self.current if self.height == 1: if self.graphs[self.current][0].startswith(self.symbol[0.0]): self.graphs[self.current][0] = self.graphs[self.current][0].replace(self.symbol[0.0], "", 1) else: self.graphs[self.current][0] = self.graphs[self.current][0][1:] else: for n in range(self.height): self.graphs[self.current][n] = self.graphs[self.current][n][1:] if self.max_value: value = (value + self.offset) * 100 // (self.max_value + self.offset) if value < self.max_value else 100 self._create([value]) return self.out def __call__(self, value: int) -> str: return self.add(value) def __str__(self): return self.out def __repr__(self): return repr(self.out) class Graphs: '''Holds all graphs and lists of graphs for dynamically created graphs''' cpu: Dict[str, Graph] = {} cores: List[Graph] = [NotImplemented] * THREADS temps: List[Graph] = [NotImplemented] * (THREADS + 1) net: Graph detailed_cpu: Graph detailed_mem: Graph pid_cpu: Dict[int, Graph] = {} class Meter: '''Creates a percentage meter __init__(value, width, theme, gradient_name) to create new meter __call__(value) to set value and return meter as a string __str__ returns last set meter as a string ''' out: str color_gradient: List[str] color_inactive: Color width: int saved: Dict[int, str] def __init__(self, value: int, width: int, gradient_name: str): self.color_gradient = THEME.gradient[gradient_name] self.color_inactive = THEME.inactive_fg self.width = width self.out = self._create(value) self.saved = { value : self.out } def __call__(self, value: int): if value in self.saved.keys(): self.out = self.saved[value] else: self.out = self._create(value) self.saved[value] = self.out return self.out def __str__(self): return self.out def __repr__(self): return repr(self.out) def _create(self, value: int): if value > 100: value = 100 elif value < 0: value = 100 out: str = "" for i in range(1, self.width + 1): if value >= round(i * 100 / self.width): out += f'{self.color_gradient[round(i * 100 / self.width)]}{Symbol.meter}' else: out += self.color_inactive(Symbol.meter * (self.width + 1 - i)) break else: out += f'{Term.fg}' return out class Meters: cpu: Meter mem: Dict[str, Union[Meter, Graph]] = {} swap: Dict[str, Union[Meter, Graph]] = {} disks_used: Dict[str, Meter] = {} disks_free: Dict[str, Meter] = {} class Box: '''Box class with all needed attributes for create_box() function''' name: str height_p: int width_p: int x: int y: int width: int height: int out: str bg: str _b_cpu_h: int _b_mem_h: int redraw_all: bool buffers: List[str] = [] resized: bool = False @classmethod def calc_sizes(cls): '''Calculate sizes of boxes''' for sub in cls.__subclasses__(): sub._calc_size() # type: ignore sub.resized = True # type: ignore @classmethod def draw_update_ms(cls, now: bool = True): Draw.buffer("update_ms!" if now and not Menu.active else "update_ms", f'{Mv.to(CpuBox.y - 1, CpuBox.x + CpuBox.width - len(str(CONFIG.update_ms)) - 7)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("+")} ' f'{THEME.title(str(CONFIG.update_ms))} {THEME.hi_fg("+")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}', save=True if Menu.active else False) if now and not Menu.active: Draw.clear("update_ms") @classmethod def draw_bg(cls, now: bool = True): '''Draw all boxes outlines and titles''' Draw.buffer("bg!" if now and not Menu.active else "bg", "".join(sub._draw_bg() for sub in cls.__subclasses__()), z=1000, save=True if Menu.active else False) # type: ignore if now and not Menu.active: Draw.clear("bg") cls.draw_update_ms(now=now) class SubBox: box_x: int = 0 box_y: int = 0 box_width: int = 0 box_height: int = 0 box_columns: int = 0 column_size: int = 0 class CpuBox(Box, SubBox): name = "cpu" x = 1 y = 1 height_p = 32 width_p = 100 resized: bool = True buffer: str = "cpu" Box.buffers.append(buffer) @classmethod def _calc_size(cls): cls.width = round(Term.width * cls.width_p / 100) cls.height = round(Term.height * cls.height_p / 100) Box._b_cpu_h = cls.height #THREADS = 64 cls.box_columns = ceil((THREADS + 1) / (cls.height - 5)) if cls.box_columns * (24 + 13 if CONFIG.check_temp else 24) < cls.width - (cls.width // 4): cls.column_size = 2 elif cls.box_columns * (19 + 6 if CONFIG.check_temp else 19) < cls.width - (cls.width // 4): cls.column_size = 1 elif cls.box_columns * (10 + 6 if CONFIG.check_temp else 10) < cls.width - (cls.width // 4): cls.column_size = 0 else: cls.box_columns = (cls.width - cls.width // 4) // (10 + 6 if CONFIG.check_temp else 10); cls.column_size = 0 if cls.column_size == 2: cls.box_width = (24 + 13 if CONFIG.check_temp else 24) * cls.box_columns - ((cls.box_columns - 1) * 1) elif cls.column_size == 1: cls.box_width = (19 + 6 if CONFIG.check_temp else 19) * cls.box_columns - ((cls.box_columns - 1) * 1) else: cls.box_width = (11 + 6 if CONFIG.check_temp else 11) * cls.box_columns + 1 cls.box_height = ceil(THREADS / cls.box_columns) + 4 if cls.box_height > cls.height - 2: cls.box_height = cls.height - 2 cls.box_x = (cls.width - 1) - cls.box_width cls.box_y = cls.y + ceil((cls.height - 2) / 2) - ceil(cls.box_height / 2) + 1 @classmethod def _draw_bg(cls) -> str: return (f'{create_box(box=cls, line_color=THEME.cpu_box)}' f'{Mv.to(cls.y, cls.x + 10)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("m")}{THEME.title("enu")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}' f'{create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title=CPU_NAME[:18 if CONFIG.check_temp else 9])}') @classmethod def _draw_fg(cls): cpu = CpuCollector out: str = "" lavg: str = "" x, y, w, h = cls.x + 1, cls.y + 1, cls.width - 2, cls.height - 2 bx, by, bw, bh = cls.box_x + 1, cls.box_y + 1, cls.box_width - 2, cls.box_height - 2 hh: int = ceil(h / 2) if cls.resized: Graphs.cpu["up"] = Graph(w - bw - 3, hh, THEME.gradient["cpu"], cpu.cpu_usage[0]) Graphs.cpu["down"] = Graph(w - bw - 3, h - hh, THEME.gradient["cpu"], cpu.cpu_usage[0], invert=True) Meters.cpu = Meter(cpu.cpu_usage[0][-1], (bw - 9 - 13 if CONFIG.check_temp else bw - 9), "cpu") if cls.column_size > 0: for n in range(THREADS): Graphs.cores[n] = Graph(5 * cls.column_size, 1, None, cpu.cpu_usage[n + 1]) if CONFIG.check_temp: Graphs.temps[0] = Graph(5, 1, None, cpu.cpu_temp[0], max_value=cpu.cpu_temp_crit, offset=-23) if cls.column_size > 1: for n in range(1, THREADS + 1): Graphs.temps[n] = Graph(5, 1, None, cpu.cpu_temp[n], max_value=cpu.cpu_temp_crit, offset=-23) cx = cy = cc = 0 ccw = (bw + 1) // cls.box_columns freq: str = f'{cpu.cpu_freq} Mhz' if cpu.cpu_freq < 1000 else f'{float(cpu.cpu_freq / 1000):.1f} GHz' out += (f'{Mv.to(by - 1, bx + bw - 9)}{THEME.div_line(Symbol.title_left)}{Fx.b}{THEME.title(freq)}{Fx.ub}{THEME.div_line(Symbol.title_right)}' f'{Mv.to(y, x)}{Graphs.cpu["up"](cpu.cpu_usage[0][-1])}{Mv.to(y + hh, x)}{Graphs.cpu["down"](cpu.cpu_usage[0][-1])}' f'{THEME.main_fg}{Mv.to(by + cy, bx + cx)}{"CPU "}{Meters.cpu(cpu.cpu_usage[0][-1])}' f'{THEME.gradient["cpu"][cpu.cpu_usage[0][-1]]}{cpu.cpu_usage[0][-1]:>4}{THEME.main_fg}%') if CONFIG.check_temp: out += (f'{THEME.inactive_fg} ⡀⡀⡀⡀⡀{Mv.l(5)}{THEME.gradient["temp"][cpu.cpu_temp[0][-1]]}{Graphs.temps[0](cpu.cpu_temp[0][-1])}' f'{cpu.cpu_temp[0][-1]:>4}{THEME.main_fg}°C') cy += 1 for n in range(1, THREADS + 1): out += f'{THEME.main_fg}{Mv.to(by + cy, bx + cx)}{"Core" + str(n):<{7 if cls.column_size > 0 else 5}}' if cls.column_size > 0: out += f'{THEME.inactive_fg}{"⡀" * (5 * cls.column_size)}{Mv.l(5 * cls.column_size)}{THEME.gradient["cpu"][cpu.cpu_usage[n][-1]]}{Graphs.cores[n-1](cpu.cpu_usage[n][-1])}' else: out += f'{THEME.gradient["cpu"][cpu.cpu_usage[n][-1]]}' out += f'{cpu.cpu_usage[n][-1]:>4}{THEME.main_fg}%' if CONFIG.check_temp: if cls.column_size > 1: out += f'{THEME.inactive_fg} ⡀⡀⡀⡀⡀{Mv.l(5)}{THEME.gradient["temp"][cpu.cpu_temp[n][-1]]}{Graphs.temps[n](cpu.cpu_temp[n][-1])}' else: out += f'{THEME.gradient["temp"][cpu.cpu_temp[n][-1]]}' out += f'{cpu.cpu_temp[n][-1]:>4}{THEME.main_fg}°C' cy += 1 if cy == bh: cc += 1; cy = 1; cx = ccw * cc if cc == cls.box_columns: break if cy < bh - 1: cy = bh - 1 if cls.column_size == 2 and CONFIG.check_temp: lavg = f'Load Average: {" ".join(str(l) for l in cpu.load_avg):^18.18}' elif cls.column_size == 2 or (cls.column_size == 1 and CONFIG.check_temp): lavg = f'L-AVG: {" ".join(str(l) for l in cpu.load_avg):^14.14}' else: lavg = f'{" ".join(str(round(l, 1)) for l in cpu.load_avg):^11.11}' out += f'{Mv.to(by + cy, bx + cx)}{THEME.main_fg}{lavg}' out += f'{Mv.to(y + h - 1, x + 1)}{THEME.inactive_fg}up {cpu.uptime}' Draw.buffer(cls.buffer, f'{out}{Term.fg}', save=Menu.active) cls.resized = False class MemBox(Box): name = "mem" height_p = 40 width_p = 45 x = 1 y = 1 mem_meter: int = 0 mem_size: int = 0 disk_meter: int = 0 divider: int = 0 mem_width: int = 0 disks_width: int = 0 graph_height: int resized: bool = True redraw: bool = False buffer: str = "mem" swap_on: bool = CONFIG.show_swap Box.buffers.append(buffer) mem_names: List[str] = ["used", "available", "cached", "free"] swap_names: List[str] = ["used", "free"] @classmethod def _calc_size(cls): cls.width = round(Term.width * cls.width_p / 100) cls.height = round(Term.height * cls.height_p / 100) + 1 Box._b_mem_h = cls.height cls.y = Box._b_cpu_h + 1 if CONFIG.show_disks: cls.mem_width = ceil((cls.width - 3) / 2) cls.disks_width = cls.width - cls.mem_width - 3 if cls.mem_width + cls.disks_width < cls.width - 2: cls.mem_width += 1 cls.divider = cls.x + cls.mem_width else: cls.mem_width = cls.width - 1 if cls.height - (3 if cls.swap_on and not CONFIG.swap_disk else 2) > 2 * (6 if cls.swap_on and not CONFIG.swap_disk else 4): cls.mem_size = 3 elif cls.mem_width > 25: cls.mem_size = 2 else: cls.mem_size = 1 cls.mem_meter = cls.width - (cls.disks_width if CONFIG.show_disks else 0) - (9 if cls.mem_size > 2 else 20) if cls.mem_size == 1: cls.mem_meter += 6 if cls.mem_meter < 1: cls.mem_meter = 0 if CONFIG.mem_graphs: cls.graph_height = round(((cls.height - (2 if cls.swap_on and not CONFIG.swap_disk else 1)) - (2 if cls.mem_size == 3 else 1) * (6 if cls.swap_on and not CONFIG.swap_disk else 4)) / (6 if cls.swap_on and not CONFIG.swap_disk else 4)) if cls.graph_height == 0: cls.graph_height = 1 if cls.graph_height > 1: cls.mem_meter += 6 else: cls.graph_height = 0 if CONFIG.show_disks: cls.disk_meter = cls.width - cls.mem_width - 23 if cls.disks_width < 25: cls.disk_meter += 10 if cls.disk_meter < 1: cls.disk_meter = 0 @classmethod def _draw_bg(cls) -> str: out: str = "" out += f'{create_box(box=cls, line_color=THEME.mem_box)}' if CONFIG.show_disks: out += (f'{Mv.to(cls.y, cls.divider + 2)}{THEME.mem_box(Symbol.title_left)}{Fx.b}{THEME.title("disks")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}' f'{Mv.to(cls.y, cls.divider)}{THEME.mem_box(Symbol.div_up)}' f'{Mv.to(cls.y + cls.height - 1, cls.divider)}{THEME.mem_box(Symbol.div_down)}{THEME.div_line}' f'{"".join(f"{Mv.to(cls.y + i, cls.divider)}{Symbol.v_line}" for i in range(1, cls.height - 1))}') return out @classmethod def _draw_fg(cls): mem = MemCollector out: str = "" gbg: str = "" gmv: str = "" gli: str = "" x, y, h = cls.x + 1, cls.y + 1, cls.height - 2 if cls.resized: Meters.mem = {} Meters.swap = {} Meters.disks_used = {} Meters.disks_free = {} if cls.mem_meter > 0: for name in cls.mem_names: if CONFIG.mem_graphs: Meters.mem[name] = Graph(cls.mem_meter, cls.graph_height, THEME.gradient[name], mem.vlist[name]) else: Meters.mem[name] = Meter(mem.percent[name], cls.mem_meter, name) if cls.swap_on: for name in cls.swap_names: if CONFIG.mem_graphs and not CONFIG.swap_disk: Meters.swap[name] = Graph(cls.mem_meter, cls.graph_height, THEME.gradient[name], mem.swap_vlist[name]) elif CONFIG.swap_disk: Meters.disks_used["__swap"] = Meter(mem.swap_percent["used"], cls.disk_meter, "used") if len(mem.disks) * 3 <= h + 1: Meters.disks_free["__swap"] = Meter(mem.swap_percent["free"], cls.disk_meter, "free") break else: Meters.swap[name] = Meter(mem.swap_percent[name], cls.mem_meter, name) if cls.disk_meter > 0: for name in mem.disks.keys(): Meters.disks_used[name] = Meter(mem.disks[name]["used_percent"], cls.disk_meter, "used") if len(mem.disks) * 3 <= h + 1: Meters.disks_free[name] = Meter(mem.disks[name]["free_percent"], cls.disk_meter, "free") #* Mem out += f'{Mv.to(y, x+1)}{THEME.title}{Fx.b}Total:{mem.string["total"]:>{cls.mem_width - 9}}{Fx.ub}{THEME.main_fg}' cx = 1; cy = 1 # if cls.graph_height == 1: # gbg = f'{THEME.inactive_fg}{"⡀" * cls.mem_meter}{Mv.l(cls.mem_meter)}{THEME.main_fg}' if cls.graph_height > 0: gli = f'{Mv.l(2)}{THEME.mem_box(Symbol.title_right)}{THEME.div_line}{Symbol.h_line * (cls.mem_width - 1)}{"" if CONFIG.show_disks else THEME.mem_box}{Symbol.title_left}{Mv.l(cls.mem_width - 1)}{THEME.title}' if cls.graph_height >= 2: gbg = f'{Mv.l(1)}' gmv = f'{Mv.l(cls.mem_width - 2)}{Mv.u(cls.graph_height - 1)}' for name in cls.mem_names: if cls.mem_size > 2: out += (f'{Mv.to(y+cy, x+cx)}{gli}{name.capitalize()[:None if cls.mem_width > 21 else 5]+":":<{1 if cls.mem_width > 21 else 6.6}}{Mv.to(y+cy, x+cx + cls.mem_width - 3 - (len(mem.string[name])))}{mem.string[name]}' f'{Mv.to(y+cy+1, x+cx)}{gbg}{Meters.mem[name](mem.percent[name])}{gmv}{str(mem.percent[name])+"%":>4}'); cy += 2 if not cls.graph_height else cls.graph_height + 1 else: out += f'{Mv.to(y+cy, x+cx)}{name.capitalize():{5.5 if cls.mem_size > 1 else 1.1}} {gbg}{Meters.mem[name](mem.percent[name])}{mem.string[name][:None if cls.mem_size > 1 else -2]:>{9 if cls.mem_size > 1 else 7}}'; cy += 1 if not cls.graph_height else cls.graph_height #* Swap if cls.swap_on and CONFIG.show_swap and not CONFIG.swap_disk: if h - cy > 5: if cls.graph_height > 0: out += f'{Mv.to(y+cy, x+cx)}{gli}' cy += 1 out += f'{Mv.to(y+cy, x+cx)}{THEME.title}{Fx.b}Swap:{mem.swap_string["total"]:>{cls.mem_width - 8}}{Fx.ub}{THEME.main_fg}'; cy += 1 for name in cls.swap_names: if cls.mem_size > 2: out += (f'{Mv.to(y+cy, x+cx)}{gli}{name.capitalize()[:None if cls.mem_width > 21 else 5]+":":<{1 if cls.mem_width > 21 else 6.6}}{Mv.to(y+cy, x+cx + cls.mem_width - 3 - (len(mem.swap_string[name])))}{mem.swap_string[name]}' f'{Mv.to(y+cy+1, x+cx)}{gbg}{Meters.swap[name](mem.swap_percent[name])}{gmv}{str(mem.swap_percent[name])+"%":>4}'); cy += 2 if not cls.graph_height else cls.graph_height + 1 else: out += f'{Mv.to(y+cy, x+cx)}{name.capitalize():{5.5 if cls.mem_size > 1 else 1.1}} {gbg}{Meters.swap[name](mem.swap_percent[name])}{mem.swap_string[name][:None if cls.mem_size > 1 else -2]:>{9 if cls.mem_size > 1 else 7}}'; cy += 1 if not cls.graph_height else cls.graph_height if cls.graph_height > 0 and not cy == h: out += f'{Mv.to(y+cy, x+cx)}{gli}' #* Disks if CONFIG.show_disks: cx = x + cls.mem_width - 1; cy = 0 for name, item in mem.disks.items(): if cy > h - 2: break out += (f'{Mv.to(y+cy, x+cx)}{THEME.title}{Fx.b}{item["name"]:{cls.disks_width - 2}.12}{Mv.to(y+cy, x + cx + cls.disks_width - 11)}{item["total"][:None if cls.disks_width >= 25 else -2]:>9}' f'{Mv.to(y+cy, x + cx + (cls.disks_width // 2) - (len(item["io"]) // 2) - 2)}{Fx.ub}{THEME.main_fg}{item["io"]}{Fx.ub}{THEME.main_fg}{Mv.to(y+cy+1, x+cx)}') out += f'Used:{str(item["used_percent"]) + "%":>4} ' if cls.disks_width >= 25 else "U " out += f'{Meters.disks_used[name]}{item["used"][:None if cls.disks_width >= 25 else -2]:>{9 if cls.disks_width >= 25 else 7}}'; cy += 2 if len(mem.disks) * 3 <= h + 1: if cy > h - 1: break out += Mv.to(y+cy, x+cx) out += f'Free:{str(item["free_percent"]) + "%":>4} ' if cls.disks_width >= 25 else f'{"F "}' out += f'{Meters.disks_free[name]}{item["free"][:None if cls.disks_width >= 25 else -2]:>{9 if cls.disks_width >= 25 else 7}}'; cy += 1 if len(mem.disks) * 4 <= h + 1: cy += 1 Draw.buffer(cls.buffer, f'{out}{Term.fg}', save=Menu.active) cls.resized = False class NetBox(Box, SubBox): name = "net" height_p = 28 width_p = 45 x = 1 y = 1 redraw: bool = True buffer: str = "net" Box.buffers.append(buffer) @classmethod def _calc_size(cls): cls.width = round(Term.width * cls.width_p / 100) cls.height = Term.height - Box._b_cpu_h - Box._b_mem_h cls.y = Term.height - cls.height + 1 cls.box_width = 24 cls.box_height = 9 if cls.height > 12 else cls.height - 2 cls.box_x = cls.width - cls.box_width - 2 cls.box_y = cls.y + ((cls.height - 2) // 2) - cls.box_height // 2 + 1 cls.redraw_all = True @classmethod def _draw_bg(cls) -> str: return f'{create_box(box=cls, line_color=THEME.net_box)}\ {create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title="Download", title2="Upload")}' class ProcBox(Box): name = "proc" height_p = 68 width_p = 55 x = 1 y = 1 detailed: bool = False detailed_x: int = 0 detailed_y: int = 0 detailed_width: int = 0 detailed_height: int = 8 redraw: bool = True buffer: str = "proc" Box.buffers.append(buffer) @classmethod def _calc_size(cls): cls.width = round(Term.width * cls.width_p / 100) cls.height = round(Term.height * cls.height_p / 100) cls.x = Term.width - cls.width + 1 cls.y = Box._b_cpu_h + 1 cls.detailed_x = cls.x cls.detailed_y = cls.y cls.detailed_height = 8 cls.detailed_width = cls.width cls.redraw_all = True @classmethod def _draw_bg(cls) -> str: return create_box(box=cls, line_color=THEME.proc_box) class Collector: '''Data collector master class * .start(): Starts collector thread * .stop(): Stops collector thread * .collect(*collectors: Collector, draw_now: bool = True, interrupt: bool = False): queues up collectors to run''' stopping: bool = False started: bool = False draw_now: bool = False thread: threading.Thread collect_run = threading.Event() collect_idle = threading.Event() collect_idle.set() collect_done = threading.Event() collect_queue: List = [] collect_interrupt: bool = False @classmethod def start(cls): cls.stopping = False cls.thread = threading.Thread(target=cls._runner, args=()) cls.thread.start() cls.started = True @classmethod def stop(cls): if cls.started and cls.thread.is_alive(): cls.stopping = True cls.started = False cls.collect_queue = [] cls.collect_idle.set() cls.collect_done.set() try: cls.thread.join() except: pass @classmethod def _runner(cls): '''This is meant to run in it's own thread, collecting and drawing when collect_run is set''' draw_buffers: List[str] = [] try: while not cls.stopping: cls.collect_run.wait(0.1) if not cls.collect_run.is_set(): continue draw_buffers = [] #if cls.collect_interrupt and bkp_queue: # for q in bkp_queue: # if q not in cls.collect_queue: cls.collect_queue.append(q) #bkp_queue = [] cls.collect_interrupt = False cls.collect_run.clear() cls.collect_idle.clear() while cls.collect_queue: collector = cls.collect_queue.pop() collector._collect() collector._draw() #bkp_queue.append(collector) draw_buffers.append(collector.buffer) if cls.collect_interrupt: break if cls.draw_now and not Menu.active and not cls.collect_interrupt: Draw.out(*draw_buffers) cls.collect_idle.set() cls.collect_done.set() except Exception as e: errlog.exception(f'Data collection thread failed with exception: {e}') cls.collect_done.set() clean_quit(1, thread=True) @classmethod def collect(cls, *collectors, draw_now: bool = True, interrupt: bool = False): '''Setup collect queue for _runner''' #* Set interrupt flag if True to stop _runner prematurely cls.collect_interrupt = interrupt #* Wait for _runner to finish cls.collect_idle.wait() #* Reset interrupt flag cls.collect_interrupt = False #* Set draw_now flag if True to draw to screen instead of buffer cls.draw_now = draw_now #* Append any collector given as argument to _runner queue if collectors: cls.collect_queue = [*collectors] #* Add all collectors to _runner queue if no collectors in argument else: cls.collect_queue = list(cls.__subclasses__()) #* Set run flag to start _runner cls.collect_run.set() class CpuCollector(Collector): '''Collects cpu usage for cpu and cores, cpu frequency, load_avg, uptime and cpu temps''' cpu_usage: List[List[int]] = [] cpu_temp: List[List[int]] = [] cpu_temp_high: int = 0 cpu_temp_crit: int = 0 for _ in range(THREADS + 1): cpu_usage.append([]) cpu_temp.append([]) cpu_freq: int = 0 load_avg: List[float] = [] uptime: str = "" buffer: str = CpuBox.buffer @staticmethod def _get_sensors() -> str: '''Check if we can get cpu temps and return method of getting temps''' if SYSTEM == "MacOS": try: if which("osx-cpu-temp") and subprocess.check_output("osx-cpu-temp", text=True).rstrip().endswith("°C"): return "osx-cpu-temp" except: pass elif hasattr(psutil, "sensors_temperatures"): try: temps = psutil.sensors_temperatures() if temps: for _, entries in temps.items(): for entry in entries: if entry.label.startswith(("Package", "Core 0", "Tdie")): return "psutil" except: pass try: if SYSTEM == "Linux" and which("vcgencmd") and subprocess.check_output("vcgencmd measure_temp", text=True).rstrip().endswith("'C"): return "vcgencmd" except: pass return "" sensor_method: str = _get_sensors.__func__() # type: ignore got_sensors: bool = True if sensor_method else False @classmethod def _collect(cls): cls.cpu_usage[0].append(round(psutil.cpu_percent(percpu=False))) for n, thread in enumerate(psutil.cpu_percent(percpu=True), start=1): cls.cpu_usage[n].append(round(thread)) if len(cls.cpu_usage[n]) > Term.width * 2: del cls.cpu_usage[n][0] cls.cpu_freq = round(psutil.cpu_freq().current) cls.load_avg = [round(lavg, 2) for lavg in os.getloadavg()] cls.uptime = str(timedelta(seconds=round(time()-psutil.boot_time(),0)))[:-3] if CONFIG.check_temp and cls.got_sensors: cls._collect_temps() @classmethod def _collect_temps(cls): temp: int cores: List[int] = [] cpu_type: str = "" if cls.sensor_method == "psutil": for _, entries in psutil.sensors_temperatures().items(): for entry in entries: if entry.label.startswith(("Package", "Tdie")): cpu_type = "ryzen" if entry.label.startswith("Package") else "ryzen" if not cls.cpu_temp_high: cls.cpu_temp_high, cls.cpu_temp_crit = round(entry.high), round(entry.critical) temp = round(entry.current) elif entry.label.startswith(("Core", "Tccd")): if not cpu_type: cpu_type = "other" if not cls.cpu_temp_high: cls.cpu_temp_high, cls.cpu_temp_crit = round(entry.high), round(entry.critical) temp = round(entry.current) cores.append(round(entry.current)) if len(cores) < THREADS: if cpu_type == "intel" or (cpu_type == "other" and len(cores) == THREADS // 2): cls.cpu_temp[0].append(temp) for n, t in enumerate(cores, start=1): cls.cpu_temp[n].append(t) cls.cpu_temp[THREADS // 2 + n].append(t) elif cpu_type == "ryzen" or cpu_type == "other": cls.cpu_temp[0].append(temp) if len(cores) < 1: cores.append(temp) z = 1 for t in cores: for i in range(THREADS // len(cores)): cls.cpu_temp[z + i].append(t) z += i else: cores.insert(0, temp) for n, t in enumerate(cores): cls.cpu_temp[n].append(t) else: try: if cls.sensor_method == "osx-cpu-temp": temp = round(float(subprocess.check_output("osx-cpu-temp", text=True).rstrip().rstrip("°C"))) elif cls.sensor_method == "vcgencmd": temp = round(float(subprocess.check_output("vcgencmd measure_temp", text=True).rstrip().rstrip("'C"))) except Exception as e: errlog.exception(f'{e}') cls.got_sensors = False CONFIG.check_temp = False CpuBox._calc_size() else: for n in range(THREADS + 1): cls.cpu_temp[n].append(temp) if len(cls.cpu_temp[0]) > 5: for n in range(len(cls.cpu_temp)): del cls.cpu_temp[n][0] @classmethod def _draw(cls): CpuBox._draw_fg() class MemCollector(Collector): '''Collects memory and disks information''' values: Dict[str, int] = {} vlist: Dict[str, List[int]] = {} percent: Dict[str, int] = {} string: Dict[str, str] = {} swap_values: Dict[str, int] = {} swap_vlist: Dict[str, List[int]] = {} swap_percent: Dict[str, int] = {} swap_string: Dict[str, str] = {} disks: Dict[str, Dict] disk_hist: Dict[str, Tuple] = {} excludes: List[str] = ["squashfs"] if SYSTEM == "BSD": excludes += ["devfs", "tmpfs", "procfs", "linprocfs", "gvfs", "fusefs"] buffer: str = MemBox.buffer @classmethod def _collect(cls): #* Collect memory mem = psutil.virtual_memory() if hasattr(mem, "cached"): cls.values["cached"] = mem.cached else: cls.values["cached"] = mem.active cls.values["total"], cls.values["free"], cls.values["available"] = mem.total, mem.free, mem.available cls.values["used"] = cls.values["total"] - cls.values["available"] for key, value in cls.values.items(): cls.string[key] = floating_humanizer(value) if key == "total": continue cls.percent[key] = round(value * 100 / cls.values["total"]) if CONFIG.mem_graphs: if not key in cls.vlist: cls.vlist[key] = [] cls.vlist[key].append(cls.percent[key]) if len(cls.vlist[key]) > MemBox.width: del cls.vlist[key][0] #* Collect swap if CONFIG.show_swap or CONFIG.swap_disk: swap = psutil.swap_memory() cls.swap_values["total"], cls.swap_values["free"] = swap.total, swap.free cls.swap_values["used"] = cls.swap_values["total"] - cls.swap_values["free"] MemBox.swap_on = bool(swap.total) if swap.total: MemBox.swap_on = True for key, value in cls.swap_values.items(): cls.swap_string[key] = floating_humanizer(value) if key == "total": continue cls.swap_percent[key] = round(value * 100 / cls.swap_values["total"]) if CONFIG.mem_graphs: if not key in cls.swap_vlist: cls.swap_vlist[key] = [] cls.swap_vlist[key].append(cls.swap_percent[key]) if len(cls.swap_vlist[key]) > MemBox.width: del cls.swap_vlist[key][0] else: MemBox.swap_on = False else: MemBox.swap_on = False if not CONFIG.show_disks: return #* Collect disks usage disk_read: int = 0 disk_write: int = 0 dev_name: str disk_name: str filtering: Tuple = () filter_exclude: bool = False io_string: str u_percent: int disk_list: List[str] = [] cls.disks = {} if CONFIG.disks_filter: if CONFIG.disks_filter.startswith("exclude="): filter_exclude = True filtering = tuple(v.strip() for v in CONFIG.disks_filter.replace("exclude=", "").strip().split(",")) else: filtering = tuple(v.strip() for v in CONFIG.disks_filter.strip().split(",")) io_counters = psutil.disk_io_counters(perdisk=True if SYSTEM == "Linux" else False, nowrap=True) for disk in psutil.disk_partitions(): disk_io = None io_string = "" disk_name = disk.mountpoint.rsplit('/', 1)[-1] if not disk.mountpoint == "/" else "root" while disk_name in disk_list: disk_name += "_" disk_list += [disk_name] if cls.excludes and disk.fstype in cls.excludes: continue if filtering and ((not filter_exclude and not disk_name.endswith(filtering)) or (filter_exclude and disk_name.endswith(filtering))): continue #elif filtering and disk_name.endswith(filtering) if SYSTEM == "MacOS" and disk.mountpoint == "/private/var/vm": continue try: disk_u = psutil.disk_usage(disk.mountpoint) except: pass u_percent = round(disk_u.percent) cls.disks[disk.device] = {} cls.disks[disk.device]["name"] = disk_name cls.disks[disk.device]["used_percent"] = u_percent cls.disks[disk.device]["free_percent"] = 100 - u_percent for name in ["total", "used", "free"]: cls.disks[disk.device][name] = floating_humanizer(getattr(disk_u, name, 0)) #* Collect disk io try: if SYSTEM == "Linux": dev_name = os.path.realpath(disk.device).rsplit('/', 1)[-1] if dev_name.startswith("md"): try: dev_name = dev_name[:dev_name.index("p")] except: pass disk_io = io_counters[dev_name] elif disk.mountpoint == "/": disk_io = io_counters else: raise Exception disk_read = disk_io.read_bytes - cls.disk_hist[disk.device][0] disk_write = disk_io.write_bytes - cls.disk_hist[disk.device][1] except: pass disk_read = disk_write = 0 if disk_io: cls.disk_hist[disk.device] = (disk_io.read_bytes, disk_io.write_bytes) if MemBox.disks_width > 30: if disk_read > 0: io_string += f'▲{floating_humanizer(disk_read, short=True)} ' if disk_write > 0: io_string += f'▼{floating_humanizer(disk_write, short=True)}' elif disk_read + disk_write > 0: io_string += f'▼▲{floating_humanizer(disk_read + disk_write, short=True)}' cls.disks[disk.device]["io"] = io_string if CONFIG.swap_disk: cls.disks["__swap"] = {} cls.disks["__swap"]["name"] = "swap" cls.disks["__swap"]["used_percent"] = cls.swap_percent["used"] cls.disks["__swap"]["free_percent"] = cls.swap_percent["free"] for name in ["total", "used", "free"]: cls.disks["__swap"][name] = cls.swap_string[name] cls.disks["__swap"]["io"] = "" if len(cls.disks) > 2: try: new = { list(cls.disks)[0] : cls.disks.pop(list(cls.disks)[0])} new["__swap"] = cls.disks.pop("__swap") new.update(cls.disks) cls.disks = new except: pass @classmethod def _draw(cls): MemBox._draw_fg() #class ProcCollector(Collector): #! add interrupt on _collect and _draw @timerd def testing_collectors(): # CONFIG.check_temp = True # Box.calc_sizes() Box.draw_bg() #for _ in range(1000): while True: Collector.collect() Collector.collect_done.wait() Draw.now(Mv.to(1, 1)) sleep(1) Draw.now(Mv.to(Term.height - 5, 1)) #Draw.now(f'Cpu usage: {CpuCollector.cpu_usage}\nCpu freq: {CpuCollector.cpu_freq}\nLoad avg: {CpuCollector.load_avg}\n\ # Temps: {CpuCollector.cpu_temp}\n') class Menu: '''Holds the main menu and all submenus''' active: bool = False #Draw.buffer("menubg", Draw.last_buffer, z=1000, uncolor=True) #Draw.clear("menubg", last=True) #? Functions -------------------------------------------------------------------------------------> def get_cpu_name() -> str: '''Fetch a suitable CPU identifier from the CPU model name string''' name: str = "" nlist: List = [] command: str = "" cmd_out: str = "" rem_line: str = "" if SYSTEM == "Linux": command = "cat /proc/cpuinfo" rem_line = "model name" elif SYSTEM == "MacOS": command ="sysctl -n machdep.cpu.brand_string" elif SYSTEM == "BSD": command ="sysctl hw.model" rem_line = "hw.model" try: cmd_out = subprocess.check_output("LANG=C " + command, shell=True, universal_newlines=True) except: pass if rem_line: for line in cmd_out.split("\n"): if rem_line in line: name = re.sub( ".*" + rem_line + ".*:", "", line,1).lstrip() else: name = cmd_out nlist = name.split(" ") if "Xeon" in name: name = nlist[nlist.index("CPU")+1] elif "Ryzen" in name: name = " ".join(nlist[nlist.index("Ryzen"):nlist.index("Ryzen")+3]) elif "CPU" in name: name = nlist[nlist.index("CPU")-1] return name def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", title2: str = "", line_color: Color = None, fill: bool = True, box = None) -> str: '''Create a box from a box object or by given arguments''' out: str = f'{Term.fg}{Term.bg}' if not line_color: line_color = THEME.div_line #* Get values from box class if given if box: x = box.x y = box.y width = box.width height =box.height title = box.name vlines: Tuple[int, int] = (x, x + width - 1) hlines: Tuple[int, int] = (y, y + height - 1) #* Fill box if enabled if fill: for i in range(y + 1, y + height - 1): out += f'{Mv.to(i, x)}{" " * (width - 1)}' out += f'{line_color}' #* Draw all horizontal lines for hpos in hlines: out += f'{Mv.to(hpos, x)}{Symbol.h_line * (width - 1)}' #* Draw all vertical lines for vpos in vlines: for hpos in range(y, y + height - 1): out += f'{Mv.to(hpos, vpos)}{Symbol.v_line}' #* Draw corners out += f'{Mv.to(y, x)}{Symbol.left_up}\ {Mv.to(y, x + width - 1)}{Symbol.right_up}\ {Mv.to(y + height - 1, x)}{Symbol.left_down}\ {Mv.to(y + height - 1, x + width - 1)}{Symbol.right_down}' #* Draw titles if enabled if title: out += f'{Mv.to(y, x + 2)}{Symbol.title_left}{THEME.title}{Fx.b}{title}{Fx.ub}{line_color}{Symbol.title_right}' if title2: out += f'{Mv.to(y + height - 1, x + 2)}{Symbol.title_left}{THEME.title}{Fx.b}{title2}{Fx.ub}{line_color}{Symbol.title_right}' return f'{out}{Term.fg}{Mv.to(y + 1, x + 1)}' def now_sleeping(signum, frame): """Reset terminal settings and stop background input read before putting to sleep""" Key.stop() Collector.stop() Draw.now(Term.clear, Term.normal_screen, Term.show_cursor) Term.echo(True) os.kill(os.getpid(), signal.SIGSTOP) def now_awake(signum, frame): """Set terminal settings and restart background input read""" Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor) Term.echo(False) Key.start() Collector.start() def quit_sigint(signum, frame): """SIGINT redirection to clean_quit()""" clean_quit() def clean_quit(errcode: int = 0, errmsg: str = "", thread: bool = False): """Stop background input read, save current config and reset terminal settings before quitting""" global THREAD_ERROR if thread: THREAD_ERROR = errcode interrupt_main() return if THREAD_ERROR: errcode = THREAD_ERROR Key.stop() Collector.stop() if not errcode: CONFIG.save_config() if not testing: Draw.now(Term.clear, Term.normal_screen, Term.show_cursor) #! Enable else: Draw.now(Term.show_cursor) #! Remove Term.echo(True) if errcode == 0: errlog.info(f'Exiting. Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n') else: errlog.warning(f'Exiting with errorcode ({errcode}). Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n') if not errmsg: errmsg = f'Bpytop exited with errorcode ({errcode}). See {CONFIG_DIR}/error.log for more information!' if errmsg: print(errmsg) raise SystemExit(errcode) def floating_humanizer(value: Union[float, int], bit: bool = False, per_second: bool = False, start: int = 0, short: bool = False) -> str: '''Scales up in steps of 1024 to highest possible unit and returns string with unit suffixed * bit=True or defaults to bytes * start=int to set 1024 multiplier starting unit * short=True always returns 0 decimals and shortens unit to 1 character ''' out: str = "" unit: Tuple[str, ...] = UNITS["bit"] if bit else UNITS["byte"] selector: int = start if start else 0 mult: int = 8 if bit else 1 if value <= 0: value = 0 if isinstance(value, float): value = round(value * 100 * mult) elif value > 0: value *= 100 * mult while len(f'{value}') > 5 and value >= 102400: value >>= 10 if value < 100: out = f'{value}' break selector += 1 else: if len(f'{value}') < 5 and len(f'{value}') >= 2 and selector > 0: decimals = 5 - len(f'{value}') out = f'{value}'[:-2] + "." + f'{value}'[-decimals:] elif len(f'{value}') >= 2: out = f'{value}'[:-2] if short: out = out.split(".")[0] out += f'{"" if short else " "}{unit[selector][0] if short else unit[selector]}' if per_second: out += "/s" if bit else "ps" return out #? Main function ---------------------------------------------------------------------------------> def main(): clean_quit() #? Pre main --------------------------------------------------------------------------------------> CPU_NAME: str = get_cpu_name() testing = True #! Remove #! For testing -------------------------------------------------------------------------------> def waitone(t: float = 0.0): if t > 0.0: Key.new.wait(t) else: Key.new.wait() Key.new.clear() Draw.clear() Draw.now(Term.clear) @timerd def testing_gradients(): # for theme in THEME.themes.keys(): # THEME(theme) # timer = time() for key in THEME.gradient.keys(): Draw.now(f'{Term.fg}{key}:\n{"█".join(THEME.gradient[key])}\n') Draw.now(f'{Term.fg}') # Draw.now(f'Theme creation of {CONFIG.color_theme} took {time() - timer:.5f} seconds\n\n') Draw.now("\n") # THEME("Default") @timerd def testing_humanizer(): for i in range(1, 101, 3): for n in range(1, 6): Draw.now(floating_humanizer(i * (1050 * n) * (n << 10 ), bit=False, per_second=False, short=False), " ") Draw.now("\n") for i in range(1, 30): for n in range(1, 8): Draw.now(floating_humanizer((995 + i) << (n * 10), bit=False), " ") Draw.now("\n") @timerd def testing_colors(): for item, _ in DEFAULT_THEME.items(): Draw.buffer("+testing", Fx.b, getattr(THEME, item)(f'{item:<20}'), Fx.ub, f'{"hex=" + getattr(THEME, item).hexa:<20} dec={getattr(THEME, item).dec}\n') Draw.out() @timerd def testing_boxes(): #Box.calc_sizes() Box.draw_bg() Draw.now(Mv.to(Term.height - 3, 1)) #Draw.now(create_box(20, 20, 15, 10, "Hej")) #waitone() #Draw.now(Term.normal_screen, Term.show_cursor) @timerd def testing_banner(): Draw.buffer("banner", Banner.draw(18, center=True)) Draw.out() Draw.now(Mv.to(35, 1)) @timerd def testing_meter(): Draw.clear("meters") for _ in range(10): Draw.buffer("+meters", "1234567890") Draw.buffer("+meters", "\n") stamp = time() test_meter = Meter(0, Term.width, "cpu") for i in range(0,101, 2): Draw.buffer("+meters", test_meter(i), "\n") Draw.buffer("+meter", f'{time() - stamp}') Draw.out() def testing_keyinput(): line: str = "" this_key: str = "" count: int = 0 while True: count += 1 Draw.now(f'{Mv.to(1,1)}{Fx.b}{THEME.temp_start("Count:")} {count} {THEME.cached_mid("Time:")} {strftime("%H:%M:%S", localtime())}', f'{Color.fg("#ff")} Width: {Term.width} Height: {Term.height} Resized: {Term.resized}') if Key.input_wait(1): while Key.list: #Key.new.clear() this_key = Key.list.pop() Draw.now(f'{Mv.to(2,1)}{Color.fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key):14}{" "}') if this_key == "backspace": line = line[:-1] elif this_key == "escape": line = "" elif this_key == "Q": clean_quit() elif this_key == "R": raise Exception("Test ERROR") elif len(this_key) == 1: line += this_key Draw.now(f'{Color.fg("#90ff50")}{Fx.b}Command= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}\033[0K\n') if this_key == "enter": try: exec(line) except: pass Draw.clear() def testing_graphs(): my_data = [x for x in range(0, 101)] my_data += [x for x in range(100, -1, -1)] my_graph = Graph(100, 1, THEME.main_fg, my_data) Draw.now(f'{Fx.ub}{Mv.to(0, 0)}{my_graph}') return my_data100 = [randint(0, 100) for _ in range(Term.width * 2)] my_data2 = my_data[-90:] my_data3 = my_data[:86] my_colors = [] for i in range(51): for _ in range(2): my_colors.append(Color.fg(i, i, i)) #my_colors.reverse() my_graph = Graph(Term.width, Term.height // 2, my_colors, my_data100, invert=True) my_graph2 = Graph(Term.width, Term.height // 2, my_colors, my_data100, invert=False) # my_graph3 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data2) # my_graph4 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data3) # my_graph5 = Graph(100, Term.height // 3, THEME.inactive_fg, my_data) #pause = re.compile(r"\033\[\d+;\d?;?\d*;?\d*;?\d*m{1}") #repl = "\033[0;37m" banner = Banner.draw(Term.height // 3 - 2, center=True) Draw.now(f'{Fx.ub}{Mv.to(0, 0)}{my_graph}\ {Mv.to(Term.height // 2, 0)}{my_graph2}\ {banner}') # {Mv.to(Term.height - (Term.height // 3), Term.width // 2 - 50)}{my_graph5}\ # {Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 - 50)}{my_graph3}\ # {Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 + 7)}{my_graph4}\ #t = 1 x = 0 for _ in range(200): sleep(0.05) x = randint(0, 100) # x += 1 if t == 1 else -1 # if x == 100: t = 0 # if x == 0: t = 1 Draw.now(f'{Fx.ub}{Mv.to(0, 0)}{my_graph.add(x)}\ {Mv.to(Term.height // 2, 0)}{my_graph2.add(x)}\ {banner}') # Draw.now(f'{Mv.to(Term.height - (Term.height // 3), Term.width // 2 - 50)}{my_graph5.add(x)}') # Draw.now(f'{Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 - 50)}{my_graph3.add(x)}') # Draw.now(f'{Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 + 7)}{my_graph4.add(x)}') Draw.now(Mv.to(Term.height -4, 0)) #! Remove ------------------------------------------------------------------------------------< if __name__ == "__main__": #? Init --------------------------------------------------------------------------------------> Timer.start("Init") class Init: running: bool = True initbg_colors: List[str] = [] initbg_data: List[int] initbg_up: Graph initbg_down: Graph @staticmethod def fail(err): Draw.buffer("+init!", f'{Mv.restore}{Symbol.fail}') errlog.exception(f'{err}') sleep(2) clean_quit(1, errmsg=f'Error during init! See {CONFIG_DIR}/error.log for more information.') @classmethod def success(cls, start: bool = False): if start: Draw.buffer("initbg", z=10) Draw.buffer("init", z=1) for i in range(51): for _ in range(2): cls.initbg_colors.append(Color.fg(i, i, i)) Draw.buffer("banner", f'{Banner.draw(Term.height // 2 - 10, center=True)}{Color.fg("#50")}\n', z=2) for _i in range(10): perc = f'{str((_i + 1) * 10) + "%":>5}' Draw.buffer("+banner", f'{Mv.to(Term.height // 2 - 3 + _i, Term.width // 2 - 28)}{Fx.trans(perc)}{Symbol.v_line}') Draw.out("banner") Draw.buffer("+init!", f'{Color.fg("#cc")}{Fx.b}{Mv.to(Term.height // 2 - 3, Term.width // 2 - 21)}{Mv.save}') if start or Term.resized: cls.initbg_data = [randint(0, 100) for _ in range(Term.width * 2)] cls.initbg_up = Graph(Term.width, Term.height // 2, cls.initbg_colors, cls.initbg_data, invert=True) cls.initbg_down = Graph(Term.width, Term.height // 2, cls.initbg_colors, cls.initbg_data, invert=False) if start: return if not testing: cls.draw_bg(10) Draw.buffer("+init!", f'{Mv.restore}{Symbol.ok}\n{Mv.r(Term.width // 2 - 22)}{Mv.save}') @classmethod def draw_bg(cls, times: int = 10): for _ in range(times): sleep(0.05) x = randint(0, 100) Draw.buffer("initbg", f'{Fx.ub}{Mv.to(0, 0)}{cls.initbg_up.add(x)}{Mv.to(Term.height // 2, 0)}{cls.initbg_down.add(x)}') Draw.out("initbg", "banner", "init") @classmethod def done(cls): cls.draw_bg(20) Draw.clear("initbg", "banner", "init") cls.running = False del cls.initbg_up, cls.initbg_down, cls.initbg_data, cls.initbg_colors #? Switch to alternate screen, clear screen, hide cursor and disable input echo if not testing: Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor) #! Enable else: Draw.now(Term.clear, Term.hide_cursor) #! Disable Term.echo(False) #? Draw banner and init status if not testing: Init.success(start=True) #? Load theme Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Loading theme and creating colors... ")}{Mv.save}') try: THEME: Theme = Theme(CONFIG.color_theme) except Exception as e: Init.fail(e) else: Init.success() #? Setup boxes Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Doing some maths and drawing... ")}{Mv.save}') try: Box.calc_sizes() Box.draw_bg(now=False) except Exception as e: Init.fail(e) else: Init.success() #? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Setting up signal handlers... ")}{Mv.save}') try: signal.signal(signal.SIGTSTP, now_sleeping) #* Ctrl-Z signal.signal(signal.SIGCONT, now_awake) #* Resume signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C signal.signal(signal.SIGWINCH, Term.refresh) #* Terminal resized except Exception as e: Init.fail(e) else: Init.success() #? Start a separate thread for reading keyboard input Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Starting input reader thread... ")}{Mv.save}') try: Key.start() except Exception as e: Init.fail(e) else: Init.success() #? Start a separate thread for data collection and drawing Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Starting data collection and drawer thread... ")}{Mv.save}') try: Collector.start() except Exception as e: Init.fail(e) else: Init.success() #? Collect data and draw to buffer Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Collecting data and drawing... ")}{Mv.save}') try: #Collector.collect(draw_now=False) pass except Exception as e: Init.fail(e) else: Init.success() Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Collecting nuclear launch codes... ")}{Mv.save}') Init.success() Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Launching missiles... ")}{Mv.save}') Init.success() Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Alien invasion... ")}{Mv.save}') Init.success() #? Draw to screen Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Finishing up... ")}{Mv.save}') try: #Collector.collect_done.wait() pass except Exception as e: Init.fail(e) else: Init.success() if not testing: Init.done() #! Remove if if not testing: Draw.out(clear=True) #! Remove if else: Draw.clear(); Draw.now(Term.clear); Init.running = False #! Remove Timer.stop("Init") #! For testing -------------------------------------------------------------------------------> if testing: try: #testing_graphs() testing_collectors() #testing_humanizer() # waitone(1) #testing_keyinput() #testing_banner() # waitone(1) #testing_colors() # waitone(1) #testing_gradients() # waitone(1) #testing_boxes() # waitone(1) #testing_meter() # Draw.idle.clear() #Draw.now(f'{Mv.to(Term.height - 5, 1)}Any key to exit!') #waitone() # Draw.idle.set() #sleep(2) except Exception as e: errlog.exception(f'{e}') clean_quit(1) clean_quit() #! Remove ------------------------------------------------------------------------------------< #? Start main loop while not False: try: main() except Exception as e: errlog.exception(f'{e}') clean_quit(1) else: #? Quit cleanly even if false starts being true... clean_quit()