From 443a4b1a9d1086d828aa68819f29fb045820b4d6 Mon Sep 17 00:00:00 2001 From: aristocratos Date: Wed, 1 Jul 2020 20:09:40 +0200 Subject: [PATCH] name... --- bpytop | 930 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 930 insertions(+) create mode 100755 bpytop diff --git a/bpytop b/bpytop new file mode 100755 index 0000000..5649935 --- /dev/null +++ b/bpytop @@ -0,0 +1,930 @@ +#!/usr/bin/env python3 +# pylint: disable=not-callable, no-member + +# Copyright 2020 Aristocratos (jakob@qvantnet.com) + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os, sys, time, threading, signal, re, subprocess, logging, logging.handlers +from select import select +from pathlib import Path +from distutils.util import strtobool +from typing import List, Set, Dict, Tuple, Optional, Union, Any, Callable, ContextManager, Iterable + +errors: List[str] = [] +try: import fcntl, termios, tty +except Exception as e: errors.append(f'{e}') + +try: import psutil +except Exception as e: errors.append(f'{e}') + + +SYSTEM: str +if "linux" in sys.platform: SYSTEM = "Linux" +elif "bsd" in sys.platform: SYSTEM = "BSD" +elif "darwin" in sys.platform: SYSTEM = "MacOS" +else: SYSTEM = "Other" + +if errors: + print ("ERROR!") + for error in errors: + print(error) + if SYSTEM == "Other": + print("\nUnsupported platform!\n") + else: + print("\nInstall required modules!\n") + quit(1) + +from functools import partial + +print: partial = partial(print, sep="", end="", flush=True) #* Setup print function to default to empty seperator and no new line + +#? Variables -------------------------------------------------------------------------------------> + +BANNER_SRC: Dict[str, str] = { +"#E62525" : "██████╗ ██████╗ ██╗ ██╗████████╗ ██████╗ ██████╗", +"#CD2121" : "██╔══██╗██╔══██╗╚██╗ ██╔╝╚══██╔══╝██╔═══██╗██╔══██╗", +"#B31D1D" : "██████╔╝██████╔╝ ╚████╔╝ ██║ ██║ ██║██████╔╝", +"#9A1919" : "██╔══██╗██╔═══╝ ╚██╔╝ ██║ ██║ ██║██╔═══╝ ", +"#801414" : "██████╔╝██║ ██║ ██║ ╚██████╔╝██║", +"#000000" : "╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝", +} + +VERSION: str = "0.0.1" + +DEFAULT_CONF: str = f'#? Config file for bpytop v. {VERSION}\n' +DEFAULT_CONF += ''' +#* Color theme, looks for a .theme file in "~/.config/bpytop/themes" and "~/.config/bpytop/user_themes", "Default" for builtin default theme +color_theme = "Default" + +#* Update time in milliseconds, increases automatically if set below internal loops processing time, recommended 2000 ms or above for better sample times for graphs +update_ms = 2500 + +#* Processes sorting, "pid" "program" "arguments" "threads" "user" "memory" "cpu lazy" "cpu responsive" +#* "cpu lazy" updates sorting over time, "cpu responsive" updates sorting directly +proc_sorting = "cpu lazy" + +#* Reverse sorting order, True or False +proc_reversed = False + +#* Show processes as a tree +proc_tree = False + +#* Check cpu temperature, only works if "sensors", "vcgencmd" or "osx-cpu-temp" commands is available +check_temp = True + +#* Draw a clock at top of screen, formatting according to strftime, empty string to disable +draw_clock = "%X" + +#* Update main ui when menus are showing, set this to false if the menus is flickering too much for comfort +background_update = True + +#* Custom cpu model name, empty string to disable +custom_cpu_name = "" + +#* Show color gradient in process list, True or False +proc_gradient = True + +#* If process cpu usage should be of the core it's running on or usage of the total available cpu power +proc_per_core = False + +#* Optional filter for shown disks, should be names of mountpoints, "root" replaces "/", separate multiple values with space +disks_filter = "" + +#* Enable check for new version from github.com/aristocratos/bpytop at start +update_check = True + +#* Enable graphs with double the horizontal resolution, increases cpu usage +hires_graphs = False + +''' + +conf: Path = Path(f'{Path.home()}/.config/bpytop') +if not conf.is_dir(): + try: + conf.mkdir(mode=0o777, parents=True) + except PermissionError: + print(f'ERROR!\nNo permission to write to "{conf}" directory!') + quit(1) + +CONFIG_DIR: str = str(conf) +del conf + +CORES: int = psutil.cpu_count(logical=False) or 1 +THREADS: int = psutil.cpu_count(logical=True) or 1 + +DEFAULT_THEME: Dict[str, str] = { + "main_bg" : "", + "main_fg" : "#cc", + "title" : "#ee", + "hi_fg" : "#90", + "selected_bg" : "#7e2626", + "selected_fg" : "#ee", + "inactive_fg" : "#40", + "proc_misc" : "#0de756", + "cpu_box" : "#3d7b46", + "mem_box" : "#8a882e", + "net_box" : "#423ba5", + "proc_box" : "#923535", + "div_line" : "#30", + "temp_start" : "#4897d4", + "temp_mid" : "#5474e8", + "temp_end" : "#ff40b6", + "cpu_start" : "#50f095", + "cpu_mid" : "#f2e266", + "cpu_end" : "#fa1e1e", + "free_start" : "#223014", + "free_mid" : "#b5e685", + "free_end" : "#dcff85", + "cached_start" : "#0b1a29", + "cached_mid" : "#74e6fc", + "cached_end" : "#26c5ff", + "available_start" : "#292107", + "available_mid" : "#ffd77a", + "available_end" : "#ffb814", + "used_start" : "#3b1f1c", + "used_mid" : "#d9626d", + "used_end" : "#ff4769", + "download_start" : "#231a63", + "download_mid" : "#4f43a3", + "download_end" : "#b0a9de", + "upload_start" : "#510554", + "upload_mid" : "#7d4180", + "upload_end" : "#dcafde" +} + + +#? Setup error logger ----------------------------------------------------------------------------> + +try: + errlog = logging.getLogger("ErrorLogger") + errlog.setLevel(logging.DEBUG) + eh = logging.handlers.RotatingFileHandler(f'{CONFIG_DIR}/error.log', maxBytes=1048576, backupCount=4) + eh.setLevel(logging.DEBUG) + eh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s: %(message)s", datefmt="%d/%m/%y (%X)")) + errlog.addHandler(eh) +except PermissionError: + print(f'ERROR!\nNo permission to write to "{CONFIG_DIR}" directory!') + quit(1) + +errlog.info(f'New instance of bpytop version {VERSION} started with pid {os.getpid()}') + +#? Classes ---------------------------------------------------------------------------------------> + +class Term: + """Terminal info and commands""" + width: int = os.get_terminal_size().columns #* Current terminal width in columns + height: int = os.get_terminal_size().lines #* Current terminal height in lines + resized: bool = False #* Flag indicating if terminal was recently resized + fg: str = "" #* Default foreground color + bg: str = "" #* Default background color + hide_cursor = "\033[?25l" #* Hide terminal cursor + show_cursor = "\033[?25h" #* Show terminal cursor + alt_screen = "\033[?1049h" #* Switch to alternate screen + normal_screen = "\033[?1049l" #* Switch to normal screen + clear = "\033[2J\033[0;0f" #* Clear screen and set cursor to position 0,0 + @classmethod + def refresh(cls, *args): + """Update width, height and set resized flag if terminal has been resized""" + w: int; h: int + w, h = os.get_terminal_size() + if (w, h) != (cls.width, cls.height): + cls.resized = True + cls.width, cls.height = w, h + @staticmethod + def echo(on: bool): + """Toggle input echo""" + (iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = termios.tcgetattr(sys.stdin.fileno()) + if on: + lflag |= termios.ECHO # type: ignore + else: + lflag &= ~termios.ECHO # type: ignore + new_attr = [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] + termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, new_attr) + +class Fx: + """Text effects""" + start = "\033[" #* Escape sequence start + sep = ";" #* Escape sequence separator + end = "m" #* Escape sequence end + reset = rs = "\033[0m" #* Reset foreground/background color and text effects + bold = b = "\033[1m" #* Bold on + unbold = ub = "\033[22m" #* Bold off + dark = d = "\033[2m" #* Dark on + undark = ud = "\033[22m" #* Dark off + italic = i = "\033[3m" #* Italic on + unitalic = ui = "\033[23m" #* Italic off + underline = u = "\033[4m" #* Underline on + ununderline = uu = "\033[24m" #* Underline off + blink = bl = "\033[5m" #* Blink on + unblink = ubl = "\033[25m" #* Blink off + strike = s = "\033[9m" #* Strike / crossed-out on + unstrike = us = "\033[29m" #* Strike / crossed-out off + +class Raw(object): + """Set raw input mode for device""" + def __init__(self, stream): + self.stream = stream + self.fd = self.stream.fileno() + def __enter__(self): + self.original_stty = termios.tcgetattr(self.stream) + tty.setcbreak(self.stream) + def __exit__(self, type, value, traceback): + termios.tcsetattr(self.stream, termios.TCSANOW, self.original_stty) + +class Nonblocking(object): + """Set nonblocking mode for device""" + def __init__(self, stream): + self.stream = stream + self.fd = self.stream.fileno() + def __enter__(self): + self.orig_fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) + fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl | os.O_NONBLOCK) + def __exit__(self, *args): + fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl) + +class Mv: + """Class with collection of cursor movement functions: .t[o](line, column) | .r[ight](columns) | .l[eft](columns) | .u[p](lines) | .d[own](lines) | .save() | .restore()""" + @staticmethod + def to(line: int, col: int) -> str: + return f'\033[{line};{col}f' #* Move cursor to line, column + @staticmethod + def right(x: int) -> str: #* Move cursor right x columns + return f'\033[{x}C' + @staticmethod + def left(x: int) -> str: #* Move cursor left x columns + return f'\033[{x}D' + @staticmethod + def up(x: int) -> str: #* Move cursor up x lines + return f'\033[{x}A' + @staticmethod + def down(x: int) -> str: #* Move cursor down x lines + return f'\033[{x}B' + @staticmethod + def save() -> str: #* Save cursor position + return "\033[s" + @staticmethod + def restore() -> str: #* Restore saved cursor postion + return "\033[u" + t = to + r = right + l = left + u = up + d = down + +class Key: + """.list = Input queue | .new = Input threading event | .reader = Input reader thread | .start() = Start input reader | .stop() = Stop input reader""" + list: List[str] = [] + new = threading.Event() + stopping: bool = False + @classmethod + def start(cls): + cls.stopping = False + cls.reader = threading.Thread(target=get_key) + cls.reader.start() + @classmethod + def stop(cls): + if cls.reader.is_alive(): + cls.stopping = True + try: + cls.reader.join() + except: + pass + +class Color: + '''self.__init__ accepts 6 digit hexadecimal: string "#RRGGBB", 2 digit hexadecimal: string "#FF" and decimal RGB "0-255 0-255 0-255" as a string.\n + self.__init__ also accepts depth="fg" or "bg" | default=bool\n + self.__call__(*args) converts arguments to a string and apply color\n + self.__str__ returns escape sequence to set color. __iter__ returns iteration over red, green and blue in integer values of 0-255.\n + Values: .hex: str | .dec: Tuple[int] | .red: int | .green: int | .blue: int | .depth: str | .escape: str\n + ''' + hex: str; dec: Tuple[int, int, int]; red: int; green: int; blue: int; depth: str; escape: str; default: bool + + def __init__(self, color: str, depth: str = "fg", default: bool = False): + self.depth = depth + self.default = default + try: + if not color: + if depth != "bg" and not default: raise ValueError("No RGB values given") + self.dec = (0, 0, 0) + self.hex = "" + self.red = self.green = self.blue = 0 + self.escape = "\033[49m" + return + + elif color.startswith("#"): + self.hex = color + if len(self.hex) == 3: + self.hex += self.hex[1:3] + self.hex[1:3] + c = int(self.hex[1:3], base=16) + self.dec = (c, c, c) + elif len(self.hex) == 7: + self.dec = (int(self.hex[1:3], base=16), int(self.hex[3:5], base=16), int(self.hex[5:7], base=16)) + else: + raise ValueError(f'Incorrectly formatted hexadeciaml rgb string: {self.hex}') + + else: + c_t = tuple(map(int, color.split(" "))) + if len(c_t) == 3: + self.dec = c_t #type: ignore + else: + raise ValueError(f'RGB dec should be "0-255 0-255 0-255"') + + ct = self.dec[0] + self.dec[1] + self.dec[2] + if ct > 255*3 or ct < 0: + raise ValueError(f'RGB values out of range: {color}') + except Exception as e: + errlog.exception(str(e)) + self.escape = "" + return + + if self.dec and not self.hex: self.hex = f'{hex(self.dec[0]).lstrip("0x").zfill(2)}{hex(self.dec[1]).lstrip("0x").zfill(2)}{hex(self.dec[2]).lstrip("0x").zfill(2)}' + + if self.dec and self.hex: + self.red, self.green, self.blue = self.dec + self.escape = f'\033[{38 if self.depth == "fg" else 48};2;{";".join(str(c) for c in self.dec)}m' + + def __str__(self) -> str: + return self.escape + + def __repr__(self) -> str: + return repr(self.escape) + + def __iter__(self) -> Iterable: + for c in self.dec: yield c + + def __call__(self, *args) -> str: + if len(args) == 0: return "" + return f'{self.escape}{"".join(map(str, args))}{getattr(Term, self.depth)}' + +class Theme: + '''__init__ accepts a dict containing { "color_element" : "color" } , errors defaults to default theme color''' + + main_bg = main_fg = title = hi_fg = selected_bg = selected_fg = inactive_fg = proc_misc = cpu_box = mem_box = net_box = proc_box = div_line = temp_start = temp_mid = temp_end = cpu_start = cpu_mid = cpu_end = free_start = free_mid = free_end = cached_start = cached_mid = cached_end = available_start = available_mid = available_end = used_start = used_mid = used_end = download_start = download_mid = download_end = upload_start = upload_mid = upload_end = NotImplemented + + def __init__(self, tdict: Dict[str, str]): + for item, value in tdict.items(): + if hasattr(self, item): + default = False if item not in ["main_fg", "main_bg"] else True + depth = "fg" if item not in ["main_bg", "selected_bg"] else "bg" + setattr(self, item, Color(value, depth=depth, default=default)) + if getattr(self, item).escape == "": + setattr(self, item, Color(DEFAULT_THEME[item], depth=depth, default=default)) + + Term.fg, Term.bg = self.main_fg, self.main_bg + print(self.main_fg, self.main_bg) #* Set terminal colors + +class Draw: + '''Holds the draw buffer\n + Add to buffer: .buffer(name, *args, now=False, clear=False)\n + Print buffer: .out()\n + ''' + strings: Dict[str, str] = {} + last_screen: str = "" + + @classmethod + def buffer(cls, name: str, *args, now: bool = False, clear: bool = False): + string: str = "" + if args: string = "".join(map(str, args)) + if name not in cls.strings or clear: cls.strings[name] = "" + cls.strings[name] += string + if now: print(string) + + @classmethod + def out(cls): + cls.last_screen = "".join(cls.strings.values()) + #cls.strings = {} + print(cls.last_screen) + +class Symbol: + h_line: str = "─" + v_line: str = "│" + left_up: str = "┌" + right_up: str = "┐" + left_down: str = "└" + right_down: str = "┘" + title_left: str = "┤" + title_right: str = "├" + div_up: str = "┬" + div_down: str = "┴" + graph_up: Dict[float, str] = { + 0.0 : "⠀", 0.1 : "⢀", 0.2 : "⢠", 0.3 : "⢰", 0.4 : "⢸", + 1.0 : "⡀", 1.1 : "⣀", 1.2 : "⣠", 1.3 : "⣰", 1.4 : "⣸", + 2.0 : "⡄", 2.1 : "⣄", 2.2 : "⣤", 2.3 : "⣴", 2.4 : "⣼", + 3.0 : "⡆", 3.1 : "⣆", 3.2 : "⣦", 3.3 : "⣶", 3.4 : "⣾", + 4.0 : "⡇", 4.1 : "⣇", 4.2 : "⣧", 4.3 : "⣷", 4.4 : "⣿" + } + graph_down: Dict[float, str] = { + 0.0 : "⠀", 0.1 : "⠈", 0.2 : "⠘", 0.3 : "⠸", 0.4 : "⢸", + 1.0 : "⠁", 1.1 : "⠉", 1.2 : "⠙", 1.3 : "⠹", 1.4 : "⢹", + 2.0 : "⠃", 2.1 : "⠋", 2.2 : "⠛", 2.3 : "⠻", 2.4 : "⢻", + 3.0 : "⠇", 3.1 : "⠏", 3.2 : "⠟", 3.3 : "⠿", 3.4 : "⢿", + 4.0 : "⡇", 4.1 : "⡏", 4.2 : "⡟", 4.3 : "⡿", 4.4 : "⣿" + } + + + +#? Functions -------------------------------------------------------------------------------------> + +def get_cpu_name(): + '''Fetch a suitable CPU identifier from the CPU model name string''' + name: str = "" + nlist: List = [] + command: str = "" + cmd_out: str = "" + rem_line: str = "" + if SYSTEM == "Linux": + command = "cat /proc/cpuinfo" + rem_line = "model name" + elif SYSTEM == "MacOS": + command ="sysctl -n machdep.cpu.brand_string" + elif SYSTEM == "BSD": + command ="sysctl hw.model" + rem_line = "hw.model" + + cmd_out = subprocess.check_output("LANG=C " + command, shell=True, universal_newlines=True) + if rem_line: + for line in cmd_out.split("\n"): + if rem_line in line: + name = re.sub( ".*" + rem_line + ".*:", "", line,1).lstrip() + else: + name = cmd_out + nlist = name.split(" ") + if "Xeon" in name: + name = nlist[nlist.index("CPU")+1] + elif "Ryzen" in name: + name = " ".join(nlist[nlist.index("Ryzen"):nlist.index("Ryzen")+3]) + elif "CPU" in name: + name = nlist[nlist.index("CPU")-1] + + return name + +def load_theme(path: str) -> Dict[str, str]: + '''Load a bashtop formatted theme file''' + new_theme: Dict[str, str] = {} + try: + with open(path) as f: + for line in f: + if not line.startswith("theme["): continue + key = line[6:line.find("]")] + s = line.find('"') + value = line[s + 1:line.find('"', s + 1)] + new_theme[key] = value + except Exception as e: + errlog.exception(str(e)) + + return new_theme + +def fg(h_r: Union[str, int], g: int = 0, b: int = 0) -> str: + """Returns escape sequence to set foreground color, accepts either 6 digit hexadecimal: "#RRGGBB", 2 digit hexadecimal: "#FF" or decimal RGB: 0-255, 0-255, 0-255""" + color: str = "" + if isinstance(h_r, int): #* Check for decimal RGB + color = f'\033[38;2;{h_r};{g};{b}m' + + else: #* Check for 2 or 6 digit hexadecimal RGB + if h_r[0] == "#": h_r = h_r[1:] + try: + if len(h_r) == 2: + c = int(h_r, base=16) + color = f'\033[38;2;{c};{c};{c}m' + elif len(h_r) == 6: + color = f'\033[38;2;{int(h_r[0:2], base=16)};{int(h_r[2:4], base=16)};{int(h_r[4:6], base=16)}m' + except ValueError: + pass + return color + +def bg(h_r: Union[str, int], g: int = 0, b: int = 0) -> str: + """Returns escape sequence to set background color, accepts either 6 digit hexadecimal: "#RRGGBB", 2 digit hexadecimal: "#FF" or decimal RGB: 0-255, 0-255, 0-255""" + color: str = "" + if isinstance(h_r, int): #* Check for decimal RGB + color = f'\033[48;2;{h_r};{g};{b}m' + + else: #* Check for 2 or 6 digit hexadecimal RGB + if h_r[0] == "#": h_r = h_r[1:] + try: + if len(h_r) == 2: + c = int(h_r, base=16) + color = f'\033[48;2;{c};{c};{c}m' + elif len(h_r) == 6: + color = f'\033[48;2;{int(h_r[0:2], base=16)};{int(h_r[2:4], base=16)};{int(h_r[4:6], base=16)}m' + except ValueError: + pass + return color + +def get_key(): + """Get a single key from stdin, convert to readable format and save to keys list""" + input_key: str = "" + clean_key: str = "" + with Raw(sys.stdin): #* Set raw mode + with Nonblocking(sys.stdin): #* Set nonblocking mode + while not Key.stopping: + if not select([sys.stdin], [], [], 0.1)[0]: #* Wait 100ms for input then restart loop to check for stop signal + continue + try: + input_key = sys.stdin.read(1) #* Read 1 character from stdin + if input_key == "\033": #* Read 3 additional characters if first is escape character + input_key += sys.stdin.read(3) + except: + pass + if input_key == "\033": clean_key = "escape" + elif input_key == "\n": clean_key = "enter" + elif input_key == "\x7f" or input_key == "\x08": clean_key = "backspace" + + elif input_key.isprintable(): clean_key = input_key #* Return character if input key is printable + + if clean_key: + Key.list.append(clean_key) #* Store keys in input queue for later processing + clean_key = "" + Key.new.set() #* Set threading event to interrupt main thread sleep + input_key = "" + sys.stdin.read(100) #* Clear stdin + +def now_sleeping(signum, frame): + """Reset terminal settings and stop background input read before putting to sleep""" + Key.stop() + print(Term.clear, Term.normal_screen, Term.show_cursor) + os.kill(os.getpid(), signal.SIGSTOP) + +def now_awake(signum, frame): + """Set terminal settings and restart background input read""" + print(Term.alt_screen, Term.clear, Term.hide_cursor) + Key.start() + +def quit_sigint(signum, frame): + """SIGINT redirection to clean_quit()""" + clean_quit() + +def clean_quit(errcode: int = 0): + """Reset terminal settings, save settings to config and stop background input read before quitting""" + Key.stop() + print(Term.clear, Term.normal_screen, Term.show_cursor) + raise SystemExit(errcode) + +def calc_sizes(): + '''Calculate sizes of boxes''' + + #* Calculate lines and columns from percentage values + for box in boxes: + setattr(box, "height", round(Term.height * getattr(box, "height_p") / 100)) + setattr(box, "width", round(Term.width * getattr(box, "width_p") / 100)) + + #* Set position values + proc.x = mem.width + 1 + mem.y = proc.y = cpu.height + 1 + net.y = cpu.height + mem.height + 2 + + #* Set values for detailed view in process box + proc.detailed_x = proc.x + proc.detailed_y = proc.y + proc.detailed_height = 8 + proc.detailed_width = proc.width + + #THREADS = 8 + + #* Set values for cpu info sub box + if THREADS > (cpu.height - 5) * 3 and cpu.width > 200: cpu.box_width = 24 * 4; cpu.box_height = round(THREADS / 4) + 4; cpu.box_columns = 4 + elif THREADS > (cpu.height - 5) * 2 and cpu.width > 150: cpu.box_width = 24 * 3; cpu.box_height = round(THREADS / 3) + 5; cpu.box_columns = 3 + elif THREADS > cpu.height - 5 and cpu.width > 100: cpu.box_width = 24 * 2; cpu.box_height = round(THREADS / 2) + 4; cpu.box_columns = 2 + else: cpu.box_width = 24; cpu.box_height = THREADS + 4; cpu.box_columns = 1 + + if Config.check_temp: cpu.box_width += 13 * cpu.box_columns + if cpu.box_height > cpu.height - 3: cpu.box_height = cpu.height - 3 + cpu.box_x = (cpu.width - 2) - cpu.box_width + cpu.box_y = cpu.y + ((cpu.height - 2) // 2) - round(cpu.box_height / 2) + 2 + + #* Set value for mem box divider + mem.mem_width = mem.disks_width = round((mem.width-2) / 2) + if mem.mem_width + mem.disks_width < mem.width - 2: mem.mem_width += 1 + mem.divider = mem.x + mem.mem_width + 2 + + #* Set values for net sub box + net.box_width = 24 + net.box_height = 9 if net.height > 12 else net.height - 4 + net.box_x = net.width - net.box_width - 2 + net.box_y = net.y + ((net.height - 2) // 2) - round(net.box_height / 2) + +def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", line_color: Color = None, fill: bool = False, box_object: object = None): + '''Create a box from a box object or by given arguments''' + out: str = f'{Term.fg}{Term.bg}' + if not line_color: line_color = theme.div_line + + #* Get values from box object if given + if box_object: + x = getattr(box_object, "x") + y = getattr(box_object, "y") + width = getattr(box_object, "width") + height = getattr(box_object, "height") + title = getattr(box_object, "name") + vlines: Tuple[int, int] = (x, x + width) + hlines: Tuple[int, int] = (y, y + height) + + #* Fill box if enabled + if fill: + i: int + for i in range(y + 1, y + height): + out += f'{Mv.to(i, x)}{" " * (width - 1)}' + + out += f'{line_color}' + + #* Draw all horizontal lines + hpos: int + for hpos in hlines: + out += f'{Mv.to(hpos, x)}{Symbol.h_line * width}' + + #* Draw all vertical lines + vpos: int + for vpos in vlines: + for hpos in range(y, y + height): + out += f'{Mv.to(hpos, vpos)}{Symbol.v_line}' + + #* Draw corners + out += f'{Mv.to(y, x)}{Symbol.left_up}\ + {Mv.to(y, x + width)}{Symbol.right_up}\ + {Mv.to(y + height, x)}{Symbol.left_down}\ + {Mv.to(y + height, x + width)}{Symbol.right_down}' + + #* Draw title if enabled + if title: + out += f'{Mv.to(y, x + 2)}{Symbol.title_left}{theme.title}{Fx.b}{title}{Fx.ub}{line_color}{Symbol.title_right}' + + return out + +#? Main function ---------------------------------------------------------------------------------> + +def main(): + line: str = "" + this_key: str = "" + count: int = 0 + while True: + count += 1 + print(f'{Mv.to(1,1)}{Fx.b}{blue("Count:")} {count} {lime("Time:")} {time.strftime("%H:%M:%S", time.localtime())}') + print(f'{fg("#ff")} Width: {Term.width} Height: {Term.height} Resized: {Term.resized}') + while Key.list: + Key.new.clear() + this_key = Key.list.pop() + print(f'{Mv.to(2,1)}{fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key)}{" " * 40}') + if this_key == "backspace": + line = line[:-1] + elif this_key == "enter": + line += "\n" + else: + line += this_key + print(f'{Mv.to(3,1)}{fg("#90ff50")}{Fx.b}Full line= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}') + if this_key == "q": + clean_quit() + if this_key == "R": + raise Exception("Test ERROR") + if not Key.reader.is_alive(): + clean_quit(1) + Key.new.wait(1.0) + + +#? Init Classes ----------------------------------------------------------------------------------> + +class Banner: + out: List[str] = [] + c_color: str = "" + length: int = 0 + if not out: + for num, (color, line) in enumerate(BANNER_SRC.items()): + if len(line) > length: length = len(line) + out += [""] + line_color = fg(color) + line_dark = fg(f'#{80 - num * 6}') + for letter in line: + if letter == "█" and c_color != line_color: + c_color = line_color + out[num] += line_color + elif letter == " ": + letter = f'{Mv.r(1)}' + elif letter != "█" and c_color != line_dark: + c_color = line_dark + out[num] += line_dark + out[num] += letter + + @classmethod + def draw(cls, line: int, col: int = 0, center: bool = False, now: bool = False): + out: str = "" + if center: col = Term.width // 2 - cls.length // 2 + for n, o in enumerate(cls.out): + out += f'{Mv.to(line + n, col)}{o}' + out += f'{Term.fg}' + if now: print(out) + else: return out + +class Config: + '''Holds all config variables''' + check_temp: bool = True + +class Graphs: + '''Holds all graph objects and dicts for dynamically created graphs''' + cpu: object = None + cores: Dict[int, object] = {} + temps: Dict[int, object] = {} + net: object = None + detailed_cpu: object = None + detailed_mem: object = None + pid_cpu: Dict[int, object] = {} + +class Meters: + '''Holds created meters to reuse instead of recreating meters of past values''' + cpu: Dict[int, str] = {} + mem_used: Dict[int, str] = {} + mem_available: Dict[int, str] = {} + mem_cached: Dict[int, str] = {} + mem_free: Dict[int, str] = {} + swap_used: Dict[int, str] = {} + swap_free: Dict[int, str] = {} + disks_used: Dict[int, str] = {} + disks_free: Dict[int, str] = {} + +class Box: + '''Box object with all needed attributes for create_box() function''' + def __init__(self, name: str, height_p: int, width_p: int): + self.name: str = name + self.height_p: int = height_p + self.width_p: int = width_p + self.x: int = 0 + self.y: int = 0 + self.width: int = 0 + self.height: int = 0 + self.out: str = "" + if name == "proc": + self.detailed: bool = False + self.detailed_x: int = 0 + self.detailed_y: int = 0 + self.detailed_width: int = 0 + self.detailed_height: int = 8 + if name == "mem": + self.divider: int = 0 + self.mem_width: int = 0 + self.disks_width: int = 0 + if name in ("cpu", "net"): + self.box_x: int = 0 + self.box_y: int = 0 + self.box_width: int = 0 + self.box_height: int = 0 + self.box_columns: int = 0 + + +#? Init variables --------------------------------------------------------------------------------> + +CPU_NAME: str = get_cpu_name() + + +#theme = Theme(load_theme("/home/gnm/.config/bashtop/themes/monokai.theme")) +theme = Theme(DEFAULT_THEME) + +cpu = Box("cpu", height_p=32, width_p=100) +mem = Box("mem", height_p=40, width_p=45) +net = Box("net", height_p=28, width_p=mem.width_p) +proc = Box("proc", height_p=100 - cpu.height_p, width_p=100 - mem.width_p) + +boxes: List[object] = [cpu, mem, net, proc] + +blue = theme.temp_start +lime = theme.cached_mid +orange = theme.available_end +green = theme.cpu_start +dfg = theme.main_fg + + + +def draw_bg(now: bool = True): + '''Draw all boxes to buffer and print to screen if now=True''' + Draw.buffer("bg", clear=True) + + #* Draw cpu box and cpu sub box + cpu_box = f'{create_box(box_object=cpu, line_color=theme.cpu_box, fill=True)}\ + {Mv.to(cpu.y, cpu.x + 10)}{theme.cpu_box(Symbol.title_left)}{Fx.b}{theme.hi_fg("m")}{theme.title("enu")}{Fx.ub}{theme.cpu_box(Symbol.title_right)}\ + {create_box(x=cpu.box_x, y=cpu.box_y, width=cpu.box_width, height=cpu.box_height, line_color=theme.div_line, title=CPU_NAME[:18 if Config.check_temp else 9])}' + + #* Draw mem/disk box and divider + mem_box = f'{create_box(box_object=mem, line_color=theme.mem_box, fill=True)}\ + {Mv.to(mem.y, mem.divider + 2)}{theme.mem_box(Symbol.title_left)}{Fx.b}{theme.title("disks")}{Fx.ub}{theme.mem_box(Symbol.title_right)}\ + {Mv.to(mem.y, mem.divider)}{theme.mem_box(Symbol.div_up)}\ + {Mv.to(mem.y + mem.height, mem.divider)}{theme.mem_box(Symbol.div_down)}{theme.div_line}' + for i in range(1, mem.height): + mem_box += f'{Mv.to(mem.y + i, mem.divider)}{Symbol.v_line}' + + #* Draw net box and net sub box + net_box = f'{create_box(box_object=net, line_color=theme.net_box, fill=True)}\ + {create_box(x=net.box_x, y=net.box_y, width=net.box_width, height=net.box_height, line_color=theme.div_line, title="Download")}\ + {Mv.to(net.box_y + net.box_height, net.box_x + 1)}{theme.div_line(Symbol.title_left)}{Fx.b}{theme.title("Upload")}{Fx.ub}{theme.div_line(Symbol.title_right)}' + + #* Draw proc box + proc_box = create_box(box_object=proc, line_color=theme.proc_box, fill=True) + + Draw.buffer("bg", cpu_box, mem_box, net_box, proc_box) + +def testing_colors(): + for item, _ in DEFAULT_THEME.items(): + print(Fx.b, getattr(theme, item)(f'{item:<20}'), Fx.ub, f'{"hex=" + getattr(theme, item).hex:<20} dec={getattr(theme, item).dec}', end="\n") + + print() + print(theme.temp_start, "Hej!\n") + print(Term.fg, "\nHEJ\n") + print(repr(Term.fg), repr(Term.bg)) + + quit() + +def testing_banner(): + print(Term.normal_screen, Term.alt_screen) + Key.start() + #try: + #sad + #except Exception as e: + # errlog.exception(f'{e}') + + + + #eprint("Test") + calc_sizes() + + draw_bg() + + Draw.buffer("banner", Banner.draw(18, 45), clear=True) + Draw.out() + + print(Mv.to(35, 1), repr(Term.fg), " ", repr(Term.bg), "\n") + + # quit() + + # global theme + + # path = "/home/gnm/.config/bashtop/themes/" + # for file in os.listdir(path): + # if file.endswith(".theme"): + # theme = Theme(load_theme(path + file)) + # draw_bg() + # Draw.out() + # time.sleep(1) + + #draw_bg() + #Draw.buffer("banner", Banner.draw(5, center=True)) + + #Draw.out() + + # print(f'\n{Fx.b}Terminal Height={Term.height} Width={Term.width}') + # total_h = total_w = 0 + # for box in boxes: + # print(f'\n{getattr(box, "name")} Height={getattr(box, "height")} Width={getattr(box, "width")}') + # total_h += getattr(box, "height") + # total_w += getattr(box, "width") + # print(f'\nTotal Height={cpu.height + net.height + mem.height} Width={net.width + proc.width}') + Key.stop() + quit() + + + +#testing_colors() +#error_log("/home/gnm/bashtop/misc/error.log") + +try: + testing_banner() +except Exception as e: + errlog.exception(f'{e}') + clean_quit(1) + + +quit() + +if __name__ == "__main__": + + #? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH + signal.signal(signal.SIGTSTP, now_sleeping) #* Ctrl-Z + signal.signal(signal.SIGCONT, now_awake) #* Resume + signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C + signal.signal(signal.SIGWINCH, Term.refresh) #* Terminal resized + + #? Switch to alternate screen, clear screen and hide cursor + print(Term.alt_screen, Term.clear, Term.hide_cursor) + + #? Start a separate thread for reading keyboard input + try: + Key.start() + except Exception as e: + errlog.exception(f'{e}') + clean_quit(1) + + while True: + try: + main() + except Exception as e: + errlog.exception(f'{e}') + clean_quit(1) + + + + + clean_quit()