bpytop/bpytop.py

1743 lines
55 KiB
Python
Raw Normal View History

2020-07-09 00:56:03 +00:00
#!/usr/bin/env python3
# pylint: disable=not-callable, no-member
# indent = tab
# tab-size = 4
# Copyright 2020 Aristocratos (jakob@qvantnet.com)
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os, sys, threading, signal, re, subprocess, logging, logging.handlers
from time import time, sleep, strftime, localtime
from datetime import timedelta
from select import select
from distutils.util import strtobool
from string import Template
from math import ceil
from typing import List, Set, Dict, Tuple, Optional, Union, Any, Callable, ContextManager, Iterable
errors: List[str] = []
try: import fcntl, termios, tty
except Exception as e: errors.append(f'{e}')
try: import psutil # type: ignore
except Exception as e: errors.append(f'{e}')
SELF_START = time()
SYSTEM: str
if "linux" in sys.platform: SYSTEM = "Linux"
elif "bsd" in sys.platform: SYSTEM = "BSD"
elif "darwin" in sys.platform: SYSTEM = "MacOS"
else: SYSTEM = "Other"
if errors:
print ("ERROR!")
for error in errors:
print(error)
if SYSTEM == "Other":
print("\nUnsupported platform!\n")
else:
print("\nInstall required modules!\n")
quit(1)
#? Variables ------------------------------------------------------------------------------------->
BANNER_SRC: Dict[str, str] = {
"#E62525" : "██████╗ ██████╗ ██╗ ██╗████████╗ ██████╗ ██████╗",
"#CD2121" : "██╔══██╗██╔══██╗╚██╗ ██╔╝╚══██╔══╝██╔═══██╗██╔══██╗",
"#B31D1D" : "██████╔╝██████╔╝ ╚████╔╝ ██║ ██║ ██║██████╔╝",
"#9A1919" : "██╔══██╗██╔═══╝ ╚██╔╝ ██║ ██║ ██║██╔═══╝ ",
"#801414" : "██████╔╝██║ ██║ ██║ ╚██████╔╝██║",
"#000000" : "╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝",
}
VERSION: str = "0.0.1"
#*?This is the template used to create the config file
DEFAULT_CONF: Template = Template(f'#? Config file for bpytop v. {VERSION}' + '''
#* Color theme, looks for a .theme file in "~/.config/bpytop/themes" and "~/.config/bpytop/user_themes", "Default" for builtin default theme
color_theme="$color_theme"
#* Update time in milliseconds, increases automatically if set below internal loops processing time, recommended 2000 ms or above for better sample times for graphs
update_ms=$update_ms
#* Processes sorting, "pid" "program" "arguments" "threads" "user" "memory" "cpu lazy" "cpu responsive"
#* "cpu lazy" updates sorting over time, "cpu responsive" updates sorting directly
proc_sorting="$proc_sorting"
#* Reverse sorting order, True or False
proc_reversed=$proc_reversed
#* Show processes as a tree
proc_tree=$proc_tree
#* Check cpu temperature, needs "vcgencmd" on Raspberry Pi and "osx-cpu-temp" on MacOS X
check_temp=$check_temp
#* Draw a clock at top of screen, formatting according to strftime, empty string to disable
draw_clock="$draw_clock"
#* Update main ui in background when menus are showing, set this to false if the menus is flickering too much for comfort
background_update=$background_update
#* Custom cpu model name, empty string to disable
custom_cpu_name="$custom_cpu_name"
#* Show color gradient in process list, True or False
proc_gradient=$proc_gradient
#* If process cpu usage should be of the core it's running on or usage of the total available cpu power
proc_per_core=$proc_per_core
#* Optional filter for shown disks, should be names of mountpoints, "root" replaces "/", separate multiple values with space
disks_filter="$disks_filter"
#* Enable check for new version from github.com/aristocratos/bpytop at start
update_check=$update_check
#* Enable graphs with double the horizontal resolution, increases cpu usage
hires_graphs=$hires_graphs
''')
CONFIG_DIR: str = f'{os.path.expanduser("~")}/.config/bpytop'
if not os.path.isdir(CONFIG_DIR):
try:
os.makedirs(CONFIG_DIR)
os.mkdir(f'{CONFIG_DIR}/themes')
os.mkdir(f'{CONFIG_DIR}/user_themes')
except PermissionError:
print(f'ERROR!\nNo permission to write to "{CONFIG_DIR}" directory!')
quit(1)
CONFIG_FILE: str = f'{CONFIG_DIR}/bpytop.conf'
THEME_DIR: str = f'{CONFIG_DIR}/themes'
USER_THEME_DIR: str = f'{CONFIG_DIR}/user_themes'
CORES: int = psutil.cpu_count(logical=False) or 1
THREADS: int = psutil.cpu_count(logical=True) or 1
DEFAULT_THEME: Dict[str, str] = {
"main_bg" : "",
"main_fg" : "#cc",
"title" : "#ee",
"hi_fg" : "#90",
"selected_bg" : "#7e2626",
"selected_fg" : "#ee",
"inactive_fg" : "#40",
"proc_misc" : "#0de756",
"cpu_box" : "#3d7b46",
"mem_box" : "#8a882e",
"net_box" : "#423ba5",
"proc_box" : "#923535",
"div_line" : "#30",
"temp_start" : "#4897d4",
"temp_mid" : "#5474e8",
"temp_end" : "#ff40b6",
"cpu_start" : "#50f095",
"cpu_mid" : "#f2e266",
"cpu_end" : "#fa1e1e",
"free_start" : "#223014",
"free_mid" : "#b5e685",
"free_end" : "#dcff85",
"cached_start" : "#0b1a29",
"cached_mid" : "#74e6fc",
"cached_end" : "#26c5ff",
"available_start" : "#292107",
"available_mid" : "#ffd77a",
"available_end" : "#ffb814",
"used_start" : "#3b1f1c",
"used_mid" : "#d9626d",
"used_end" : "#ff4769",
"download_start" : "#231a63",
"download_mid" : "#4f43a3",
"download_end" : "#b0a9de",
"upload_start" : "#510554",
"upload_mid" : "#7d4180",
"upload_end" : "#dcafde"
}
#? Units for floating_humanizer function
UNITS: Dict[str, Tuple[str, ...]] = {
"bit" : ("bit", "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", "Zib", "Yib", "Bib", "GEb"),
"byte" : ("Byte", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB", "BiB", "GEB")
}
#? Setup error logger ---------------------------------------------------------------->
try:
errlog = logging.getLogger("ErrorLogger")
errlog.setLevel(logging.DEBUG)
eh = logging.handlers.RotatingFileHandler(f'{CONFIG_DIR}/error.log', maxBytes=1048576, backupCount=4)
eh.setLevel(logging.DEBUG)
eh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s: %(message)s", datefmt="%d/%m/%y (%X)"))
errlog.addHandler(eh)
except PermissionError:
print(f'ERROR!\nNo permission to write to "{CONFIG_DIR}" directory!')
quit(1)
errlog.info(f'New instance of bpytop version {VERSION} started with pid {os.getpid()}')
#! Timers, remove ----------------------------------------------------------------------->
class Timer:
timers: Dict[str, float] = {}
paused: Dict[str, float] = {}
@classmethod
def start(cls, name):
cls.timers[name] = time()
@classmethod
def pause(cls, name):
if name in cls.timers:
cls.paused[name] = time() - cls.timers[name]
del cls.timers[name]
@classmethod
def stop(cls, name):
if name in cls.timers:
total: float = time() - cls.timers[name]
del cls.timers[name]
if name in cls.paused:
total += cls.paused[name]
del cls.paused[name]
errlog.debug(f'{name} completed in {total:.6f} seconds')
def timerd(func):
def timed(*args, **kw):
ts = time()
out = func(*args, **kw)
te = time()
errlog.debug(f'{func.__name__} completed in {te - ts:.6f} seconds')
return out
return timed
#! Timers, remove -----------------------------------------------------------------------<
#? Set up config class and load config ------------------------------------------------>
class Config:
'''Holds all config variables and functions for loading from and saving to disk'''
keys: List[str] = ["color_theme", "update_ms", "proc_sorting", "proc_reversed", "proc_tree", "check_temp", "draw_clock", "background_update", "custom_cpu_name", "proc_gradient", "proc_per_core", "disks_filter", "update_check", "hires_graphs"]
conf_dict: Dict[str, Union[str, int, bool]] = {}
color_theme: str = "Default"
update_ms: int = 2500
proc_sorting: str = "cpu lazy"
proc_reversed: bool = False
proc_tree: bool = False
check_temp: bool = True
draw_clock: str = "%X"
background_update: bool = True
custom_cpu_name: str = ""
proc_gradient: bool = True
proc_per_core: bool = False
disks_filter: str = ""
update_check: bool = True
hires_graphs: bool = False
changed: bool = False
recreate: bool = False
config_file: str = ""
_initialized: bool = False
def __init__(self, path: str):
self.config_file = path
conf: Dict[str, Union[str, int, bool]] = self.load_config()
if not "version" in conf.keys():
self.recreate = True
errlog.warning(f'Config file malformatted or missing, will be recreated on exit!')
elif conf["version"] != VERSION:
self.recreate = True
errlog.warning(f'Config file version and bpytop version missmatch, will be recreated on exit!')
for key in self.keys:
if key in conf.keys() and conf[key] != "_error_":
setattr(self, key, conf[key])
else:
self.recreate = True
self.conf_dict[key] = getattr(self, key)
self._initialized = True
def __setattr__(self, name, value):
if self._initialized:
object.__setattr__(self, "changed", True)
object.__setattr__(self, name, value)
if name not in ["_initialized", "recreate", "changed"]:
self.conf_dict[name] = value
def load_config(self) -> Dict[str, Union[str, int, bool]]:
'''Load config from file, set correct types for values and return a dict'''
new_config: Dict[str,Union[str, int, bool]] = {}
if not os.path.isfile(self.config_file): return new_config
try:
with open(self.config_file, "r") as f:
for line in f:
line = line.strip()
if line.startswith("#? Config"):
new_config["version"] = line[line.find("v. ") + 3:]
for key in self.keys:
if line.startswith(key):
line = line.lstrip(key + "=")
if line.startswith('"'):
line = line.strip('"')
if type(getattr(self, key)) == int:
try:
new_config[key] = int(line)
except ValueError:
errlog.warning(f'Config key "{key}" should be an integer!')
if type(getattr(self, key)) == bool:
try:
new_config[key] = bool(strtobool(line))
except ValueError:
errlog.warning(f'Config key "{key}" can only be True or False!')
if type(getattr(self, key)) == str:
new_config[key] = str(line)
except Exception as e:
errlog.exception(str(e))
if "proc_sorting" in new_config and not new_config["proc_sorting"] in ["pid", "program", "arguments", "threads", "user", "memory", "cpu lazy", "cpu responsive"]:
new_config["proc_sorting"] = "_error_"
errlog.warning(f'Config key "proc_sorted" didn\'t get an acceptable value!')
return new_config
def save_config(self):
'''Save current config to config file if difference in values or version, creates a new file if not found'''
if not self.changed and not self.recreate: return
try:
with open(self.config_file, "w" if os.path.isfile(self.config_file) else "x") as f:
f.write(DEFAULT_CONF.substitute(self.conf_dict))
except Exception as e:
errlog.exception(str(e))
try:
CONFIG: Config = Config(CONFIG_FILE)
except Exception as e:
errlog.exception(f'{e}')
quit(1)
#? Classes --------------------------------------------------------------------------------------->
class Term:
"""Terminal info and commands"""
width: int = os.get_terminal_size().columns #* Current terminal width in columns
height: int = os.get_terminal_size().lines #* Current terminal height in lines
resized: bool = False #* Flag indicating if terminal was recently resized
_resizing: bool = False
_w : int = 0
_h : int = 0
fg: str = "" #* Default foreground color
bg: str = "" #* Default background color
hide_cursor = "\033[?25l" #* Hide terminal cursor
show_cursor = "\033[?25h" #* Show terminal cursor
alt_screen = "\033[?1049h" #* Switch to alternate screen
normal_screen = "\033[?1049l" #* Switch to normal screen
clear = "\033[2J\033[0;0f" #* Clear screen and set cursor to position 0,0
@classmethod
def refresh(cls, *args):
"""Update width, height and set resized flag if terminal has been resized"""
if cls._resizing == True: return
cls._w, cls._h = os.get_terminal_size()
while (cls._w, cls._h) != (cls.width, cls.height):
cls._resizing = True
cls.resized = True
cls.width, cls.height = cls._w, cls._h
Draw.now(Term.clear)
Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 3, "resizing")}{THEME.main_fg}{Fx.b}{Mv.r(12)}Width : {cls._w} Height: {cls._h}')
while cls._w < 80 or cls._h < 24:
Draw.now(Term.clear)
Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 4, "warning")}{THEME.main_fg}{Fx.b}{Mv.r(12)}Width: {cls._w} Height: {cls._h}\
{Mv.to(cls._h // 2, cls._w // 2 - 23)}Width and Height needs to be at least 80x24!')
sleep(0.3)
cls._w, cls._h = os.get_terminal_size()
sleep(0.3)
cls._w, cls._h = os.get_terminal_size()
cls._resizing = False
Box.calc_sizes()
Box.draw_bg()
@staticmethod
def echo(on: bool):
"""Toggle input echo"""
(iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = termios.tcgetattr(sys.stdin.fileno())
if on:
lflag |= termios.ECHO # type: ignore
else:
lflag &= ~termios.ECHO # type: ignore
new_attr = [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, new_attr)
class Fx:
"""Text effects"""
start = "\033[" #* Escape sequence start
sep = ";" #* Escape sequence separator
end = "m" #* Escape sequence end
reset = rs = "\033[0m" #* Reset foreground/background color and text effects
bold = b = "\033[1m" #* Bold on
unbold = ub = "\033[22m" #* Bold off
dark = d = "\033[2m" #* Dark on
undark = ud = "\033[22m" #* Dark off
italic = i = "\033[3m" #* Italic on
unitalic = ui = "\033[23m" #* Italic off
underline = u = "\033[4m" #* Underline on
ununderline = uu = "\033[24m" #* Underline off
blink = bl = "\033[5m" #* Blink on
unblink = ubl = "\033[25m" #* Blink off
strike = s = "\033[9m" #* Strike / crossed-out on
unstrike = us = "\033[29m" #* Strike / crossed-out off
@staticmethod
def trans(string: str):
'''Replace white space with escape move right to not paint background in whitespace'''
return string.replace(" ", "\033[1C")
class Raw(object):
"""Set raw input mode for device"""
def __init__(self, stream):
self.stream = stream
self.fd = self.stream.fileno()
def __enter__(self):
self.original_stty = termios.tcgetattr(self.stream)
tty.setcbreak(self.stream)
def __exit__(self, type, value, traceback):
termios.tcsetattr(self.stream, termios.TCSANOW, self.original_stty)
class Nonblocking(object):
"""Set nonblocking mode for device"""
def __init__(self, stream):
self.stream = stream
self.fd = self.stream.fileno()
def __enter__(self):
self.orig_fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl | os.O_NONBLOCK)
def __exit__(self, *args):
fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl)
class Mv:
"""Class with collection of cursor movement functions: .t[o](line, column) | .r[ight](columns) | .l[eft](columns) | .u[p](lines) | .d[own](lines) | .save() | .restore()"""
@staticmethod
def to(line: int, col: int) -> str:
return f'\033[{line};{col}f' #* Move cursor to line, column
@staticmethod
def right(x: int) -> str: #* Move cursor right x columns
return f'\033[{x}C'
@staticmethod
def left(x: int) -> str: #* Move cursor left x columns
return f'\033[{x}D'
@staticmethod
def up(x: int) -> str: #* Move cursor up x lines
return f'\033[{x}A'
@staticmethod
def down(x: int) -> str: #* Move cursor down x lines
return f'\033[{x}B'
@staticmethod
def save() -> str: #* Save cursor position
return "\033[s"
@staticmethod
def restore() -> str: #* Restore saved cursor postion
return "\033[u"
t = to
r = right
l = left
u = up
d = down
class Key:
"""Handles the threaded input reader"""
list: List[str] = []
new = threading.Event()
idle = threading.Event()
idle.set()
stopping: bool = False
started: bool = False
reader: threading.Thread
@classmethod
def start(cls):
cls.stopping = False
cls.reader = threading.Thread(target=cls._get_key)
cls.reader.start()
cls.started = True
@classmethod
def stop(cls):
if cls.started and cls.reader.is_alive():
cls.stopping = True
try:
cls.reader.join()
except:
pass
@classmethod
def input_wait(cls, sec: float = 0.0) -> bool:
'''Returns True if key is detected else waits out timer and returns False'''
cls.new.wait(sec if sec > 0 else 0.0)
if cls.new.is_set():
cls.new.clear()
return True
else:
return False
@classmethod
def _get_key(cls):
"""Get a single key from stdin, convert to readable format and save to keys list. Meant to be run in it's own thread."""
input_key: str = ""
clean_key: str = ""
escape: Dict[Union[str, Tuple[str, str]], str] = {
"\n" : "enter",
("\x7f", "\x08") : "backspace",
("[A", "OA") : "up",
("[B", "OB") : "down",
("[D", "OD") : "left",
("[C", "OC") : "right",
"[2~" : "insert",
"[3~" : "delete",
"[H" : "home",
"[F" : "end",
"[5~" : "page_up",
"[6~" : "page_down",
"[Z" : "shift_tab",
"OP" : "f1",
"OQ" : "f2",
"OR" : "f3",
"OS" : "f4",
"[15" : "f5",
"[17" : "f6",
"[18" : "f7",
"[19" : "f8",
"[20" : "f9",
"[21" : "f10",
"[23" : "f11",
"[24" : "f12"
}
try:
while not cls.stopping:
with Raw(sys.stdin):
if not select([sys.stdin], [], [], 0.1)[0]: #* Wait 100ms for input on stdin then restart loop to check for stop flag
continue
cls.idle.clear() #* Report IO block in progress to prevent Draw functions to get IO Block error
input_key += sys.stdin.read(1)
if input_key == "\033": #* If first character is a escape sequence read 5 more keys
Draw.idle.wait() #* Wait for Draw function to finish if busy
with Nonblocking(sys.stdin): #* Set non blocking to prevent read stall if less than 5 characters
input_key += sys.stdin.read(5)
if input_key == "\033": clean_key = "escape" #* Key is escape if only containing \033
if input_key == "\\": clean_key = "\\" #* Clean up "\" to not return escaped
else:
for code in escape.keys(): #* Go trough dict of escape codes to get the cleaned key name
if input_key.lstrip("\033").startswith(code):
clean_key = escape[code]
break
else: #* If not found in escape dict and length of key is 1, assume regular character
if len(input_key) == 1:
clean_key = input_key
#if testing: errlog.debug(f'Input key: {repr(input_key)} Clean key: {clean_key}') #! Remove
if clean_key:
cls.list.append(clean_key) #* Store keys in input queue for later processing
clean_key = ""
cls.new.set() #* Set threading event to interrupt main thread sleep
input_key = ""
cls.idle.set() #* Report IO blocking done
except Exception as e:
errlog.exception(f'Input reader failed with exception: {e}')
cls.idle.set()
cls.list.clear()
class Draw:
'''Holds the draw buffer and manages IO blocking queue
* .buffer([+]name[!], *args, append=False, now=False) : Add *args to buffer
* - Adding "+" prefix to name sets append to True and appends to name's current string
* - Adding "!" suffix to name sets now to True and print name's current string
* .out(clear=False) : Print all strings in buffer, clear=True clear all buffers after
* .now(*args) : Prints all arguments as a string
* .clear(*names) : Clear named buffers, all if no argument
'''
strings: Dict[str, str] = {}
last_screen: str = ""
idle = threading.Event()
idle.set()
@classmethod
def now(cls, *args):
'''Wait for input reader to be idle then print to screen'''
Key.idle.wait()
cls.idle.wait()
cls.idle.clear()
try:
print(*args, sep="", end="", flush=True)
except BlockingIOError:
pass
Key.idle.wait()
print(*args, sep="", end="", flush=True)
cls.idle.set()
@classmethod
def buffer(cls, name: str, *args: str, append: bool = False, now: bool = False):
string: str = ""
if name.startswith("+"):
name = name.lstrip("+")
append = True
if name.endswith("!"):
name = name.rstrip("!")
now = True
if name == "": name = "_null"
if args: string = "".join(args)
if name not in cls.strings or not append: cls.strings[name] = ""
cls.strings[name] += string
if now: cls.now(string)
@classmethod
def out(cls, clear = False):
cls.last_screen = "".join(cls.strings.values())
if clear: cls.strings = {}
cls.now(cls.last_screen)
@classmethod
def clear(cls, *names):
if names:
for name in names:
if name in cls.strings:
del cls.strings["name"]
else:
cls.strings = {}
class Color:
'''Holds representations for a 24-bit color
__init__(color, depth="fg", default=False)
-- color accepts 6 digit hexadecimal: string "#RRGGBB", 2 digit hexadecimal: string "#FF" or decimal RGB "255 255 255" as a string.
-- depth accepts "fg" or "bg"
__call__(*args) converts arguments to a string and apply color
__str__ returns escape sequence to set color
__iter__ returns iteration over red, green and blue in integer values of 0-255.
* Values: .hexa: str | .dec: Tuple[int, int, int] | .red: int | .green: int | .blue: int | .depth: str | .escape: str
'''
hex: str; dec: Tuple[int, int, int]; red: int; green: int; blue: int; depth: str; escape: str; default: bool
def __init__(self, color: str, depth: str = "fg", default: bool = False):
self.depth = depth
self.default = default
try:
if not color:
self.dec = (-1, -1, -1)
self.hexa = ""
self.red = self.green = self.blue = -1
self.escape = "\033[49m" if depth == "bg" and default else ""
return
elif color.startswith("#"):
self.hexa = color
if len(self.hexa) == 3:
self.hexa += self.hexa[1:3] + self.hexa[1:3]
c = int(self.hexa[1:3], base=16)
self.dec = (c, c, c)
elif len(self.hexa) == 7:
self.dec = (int(self.hexa[1:3], base=16), int(self.hexa[3:5], base=16), int(self.hexa[5:7], base=16))
else:
raise ValueError(f'Incorrectly formatted hexadeciaml rgb string: {self.hexa}')
else:
c_t = tuple(map(int, color.split(" ")))
if len(c_t) == 3:
self.dec = c_t #type: ignore
else:
raise ValueError(f'RGB dec should be "0-255 0-255 0-255"')
ct = self.dec[0] + self.dec[1] + self.dec[2]
if ct > 255*3 or ct < 0:
raise ValueError(f'RGB values out of range: {color}')
except Exception as e:
errlog.exception(str(e))
self.escape = ""
return
if self.dec and not self.hexa: self.hexa = f'{hex(self.dec[0]).lstrip("0x").zfill(2)}{hex(self.dec[1]).lstrip("0x").zfill(2)}{hex(self.dec[2]).lstrip("0x").zfill(2)}'
if self.dec and self.hexa:
self.red, self.green, self.blue = self.dec
self.escape = f'\033[{38 if self.depth == "fg" else 48};2;{";".join(str(c) for c in self.dec)}m'
def __str__(self) -> str:
return self.escape
def __repr__(self) -> str:
return repr(self.escape)
def __iter__(self) -> Iterable:
for c in self.dec: yield c
def __call__(self, *args: str) -> str:
if len(args) == 0: return ""
return f'{self.escape}{"".join(args)}{getattr(Term, self.depth)}'
@staticmethod
def escape_color(hexa: str = "", r: int = 0, g: int = 0, b: int = 0, depth: str = "fg") -> str:
"""Returns escape sequence to set color
* accepts either 6 digit hexadecimal hexa="#RRGGBB", 2 digit hexadecimal: hexa="#FF"
* or decimal RGB: r=0-255, g=0-255, b=0-255
* depth="fg" or "bg"
"""
dint: int = 38 if depth == "fg" else 48
color: str = ""
if hexa:
try:
if len(hexa) == 3:
c = int(hexa[1:], base=16)
color = f'\033[{dint};2;{c};{c};{c}m'
elif len(hexa) == 7:
color = f'\033[{dint};2;{int(hexa[1:3], base=16)};{int(hexa[3:5], base=16)};{int(hexa[5:7], base=16)}m'
except ValueError as e:
errlog.exception(f'{e}')
else:
color = f'\033[{dint};2;{r};{g};{b}m'
return color
@classmethod
def fg(cls, *args) -> str:
if len(args) > 1: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="fg")
else: return cls.escape_color(hexa=args[0], depth="fg")
@classmethod
def bg(cls, *args) -> str:
if len(args) > 1: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="bg")
else: return cls.escape_color(hexa=args[0], depth="bg")
class Theme:
'''__init__ accepts a dict containing { "color_element" : "color" }'''
themes: Dict[str, str] = {}
cached: Dict[str, Dict[str, str]] = { "Default" : DEFAULT_THEME }
current: str = ""
main_bg = main_fg = title = hi_fg = selected_bg = selected_fg = inactive_fg = proc_misc = cpu_box = mem_box = net_box = proc_box = div_line = temp_start = temp_mid = temp_end = cpu_start = cpu_mid = cpu_end = free_start = free_mid = free_end = cached_start = cached_mid = cached_end = available_start = available_mid = available_end = used_start = used_mid = used_end = download_start = download_mid = download_end = upload_start = upload_mid = upload_end = NotImplemented
gradient: Dict[str, List[str]] = {
"temp" : [],
"cpu" : [],
"free" : [],
"cached" : [],
"available" : [],
"used" : [],
"download" : [],
"upload" : []
}
def __init__(self, theme: str):
self.refresh()
self._load_theme(theme)
def __call__(self, theme: str):
for k in self.gradient.keys(): self.gradient[k] = []
self._load_theme(theme)
def _load_theme(self, theme: str):
tdict: Dict[str, str]
if theme in self.cached:
tdict = self.cached[theme]
elif theme in self.themes:
tdict = self._load_file(self.themes[theme])
self.cached[theme] = tdict
else:
theme = "Default"
tdict = DEFAULT_THEME
self.current = theme
#if CONFIG.color_theme != theme: CONFIG.color_theme = theme
#* Get key names from DEFAULT_THEME dict to not leave any color unset if missing from theme dict
for item, value in DEFAULT_THEME.items():
default = False if item not in ["main_fg", "main_bg"] else True
depth = "fg" if item not in ["main_bg", "selected_bg"] else "bg"
if item in tdict.keys():
setattr(self, item, Color(tdict[item], depth=depth, default=default))
else:
setattr(self, item, Color(value, depth=depth, default=default))
#* Create color gradients from one, two or three colors, 101 values indexed 0-100
2020-07-09 00:56:03 +00:00
rgb: Dict[str, Tuple[int, int, int]]
colors: List[List[int]] = []
2020-07-09 00:56:03 +00:00
for name in self.gradient.keys():
rgb = { "start" : getattr(self, f'{name}_start').dec, "mid" : getattr(self, f'{name}_mid').dec, "end" : getattr(self, f'{name}_end').dec }
colors = [ list(getattr(self, f'{name}_start')) ]
2020-07-09 00:56:03 +00:00
if rgb["end"][0] >= 0:
r = 50 if rgb["mid"][0] >= 0 else 100
for first, second in ["start", "mid" if r == 50 else "end"], ["mid", "end"]:
for i in range(r):
colors += [[rgb[first][n] + i * (rgb[second][n] - rgb[first][n]) // r for n in range(3)]]
2020-07-09 00:56:03 +00:00
if r == 100:
break
self.gradient[name] += [ Color.fg(*color) for color in colors ]
2020-07-09 00:56:03 +00:00
else:
c = Color.fg(*rgb["start"])
for _ in range(100):
self.gradient[name] += [c]
#* Set terminal colors
Term.fg, Term.bg = self.main_fg, self.main_bg
Draw.now(self.main_fg, self.main_bg)
def refresh(self):
'''Sets themes dict with names and paths to all found themes'''
self.themes = { "Default" : "Default" }
try:
for d in (THEME_DIR, USER_THEME_DIR):
for f in os.listdir(d):
if f.endswith(".theme"):
self.themes[f'{f[:-6] if d == THEME_DIR else f[:-6] + "*"}'] = f'{d}/{f}'
except Exception as e:
errlog.exception(str(e))
@staticmethod
def _load_file(path: str) -> Dict[str, str]:
'''Load a bashtop formatted theme file and return a dict'''
new_theme: Dict[str, str] = {}
try:
with open(path) as f:
for line in f:
if not line.startswith("theme["): continue
key = line[6:line.find("]")]
s = line.find('"')
value = line[s + 1:line.find('"', s + 1)]
new_theme[key] = value
except Exception as e:
errlog.exception(str(e))
return new_theme
class Banner:
'''Holds the bpytop banner, .draw(line, [col=0], [center=False], [now=False])'''
out: List[str] = []
c_color: str = ""
length: int = 0
if not out:
for num, (color, line) in enumerate(BANNER_SRC.items()):
if len(line) > length: length = len(line)
out += [""]
line_color = Color.fg(color)
line_dark = Color.fg(f'#{80 - num * 6}')
for letter in line:
if letter == "" and c_color != line_color:
c_color = line_color
out[num] += line_color
elif letter == " ":
letter = f'{Mv.r(1)}'
elif letter != "" and c_color != line_dark:
c_color = line_dark
out[num] += line_dark
out[num] += letter
@classmethod
def draw(cls, line: int, col: int = 0, center: bool = False, now: bool = False):
out: str = ""
if center: col = Term.width // 2 - cls.length // 2
for n, o in enumerate(cls.out):
out += f'{Mv.to(line + n, col)}{o}'
out += f'{Term.fg}'
if now: Draw.out(out)
else: return out
class Symbol:
h_line: str = ""
v_line: str = ""
left_up: str = ""
right_up: str = ""
left_down: str = ""
right_down: str = ""
title_left: str = ""
title_right: str = ""
div_up: str = ""
div_down: str = ""
graph_up: Dict[float, str] = {
0.0 : "", 0.1 : "", 0.2 : "", 0.3 : "", 0.4 : "",
1.0 : "", 1.1 : "", 1.2 : "", 1.3 : "", 1.4 : "",
2.0 : "", 2.1 : "", 2.2 : "", 2.3 : "", 2.4 : "",
3.0 : "", 3.1 : "", 3.2 : "", 3.3 : "", 3.4 : "",
4.0 : "", 4.1 : "", 4.2 : "", 4.3 : "", 4.4 : ""
}
graph_down: Dict[float, str] = {
0.0 : "", 0.1 : "", 0.2 : "", 0.3 : "", 0.4 : "",
1.0 : "", 1.1 : "", 1.2 : "", 1.3 : "", 1.4 : "",
2.0 : "", 2.1 : "", 2.2 : "", 2.3 : "", 2.4 : "",
3.0 : "", 3.1 : "", 3.2 : "", 3.3 : "", 3.4 : "",
4.0 : "", 4.1 : "", 4.2 : "", 4.3 : "", 4.4 : ""
}
meter: str = ""
ok: str = f'{Color.fg("#30ff50")}{Color.fg("#cc")}'
fail: str = f'{Color.fg("#ff3050")}!{Color.fg("#cc")}'
class Graph:
out: str
width: int
height: int
graphs: Dict[bool, List[str]]
2020-07-09 00:56:03 +00:00
colors: List[str]
invert: bool
max_value: int
current: bool
last: int
2020-07-09 00:56:03 +00:00
symbol: Dict[float, str]
def __init__(self, width: int, height: int, color: Union[List[str], Color, None], data: List[int], invert: bool = False, max_value: int = 0):
self.graphs: Dict[bool, List[str]] = {False : [], True : []}
self.current: bool = True
self.colors: List[str] = []
if isinstance(color, list):
for i in range(1, height + 1): self.colors.insert(0, color[i * 100 // height]) #* Calculate colors of graph
if invert: self.colors.reverse()
elif isinstance(color, Color):
self.colors = [ f'{color}' for _ in range(height) ]
self.width = width
self.height = height
self.invert = invert
if not data: data = [0]
2020-07-09 00:56:03 +00:00
if max_value:
self.max_value = max_value
data = [ v * 100 // max_value if v < max_value else 100 for v in data ] #* Convert values to percentage values of max_value with max_value as ceiling
else:
self.max_value = 0
2020-07-09 00:56:03 +00:00
self.symbol = Symbol.graph_down if invert else Symbol.graph_up
value_width: int = ceil(len(data) / 2)
filler: str = ""
if value_width > width: #* If the size of given data set is bigger then width of graph, shrink data set
data = data[-(width*2):]
value_width = ceil(len(data) / 2)
elif value_width < width: #* If the size of given data set is smaller then width of graph, fill graph with whitespace
filler = " " * (width - value_width)
for _ in range(height):
for b in [True, False]:
self.graphs[b].append(filler if filler else "")
self._create(data, new=True)
def _create(self, data: List[int], new: bool = False):
h_high: int
h_low: int
value: Dict[str, int] = { "left" : 0, "right" : 0 }
val: int
side: str
#* Create the graph
for h in range(self.height):
h_high = round(100 * (self.height - h) / self.height) if self.height > 1 else 100
h_low = round(100 * (self.height - (h + 1)) / self.height) if self.height > 1 else 0
for v in range(len(data)):
if new: self.current = bool(v % 2) #* Switch between True and False graphs
if new and v == 0: self.last = 0
for val, side in [self.last, "left"], [data[v], "right"]: # type: ignore
if val >= h_high:
value[side] = 4
elif val <= h_low:
value[side] = 0
else:
if self.height == 1: value[side] = round(val * 4 / 100 + 0.5)
else: value[side] = round((val - h_low) * 4 / (h_high - h_low) + 0.1)
if new: self.last = data[v]
self.graphs[self.current][h] += self.symbol[float(value["left"] + value["right"] / 10)]
self.last = data[-1]
self.out = ""
for h in range(self.height):
if h > 0: self.out += f'{Mv.d(1)}{Mv.l(self.width)}'
self.out += f'{"" if not self.colors else self.colors[h]}{self.graphs[self.current][h if not self.invert else (self.height - 1) - h]}'
self.out += f'{Term.fg}'
2020-07-09 00:56:03 +00:00
def add(self, value: int):
self.current = not self.current
for n in range(self.height):
self.graphs[self.current][n] = self.graphs[self.current][n][1:]
2020-07-09 00:56:03 +00:00
if self.max_value: value = value * 100 // self.max_value if value < self.max_value else 100
self._create([value])
return self.out
2020-07-09 00:56:03 +00:00
def __str__(self):
return self.out
def __repr__(self):
return repr(self.out)
2020-07-09 00:56:03 +00:00
class Graphs:
'''Holds all graphs and lists of graphs for dynamically created graphs'''
cpu: Graph
cores: List[Graph] = []
temps: List[Graph] = []
net: Graph
detailed_cpu: Graph
detailed_mem: Graph
pid_cpu: List[Graph] = []
class Meter:
'''Creates a percentage meter
__init__(value, width, theme, gradient_name) to create new meter
__call__(value) to set value and return meter as a string
__str__ returns last set meter as a string
'''
out: str
2020-07-09 00:56:03 +00:00
color_gradient: List[str]
color_inactive: Color
width: int
saved: Dict[int, str]
2020-07-09 00:56:03 +00:00
def __init__(self, value: int, width: int, gradient_name: str):
self.color_gradient = THEME.gradient[gradient_name]
self.color_inactive = THEME.inactive_fg
self.width = width
self.out = self._create(value)
self.saved = { value : self.out }
2020-07-09 00:56:03 +00:00
def __call__(self, value: int):
if value in self.saved.keys():
self.out = self.saved[value]
else:
self.out = self._create(value)
self.saved[value] = self.out
return self.out
def __str__(self):
return self.out
def __repr__(self):
return repr(self.out)
def _create(self, value: int):
if value > 100: value = 100
elif value < 0: value = 100
out: str = ""
for i in range(1, self.width + 1):
if value >= round(i * 100 / self.width):
out += f'{self.color_gradient[round(i * 100 / self.width)]}{Symbol.meter}'
else:
out += self.color_inactive(Symbol.meter * (self.width + 1 - i))
break
else:
out += f'{Term.fg}'
return out
class Meters:
cpu: Meter
mem_used: Meter
mem_available: Meter
mem_cached: Meter
mem_free: Meter
swap_used: Meter
swap_free: Meter
disks_used: Meter
disks_free: Meter
class Box:
'''Box class with all needed attributes for create_box() function'''
name: str
height_p: int
width_p: int
x: int
y: int
width: int
height: int
out: str
bg: str
_b_cpu_h: int
_b_mem_h: int
redraw_all: bool
2020-07-09 00:56:03 +00:00
@classmethod
def calc_sizes(cls):
'''Calculate sizes of boxes'''
for sub in cls.__subclasses__():
sub._calc_size() # type: ignore
@classmethod
def draw_bg(cls, now: bool = True):
'''Draw all boxes outlines and titles'''
Draw.buffer("bg!" if now else "bg", "".join(sub._draw_bg() for sub in cls.__subclasses__())) # type: ignore
class SubBox:
box_x: int = 0
box_y: int = 0
box_width: int = 0
box_height: int = 0
box_columns: int = 0
class CpuBox(Box, SubBox):
name = "cpu"
x = 0
y = 0
2020-07-09 00:56:03 +00:00
height_p = 32
width_p = 100
@classmethod
def _calc_size(cls):
cls.width = round(Term.width * cls.width_p / 100)
cls.height = round(Term.height * cls.height_p / 100)
Box._b_cpu_h = cls.height
#THREADS = 25
if THREADS > (cls.height - 6) * 3 and cls.width > 200: cls.box_width = 24 * 4; cls.box_height = ceil(THREADS / 4) + 4; cls.box_columns = 4
elif THREADS > (cls.height - 6) * 2 and cls.width > 150: cls.box_width = 24 * 3; cls.box_height = ceil(THREADS / 3) + 4; cls.box_columns = 3
elif THREADS > cls.height - 6 and cls.width > 100: cls.box_width = 24 * 2; cls.box_height = ceil(THREADS / 2) + 4; cls.box_columns = 2
else: cls.box_width = 24; cls.box_height = THREADS + 4; cls.box_columns = 1
if CONFIG.check_temp: cls.box_width += 13 * cls.box_columns
if cls.box_height > cls.height - 2: cls.box_height = cls.height - 2
cls.box_x = (cls.width - 2) - cls.box_width
cls.box_y = cls.y + ((cls.height - 2) // 2) - cls.box_height // 2 + 1
cls.redraw_all = True
@classmethod
def _draw_bg(cls) -> str:
return f'{create_box(box=cls, line_color=THEME.cpu_box)}\
{Mv.to(cls.y, cls.x + 10)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("m")}{THEME.title("enu")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}\
{create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title=CPU_NAME[:18 if CONFIG.check_temp else 9])}'
@classmethod
def _draw_fg(cls, cpu):
Draw.buffer("cpu", f'Cpu usage: {cpu.cpu_usage}\nCpu freq: {cpu.cpu_freq}\nLoad avg: {cpu.load_avg}\n')
class MemBox(Box):
name = "mem"
height_p = 40
width_p = 45
x = 0
y = 0
2020-07-09 00:56:03 +00:00
divider: int = 0
mem_width: int = 0
disks_width: int = 0
@classmethod
def _calc_size(cls):
cls.width = round(Term.width * cls.width_p / 100)
cls.height = round(Term.height * cls.height_p / 100) + 1
Box._b_mem_h = cls.height
cls.y = Box._b_cpu_h + 1
cls.mem_width = cls.disks_width = round((cls.width-1) / 2)
if cls.mem_width + cls.disks_width < cls.width - 2: cls.mem_width += 1
cls.divider = cls.x + cls.mem_width + 1
cls.redraw_all = True
@classmethod
def _draw_bg(cls) -> str:
return f'{create_box(box=cls, line_color=THEME.mem_box)}\
{Mv.to(cls.y, cls.divider + 2)}{THEME.mem_box(Symbol.title_left)}{Fx.b}{THEME.title("disks")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}\
{Mv.to(cls.y, cls.divider)}{THEME.mem_box(Symbol.div_up)}\
{Mv.to(cls.y + cls.height - 1, cls.divider)}{THEME.mem_box(Symbol.div_down)}{THEME.div_line}\
{"".join(f"{Mv.to(cls.y + i, cls.divider)}{Symbol.v_line}" for i in range(1, cls.height - 1))}'
class NetBox(Box, SubBox):
name = "net"
height_p = 28
width_p = 45
x = 0
y = 0
2020-07-09 00:56:03 +00:00
@classmethod
def _calc_size(cls):
cls.width = round(Term.width * cls.width_p / 100)
cls.height = Term.height - Box._b_cpu_h - Box._b_mem_h
cls.y = Term.height - cls.height + 1
cls.box_width = 24
cls.box_height = 9 if cls.height > 12 else cls.height - 2
cls.box_x = cls.width - cls.box_width - 2
cls.box_y = cls.y + ((cls.height - 2) // 2) - cls.box_height // 2 + 1
cls.redraw_all = True
@classmethod
def _draw_bg(cls) -> str:
return f'{create_box(box=cls, line_color=THEME.net_box)}\
{create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title="Download", title2="Upload")}'
class ProcBox(Box):
name = "proc"
height_p = 68
width_p = 55
x = 0
y = 0
2020-07-09 00:56:03 +00:00
detailed: bool = False
detailed_x: int = 0
detailed_y: int = 0
detailed_width: int = 0
detailed_height: int = 8
@classmethod
def _calc_size(cls):
cls.width = round(Term.width * cls.width_p / 100)
cls.height = round(Term.height * cls.height_p / 100)
cls.x = Term.width - cls.width + 1
cls.y = Box._b_cpu_h + 1
cls.detailed_x = cls.x
cls.detailed_y = cls.y
cls.detailed_height = 8
cls.detailed_width = cls.width
cls.redraw_all = True
@classmethod
def _draw_bg(cls) -> str:
return create_box(box=cls, line_color=THEME.proc_box)
class Collector:
'''Data collector master class
* .start(): Starts collector thread
* .stop(): Stops collector thread
* .collect(*collectors: Collector, draw_now: bool = True, interrupt: bool = False): queues up collectors to run'''
stopping: bool = False
started: bool = False
draw_now: bool = False
thread: threading.Thread
collect_run = threading.Event()
collect_idle = threading.Event()
collect_idle.set()
collect_done = threading.Event()
collect_queue: List = []
collect_interrupt: bool = False
@classmethod
def start(cls):
cls.stopping = False
cls.thread = threading.Thread(target=cls._runner, args=())
cls.thread.start()
cls.started = True
@classmethod
def stop(cls):
if cls.started and cls.thread.is_alive():
cls.stopping = True
try:
cls.thread.join()
except:
pass
@classmethod
def _runner(cls):
'''This is meant to run in it's own thread, collecting and drawing when collect_run is set'''
while not cls.stopping:
cls.collect_run.wait(0.1)
if not cls.collect_run.is_set():
continue
cls.collect_run.clear()
cls.collect_idle.clear()
while cls.collect_queue:
collector = cls.collect_queue.pop()
collector._collect()
collector._draw()
if cls.draw_now:
Draw.out()
cls.collect_idle.set()
cls.collect_done.set()
@classmethod
def collect(cls, *collectors: object, draw_now: bool = True, interrupt: bool = False):
'''Setup collect queue for _runner'''
#* Set interrupt flag if True to stop _runner prematurely
cls.collect_interrupt = interrupt
#* Wait for _runner to finish
cls.collect_idle.wait()
#* Reset interrupt flag
cls.collect_interrupt = False
#* Set draw_now flag if True to draw to screen instead of buffer
cls.draw_now = draw_now
#* Append any collector given as argument to _runner queue
if collectors:
cls.collect_queue = [*collectors]
#* Add all collectors to _runner queue if no collectors in argument
else:
cls.collect_queue = list(cls.__subclasses__())
#* Set run flag to start _runner
cls.collect_run.set()
class CpuCollector(Collector):
'''Collects cpu usage for cpu and cores, cpu frequency, load_avg
_collect(): Collects data
_draw(): calls CpuBox._draw_fg()'''
cpu_usage: List[List[float]] = [[]]
for i in range(THREADS):
cpu_usage.append([])
cpu_freq: int = 0
load_avg: List[float] = []
@classmethod
def _collect(cls):
cls.cpu_usage[0].append(round(psutil.cpu_percent(percpu=False)))
for n, thread in enumerate(psutil.cpu_percent(percpu=True), start=1):
cls.cpu_usage[n].append(round(thread))
if len(cls.cpu_usage[0]) > Term.width:
try:
for n in range(THREADS + 1):
del cls.cpu_usage[n][0]
except IndexError:
pass
cls.cpu_freq = round(psutil.cpu_freq().current)
cls.load_avg = [round(lavg, 2) for lavg in os.getloadavg()]
@classmethod
def _draw(cls):
CpuBox._draw_fg(cls)
#class ProcCollector(Collector): #! add interrupt on _collect and _draw
@timerd
def testing_collectors():
Collector.collect()
Collector.collect_done.wait()
#sleep(2)
#Collector.collect(CpuCollector)
#Draw.now(f'Cpu usage: {CpuCollector.cpu_usage}\nCpu freq: {CpuCollector.cpu_freq}\nLoad avg: {CpuCollector.load_avg}\n')
#? Functions ------------------------------------------------------------------------------------->
def get_cpu_name() -> str:
'''Fetch a suitable CPU identifier from the CPU model name string'''
name: str = ""
nlist: List = []
command: str = ""
cmd_out: str = ""
rem_line: str = ""
if SYSTEM == "Linux":
command = "cat /proc/cpuinfo"
rem_line = "model name"
elif SYSTEM == "MacOS":
command ="sysctl -n machdep.cpu.brand_string"
elif SYSTEM == "BSD":
command ="sysctl hw.model"
rem_line = "hw.model"
cmd_out = subprocess.check_output("LANG=C " + command, shell=True, universal_newlines=True)
if rem_line:
for line in cmd_out.split("\n"):
if rem_line in line:
name = re.sub( ".*" + rem_line + ".*:", "", line,1).lstrip()
else:
name = cmd_out
nlist = name.split(" ")
if "Xeon" in name:
name = nlist[nlist.index("CPU")+1]
elif "Ryzen" in name:
name = " ".join(nlist[nlist.index("Ryzen"):nlist.index("Ryzen")+3])
elif "CPU" in name:
name = nlist[nlist.index("CPU")-1]
return name
def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", title2: str = "", line_color: Color = None, fill: bool = True, box = None) -> str:
'''Create a box from a box object or by given arguments'''
out: str = f'{Term.fg}{Term.bg}'
if not line_color: line_color = THEME.div_line
#* Get values from box class if given
if box:
x = box.x
y = box.y
width = box.width
height =box.height
title = box.name
vlines: Tuple[int, int] = (x, x + width - 1)
hlines: Tuple[int, int] = (y, y + height - 1)
#* Fill box if enabled
if fill:
for i in range(y + 1, y + height - 1):
out += f'{Mv.to(i, x)}{" " * (width - 1)}'
out += f'{line_color}'
#* Draw all horizontal lines
for hpos in hlines:
out += f'{Mv.to(hpos, x)}{Symbol.h_line * (width - 1)}'
#* Draw all vertical lines
for vpos in vlines:
for hpos in range(y, y + height - 1):
out += f'{Mv.to(hpos, vpos)}{Symbol.v_line}'
#* Draw corners
out += f'{Mv.to(y, x)}{Symbol.left_up}\
{Mv.to(y, x + width - 1)}{Symbol.right_up}\
{Mv.to(y + height - 1, x)}{Symbol.left_down}\
{Mv.to(y + height - 1, x + width - 1)}{Symbol.right_down}'
#* Draw titles if enabled
if title:
out += f'{Mv.to(y, x + 2)}{Symbol.title_left}{THEME.title}{Fx.b}{title}{Fx.ub}{line_color}{Symbol.title_right}'
if title2:
out += f'{Mv.to(y + height - 1, x + 2)}{Symbol.title_left}{THEME.title}{Fx.b}{title2}{Fx.ub}{line_color}{Symbol.title_right}'
return f'{out}{Term.fg}{Mv.to(y + 1, x + 1)}'
def now_sleeping(signum, frame):
"""Reset terminal settings and stop background input read before putting to sleep"""
Key.stop()
Collector.stop()
Draw.now(Term.clear, Term.normal_screen, Term.show_cursor)
Term.echo(True)
os.kill(os.getpid(), signal.SIGSTOP)
def now_awake(signum, frame):
"""Set terminal settings and restart background input read"""
Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor)
Term.echo(False)
Key.start()
Collector.start()
def quit_sigint(signum, frame):
"""SIGINT redirection to clean_quit()"""
clean_quit()
def clean_quit(errcode: int = 0, errmsg: str = ""):
"""Stop background input read, save current config and reset terminal settings before quitting"""
Key.stop()
Collector.stop()
if not errcode: CONFIG.save_config()
#Draw.now(Term.clear, Term.normal_screen, Term.show_cursor) #! Enable
Draw.now(Term.show_cursor) #! Remove
Term.echo(True)
if errcode == 0:
errlog.info(f'Exiting. Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n')
else:
errlog.warning(f'Exiting with errorcode ({errcode}). Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n')
if errmsg: print(errmsg)
raise SystemExit(errcode)
def floating_humanizer(value: Union[float, int], bit: bool = False, per_second: bool = False, start: int = 0, short: bool = False) -> str:
'''Scales up in steps of 1024 to highest possible unit and returns string with unit suffixed
* bit=True or defaults to bytes
* start=int to set 1024 multiplier starting unit
* short=True always returns 0 decimals and shortens unit to 1 character
'''
out: str = ""
unit: Tuple[str, ...] = UNITS["bit"] if bit else UNITS["byte"]
selector: int = start if start else 0
mult: int = 8 if bit else 1
if value <= 0: value = 0
if isinstance(value, float): value = round(value * 100 * mult)
elif value > 0: value *= 100 * mult
while len(f'{value}') > 5 and value >= 102400:
value >>= 10
if value < 100:
out = f'{value}'
break
selector += 1
else:
if len(f'{value}') < 5 and len(f'{value}') >= 2 and selector > 0:
decimals = 5 - len(f'{value}')
out = f'{value}'[:-2] + "." + f'{value}'[-decimals:]
elif len(f'{value}') >= 2:
out = f'{value}'[:-2]
if short: out = out.split(".")[0]
out += f'{"" if short else " "}{unit[selector][0] if short else unit[selector]}'
if per_second: out += "/s" if bit else "ps"
return out
#? Main function --------------------------------------------------------------------------------->
def main():
clean_quit()
#? Pre main -------------------------------------------------------------------------------------->
CPU_NAME: str = get_cpu_name()
testing = True #! Remove
#! For testing ------------------------------------------------------------------------------->
def waitone(t: float = 0.0):
if t > 0.0: Key.new.wait(t)
else: Key.new.wait()
Key.new.clear()
Draw.clear()
Draw.now(Term.clear)
@timerd
def testing_gradients():
# for theme in THEME.themes.keys():
# THEME(theme)
# timer = time()
for key in THEME.gradient.keys():
Draw.now(f'{Term.fg}{key}:\n{"".join(THEME.gradient[key])}\n')
Draw.now(f'{Term.fg}')
# Draw.now(f'Theme creation of {CONFIG.color_theme} took {time() - timer:.5f} seconds\n\n')
Draw.now("\n")
# THEME("Default")
@timerd
def testing_humanizer():
for i in range(1, 101, 3):
for n in range(1, 6):
Draw.now(floating_humanizer(i * (1050 * n) * (n << 10 ), bit=False, per_second=False, short=False), " ")
Draw.now("\n")
for i in range(1, 30):
for n in range(1, 8):
Draw.now(floating_humanizer((995 + i) << (n * 10), bit=False), " ")
Draw.now("\n")
@timerd
def testing_colors():
for item, _ in DEFAULT_THEME.items():
Draw.buffer("+testing", Fx.b, getattr(THEME, item)(f'{item:<20}'), Fx.ub, f'{"hex=" + getattr(THEME, item).hexa:<20} dec={getattr(THEME, item).dec}\n')
Draw.out()
@timerd
def testing_boxes():
#Box.calc_sizes()
Box.draw_bg()
Draw.now(Mv.to(Term.height - 3, 1))
#Draw.now(create_box(20, 20, 15, 10, "Hej"))
#waitone()
#Draw.now(Term.normal_screen, Term.show_cursor)
@timerd
def testing_banner():
Draw.buffer("banner", Banner.draw(18, center=True))
Draw.out()
Draw.now(Mv.to(35, 1))
@timerd
def testing_meter():
Draw.clear("meters")
for _ in range(10):
Draw.buffer("+meters", "1234567890")
Draw.buffer("+meters", "\n")
stamp = time()
test_meter = Meter(0, Term.width, "cpu")
for i in range(0,101, 2):
Draw.buffer("+meters", test_meter(i), "\n")
Draw.buffer("+meter", f'{time() - stamp}')
Draw.out()
def testing_keyinput():
line: str = ""
this_key: str = ""
count: int = 0
while True:
count += 1
Draw.now(f'{Mv.to(1,1)}{Fx.b}{THEME.temp_start("Count:")} {count} {THEME.cached_mid("Time:")} {strftime("%H:%M:%S", localtime())}',
f'{Color.fg("#ff")} Width: {Term.width} Height: {Term.height} Resized: {Term.resized}')
if Key.input_wait(1):
while Key.list:
#Key.new.clear()
this_key = Key.list.pop()
Draw.now(f'{Mv.to(2,1)}{Color.fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key):14}{" "}')
if this_key == "backspace":
line = line[:-1]
elif this_key == "escape":
line = ""
elif this_key == "Q":
clean_quit()
elif this_key == "R":
raise Exception("Test ERROR")
elif len(this_key) == 1:
line += this_key
Draw.now(f'{Color.fg("#90ff50")}{Fx.b}Command= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}\033[0K\n')
if this_key == "enter":
try:
exec(line)
except:
pass
Draw.clear()
def testing_graphs():
from random import randint
my_data = [x for x in range(0, 101)]
my_data += [x for x in range(100, -1, -1)]
my_graph = Graph(Term.width, Term.height // 3, THEME.gradient['cpu'], my_data, invert=False)
my_graph2 = Graph(Term.width, Term.height // 3, THEME.gradient['cpu'], my_data, invert=True)
my_graph3 = Graph(Term.width, 1, THEME.proc_misc, my_data)
my_graph4 = Graph(Term.width, 2, None, my_data)
Draw.now(f'{Mv.to(0, 0)}{my_graph}')
Draw.now(f'{Mv.to(Term.height // 3 + 1, 0)}{my_graph2}')
Draw.now(f'{Mv.to(Term.height - (Term.height // 3) + 2, 0)}{my_graph3}')
Draw.now(f'{Mv.to(Term.height - (Term.height // 3) + 4, 0)}{my_graph4}')
for _ in range(100):
sleep(0.1)
x = randint(0, 100)
Draw.now(f'{Mv.to(0, 0)}{my_graph.add(x)}')
Draw.now(f'{Mv.to(Term.height // 3 + 1, 0)}{my_graph2.add(x)}')
Draw.now(f'{Mv.to(Term.height - (Term.height // 3) + 2, 0)}{my_graph3.add(x)}')
Draw.now(f'{Mv.to(Term.height - (Term.height // 3) + 4, 0)}{my_graph4.add(x)}')
Draw.now(Mv.to(Term.height -4, 0))
2020-07-09 00:56:03 +00:00
# if not Key.reader.is_alive():
# clean_quit(1)
# Key.new.wait(1.0)
#! Remove ------------------------------------------------------------------------------------<
if __name__ == "__main__":
#? Init -------------------------------------------------------------------------------------->
Timer.start("Init")
#? Temporary functions for init
def _fail(err):
Draw.now(f'{Symbol.fail}')
errlog.exception(f'{err}')
sleep(2)
clean_quit(1, errmsg=f'Error during init! See {CONFIG_DIR}/error.log for more information.')
def _success():
if not testing: sleep(0.1) #! Remove if
Draw.now(f'{Symbol.ok}\n{Mv.r(Term.width // 2 - 19)}')
#? Switch to alternate screen, clear screen, hide cursor and disable input echo
Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor) #! Enable
#Draw.now(Term.clear, Term.hide_cursor) #! Disable
Term.echo(False)
#? Draw banner and init status
Draw.now(Banner.draw(Term.height // 2 - 10, center=True), "\n")
Draw.now(Color.fg("#50"))
for _i in range(10):
Draw.now(f'{Mv.to(Term.height // 2 - 3 + _i, Term.width // 2 - 25)}{str((_i + 1) * 10) + "%":>5}{Symbol.v_line}')
Draw.now(f'{Color.fg("#cc")}{Fx.b}{Mv.to(Term.height // 2 - 3, Term.width // 2 - 18)}')
#? Load theme
Draw.now("Loading theme and creating colors... ")
try:
THEME: Theme = Theme(CONFIG.color_theme)
except Exception as e:
_fail(e)
else:
_success()
#? Setup boxes
Draw.now("Doing some maths and drawing... ")
try:
Box.calc_sizes()
Box.draw_bg(now=False)
except Exception as e:
_fail(e)
else:
_success()
#? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH
Draw.now("Setting up signal handlers... ")
try:
signal.signal(signal.SIGTSTP, now_sleeping) #* Ctrl-Z
signal.signal(signal.SIGCONT, now_awake) #* Resume
signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C
signal.signal(signal.SIGWINCH, Term.refresh) #* Terminal resized
except Exception as e:
_fail(e)
else:
_success()
#? Start a separate thread for reading keyboard input
Draw.now("Starting input reader thread... ")
try:
Key.start()
except Exception as e:
_fail(e)
else:
_success()
#? Start a separate thread for data collection and drawing
Draw.now("Starting data collection and drawer thread... ")
try:
Collector.start()
except Exception as e:
_fail(e)
else:
_success()
#? Collect data and draw to buffer
Draw.now("Collecting data and drawing...")
try:
#Collector.collect(draw_now=False)
pass
except Exception as e:
_fail(e)
else:
_success()
#? Draw to screen
Draw.now("Finishing up...")
try:
#Collector.collect_done.wait()
pass
except Exception as e:
_fail(e)
else:
_success()
if not testing: sleep(1) #! Remove if
del _fail, _success
if not testing: Draw.out(clear=True) #! Remove if
else: Draw.clear(); Draw.now(Term.clear) #! Remove
Timer.stop("Init")
#! For testing ------------------------------------------------------------------------------->
if testing:
try:
testing_graphs()
#testing_collectors()
#testing_humanizer()
2020-07-09 00:56:03 +00:00
# waitone(1)
#testing_keyinput()
#testing_banner()
2020-07-09 00:56:03 +00:00
# waitone(1)
#testing_colors()
2020-07-09 00:56:03 +00:00
# waitone(1)
#testing_gradients()
2020-07-09 00:56:03 +00:00
# waitone(1)
#testing_boxes()
2020-07-09 00:56:03 +00:00
# waitone(1)
#testing_meter()
2020-07-09 00:56:03 +00:00
# Draw.idle.clear()
#Draw.now(f'{Mv.to(Term.height - 5, 1)}Any key to exit!')
#waitone()
# Draw.idle.set()
#sleep(2)
except Exception as e:
errlog.exception(f'{e}')
clean_quit(1)
clean_quit()
#! Remove ------------------------------------------------------------------------------------<
#? Start main loop
while not False:
try:
main()
except Exception as e:
errlog.exception(f'{e}')
clean_quit(1)
else:
#? Quit cleanly even if false starts being true...
clean_quit()