mirror of https://github.com/aristocratos/bpytop
1921 lines
61 KiB
Python
Executable File
1921 lines
61 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# pylint: disable=not-callable, no-member
|
||
# indent = tab
|
||
# tab-size = 4
|
||
|
||
# Copyright 2020 Aristocratos (jakob@qvantnet.com)
|
||
|
||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at
|
||
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
|
||
import os, sys, threading, signal, re, subprocess, logging, logging.handlers
|
||
from time import time, sleep, strftime, localtime
|
||
from datetime import timedelta
|
||
from select import select
|
||
from distutils.util import strtobool
|
||
from string import Template
|
||
from math import ceil
|
||
from random import randint
|
||
from typing import List, Set, Dict, Tuple, Optional, Union, Any, Callable, ContextManager, Iterable
|
||
|
||
errors: List[str] = []
|
||
try: import fcntl, termios, tty
|
||
except Exception as e: errors.append(f'{e}')
|
||
|
||
try: import psutil # type: ignore
|
||
except Exception as e: errors.append(f'{e}')
|
||
|
||
SELF_START = time()
|
||
|
||
SYSTEM: str
|
||
if "linux" in sys.platform: SYSTEM = "Linux"
|
||
elif "bsd" in sys.platform: SYSTEM = "BSD"
|
||
elif "darwin" in sys.platform: SYSTEM = "MacOS"
|
||
else: SYSTEM = "Other"
|
||
|
||
if errors:
|
||
print ("ERROR!")
|
||
for error in errors:
|
||
print(error)
|
||
if SYSTEM == "Other":
|
||
print("\nUnsupported platform!\n")
|
||
else:
|
||
print("\nInstall required modules!\n")
|
||
quit(1)
|
||
|
||
#? Variables ------------------------------------------------------------------------------------->
|
||
|
||
BANNER_SRC: List[Tuple[str, str, str]] = [
|
||
("#ffa50a", "#0fd7ff", "██████╗ ██████╗ ██╗ ██╗████████╗ ██████╗ ██████╗"),
|
||
("#f09800", "#00bfe6", "██╔══██╗██╔══██╗╚██╗ ██╔╝╚══██╔══╝██╔═══██╗██╔══██╗"),
|
||
("#db8b00", "#00a6c7", "██████╔╝██████╔╝ ╚████╔╝ ██║ ██║ ██║██████╔╝"),
|
||
("#c27b00", "#008ca8", "██╔══██╗██╔═══╝ ╚██╔╝ ██║ ██║ ██║██╔═══╝ "),
|
||
("#a86b00", "#006e85", "██████╔╝██║ ██║ ██║ ╚██████╔╝██║"),
|
||
("#000000", "#000000", "╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝"),
|
||
]
|
||
|
||
#RED
|
||
# "#E62525"
|
||
# "#CD2121"
|
||
# "#B31D1D"
|
||
# "#9A1919"
|
||
# "#801414"
|
||
# "#000000"
|
||
|
||
#GREEN
|
||
# "#75e12d"
|
||
# "#5bbb1b"
|
||
# "#499914"
|
||
# "#3b7811"
|
||
# "#30620e"
|
||
# "#000000"
|
||
|
||
#YELLOW
|
||
# "#fadd00"
|
||
# "#dbc200"
|
||
# "#b8a200"
|
||
# "#998700"
|
||
# "#807100"
|
||
# "#000000"
|
||
|
||
#BLUE
|
||
# "#0084ff"
|
||
# "#016cd0"
|
||
# "#0157a7"
|
||
# "#014484"
|
||
# "#003261"
|
||
# "#000000"
|
||
|
||
VERSION: str = "0.0.3"
|
||
|
||
#*?This is the template used to create the config file
|
||
DEFAULT_CONF: Template = Template(f'#? Config file for bpytop v. {VERSION}' + '''
|
||
|
||
#* Color theme, looks for a .theme file in "~/.config/bpytop/themes" and "~/.config/bpytop/user_themes", "Default" for builtin default theme
|
||
color_theme="$color_theme"
|
||
|
||
#* Update time in milliseconds, increases automatically if set below internal loops processing time, recommended 2000 ms or above for better sample times for graphs.
|
||
update_ms=$update_ms
|
||
|
||
#* Processes sorting, "pid" "program" "arguments" "threads" "user" "memory" "cpu lazy" "cpu responsive",
|
||
#* "cpu lazy" updates sorting over time, "cpu responsive" updates sorting directly.
|
||
proc_sorting="$proc_sorting"
|
||
|
||
#* Reverse sorting order, True or False.
|
||
proc_reversed=$proc_reversed
|
||
|
||
#* Show processes as a tree
|
||
proc_tree=$proc_tree
|
||
|
||
#* Check cpu temperature, needs "vcgencmd" on Raspberry Pi and "osx-cpu-temp" on MacOS X.
|
||
check_temp=$check_temp
|
||
|
||
#* Draw a clock at top of screen, formatting according to strftime, empty string to disable.
|
||
draw_clock="$draw_clock"
|
||
|
||
#* Update main ui in background when menus are showing, set this to false if the menus is flickering too much for comfort.
|
||
background_update=$background_update
|
||
|
||
#* Custom cpu model name, empty string to disable.
|
||
custom_cpu_name="$custom_cpu_name"
|
||
|
||
#* Show color gradient in process list, True or False.
|
||
proc_gradient=$proc_gradient
|
||
|
||
#* If process cpu usage should be of the core it's running on or usage of the total available cpu power.
|
||
proc_per_core=$proc_per_core
|
||
|
||
#* Optional filter for shown disks, should be names of mountpoints, "root" replaces "/", separate multiple values with space.
|
||
disks_filter="$disks_filter"
|
||
|
||
#* Enable check for new version from github.com/aristocratos/bpytop at start.
|
||
update_check=$update_check
|
||
|
||
#* Set loglevel for "~/.config/bpytop/error.log" levels are: "CRITICAL" "ERROR" "WARNING" "INFO" "DEBUG".
|
||
#* The level set includes all lower levels, i.e. "DEBUG" will show all logging info.
|
||
log_level="$log_level"
|
||
''')
|
||
|
||
CONFIG_DIR: str = f'{os.path.expanduser("~")}/.config/bpytop'
|
||
if not os.path.isdir(CONFIG_DIR):
|
||
try:
|
||
os.makedirs(CONFIG_DIR)
|
||
os.mkdir(f'{CONFIG_DIR}/themes')
|
||
os.mkdir(f'{CONFIG_DIR}/user_themes')
|
||
except PermissionError:
|
||
print(f'ERROR!\nNo permission to write to "{CONFIG_DIR}" directory!')
|
||
quit(1)
|
||
CONFIG_FILE: str = f'{CONFIG_DIR}/bpytop.conf'
|
||
THEME_DIR: str = f'{CONFIG_DIR}/themes'
|
||
USER_THEME_DIR: str = f'{CONFIG_DIR}/user_themes'
|
||
|
||
CORES: int = psutil.cpu_count(logical=False) or 1
|
||
THREADS: int = psutil.cpu_count(logical=True) or 1
|
||
|
||
DEFAULT_THEME: Dict[str, str] = {
|
||
"main_bg" : "",
|
||
"main_fg" : "#cc",
|
||
"title" : "#ee",
|
||
"hi_fg" : "#90",
|
||
"selected_bg" : "#7e2626",
|
||
"selected_fg" : "#ee",
|
||
"inactive_fg" : "#40",
|
||
"proc_misc" : "#0de756",
|
||
"cpu_box" : "#3d7b46",
|
||
"mem_box" : "#8a882e",
|
||
"net_box" : "#423ba5",
|
||
"proc_box" : "#923535",
|
||
"div_line" : "#30",
|
||
"temp_start" : "#4897d4",
|
||
"temp_mid" : "#5474e8",
|
||
"temp_end" : "#ff40b6",
|
||
"cpu_start" : "#50f095",
|
||
"cpu_mid" : "#f2e266",
|
||
"cpu_end" : "#fa1e1e",
|
||
"free_start" : "#223014",
|
||
"free_mid" : "#b5e685",
|
||
"free_end" : "#dcff85",
|
||
"cached_start" : "#0b1a29",
|
||
"cached_mid" : "#74e6fc",
|
||
"cached_end" : "#26c5ff",
|
||
"available_start" : "#292107",
|
||
"available_mid" : "#ffd77a",
|
||
"available_end" : "#ffb814",
|
||
"used_start" : "#3b1f1c",
|
||
"used_mid" : "#d9626d",
|
||
"used_end" : "#ff4769",
|
||
"download_start" : "#231a63",
|
||
"download_mid" : "#4f43a3",
|
||
"download_end" : "#b0a9de",
|
||
"upload_start" : "#510554",
|
||
"upload_mid" : "#7d4180",
|
||
"upload_end" : "#dcafde"
|
||
}
|
||
|
||
#? Units for floating_humanizer function
|
||
UNITS: Dict[str, Tuple[str, ...]] = {
|
||
"bit" : ("bit", "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", "Zib", "Yib", "Bib", "GEb"),
|
||
"byte" : ("Byte", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB", "BiB", "GEB")
|
||
}
|
||
|
||
#? Setup error logger ---------------------------------------------------------------->
|
||
|
||
try:
|
||
errlog = logging.getLogger("ErrorLogger")
|
||
errlog.setLevel(logging.DEBUG)
|
||
eh = logging.handlers.RotatingFileHandler(f'{CONFIG_DIR}/error.log', maxBytes=1048576, backupCount=4)
|
||
eh.setLevel(logging.DEBUG)
|
||
eh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s: %(message)s", datefmt="%d/%m/%y (%X)"))
|
||
errlog.addHandler(eh)
|
||
except PermissionError:
|
||
print(f'ERROR!\nNo permission to write to "{CONFIG_DIR}" directory!')
|
||
quit(1)
|
||
|
||
#! Timers, remove ----------------------------------------------------------------------->
|
||
|
||
class Timer:
|
||
timers: Dict[str, float] = {}
|
||
paused: Dict[str, float] = {}
|
||
|
||
@classmethod
|
||
def start(cls, name):
|
||
cls.timers[name] = time()
|
||
|
||
@classmethod
|
||
def pause(cls, name):
|
||
if name in cls.timers:
|
||
cls.paused[name] = time() - cls.timers[name]
|
||
del cls.timers[name]
|
||
|
||
@classmethod
|
||
def stop(cls, name):
|
||
if name in cls.timers:
|
||
total: float = time() - cls.timers[name]
|
||
del cls.timers[name]
|
||
if name in cls.paused:
|
||
total += cls.paused[name]
|
||
del cls.paused[name]
|
||
errlog.debug(f'{name} completed in {total:.6f} seconds')
|
||
|
||
def timerd(func):
|
||
def timed(*args, **kw):
|
||
ts = time()
|
||
out = func(*args, **kw)
|
||
te = time()
|
||
errlog.debug(f'{func.__name__} completed in {te - ts:.6f} seconds')
|
||
return out
|
||
return timed
|
||
|
||
#! Timers, remove -----------------------------------------------------------------------<
|
||
|
||
#? Set up config class and load config ------------------------------------------------>
|
||
|
||
class Config:
|
||
'''Holds all config variables and functions for loading from and saving to disk'''
|
||
keys: List[str] = ["color_theme", "update_ms", "proc_sorting", "proc_reversed", "proc_tree", "check_temp", "draw_clock", "background_update", "custom_cpu_name", "proc_gradient", "proc_per_core", "disks_filter", "update_check", "log_level"]
|
||
conf_dict: Dict[str, Union[str, int, bool]] = {}
|
||
color_theme: str = "Default"
|
||
update_ms: int = 2500
|
||
proc_sorting: str = "cpu lazy"
|
||
proc_reversed: bool = False
|
||
proc_tree: bool = False
|
||
check_temp: bool = True
|
||
draw_clock: str = "%X"
|
||
background_update: bool = True
|
||
custom_cpu_name: str = ""
|
||
proc_gradient: bool = True
|
||
proc_per_core: bool = False
|
||
disks_filter: str = ""
|
||
update_check: bool = True
|
||
log_level: str = "WARNING"
|
||
|
||
warnings: List[str] = []
|
||
|
||
sorting_options: List[str] = ["pid", "program", "arguments", "threads", "user", "memory", "cpu lazy", "cpu responsive"]
|
||
log_levels: List[str] = ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]
|
||
|
||
changed: bool = False
|
||
recreate: bool = False
|
||
config_file: str = ""
|
||
|
||
_initialized: bool = False
|
||
|
||
def __init__(self, path: str):
|
||
self.config_file = path
|
||
conf: Dict[str, Union[str, int, bool]] = self.load_config()
|
||
if not "version" in conf.keys():
|
||
self.recreate = True
|
||
self.warnings.append(f'Config file malformatted or missing, will be recreated on exit!')
|
||
elif conf["version"] != VERSION:
|
||
self.recreate = True
|
||
self.warnings.append(f'Config file version and bpytop version missmatch, will be recreated on exit!')
|
||
for key in self.keys:
|
||
if key in conf.keys() and conf[key] != "_error_":
|
||
setattr(self, key, conf[key])
|
||
else:
|
||
self.recreate = True
|
||
self.conf_dict[key] = getattr(self, key)
|
||
self._initialized = True
|
||
|
||
def __setattr__(self, name, value):
|
||
if self._initialized:
|
||
object.__setattr__(self, "changed", True)
|
||
object.__setattr__(self, name, value)
|
||
if name not in ["_initialized", "recreate", "changed"]:
|
||
self.conf_dict[name] = value
|
||
|
||
def load_config(self) -> Dict[str, Union[str, int, bool]]:
|
||
'''Load config from file, set correct types for values and return a dict'''
|
||
new_config: Dict[str,Union[str, int, bool]] = {}
|
||
if not os.path.isfile(self.config_file): return new_config
|
||
try:
|
||
with open(self.config_file, "r") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line.startswith("#? Config"):
|
||
new_config["version"] = line[line.find("v. ") + 3:]
|
||
for key in self.keys:
|
||
if line.startswith(key):
|
||
line = line.lstrip(key + "=")
|
||
if line.startswith('"'):
|
||
line = line.strip('"')
|
||
if type(getattr(self, key)) == int:
|
||
try:
|
||
new_config[key] = int(line)
|
||
except ValueError:
|
||
self.warnings.append(f'Config key "{key}" should be an integer!')
|
||
if type(getattr(self, key)) == bool:
|
||
try:
|
||
new_config[key] = bool(strtobool(line))
|
||
except ValueError:
|
||
self.warnings.append(f'Config key "{key}" can only be True or False!')
|
||
if type(getattr(self, key)) == str:
|
||
new_config[key] = str(line)
|
||
except Exception as e:
|
||
errlog.exception(str(e))
|
||
if "proc_sorting" in new_config and not new_config["proc_sorting"] in self.sorting_options:
|
||
new_config["proc_sorting"] = "_error_"
|
||
self.warnings.append(f'Config key "proc_sorted" didn\'t get an acceptable value!')
|
||
if "log_level" in new_config and not new_config["log_level"] in self.log_levels:
|
||
new_config["log_level"] = "_error_"
|
||
self.warnings.append(f'Config key "log_level" didn\'t get an acceptable value!')
|
||
return new_config
|
||
|
||
def save_config(self):
|
||
'''Save current config to config file if difference in values or version, creates a new file if not found'''
|
||
if not self.changed and not self.recreate: return
|
||
try:
|
||
with open(self.config_file, "w" if os.path.isfile(self.config_file) else "x") as f:
|
||
f.write(DEFAULT_CONF.substitute(self.conf_dict))
|
||
except Exception as e:
|
||
errlog.exception(str(e))
|
||
|
||
try:
|
||
CONFIG: Config = Config(CONFIG_FILE)
|
||
errlog.setLevel(getattr(logging, CONFIG.log_level))
|
||
errlog.info(f'New instance of bpytop version {VERSION} started with pid {os.getpid()}')
|
||
errlog.debug(f'Loglevel set to {CONFIG.log_level}')
|
||
if CONFIG.warnings:
|
||
for warning in CONFIG.warnings:
|
||
errlog.warning(warning)
|
||
CONFIG.warnings = []
|
||
except Exception as e:
|
||
errlog.exception(f'{e}')
|
||
quit(1)
|
||
|
||
|
||
#? Classes --------------------------------------------------------------------------------------->
|
||
|
||
class Term:
|
||
"""Terminal info and commands"""
|
||
width: int = os.get_terminal_size().columns #* Current terminal width in columns
|
||
height: int = os.get_terminal_size().lines #* Current terminal height in lines
|
||
resized: bool = False #* Flag indicating if terminal was recently resized
|
||
_resizing: bool = False
|
||
_w : int = 0
|
||
_h : int = 0
|
||
fg: str = "" #* Default foreground color
|
||
bg: str = "" #* Default background color
|
||
hide_cursor = "\033[?25l" #* Hide terminal cursor
|
||
show_cursor = "\033[?25h" #* Show terminal cursor
|
||
alt_screen = "\033[?1049h" #* Switch to alternate screen
|
||
normal_screen = "\033[?1049l" #* Switch to normal screen
|
||
clear = "\033[2J\033[0;0f" #* Clear screen and set cursor to position 0,0
|
||
|
||
@classmethod
|
||
def refresh(cls, *args):
|
||
"""Update width, height and set resized flag if terminal has been resized"""
|
||
if cls._resizing == True: return
|
||
cls._w, cls._h = os.get_terminal_size()
|
||
while (cls._w, cls._h) != (cls.width, cls.height):
|
||
cls._resizing = True
|
||
cls.resized = True
|
||
cls.width, cls.height = cls._w, cls._h
|
||
Draw.now(Term.clear)
|
||
Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 3, "resizing")}{THEME.main_fg}{Fx.b}{Mv.r(12)}Width : {cls._w} Height: {cls._h}')
|
||
while cls._w < 80 or cls._h < 24:
|
||
Draw.now(Term.clear)
|
||
Draw.now(f'{create_box(cls._w // 2 - 25, cls._h // 2 - 2, 50, 4, "warning")}{THEME.main_fg}{Fx.b}{Mv.r(12)}Width: {cls._w} Height: {cls._h}\
|
||
{Mv.to(cls._h // 2, cls._w // 2 - 23)}Width and Height needs to be at least 80x24!')
|
||
sleep(0.3)
|
||
cls._w, cls._h = os.get_terminal_size()
|
||
sleep(0.3)
|
||
cls._w, cls._h = os.get_terminal_size()
|
||
cls._resizing = False
|
||
Box.calc_sizes()
|
||
Box.draw_bg(now=True if not Init.running else False)
|
||
|
||
|
||
@staticmethod
|
||
def echo(on: bool):
|
||
"""Toggle input echo"""
|
||
(iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = termios.tcgetattr(sys.stdin.fileno())
|
||
if on:
|
||
lflag |= termios.ECHO # type: ignore
|
||
else:
|
||
lflag &= ~termios.ECHO # type: ignore
|
||
new_attr = [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
|
||
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, new_attr)
|
||
|
||
class Fx:
|
||
"""Text effects"""
|
||
start = "\033[" #* Escape sequence start
|
||
sep = ";" #* Escape sequence separator
|
||
end = "m" #* Escape sequence end
|
||
reset = rs = "\033[0m" #* Reset foreground/background color and text effects
|
||
bold = b = "\033[1m" #* Bold on
|
||
unbold = ub = "\033[22m" #* Bold off
|
||
dark = d = "\033[2m" #* Dark on
|
||
undark = ud = "\033[22m" #* Dark off
|
||
italic = i = "\033[3m" #* Italic on
|
||
unitalic = ui = "\033[23m" #* Italic off
|
||
underline = u = "\033[4m" #* Underline on
|
||
ununderline = uu = "\033[24m" #* Underline off
|
||
blink = bl = "\033[5m" #* Blink on
|
||
unblink = ubl = "\033[25m" #* Blink off
|
||
strike = s = "\033[9m" #* Strike / crossed-out on
|
||
unstrike = us = "\033[29m" #* Strike / crossed-out off
|
||
|
||
@staticmethod
|
||
def trans(string: str):
|
||
'''Replace white space with escape move right to not paint background in whitespace'''
|
||
return string.replace(" ", "\033[1C")
|
||
|
||
class Raw(object):
|
||
"""Set raw input mode for device"""
|
||
def __init__(self, stream):
|
||
self.stream = stream
|
||
self.fd = self.stream.fileno()
|
||
def __enter__(self):
|
||
self.original_stty = termios.tcgetattr(self.stream)
|
||
tty.setcbreak(self.stream)
|
||
def __exit__(self, type, value, traceback):
|
||
termios.tcsetattr(self.stream, termios.TCSANOW, self.original_stty)
|
||
|
||
class Nonblocking(object):
|
||
"""Set nonblocking mode for device"""
|
||
def __init__(self, stream):
|
||
self.stream = stream
|
||
self.fd = self.stream.fileno()
|
||
def __enter__(self):
|
||
self.orig_fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
|
||
fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl | os.O_NONBLOCK)
|
||
def __exit__(self, *args):
|
||
fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl)
|
||
|
||
class Mv:
|
||
"""Class with collection of cursor movement functions: .t[o](line, column) | .r[ight](columns) | .l[eft](columns) | .u[p](lines) | .d[own](lines) | .save() | .restore()"""
|
||
@staticmethod
|
||
def to(line: int, col: int) -> str:
|
||
return f'\033[{line};{col}f' #* Move cursor to line, column
|
||
@staticmethod
|
||
def right(x: int) -> str: #* Move cursor right x columns
|
||
return f'\033[{x}C'
|
||
@staticmethod
|
||
def left(x: int) -> str: #* Move cursor left x columns
|
||
return f'\033[{x}D'
|
||
@staticmethod
|
||
def up(x: int) -> str: #* Move cursor up x lines
|
||
return f'\033[{x}A'
|
||
@staticmethod
|
||
def down(x: int) -> str: #* Move cursor down x lines
|
||
return f'\033[{x}B'
|
||
@staticmethod
|
||
def save() -> str: #* Save cursor position
|
||
return "\033[s"
|
||
@staticmethod
|
||
def restore() -> str: #* Restore saved cursor postion
|
||
return "\033[u"
|
||
t = to
|
||
r = right
|
||
l = left
|
||
u = up
|
||
d = down
|
||
|
||
class Key:
|
||
"""Handles the threaded input reader"""
|
||
list: List[str] = []
|
||
new = threading.Event()
|
||
idle = threading.Event()
|
||
idle.set()
|
||
stopping: bool = False
|
||
started: bool = False
|
||
reader: threading.Thread
|
||
@classmethod
|
||
def start(cls):
|
||
cls.stopping = False
|
||
cls.reader = threading.Thread(target=cls._get_key)
|
||
cls.reader.start()
|
||
cls.started = True
|
||
@classmethod
|
||
def stop(cls):
|
||
if cls.started and cls.reader.is_alive():
|
||
cls.stopping = True
|
||
try:
|
||
cls.reader.join()
|
||
except:
|
||
pass
|
||
|
||
@classmethod
|
||
def last(cls) -> str:
|
||
if cls.list: return cls.list.pop()
|
||
else: return ""
|
||
|
||
@classmethod
|
||
def get(cls) -> str:
|
||
if cls.list: return cls.list.pop(0)
|
||
else: return ""
|
||
|
||
@classmethod
|
||
def has_key(cls) -> bool:
|
||
if cls.list: return True
|
||
else: return False
|
||
|
||
@classmethod
|
||
def clear(cls):
|
||
cls.list = []
|
||
|
||
@classmethod
|
||
def input_wait(cls, sec: float = 0.0) -> bool:
|
||
'''Returns True if key is detected else waits out timer and returns False'''
|
||
cls.new.wait(sec if sec > 0 else 0.0)
|
||
if cls.new.is_set():
|
||
cls.new.clear()
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
@classmethod
|
||
def _get_key(cls):
|
||
"""Get a single key from stdin, convert to readable format and save to keys list. Meant to be run in it's own thread."""
|
||
input_key: str = ""
|
||
clean_key: str = ""
|
||
escape: Dict[Union[str, Tuple[str, str]], str] = {
|
||
"\n" : "enter",
|
||
("\x7f", "\x08") : "backspace",
|
||
("[A", "OA") : "up",
|
||
("[B", "OB") : "down",
|
||
("[D", "OD") : "left",
|
||
("[C", "OC") : "right",
|
||
"[2~" : "insert",
|
||
"[3~" : "delete",
|
||
"[H" : "home",
|
||
"[F" : "end",
|
||
"[5~" : "page_up",
|
||
"[6~" : "page_down",
|
||
"[Z" : "shift_tab",
|
||
"OP" : "f1",
|
||
"OQ" : "f2",
|
||
"OR" : "f3",
|
||
"OS" : "f4",
|
||
"[15" : "f5",
|
||
"[17" : "f6",
|
||
"[18" : "f7",
|
||
"[19" : "f8",
|
||
"[20" : "f9",
|
||
"[21" : "f10",
|
||
"[23" : "f11",
|
||
"[24" : "f12"
|
||
}
|
||
|
||
try:
|
||
while not cls.stopping:
|
||
with Raw(sys.stdin):
|
||
if not select([sys.stdin], [], [], 0.1)[0]: #* Wait 100ms for input on stdin then restart loop to check for stop flag
|
||
continue
|
||
cls.idle.clear() #* Report IO block in progress to prevent Draw functions to get IO Block error
|
||
input_key += sys.stdin.read(1)
|
||
if input_key == "\033": #* If first character is a escape sequence read 5 more keys
|
||
Draw.idle.wait() #* Wait for Draw function to finish if busy
|
||
with Nonblocking(sys.stdin): #* Set non blocking to prevent read stall if less than 5 characters
|
||
input_key += sys.stdin.read(5)
|
||
if input_key == "\033": clean_key = "escape" #* Key is escape if only containing \033
|
||
elif input_key == "\\": clean_key = "\\" #* Clean up "\" to not return escaped
|
||
else:
|
||
for code in escape.keys(): #* Go trough dict of escape codes to get the cleaned key name
|
||
if input_key.lstrip("\033").startswith(code):
|
||
clean_key = escape[code]
|
||
break
|
||
else: #* If not found in escape dict and length of key is 1, assume regular character
|
||
if len(input_key) == 1:
|
||
clean_key = input_key
|
||
if testing: errlog.debug(f'Input key: {repr(input_key)} Clean key: {clean_key}') #! Remove
|
||
if clean_key:
|
||
cls.list.append(clean_key) #* Store up to 10 keys in input queue for later processing
|
||
if len(cls.list) > 10: del cls.list[0]
|
||
clean_key = ""
|
||
cls.new.set() #* Set threading event to interrupt main thread sleep
|
||
input_key = ""
|
||
cls.idle.set() #* Report IO blocking done
|
||
|
||
except Exception as e:
|
||
errlog.exception(f'Input reader failed with exception: {e}')
|
||
cls.idle.set()
|
||
cls.list.clear()
|
||
|
||
class Draw:
|
||
'''Holds the draw buffer and manages IO blocking queue
|
||
* .buffer([+]name[!], *args, append=False, now=False, z=100) : Add *args to buffer
|
||
* - Adding "+" prefix to name sets append to True and appends to name's current string
|
||
* - Adding "!" suffix to name sets now to True and print name's current string
|
||
* .out(clear=False) : Print all strings in buffer, clear=True clear all buffers after
|
||
* .now(*args) : Prints all arguments as a string
|
||
* .clear(*names) : Clear named buffers, all if no argument
|
||
'''
|
||
strings: Dict[str, str] = {}
|
||
z_order: Dict[str, int] = {}
|
||
last_screen: str = ""
|
||
idle = threading.Event()
|
||
idle.set()
|
||
|
||
@classmethod
|
||
def now(cls, *args):
|
||
'''Wait for input reader to be idle then print to screen'''
|
||
Key.idle.wait()
|
||
cls.idle.wait()
|
||
cls.idle.clear()
|
||
try:
|
||
print(*args, sep="", end="", flush=True)
|
||
except BlockingIOError:
|
||
pass
|
||
Key.idle.wait()
|
||
print(*args, sep="", end="", flush=True)
|
||
cls.idle.set()
|
||
|
||
@classmethod
|
||
def buffer(cls, name: str, *args: str, append: bool = False, now: bool = False, z: int = 100):
|
||
string: str = ""
|
||
if name.startswith("+"):
|
||
name = name.lstrip("+")
|
||
append = True
|
||
if name.endswith("!"):
|
||
name = name.rstrip("!")
|
||
now = True
|
||
if not name in cls.z_order or z != 100: cls.z_order[name] = z
|
||
if args: string = "".join(args)
|
||
if name not in cls.strings or not append: cls.strings[name] = ""
|
||
cls.strings[name] += string
|
||
if now: cls.now(string)
|
||
|
||
@classmethod
|
||
def out(cls, *names: str, clear = False, no_order: bool = False):
|
||
out: str = ""
|
||
if not cls.strings: return
|
||
if no_order:
|
||
cls.last_screen = "".join(cls.strings.values())
|
||
if clear: cls.strings = {}
|
||
cls.now(cls.last_screen)
|
||
elif names:
|
||
for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True):
|
||
if name in names:
|
||
out += cls.strings[name]
|
||
if clear:
|
||
del cls.strings[name]
|
||
del cls.z_order[name]
|
||
cls.now(out)
|
||
else:
|
||
for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True):
|
||
if name in cls.strings: out += cls.strings[name]
|
||
if clear: cls.strings = {}
|
||
cls.last_screen = out
|
||
cls.now(cls.last_screen)
|
||
|
||
@classmethod
|
||
def clear(cls, *names):
|
||
if names:
|
||
for name in names:
|
||
if name in cls.strings:
|
||
del cls.strings[name]
|
||
del cls.z_order[name]
|
||
else:
|
||
cls.strings = {}
|
||
|
||
class Color:
|
||
'''Holds representations for a 24-bit color value
|
||
__init__(color, depth="fg", default=False)
|
||
-- color accepts 6 digit hexadecimal: string "#RRGGBB", 2 digit hexadecimal: string "#FF" or decimal RGB "255 255 255" as a string.
|
||
-- depth accepts "fg" or "bg"
|
||
__call__(*args) converts arguments to a string and apply color
|
||
__str__ returns escape sequence to set color
|
||
__iter__ returns iteration over red, green and blue in integer values of 0-255.
|
||
* Values: .hexa: str | .dec: Tuple[int, int, int] | .red: int | .green: int | .blue: int | .depth: str | .escape: str
|
||
'''
|
||
hexa: str; dec: Tuple[int, int, int]; red: int; green: int; blue: int; depth: str; escape: str; default: bool
|
||
|
||
def __init__(self, color: str, depth: str = "fg", default: bool = False):
|
||
self.depth = depth
|
||
self.default = default
|
||
try:
|
||
if not color:
|
||
self.dec = (-1, -1, -1)
|
||
self.hexa = ""
|
||
self.red = self.green = self.blue = -1
|
||
self.escape = "\033[49m" if depth == "bg" and default else ""
|
||
return
|
||
|
||
elif color.startswith("#"):
|
||
self.hexa = color
|
||
if len(self.hexa) == 3:
|
||
self.hexa += self.hexa[1:3] + self.hexa[1:3]
|
||
c = int(self.hexa[1:3], base=16)
|
||
self.dec = (c, c, c)
|
||
elif len(self.hexa) == 7:
|
||
self.dec = (int(self.hexa[1:3], base=16), int(self.hexa[3:5], base=16), int(self.hexa[5:7], base=16))
|
||
else:
|
||
raise ValueError(f'Incorrectly formatted hexadeciaml rgb string: {self.hexa}')
|
||
|
||
else:
|
||
c_t = tuple(map(int, color.split(" ")))
|
||
if len(c_t) == 3:
|
||
self.dec = c_t #type: ignore
|
||
else:
|
||
raise ValueError(f'RGB dec should be "0-255 0-255 0-255"')
|
||
|
||
ct = self.dec[0] + self.dec[1] + self.dec[2]
|
||
if ct > 255*3 or ct < 0:
|
||
raise ValueError(f'RGB values out of range: {color}')
|
||
except Exception as e:
|
||
errlog.exception(str(e))
|
||
self.escape = ""
|
||
return
|
||
|
||
if self.dec and not self.hexa: self.hexa = f'{hex(self.dec[0]).lstrip("0x").zfill(2)}{hex(self.dec[1]).lstrip("0x").zfill(2)}{hex(self.dec[2]).lstrip("0x").zfill(2)}'
|
||
|
||
if self.dec and self.hexa:
|
||
self.red, self.green, self.blue = self.dec
|
||
self.escape = f'\033[{38 if self.depth == "fg" else 48};2;{";".join(str(c) for c in self.dec)}m'
|
||
|
||
def __str__(self) -> str:
|
||
return self.escape
|
||
|
||
def __repr__(self) -> str:
|
||
return repr(self.escape)
|
||
|
||
def __iter__(self) -> Iterable:
|
||
for c in self.dec: yield c
|
||
|
||
def __call__(self, *args: str) -> str:
|
||
if len(args) == 0: return ""
|
||
return f'{self.escape}{"".join(args)}{getattr(Term, self.depth)}'
|
||
|
||
@staticmethod
|
||
def escape_color(hexa: str = "", r: int = 0, g: int = 0, b: int = 0, depth: str = "fg") -> str:
|
||
"""Returns escape sequence to set color
|
||
* accepts either 6 digit hexadecimal hexa="#RRGGBB", 2 digit hexadecimal: hexa="#FF"
|
||
* or decimal RGB: r=0-255, g=0-255, b=0-255
|
||
* depth="fg" or "bg"
|
||
"""
|
||
dint: int = 38 if depth == "fg" else 48
|
||
color: str = ""
|
||
if hexa:
|
||
try:
|
||
if len(hexa) == 3:
|
||
c = int(hexa[1:], base=16)
|
||
color = f'\033[{dint};2;{c};{c};{c}m'
|
||
elif len(hexa) == 7:
|
||
color = f'\033[{dint};2;{int(hexa[1:3], base=16)};{int(hexa[3:5], base=16)};{int(hexa[5:7], base=16)}m'
|
||
except ValueError as e:
|
||
errlog.exception(f'{e}')
|
||
else:
|
||
color = f'\033[{dint};2;{r};{g};{b}m'
|
||
return color
|
||
|
||
@classmethod
|
||
def fg(cls, *args) -> str:
|
||
if len(args) > 1: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="fg")
|
||
else: return cls.escape_color(hexa=args[0], depth="fg")
|
||
|
||
@classmethod
|
||
def bg(cls, *args) -> str:
|
||
if len(args) > 1: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="bg")
|
||
else: return cls.escape_color(hexa=args[0], depth="bg")
|
||
|
||
class Theme:
|
||
'''__init__ accepts a dict containing { "color_element" : "color" }'''
|
||
|
||
themes: Dict[str, str] = {}
|
||
cached: Dict[str, Dict[str, str]] = { "Default" : DEFAULT_THEME }
|
||
current: str = ""
|
||
|
||
main_bg = main_fg = title = hi_fg = selected_bg = selected_fg = inactive_fg = proc_misc = cpu_box = mem_box = net_box = proc_box = div_line = temp_start = temp_mid = temp_end = cpu_start = cpu_mid = cpu_end = free_start = free_mid = free_end = cached_start = cached_mid = cached_end = available_start = available_mid = available_end = used_start = used_mid = used_end = download_start = download_mid = download_end = upload_start = upload_mid = upload_end = NotImplemented
|
||
|
||
gradient: Dict[str, List[str]] = {
|
||
"temp" : [],
|
||
"cpu" : [],
|
||
"free" : [],
|
||
"cached" : [],
|
||
"available" : [],
|
||
"used" : [],
|
||
"download" : [],
|
||
"upload" : []
|
||
}
|
||
def __init__(self, theme: str):
|
||
self.refresh()
|
||
self._load_theme(theme)
|
||
|
||
def __call__(self, theme: str):
|
||
for k in self.gradient.keys(): self.gradient[k] = []
|
||
self._load_theme(theme)
|
||
|
||
def _load_theme(self, theme: str):
|
||
tdict: Dict[str, str]
|
||
if theme in self.cached:
|
||
tdict = self.cached[theme]
|
||
elif theme in self.themes:
|
||
tdict = self._load_file(self.themes[theme])
|
||
self.cached[theme] = tdict
|
||
else:
|
||
errlog.warning(f'No theme named "{theme}" found!')
|
||
theme = "Default"
|
||
CONFIG.color_theme = theme
|
||
tdict = DEFAULT_THEME
|
||
self.current = theme
|
||
#if CONFIG.color_theme != theme: CONFIG.color_theme = theme
|
||
#* Get key names from DEFAULT_THEME dict to not leave any color unset if missing from theme dict
|
||
for item, value in DEFAULT_THEME.items():
|
||
default = False if item not in ["main_fg", "main_bg"] else True
|
||
depth = "fg" if item not in ["main_bg", "selected_bg"] else "bg"
|
||
if item in tdict.keys():
|
||
setattr(self, item, Color(tdict[item], depth=depth, default=default))
|
||
else:
|
||
setattr(self, item, Color(value, depth=depth, default=default))
|
||
#* Create color gradients from one, two or three colors, 101 values indexed 0-100
|
||
rgb: Dict[str, Tuple[int, int, int]]
|
||
colors: List[List[int]] = []
|
||
for name in self.gradient.keys():
|
||
rgb = { "start" : getattr(self, f'{name}_start').dec, "mid" : getattr(self, f'{name}_mid').dec, "end" : getattr(self, f'{name}_end').dec }
|
||
colors = [ list(getattr(self, f'{name}_start')) ]
|
||
if rgb["end"][0] >= 0:
|
||
r = 50 if rgb["mid"][0] >= 0 else 100
|
||
for first, second in ["start", "mid" if r == 50 else "end"], ["mid", "end"]:
|
||
for i in range(r):
|
||
colors += [[rgb[first][n] + i * (rgb[second][n] - rgb[first][n]) // r for n in range(3)]]
|
||
if r == 100:
|
||
break
|
||
self.gradient[name] += [ Color.fg(*color) for color in colors ]
|
||
|
||
else:
|
||
c = Color.fg(*rgb["start"])
|
||
for _ in range(100):
|
||
self.gradient[name] += [c]
|
||
#* Set terminal colors
|
||
Term.fg, Term.bg = self.main_fg, self.main_bg
|
||
Draw.now(self.main_fg, self.main_bg)
|
||
|
||
def refresh(self):
|
||
'''Sets themes dict with names and paths to all found themes'''
|
||
self.themes = { "Default" : "Default" }
|
||
try:
|
||
for d in (THEME_DIR, USER_THEME_DIR):
|
||
for f in os.listdir(d):
|
||
if f.endswith(".theme"):
|
||
self.themes[f'{f[:-6] if d == THEME_DIR else f[:-6] + "*"}'] = f'{d}/{f}'
|
||
except Exception as e:
|
||
errlog.exception(str(e))
|
||
|
||
@staticmethod
|
||
def _load_file(path: str) -> Dict[str, str]:
|
||
'''Load a bashtop formatted theme file and return a dict'''
|
||
new_theme: Dict[str, str] = {}
|
||
try:
|
||
with open(path) as f:
|
||
for line in f:
|
||
if not line.startswith("theme["): continue
|
||
key = line[6:line.find("]")]
|
||
s = line.find('"')
|
||
value = line[s + 1:line.find('"', s + 1)]
|
||
new_theme[key] = value
|
||
except Exception as e:
|
||
errlog.exception(str(e))
|
||
|
||
return new_theme
|
||
|
||
class Banner:
|
||
'''Holds the bpytop banner, .draw(line, [col=0], [center=False], [now=False])'''
|
||
out: List[str] = []
|
||
c_color: str = ""
|
||
length: int = 0
|
||
if not out:
|
||
for num, (color, color2, line) in enumerate(BANNER_SRC):
|
||
if len(line) > length: length = len(line)
|
||
out_var = ""
|
||
line_color = Color.fg(color)
|
||
line_color2 = Color.fg(color2)
|
||
line_dark = Color.fg(f'#{80 - num * 6}')
|
||
for n, letter in enumerate(line):
|
||
if letter == "█" and c_color != line_color:
|
||
if n > 5 and n < 25: c_color = line_color2
|
||
else: c_color = line_color
|
||
out_var += c_color
|
||
elif letter == " ":
|
||
letter = f'{Mv.r(1)}'
|
||
c_color = ""
|
||
elif letter != "█" and c_color != line_dark:
|
||
c_color = line_dark
|
||
out_var += line_dark
|
||
out_var += letter
|
||
out.append(out_var)
|
||
|
||
@classmethod
|
||
def draw(cls, line: int, col: int = 0, center: bool = False, now: bool = False):
|
||
out: str = ""
|
||
if center: col = Term.width // 2 - cls.length // 2
|
||
for n, o in enumerate(cls.out):
|
||
out += f'{Mv.to(line + n, col)}{o}'
|
||
out += f'{Term.fg}'
|
||
if now: Draw.out(out)
|
||
else: return out
|
||
|
||
class Symbol:
|
||
h_line: str = "─"
|
||
v_line: str = "│"
|
||
left_up: str = "┌"
|
||
right_up: str = "┐"
|
||
left_down: str = "└"
|
||
right_down: str = "┘"
|
||
title_left: str = "┤"
|
||
title_right: str = "├"
|
||
div_up: str = "┬"
|
||
div_down: str = "┴"
|
||
graph_up: Dict[float, str] = {
|
||
0.0 : "⠀", 0.1 : "⢀", 0.2 : "⢠", 0.3 : "⢰", 0.4 : "⢸",
|
||
1.0 : "⡀", 1.1 : "⣀", 1.2 : "⣠", 1.3 : "⣰", 1.4 : "⣸",
|
||
2.0 : "⡄", 2.1 : "⣄", 2.2 : "⣤", 2.3 : "⣴", 2.4 : "⣼",
|
||
3.0 : "⡆", 3.1 : "⣆", 3.2 : "⣦", 3.3 : "⣶", 3.4 : "⣾",
|
||
4.0 : "⡇", 4.1 : "⣇", 4.2 : "⣧", 4.3 : "⣷", 4.4 : "⣿"
|
||
}
|
||
graph_down: Dict[float, str] = {
|
||
0.0 : "⠀", 0.1 : "⠈", 0.2 : "⠘", 0.3 : "⠸", 0.4 : "⢸",
|
||
1.0 : "⠁", 1.1 : "⠉", 1.2 : "⠙", 1.3 : "⠹", 1.4 : "⢹",
|
||
2.0 : "⠃", 2.1 : "⠋", 2.2 : "⠛", 2.3 : "⠻", 2.4 : "⢻",
|
||
3.0 : "⠇", 3.1 : "⠏", 3.2 : "⠟", 3.3 : "⠿", 3.4 : "⢿",
|
||
4.0 : "⡇", 4.1 : "⡏", 4.2 : "⡟", 4.3 : "⡿", 4.4 : "⣿"
|
||
}
|
||
meter: str = "■"
|
||
ok: str = f'{Color.fg("#30ff50")}√{Color.fg("#cc")}'
|
||
fail: str = f'{Color.fg("#ff3050")}!{Color.fg("#cc")}'
|
||
|
||
class Graph:
|
||
'''Class for creating and adding to graphs
|
||
* __str__ : returns graph as a string
|
||
* add(value: int) : adds a value to graph and returns it as a string
|
||
'''
|
||
out: str
|
||
width: int
|
||
height: int
|
||
graphs: Dict[bool, List[str]]
|
||
colors: List[str]
|
||
invert: bool
|
||
max_value: int
|
||
current: bool
|
||
last: int
|
||
symbol: Dict[float, str]
|
||
|
||
def __init__(self, width: int, height: int, color: Union[List[str], Color, None], data: List[int], invert: bool = False, max_value: int = 0):
|
||
self.graphs: Dict[bool, List[str]] = {False : [], True : []}
|
||
self.current: bool = True
|
||
self.colors: List[str] = []
|
||
if isinstance(color, list):
|
||
for i in range(1, height + 1): self.colors.insert(0, color[i * 100 // height]) #* Calculate colors of graph
|
||
if invert: self.colors.reverse()
|
||
elif isinstance(color, Color):
|
||
self.colors = [ f'{color}' for _ in range(height) ]
|
||
self.width = width
|
||
self.height = height
|
||
self.invert = invert
|
||
if not data: data = [0]
|
||
if max_value:
|
||
self.max_value = max_value
|
||
data = [ v * 100 // max_value if v < max_value else 100 for v in data ] #* Convert values to percentage values of max_value with max_value as ceiling
|
||
else:
|
||
self.max_value = 0
|
||
self.symbol = Symbol.graph_down if invert else Symbol.graph_up
|
||
value_width: int = ceil(len(data) / 2)
|
||
filler: str = ""
|
||
if value_width > width: #* If the size of given data set is bigger then width of graph, shrink data set
|
||
data = data[-(width*2):]
|
||
value_width = ceil(len(data) / 2)
|
||
elif value_width < width: #* If the size of given data set is smaller then width of graph, fill graph with whitespace
|
||
filler = " " * (width - value_width)
|
||
if len(data) % 2 == 1: data.insert(0, 0)
|
||
for _ in range(height):
|
||
for b in [True, False]:
|
||
self.graphs[b].append(filler)
|
||
self._create(data, new=True)
|
||
|
||
def _create(self, data: List[int], new: bool = False):
|
||
h_high: int
|
||
h_low: int
|
||
value: Dict[str, int] = { "left" : 0, "right" : 0 }
|
||
val: int
|
||
side: str
|
||
|
||
#* Create the graph
|
||
for h in range(self.height):
|
||
h_high = round(100 * (self.height - h) / self.height) if self.height > 1 else 100
|
||
h_low = round(100 * (self.height - (h + 1)) / self.height) if self.height > 1 else 0
|
||
for v in range(len(data)):
|
||
if new: self.current = bool(v % 2) #* Switch between True and False graphs
|
||
if new and v == 0: self.last = 0
|
||
for val, side in [self.last, "left"], [data[v], "right"]: # type: ignore
|
||
if val >= h_high:
|
||
value[side] = 4
|
||
elif val <= h_low:
|
||
value[side] = 0
|
||
else:
|
||
if self.height == 1: value[side] = round(val * 4 / 100 + 0.5)
|
||
else: value[side] = round((val - h_low) * 4 / (h_high - h_low) + 0.1)
|
||
if new: self.last = data[v]
|
||
self.graphs[self.current][h] += self.symbol[float(value["left"] + value["right"] / 10)]
|
||
self.last = data[-1]
|
||
self.out = ""
|
||
|
||
for h in range(self.height):
|
||
if h > 0: self.out += f'{Mv.d(1)}{Mv.l(self.width)}'
|
||
self.out += f'{"" if not self.colors else self.colors[h]}{self.graphs[self.current][h if not self.invert else (self.height - 1) - h]}'
|
||
self.out += f'{Term.fg}'
|
||
|
||
def add(self, value: int):
|
||
self.current = not self.current
|
||
for n in range(self.height):
|
||
self.graphs[self.current][n] = self.graphs[self.current][n][1:]
|
||
if self.max_value: value = value * 100 // self.max_value if value < self.max_value else 100
|
||
self._create([value])
|
||
return self.out
|
||
|
||
def __str__(self):
|
||
return self.out
|
||
|
||
def __repr__(self):
|
||
return repr(self.out)
|
||
|
||
|
||
class Graphs:
|
||
'''Holds all graphs and lists of graphs for dynamically created graphs'''
|
||
cpu: Graph
|
||
cores: List[Graph] = []
|
||
temps: List[Graph] = []
|
||
net: Graph
|
||
detailed_cpu: Graph
|
||
detailed_mem: Graph
|
||
pid_cpu: Dict[int, Graph] = {}
|
||
|
||
class Meter:
|
||
'''Creates a percentage meter
|
||
__init__(value, width, theme, gradient_name) to create new meter
|
||
__call__(value) to set value and return meter as a string
|
||
__str__ returns last set meter as a string
|
||
'''
|
||
out: str
|
||
color_gradient: List[str]
|
||
color_inactive: Color
|
||
width: int
|
||
saved: Dict[int, str]
|
||
|
||
def __init__(self, value: int, width: int, gradient_name: str):
|
||
self.color_gradient = THEME.gradient[gradient_name]
|
||
self.color_inactive = THEME.inactive_fg
|
||
self.width = width
|
||
self.out = self._create(value)
|
||
self.saved = { value : self.out }
|
||
|
||
def __call__(self, value: int):
|
||
if value in self.saved.keys():
|
||
self.out = self.saved[value]
|
||
else:
|
||
self.out = self._create(value)
|
||
self.saved[value] = self.out
|
||
return self.out
|
||
|
||
def __str__(self):
|
||
return self.out
|
||
|
||
def __repr__(self):
|
||
return repr(self.out)
|
||
|
||
def _create(self, value: int):
|
||
if value > 100: value = 100
|
||
elif value < 0: value = 100
|
||
out: str = ""
|
||
for i in range(1, self.width + 1):
|
||
if value >= round(i * 100 / self.width):
|
||
out += f'{self.color_gradient[round(i * 100 / self.width)]}{Symbol.meter}'
|
||
else:
|
||
out += self.color_inactive(Symbol.meter * (self.width + 1 - i))
|
||
break
|
||
else:
|
||
out += f'{Term.fg}'
|
||
return out
|
||
|
||
class Meters:
|
||
cpu: Meter
|
||
mem_used: Meter
|
||
mem_available: Meter
|
||
mem_cached: Meter
|
||
mem_free: Meter
|
||
swap_used: Meter
|
||
swap_free: Meter
|
||
disks_used: Meter
|
||
disks_free: Meter
|
||
|
||
class Box:
|
||
'''Box class with all needed attributes for create_box() function'''
|
||
name: str
|
||
height_p: int
|
||
width_p: int
|
||
x: int
|
||
y: int
|
||
width: int
|
||
height: int
|
||
out: str
|
||
bg: str
|
||
_b_cpu_h: int
|
||
_b_mem_h: int
|
||
redraw_all: bool
|
||
|
||
@classmethod
|
||
def calc_sizes(cls):
|
||
'''Calculate sizes of boxes'''
|
||
for sub in cls.__subclasses__():
|
||
sub._calc_size() # type: ignore
|
||
|
||
@classmethod
|
||
def draw_bg(cls, now: bool = True):
|
||
'''Draw all boxes outlines and titles'''
|
||
Draw.buffer("bg!" if now else "bg", "".join(sub._draw_bg() for sub in cls.__subclasses__())) # type: ignore
|
||
|
||
class SubBox:
|
||
box_x: int = 0
|
||
box_y: int = 0
|
||
box_width: int = 0
|
||
box_height: int = 0
|
||
box_columns: int = 0
|
||
|
||
class CpuBox(Box, SubBox):
|
||
name = "cpu"
|
||
x = 1
|
||
y = 1
|
||
height_p = 32
|
||
width_p = 100
|
||
|
||
@classmethod
|
||
def _calc_size(cls):
|
||
cls.width = round(Term.width * cls.width_p / 100)
|
||
cls.height = round(Term.height * cls.height_p / 100)
|
||
Box._b_cpu_h = cls.height
|
||
#THREADS = 25
|
||
if THREADS > (cls.height - 6) * 3 and cls.width > 200: cls.box_width = 24 * 4; cls.box_height = ceil(THREADS / 4) + 4; cls.box_columns = 4
|
||
elif THREADS > (cls.height - 6) * 2 and cls.width > 150: cls.box_width = 24 * 3; cls.box_height = ceil(THREADS / 3) + 4; cls.box_columns = 3
|
||
elif THREADS > cls.height - 6 and cls.width > 100: cls.box_width = 24 * 2; cls.box_height = ceil(THREADS / 2) + 4; cls.box_columns = 2
|
||
else: cls.box_width = 24; cls.box_height = THREADS + 4; cls.box_columns = 1
|
||
|
||
if CONFIG.check_temp: cls.box_width += 13 * cls.box_columns
|
||
if cls.box_height > cls.height - 2: cls.box_height = cls.height - 2
|
||
cls.box_x = (cls.width - 2) - cls.box_width
|
||
cls.box_y = cls.y + ((cls.height - 2) // 2) - cls.box_height // 2 + 1
|
||
cls.redraw_all = True
|
||
|
||
|
||
@classmethod
|
||
def _draw_bg(cls) -> str:
|
||
return f'{create_box(box=cls, line_color=THEME.cpu_box)}\
|
||
{Mv.to(cls.y, cls.x + 10)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("m")}{THEME.title("enu")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}\
|
||
{create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title=CPU_NAME[:18 if CONFIG.check_temp else 9])}'
|
||
|
||
@classmethod
|
||
def _draw_fg(cls, cpu):
|
||
Draw.buffer("cpu", f'Cpu usage: {cpu.cpu_usage}\nCpu freq: {cpu.cpu_freq}\nLoad avg: {cpu.load_avg}\n')
|
||
|
||
class MemBox(Box):
|
||
name = "mem"
|
||
height_p = 40
|
||
width_p = 45
|
||
x = 1
|
||
y = 1
|
||
divider: int = 0
|
||
mem_width: int = 0
|
||
disks_width: int = 0
|
||
|
||
@classmethod
|
||
def _calc_size(cls):
|
||
cls.width = round(Term.width * cls.width_p / 100)
|
||
cls.height = round(Term.height * cls.height_p / 100) + 1
|
||
Box._b_mem_h = cls.height
|
||
cls.y = Box._b_cpu_h + 1
|
||
cls.mem_width = cls.disks_width = round((cls.width-1) / 2)
|
||
if cls.mem_width + cls.disks_width < cls.width - 2: cls.mem_width += 1
|
||
cls.divider = cls.x + cls.mem_width + 1
|
||
cls.redraw_all = True
|
||
|
||
@classmethod
|
||
def _draw_bg(cls) -> str:
|
||
return f'{create_box(box=cls, line_color=THEME.mem_box)}\
|
||
{Mv.to(cls.y, cls.divider + 2)}{THEME.mem_box(Symbol.title_left)}{Fx.b}{THEME.title("disks")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}\
|
||
{Mv.to(cls.y, cls.divider)}{THEME.mem_box(Symbol.div_up)}\
|
||
{Mv.to(cls.y + cls.height - 1, cls.divider)}{THEME.mem_box(Symbol.div_down)}{THEME.div_line}\
|
||
{"".join(f"{Mv.to(cls.y + i, cls.divider)}{Symbol.v_line}" for i in range(1, cls.height - 1))}'
|
||
|
||
class NetBox(Box, SubBox):
|
||
name = "net"
|
||
height_p = 28
|
||
width_p = 45
|
||
x = 1
|
||
y = 1
|
||
|
||
@classmethod
|
||
def _calc_size(cls):
|
||
cls.width = round(Term.width * cls.width_p / 100)
|
||
cls.height = Term.height - Box._b_cpu_h - Box._b_mem_h
|
||
cls.y = Term.height - cls.height + 1
|
||
cls.box_width = 24
|
||
cls.box_height = 9 if cls.height > 12 else cls.height - 2
|
||
cls.box_x = cls.width - cls.box_width - 2
|
||
cls.box_y = cls.y + ((cls.height - 2) // 2) - cls.box_height // 2 + 1
|
||
cls.redraw_all = True
|
||
|
||
@classmethod
|
||
def _draw_bg(cls) -> str:
|
||
return f'{create_box(box=cls, line_color=THEME.net_box)}\
|
||
{create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title="Download", title2="Upload")}'
|
||
|
||
class ProcBox(Box):
|
||
name = "proc"
|
||
height_p = 68
|
||
width_p = 55
|
||
x = 1
|
||
y = 1
|
||
detailed: bool = False
|
||
detailed_x: int = 0
|
||
detailed_y: int = 0
|
||
detailed_width: int = 0
|
||
detailed_height: int = 8
|
||
|
||
@classmethod
|
||
def _calc_size(cls):
|
||
cls.width = round(Term.width * cls.width_p / 100)
|
||
cls.height = round(Term.height * cls.height_p / 100)
|
||
cls.x = Term.width - cls.width + 1
|
||
cls.y = Box._b_cpu_h + 1
|
||
cls.detailed_x = cls.x
|
||
cls.detailed_y = cls.y
|
||
cls.detailed_height = 8
|
||
cls.detailed_width = cls.width
|
||
cls.redraw_all = True
|
||
|
||
@classmethod
|
||
def _draw_bg(cls) -> str:
|
||
return create_box(box=cls, line_color=THEME.proc_box)
|
||
|
||
class Collector:
|
||
'''Data collector master class
|
||
* .start(): Starts collector thread
|
||
* .stop(): Stops collector thread
|
||
* .collect(*collectors: Collector, draw_now: bool = True, interrupt: bool = False): queues up collectors to run'''
|
||
stopping: bool = False
|
||
started: bool = False
|
||
draw_now: bool = False
|
||
thread: threading.Thread
|
||
collect_run = threading.Event()
|
||
collect_idle = threading.Event()
|
||
collect_idle.set()
|
||
collect_done = threading.Event()
|
||
collect_queue: List = []
|
||
collect_interrupt: bool = False
|
||
|
||
@classmethod
|
||
def start(cls):
|
||
cls.stopping = False
|
||
cls.thread = threading.Thread(target=cls._runner, args=())
|
||
cls.thread.start()
|
||
cls.started = True
|
||
|
||
@classmethod
|
||
def stop(cls):
|
||
if cls.started and cls.thread.is_alive():
|
||
cls.stopping = True
|
||
try:
|
||
cls.thread.join()
|
||
except:
|
||
pass
|
||
|
||
@classmethod
|
||
def _runner(cls):
|
||
'''This is meant to run in it's own thread, collecting and drawing when collect_run is set'''
|
||
while not cls.stopping:
|
||
cls.collect_run.wait(0.1)
|
||
if not cls.collect_run.is_set():
|
||
continue
|
||
cls.collect_run.clear()
|
||
cls.collect_idle.clear()
|
||
while cls.collect_queue:
|
||
collector = cls.collect_queue.pop()
|
||
collector._collect()
|
||
collector._draw()
|
||
if cls.draw_now:
|
||
Draw.out()
|
||
cls.collect_idle.set()
|
||
cls.collect_done.set()
|
||
|
||
@classmethod
|
||
def collect(cls, *collectors: object, draw_now: bool = True, interrupt: bool = False):
|
||
'''Setup collect queue for _runner'''
|
||
#* Set interrupt flag if True to stop _runner prematurely
|
||
cls.collect_interrupt = interrupt
|
||
#* Wait for _runner to finish
|
||
cls.collect_idle.wait()
|
||
#* Reset interrupt flag
|
||
cls.collect_interrupt = False
|
||
#* Set draw_now flag if True to draw to screen instead of buffer
|
||
cls.draw_now = draw_now
|
||
|
||
#* Append any collector given as argument to _runner queue
|
||
if collectors:
|
||
cls.collect_queue = [*collectors]
|
||
|
||
#* Add all collectors to _runner queue if no collectors in argument
|
||
else:
|
||
cls.collect_queue = list(cls.__subclasses__())
|
||
|
||
#* Set run flag to start _runner
|
||
cls.collect_run.set()
|
||
|
||
|
||
class CpuCollector(Collector):
|
||
'''Collects cpu usage for cpu and cores, cpu frequency, load_avg
|
||
_collect(): Collects data
|
||
_draw(): calls CpuBox._draw_fg()'''
|
||
cpu_usage: List[List[float]] = [[]]
|
||
for i in range(THREADS):
|
||
cpu_usage.append([])
|
||
cpu_freq: int = 0
|
||
load_avg: List[float] = []
|
||
|
||
@classmethod
|
||
def _collect(cls):
|
||
cls.cpu_usage[0].append(round(psutil.cpu_percent(percpu=False)))
|
||
|
||
for n, thread in enumerate(psutil.cpu_percent(percpu=True), start=1):
|
||
cls.cpu_usage[n].append(round(thread))
|
||
|
||
if len(cls.cpu_usage[0]) > Term.width:
|
||
try:
|
||
for n in range(THREADS + 1):
|
||
del cls.cpu_usage[n][0]
|
||
except IndexError:
|
||
pass
|
||
|
||
cls.cpu_freq = round(psutil.cpu_freq().current)
|
||
cls.load_avg = [round(lavg, 2) for lavg in os.getloadavg()]
|
||
|
||
@classmethod
|
||
def _draw(cls):
|
||
CpuBox._draw_fg(cls)
|
||
|
||
#class ProcCollector(Collector): #! add interrupt on _collect and _draw
|
||
|
||
@timerd
|
||
def testing_collectors():
|
||
Collector.collect()
|
||
Collector.collect_done.wait()
|
||
#sleep(2)
|
||
#Collector.collect(CpuCollector)
|
||
#Draw.now(f'Cpu usage: {CpuCollector.cpu_usage}\nCpu freq: {CpuCollector.cpu_freq}\nLoad avg: {CpuCollector.load_avg}\n')
|
||
|
||
|
||
class Menu:
|
||
'''Holds the main menu and all submenus
|
||
* uncolor(string: str) : removes all color and returns string with THEME.inactive_fg color
|
||
'''
|
||
color_re = re.compile(r"\033\[\d+;\d?;?\d*;?\d*;?\d*m{1}")
|
||
|
||
@classmethod
|
||
def uncolor(cls, string: str) -> str:
|
||
return f'{THEME.inactive_fg}{cls.color_re.sub("", string)}{Term.fg}'
|
||
|
||
|
||
|
||
#? Functions ------------------------------------------------------------------------------------->
|
||
|
||
def get_cpu_name() -> str:
|
||
'''Fetch a suitable CPU identifier from the CPU model name string'''
|
||
name: str = ""
|
||
nlist: List = []
|
||
command: str = ""
|
||
cmd_out: str = ""
|
||
rem_line: str = ""
|
||
if SYSTEM == "Linux":
|
||
command = "cat /proc/cpuinfo"
|
||
rem_line = "model name"
|
||
elif SYSTEM == "MacOS":
|
||
command ="sysctl -n machdep.cpu.brand_string"
|
||
elif SYSTEM == "BSD":
|
||
command ="sysctl hw.model"
|
||
rem_line = "hw.model"
|
||
|
||
cmd_out = subprocess.check_output("LANG=C " + command, shell=True, universal_newlines=True)
|
||
if rem_line:
|
||
for line in cmd_out.split("\n"):
|
||
if rem_line in line:
|
||
name = re.sub( ".*" + rem_line + ".*:", "", line,1).lstrip()
|
||
else:
|
||
name = cmd_out
|
||
nlist = name.split(" ")
|
||
if "Xeon" in name:
|
||
name = nlist[nlist.index("CPU")+1]
|
||
elif "Ryzen" in name:
|
||
name = " ".join(nlist[nlist.index("Ryzen"):nlist.index("Ryzen")+3])
|
||
elif "CPU" in name:
|
||
name = nlist[nlist.index("CPU")-1]
|
||
|
||
return name
|
||
|
||
def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", title2: str = "", line_color: Color = None, fill: bool = True, box = None) -> str:
|
||
'''Create a box from a box object or by given arguments'''
|
||
out: str = f'{Term.fg}{Term.bg}'
|
||
if not line_color: line_color = THEME.div_line
|
||
|
||
#* Get values from box class if given
|
||
if box:
|
||
x = box.x
|
||
y = box.y
|
||
width = box.width
|
||
height =box.height
|
||
title = box.name
|
||
vlines: Tuple[int, int] = (x, x + width - 1)
|
||
hlines: Tuple[int, int] = (y, y + height - 1)
|
||
|
||
#* Fill box if enabled
|
||
if fill:
|
||
for i in range(y + 1, y + height - 1):
|
||
out += f'{Mv.to(i, x)}{" " * (width - 1)}'
|
||
|
||
out += f'{line_color}'
|
||
|
||
#* Draw all horizontal lines
|
||
for hpos in hlines:
|
||
out += f'{Mv.to(hpos, x)}{Symbol.h_line * (width - 1)}'
|
||
|
||
#* Draw all vertical lines
|
||
for vpos in vlines:
|
||
for hpos in range(y, y + height - 1):
|
||
out += f'{Mv.to(hpos, vpos)}{Symbol.v_line}'
|
||
|
||
#* Draw corners
|
||
out += f'{Mv.to(y, x)}{Symbol.left_up}\
|
||
{Mv.to(y, x + width - 1)}{Symbol.right_up}\
|
||
{Mv.to(y + height - 1, x)}{Symbol.left_down}\
|
||
{Mv.to(y + height - 1, x + width - 1)}{Symbol.right_down}'
|
||
|
||
#* Draw titles if enabled
|
||
if title:
|
||
out += f'{Mv.to(y, x + 2)}{Symbol.title_left}{THEME.title}{Fx.b}{title}{Fx.ub}{line_color}{Symbol.title_right}'
|
||
if title2:
|
||
out += f'{Mv.to(y + height - 1, x + 2)}{Symbol.title_left}{THEME.title}{Fx.b}{title2}{Fx.ub}{line_color}{Symbol.title_right}'
|
||
|
||
return f'{out}{Term.fg}{Mv.to(y + 1, x + 1)}'
|
||
|
||
def now_sleeping(signum, frame):
|
||
"""Reset terminal settings and stop background input read before putting to sleep"""
|
||
Key.stop()
|
||
Collector.stop()
|
||
Draw.now(Term.clear, Term.normal_screen, Term.show_cursor)
|
||
Term.echo(True)
|
||
os.kill(os.getpid(), signal.SIGSTOP)
|
||
|
||
def now_awake(signum, frame):
|
||
"""Set terminal settings and restart background input read"""
|
||
Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor)
|
||
Term.echo(False)
|
||
Key.start()
|
||
Collector.start()
|
||
|
||
def quit_sigint(signum, frame):
|
||
"""SIGINT redirection to clean_quit()"""
|
||
clean_quit()
|
||
|
||
def clean_quit(errcode: int = 0, errmsg: str = ""):
|
||
"""Stop background input read, save current config and reset terminal settings before quitting"""
|
||
Key.stop()
|
||
Collector.stop()
|
||
if not errcode: CONFIG.save_config()
|
||
if not testing: Draw.now(Term.clear, Term.normal_screen, Term.show_cursor) #! Enable
|
||
else: Draw.now(Term.show_cursor) #! Remove
|
||
Term.echo(True)
|
||
if errcode == 0:
|
||
errlog.info(f'Exiting. Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n')
|
||
else:
|
||
errlog.warning(f'Exiting with errorcode ({errcode}). Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n')
|
||
if errmsg: print(errmsg)
|
||
raise SystemExit(errcode)
|
||
|
||
def floating_humanizer(value: Union[float, int], bit: bool = False, per_second: bool = False, start: int = 0, short: bool = False) -> str:
|
||
'''Scales up in steps of 1024 to highest possible unit and returns string with unit suffixed
|
||
* bit=True or defaults to bytes
|
||
* start=int to set 1024 multiplier starting unit
|
||
* short=True always returns 0 decimals and shortens unit to 1 character
|
||
'''
|
||
out: str = ""
|
||
unit: Tuple[str, ...] = UNITS["bit"] if bit else UNITS["byte"]
|
||
selector: int = start if start else 0
|
||
mult: int = 8 if bit else 1
|
||
if value <= 0: value = 0
|
||
|
||
if isinstance(value, float): value = round(value * 100 * mult)
|
||
elif value > 0: value *= 100 * mult
|
||
|
||
while len(f'{value}') > 5 and value >= 102400:
|
||
value >>= 10
|
||
if value < 100:
|
||
out = f'{value}'
|
||
break
|
||
selector += 1
|
||
else:
|
||
if len(f'{value}') < 5 and len(f'{value}') >= 2 and selector > 0:
|
||
decimals = 5 - len(f'{value}')
|
||
out = f'{value}'[:-2] + "." + f'{value}'[-decimals:]
|
||
elif len(f'{value}') >= 2:
|
||
out = f'{value}'[:-2]
|
||
|
||
|
||
if short: out = out.split(".")[0]
|
||
out += f'{"" if short else " "}{unit[selector][0] if short else unit[selector]}'
|
||
if per_second: out += "/s" if bit else "ps"
|
||
|
||
return out
|
||
|
||
|
||
#? Main function --------------------------------------------------------------------------------->
|
||
|
||
def main():
|
||
clean_quit()
|
||
|
||
|
||
#? Pre main -------------------------------------------------------------------------------------->
|
||
|
||
|
||
CPU_NAME: str = get_cpu_name()
|
||
|
||
testing = False #! Remove
|
||
|
||
#! For testing ------------------------------------------------------------------------------->
|
||
|
||
|
||
|
||
def waitone(t: float = 0.0):
|
||
if t > 0.0: Key.new.wait(t)
|
||
else: Key.new.wait()
|
||
Key.new.clear()
|
||
Draw.clear()
|
||
Draw.now(Term.clear)
|
||
|
||
@timerd
|
||
def testing_gradients():
|
||
# for theme in THEME.themes.keys():
|
||
# THEME(theme)
|
||
# timer = time()
|
||
|
||
for key in THEME.gradient.keys():
|
||
Draw.now(f'{Term.fg}{key}:\n{"█".join(THEME.gradient[key])}\n')
|
||
Draw.now(f'{Term.fg}')
|
||
# Draw.now(f'Theme creation of {CONFIG.color_theme} took {time() - timer:.5f} seconds\n\n')
|
||
Draw.now("\n")
|
||
# THEME("Default")
|
||
|
||
@timerd
|
||
def testing_humanizer():
|
||
for i in range(1, 101, 3):
|
||
for n in range(1, 6):
|
||
Draw.now(floating_humanizer(i * (1050 * n) * (n << 10 ), bit=False, per_second=False, short=False), " ")
|
||
Draw.now("\n")
|
||
for i in range(1, 30):
|
||
for n in range(1, 8):
|
||
Draw.now(floating_humanizer((995 + i) << (n * 10), bit=False), " ")
|
||
Draw.now("\n")
|
||
|
||
|
||
@timerd
|
||
def testing_colors():
|
||
for item, _ in DEFAULT_THEME.items():
|
||
Draw.buffer("+testing", Fx.b, getattr(THEME, item)(f'{item:<20}'), Fx.ub, f'{"hex=" + getattr(THEME, item).hexa:<20} dec={getattr(THEME, item).dec}\n')
|
||
|
||
Draw.out()
|
||
|
||
@timerd
|
||
def testing_boxes():
|
||
#Box.calc_sizes()
|
||
Box.draw_bg()
|
||
Draw.now(Mv.to(Term.height - 3, 1))
|
||
#Draw.now(create_box(20, 20, 15, 10, "Hej"))
|
||
#waitone()
|
||
#Draw.now(Term.normal_screen, Term.show_cursor)
|
||
|
||
@timerd
|
||
def testing_banner():
|
||
Draw.buffer("banner", Banner.draw(18, center=True))
|
||
Draw.out()
|
||
|
||
Draw.now(Mv.to(35, 1))
|
||
|
||
@timerd
|
||
def testing_meter():
|
||
Draw.clear("meters")
|
||
for _ in range(10):
|
||
Draw.buffer("+meters", "1234567890")
|
||
Draw.buffer("+meters", "\n")
|
||
|
||
stamp = time()
|
||
test_meter = Meter(0, Term.width, "cpu")
|
||
|
||
for i in range(0,101, 2):
|
||
Draw.buffer("+meters", test_meter(i), "\n")
|
||
|
||
Draw.buffer("+meter", f'{time() - stamp}')
|
||
Draw.out()
|
||
|
||
def testing_keyinput():
|
||
line: str = ""
|
||
this_key: str = ""
|
||
count: int = 0
|
||
while True:
|
||
count += 1
|
||
Draw.now(f'{Mv.to(1,1)}{Fx.b}{THEME.temp_start("Count:")} {count} {THEME.cached_mid("Time:")} {strftime("%H:%M:%S", localtime())}',
|
||
f'{Color.fg("#ff")} Width: {Term.width} Height: {Term.height} Resized: {Term.resized}')
|
||
if Key.input_wait(1):
|
||
while Key.list:
|
||
#Key.new.clear()
|
||
this_key = Key.list.pop()
|
||
Draw.now(f'{Mv.to(2,1)}{Color.fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key):14}{" "}')
|
||
if this_key == "backspace":
|
||
line = line[:-1]
|
||
elif this_key == "escape":
|
||
line = ""
|
||
elif this_key == "Q":
|
||
clean_quit()
|
||
elif this_key == "R":
|
||
raise Exception("Test ERROR")
|
||
elif len(this_key) == 1:
|
||
line += this_key
|
||
Draw.now(f'{Color.fg("#90ff50")}{Fx.b}Command= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}\033[0K\n')
|
||
if this_key == "enter":
|
||
try:
|
||
exec(line)
|
||
except:
|
||
pass
|
||
Draw.clear()
|
||
|
||
def testing_graphs():
|
||
my_data = [x for x in range(0, 101)]
|
||
my_data += [x for x in range(100, -1, -1)]
|
||
|
||
#my_data100 = [100 for _ in range(200)]
|
||
|
||
my_data100 = [randint(0, 100) for _ in range(Term.width * 2)]
|
||
|
||
my_data2 = my_data[-90:]
|
||
my_data3 = my_data[:86]
|
||
|
||
my_colors = []
|
||
for i in range(51):
|
||
for _ in range(2): my_colors.append(Color.fg(i, i, i))
|
||
#my_colors.reverse()
|
||
|
||
|
||
my_graph = Graph(Term.width, Term.height // 2, my_colors, my_data100, invert=True)
|
||
my_graph2 = Graph(Term.width, Term.height // 2, my_colors, my_data100, invert=False)
|
||
# my_graph3 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data2)
|
||
# my_graph4 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data3)
|
||
# my_graph5 = Graph(100, Term.height // 3, THEME.inactive_fg, my_data)
|
||
|
||
#pause = re.compile(r"\033\[\d+;\d?;?\d*;?\d*;?\d*m{1}")
|
||
#repl = "\033[0;37m"
|
||
|
||
banner = Banner.draw(Term.height // 3 - 2, center=True)
|
||
|
||
Draw.now(f'{Fx.ub}{Mv.to(0, 0)}{my_graph}\
|
||
{Mv.to(Term.height // 2, 0)}{my_graph2}\
|
||
{banner}')
|
||
|
||
# {Mv.to(Term.height - (Term.height // 3), Term.width // 2 - 50)}{my_graph5}\
|
||
# {Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 - 50)}{my_graph3}\
|
||
# {Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 + 7)}{my_graph4}\
|
||
|
||
#t = 1
|
||
x = 0
|
||
for _ in range(200):
|
||
sleep(0.05)
|
||
x = randint(0, 100)
|
||
# x += 1 if t == 1 else -1
|
||
# if x == 100: t = 0
|
||
# if x == 0: t = 1
|
||
Draw.now(f'{Fx.ub}{Mv.to(0, 0)}{my_graph.add(x)}\
|
||
{Mv.to(Term.height // 2, 0)}{my_graph2.add(x)}\
|
||
{banner}')
|
||
|
||
# Draw.now(f'{Mv.to(Term.height - (Term.height // 3), Term.width // 2 - 50)}{my_graph5.add(x)}')
|
||
# Draw.now(f'{Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 - 50)}{my_graph3.add(x)}')
|
||
# Draw.now(f'{Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 + 7)}{my_graph4.add(x)}')
|
||
|
||
Draw.now(Mv.to(Term.height -4, 0))
|
||
|
||
#! Remove ------------------------------------------------------------------------------------<
|
||
|
||
if __name__ == "__main__":
|
||
|
||
#? Init -------------------------------------------------------------------------------------->
|
||
Timer.start("Init")
|
||
|
||
class Init:
|
||
running: bool = True
|
||
initbg_colors: List[str] = []
|
||
initbg_data: List[int]
|
||
initbg_up: Graph
|
||
initbg_down: Graph
|
||
|
||
@staticmethod
|
||
def fail(err):
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Symbol.fail}')
|
||
errlog.exception(f'{err}')
|
||
sleep(2)
|
||
clean_quit(1, errmsg=f'Error during init! See {CONFIG_DIR}/error.log for more information.')
|
||
|
||
@classmethod
|
||
def success(cls, start: bool = False):
|
||
if start:
|
||
Draw.buffer("initbg", z=10)
|
||
Draw.buffer("init", z=1)
|
||
for i in range(51):
|
||
for _ in range(2): cls.initbg_colors.append(Color.fg(i, i, i))
|
||
Draw.buffer("banner", f'{Banner.draw(Term.height // 2 - 10, center=True)}{Color.fg("#50")}\n', z=2)
|
||
for _i in range(10):
|
||
perc = f'{str((_i + 1) * 10) + "%":>5}'
|
||
Draw.buffer("+banner", f'{Mv.to(Term.height // 2 - 3 + _i, Term.width // 2 - 28)}{Fx.trans(perc)}{Symbol.v_line}')
|
||
Draw.out("banner")
|
||
Draw.buffer("+init!", f'{Color.fg("#cc")}{Fx.b}{Mv.to(Term.height // 2 - 3, Term.width // 2 - 21)}{Mv.save()}')
|
||
|
||
if start or Term.resized:
|
||
cls.initbg_data = [randint(0, 100) for _ in range(Term.width * 2)]
|
||
cls.initbg_up = Graph(Term.width, Term.height // 2, cls.initbg_colors, cls.initbg_data, invert=True)
|
||
cls.initbg_down = Graph(Term.width, Term.height // 2, cls.initbg_colors, cls.initbg_data, invert=False)
|
||
|
||
if start: return
|
||
|
||
if not testing:
|
||
cls.draw_bg(10)
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Symbol.ok}\n{Mv.r(Term.width // 2 - 22)}{Mv.save()}')
|
||
|
||
@classmethod
|
||
def draw_bg(cls, times: int = 10):
|
||
for _ in range(times):
|
||
sleep(0.05)
|
||
x = randint(0, 100)
|
||
Draw.buffer("initbg", f'{Fx.ub}{Mv.to(0, 0)}{cls.initbg_up.add(x)}{Mv.to(Term.height // 2, 0)}{cls.initbg_down.add(x)}')
|
||
Draw.out("initbg", "banner", "init")
|
||
|
||
@classmethod
|
||
def done(cls):
|
||
cls.draw_bg(20)
|
||
Draw.clear("initbg", "banner", "init")
|
||
cls.running = False
|
||
del cls.initbg_up, cls.initbg_down, cls.initbg_data, cls.initbg_colors
|
||
|
||
|
||
#? Switch to alternate screen, clear screen, hide cursor and disable input echo
|
||
if not testing: Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor) #! Enable
|
||
else: Draw.now(Term.clear, Term.hide_cursor) #! Disable
|
||
Term.echo(False)
|
||
|
||
#? Draw banner and init status
|
||
if not testing: Init.success(start=True)
|
||
|
||
#? Load theme
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Fx.trans("Loading theme and creating colors... ")}{Mv.save()}')
|
||
try:
|
||
THEME: Theme = Theme(CONFIG.color_theme)
|
||
except Exception as e:
|
||
Init.fail(e)
|
||
else:
|
||
Init.success()
|
||
|
||
#? Setup boxes
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Fx.trans("Doing some maths and drawing... ")}{Mv.save()}')
|
||
try:
|
||
Box.calc_sizes()
|
||
Box.draw_bg(now=False)
|
||
except Exception as e:
|
||
Init.fail(e)
|
||
else:
|
||
Init.success()
|
||
|
||
#? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Fx.trans("Setting up signal handlers... ")}{Mv.save()}')
|
||
try:
|
||
signal.signal(signal.SIGTSTP, now_sleeping) #* Ctrl-Z
|
||
signal.signal(signal.SIGCONT, now_awake) #* Resume
|
||
signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C
|
||
signal.signal(signal.SIGWINCH, Term.refresh) #* Terminal resized
|
||
except Exception as e:
|
||
Init.fail(e)
|
||
else:
|
||
Init.success()
|
||
|
||
#? Start a separate thread for reading keyboard input
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Fx.trans("Starting input reader thread... ")}{Mv.save()}')
|
||
try:
|
||
Key.start()
|
||
except Exception as e:
|
||
Init.fail(e)
|
||
else:
|
||
Init.success()
|
||
|
||
#? Start a separate thread for data collection and drawing
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Fx.trans("Starting data collection and drawer thread... ")}{Mv.save()}')
|
||
try:
|
||
Collector.start()
|
||
except Exception as e:
|
||
Init.fail(e)
|
||
else:
|
||
Init.success()
|
||
|
||
#? Collect data and draw to buffer
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Fx.trans("Collecting data and drawing... ")}{Mv.save()}')
|
||
try:
|
||
#Collector.collect(draw_now=False)
|
||
pass
|
||
except Exception as e:
|
||
Init.fail(e)
|
||
else:
|
||
Init.success()
|
||
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Fx.trans("Collecting nuclear launch codes... ")}{Mv.save()}')
|
||
Init.success()
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Fx.trans("Launching missiles... ")}{Mv.save()}')
|
||
Init.success()
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Fx.trans("Alien invasion... ")}{Mv.save()}')
|
||
Init.success()
|
||
|
||
#? Draw to screen
|
||
Draw.buffer("+init!", f'{Mv.restore()}{Fx.trans("Finishing up... ")}{Mv.save()}')
|
||
try:
|
||
#Collector.collect_done.wait()
|
||
pass
|
||
except Exception as e:
|
||
Init.fail(e)
|
||
else:
|
||
Init.success()
|
||
|
||
|
||
if not testing: Init.done() #! Remove if
|
||
if not testing: Draw.out(clear=True) #! Remove if
|
||
else: Draw.clear(); Draw.now(Term.clear) #! Remove
|
||
Timer.stop("Init")
|
||
|
||
|
||
#! For testing ------------------------------------------------------------------------------->
|
||
if testing:
|
||
try:
|
||
#testing_graphs()
|
||
#testing_collectors()
|
||
#testing_humanizer()
|
||
# waitone(1)
|
||
#testing_keyinput()
|
||
testing_banner()
|
||
# waitone(1)
|
||
#testing_colors()
|
||
# waitone(1)
|
||
#testing_gradients()
|
||
# waitone(1)
|
||
#testing_boxes()
|
||
# waitone(1)
|
||
#testing_meter()
|
||
# Draw.idle.clear()
|
||
#Draw.now(f'{Mv.to(Term.height - 5, 1)}Any key to exit!')
|
||
#waitone()
|
||
# Draw.idle.set()
|
||
#sleep(2)
|
||
except Exception as e:
|
||
errlog.exception(f'{e}')
|
||
clean_quit(1)
|
||
|
||
clean_quit()
|
||
#! Remove ------------------------------------------------------------------------------------<
|
||
|
||
#? Start main loop
|
||
while not False:
|
||
try:
|
||
main()
|
||
except Exception as e:
|
||
errlog.exception(f'{e}')
|
||
clean_quit(1)
|
||
else:
|
||
#? Quit cleanly even if false starts being true...
|
||
clean_quit()
|