2020-07-09 00:56:03 +00:00
#!/usr/bin/env python3
# pylint: disable=not-callable, no-member
# indent = tab
# tab-size = 4
# Copyright 2020 Aristocratos (jakob@qvantnet.com)
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os , sys , threading , signal , re , subprocess , logging , logging . handlers
from time import time , sleep , strftime , localtime
from datetime import timedelta
from select import select
from distutils . util import strtobool
from string import Template
from math import ceil
2020-07-11 22:54:52 +00:00
from random import randint
2020-07-14 00:20:40 +00:00
from shutil import which
2020-07-09 00:56:03 +00:00
from typing import List , Set , Dict , Tuple , Optional , Union , Any , Callable , ContextManager , Iterable
errors : List [ str ] = [ ]
try : import fcntl , termios , tty
except Exception as e : errors . append ( f ' { e } ' )
try : import psutil # type: ignore
except Exception as e : errors . append ( f ' { e } ' )
SELF_START = time ( )
SYSTEM : str
if " linux " in sys . platform : SYSTEM = " Linux "
elif " bsd " in sys . platform : SYSTEM = " BSD "
elif " darwin " in sys . platform : SYSTEM = " MacOS "
else : SYSTEM = " Other "
if errors :
print ( " ERROR! " )
for error in errors :
print ( error )
if SYSTEM == " Other " :
print ( " \n Unsupported platform! \n " )
else :
print ( " \n Install required modules! \n " )
quit ( 1 )
#? Variables ------------------------------------------------------------------------------------->
2020-07-11 22:54:52 +00:00
BANNER_SRC : List [ Tuple [ str , str , str ] ] = [
( " #ffa50a " , " #0fd7ff " , " ██████╗ ██████╗ ██╗ ██╗████████╗ ██████╗ ██████╗ " ) ,
( " #f09800 " , " #00bfe6 " , " ██╔══██╗██╔══██╗╚██╗ ██╔╝╚══██╔══╝██╔═══██╗██╔══██╗ " ) ,
( " #db8b00 " , " #00a6c7 " , " ██████╔╝██████╔╝ ╚████╔╝ ██║ ██║ ██║██████╔╝ " ) ,
( " #c27b00 " , " #008ca8 " , " ██╔══██╗██╔═══╝ ╚██╔╝ ██║ ██║ ██║██╔═══╝ " ) ,
( " #a86b00 " , " #006e85 " , " ██████╔╝██║ ██║ ██║ ╚██████╔╝██║ " ) ,
( " #000000 " , " #000000 " , " ╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ " ) ,
]
#RED
# "#E62525"
# "#CD2121"
# "#B31D1D"
# "#9A1919"
# "#801414"
# "#000000"
#GREEN
# "#75e12d"
# "#5bbb1b"
# "#499914"
# "#3b7811"
# "#30620e"
# "#000000"
#YELLOW
# "#fadd00"
# "#dbc200"
# "#b8a200"
# "#998700"
# "#807100"
# "#000000"
#BLUE
# "#0084ff"
# "#016cd0"
# "#0157a7"
# "#014484"
# "#003261"
# "#000000"
VERSION : str = " 0.0.3 "
2020-07-09 00:56:03 +00:00
#*?This is the template used to create the config file
DEFAULT_CONF : Template = Template ( f ' #? Config file for bpytop v. { VERSION } ' + '''
#* Color theme, looks for a .theme file in "~/.config/bpytop/themes" and "~/.config/bpytop/user_themes", "Default" for builtin default theme
color_theme = " $color_theme "
2020-07-11 22:54:52 +00:00
#* Update time in milliseconds, increases automatically if set below internal loops processing time, recommended 2000 ms or above for better sample times for graphs.
2020-07-09 00:56:03 +00:00
update_ms = $ update_ms
2020-07-11 22:54:52 +00:00
#* Processes sorting, "pid" "program" "arguments" "threads" "user" "memory" "cpu lazy" "cpu responsive",
#* "cpu lazy" updates sorting over time, "cpu responsive" updates sorting directly.
2020-07-09 00:56:03 +00:00
proc_sorting = " $proc_sorting "
2020-07-11 22:54:52 +00:00
#* Reverse sorting order, True or False.
2020-07-09 00:56:03 +00:00
proc_reversed = $ proc_reversed
#* Show processes as a tree
proc_tree = $ proc_tree
2020-07-11 22:54:52 +00:00
#* Check cpu temperature, needs "vcgencmd" on Raspberry Pi and "osx-cpu-temp" on MacOS X.
2020-07-09 00:56:03 +00:00
check_temp = $ check_temp
2020-07-11 22:54:52 +00:00
#* Draw a clock at top of screen, formatting according to strftime, empty string to disable.
2020-07-09 00:56:03 +00:00
draw_clock = " $draw_clock "
2020-07-11 22:54:52 +00:00
#* Update main ui in background when menus are showing, set this to false if the menus is flickering too much for comfort.
2020-07-09 00:56:03 +00:00
background_update = $ background_update
2020-07-11 22:54:52 +00:00
#* Custom cpu model name, empty string to disable.
2020-07-09 00:56:03 +00:00
custom_cpu_name = " $custom_cpu_name "
2020-07-11 22:54:52 +00:00
#* Show color gradient in process list, True or False.
2020-07-09 00:56:03 +00:00
proc_gradient = $ proc_gradient
2020-07-11 22:54:52 +00:00
#* If process cpu usage should be of the core it's running on or usage of the total available cpu power.
2020-07-09 00:56:03 +00:00
proc_per_core = $ proc_per_core
2020-07-11 22:54:52 +00:00
#* Optional filter for shown disks, should be names of mountpoints, "root" replaces "/", separate multiple values with space.
2020-07-09 00:56:03 +00:00
disks_filter = " $disks_filter "
2020-07-11 22:54:52 +00:00
#* Enable check for new version from github.com/aristocratos/bpytop at start.
2020-07-09 00:56:03 +00:00
update_check = $ update_check
2020-07-11 22:54:52 +00:00
#* Set loglevel for "~/.config/bpytop/error.log" levels are: "CRITICAL" "ERROR" "WARNING" "INFO" "DEBUG".
#* The level set includes all lower levels, i.e. "DEBUG" will show all logging info.
log_level = " $log_level "
2020-07-09 00:56:03 +00:00
''' )
CONFIG_DIR : str = f ' { os . path . expanduser ( " ~ " ) } /.config/bpytop '
if not os . path . isdir ( CONFIG_DIR ) :
try :
os . makedirs ( CONFIG_DIR )
os . mkdir ( f ' { CONFIG_DIR } /themes ' )
os . mkdir ( f ' { CONFIG_DIR } /user_themes ' )
except PermissionError :
print ( f ' ERROR! \n No permission to write to " { CONFIG_DIR } " directory! ' )
quit ( 1 )
CONFIG_FILE : str = f ' { CONFIG_DIR } /bpytop.conf '
THEME_DIR : str = f ' { CONFIG_DIR } /themes '
USER_THEME_DIR : str = f ' { CONFIG_DIR } /user_themes '
CORES : int = psutil . cpu_count ( logical = False ) or 1
THREADS : int = psutil . cpu_count ( logical = True ) or 1
DEFAULT_THEME : Dict [ str , str ] = {
" main_bg " : " " ,
" main_fg " : " #cc " ,
" title " : " #ee " ,
" hi_fg " : " #90 " ,
" selected_bg " : " #7e2626 " ,
" selected_fg " : " #ee " ,
" inactive_fg " : " #40 " ,
" proc_misc " : " #0de756 " ,
" cpu_box " : " #3d7b46 " ,
" mem_box " : " #8a882e " ,
" net_box " : " #423ba5 " ,
" proc_box " : " #923535 " ,
" div_line " : " #30 " ,
" temp_start " : " #4897d4 " ,
" temp_mid " : " #5474e8 " ,
" temp_end " : " #ff40b6 " ,
" cpu_start " : " #50f095 " ,
" cpu_mid " : " #f2e266 " ,
" cpu_end " : " #fa1e1e " ,
" free_start " : " #223014 " ,
" free_mid " : " #b5e685 " ,
" free_end " : " #dcff85 " ,
" cached_start " : " #0b1a29 " ,
" cached_mid " : " #74e6fc " ,
" cached_end " : " #26c5ff " ,
" available_start " : " #292107 " ,
" available_mid " : " #ffd77a " ,
" available_end " : " #ffb814 " ,
" used_start " : " #3b1f1c " ,
" used_mid " : " #d9626d " ,
" used_end " : " #ff4769 " ,
" download_start " : " #231a63 " ,
" download_mid " : " #4f43a3 " ,
" download_end " : " #b0a9de " ,
" upload_start " : " #510554 " ,
" upload_mid " : " #7d4180 " ,
" upload_end " : " #dcafde "
}
#? Units for floating_humanizer function
UNITS : Dict [ str , Tuple [ str , . . . ] ] = {
" bit " : ( " bit " , " Kib " , " Mib " , " Gib " , " Tib " , " Pib " , " Eib " , " Zib " , " Yib " , " Bib " , " GEb " ) ,
" byte " : ( " Byte " , " KiB " , " MiB " , " GiB " , " TiB " , " PiB " , " EiB " , " ZiB " , " YiB " , " BiB " , " GEB " )
}
#? Setup error logger ---------------------------------------------------------------->
try :
errlog = logging . getLogger ( " ErrorLogger " )
errlog . setLevel ( logging . DEBUG )
eh = logging . handlers . RotatingFileHandler ( f ' { CONFIG_DIR } /error.log ' , maxBytes = 1048576 , backupCount = 4 )
eh . setLevel ( logging . DEBUG )
eh . setFormatter ( logging . Formatter ( " %(asctime)s | %(levelname)s : %(message)s " , datefmt = " %d / % m/ % y ( %X ) " ) )
errlog . addHandler ( eh )
except PermissionError :
print ( f ' ERROR! \n No permission to write to " { CONFIG_DIR } " directory! ' )
quit ( 1 )
#! Timers, remove ----------------------------------------------------------------------->
class Timer :
timers : Dict [ str , float ] = { }
paused : Dict [ str , float ] = { }
@classmethod
def start ( cls , name ) :
cls . timers [ name ] = time ( )
@classmethod
def pause ( cls , name ) :
if name in cls . timers :
cls . paused [ name ] = time ( ) - cls . timers [ name ]
del cls . timers [ name ]
@classmethod
def stop ( cls , name ) :
if name in cls . timers :
total : float = time ( ) - cls . timers [ name ]
del cls . timers [ name ]
if name in cls . paused :
total + = cls . paused [ name ]
del cls . paused [ name ]
errlog . debug ( f ' { name } completed in { total : .6f } seconds ' )
def timerd ( func ) :
def timed ( * args , * * kw ) :
ts = time ( )
out = func ( * args , * * kw )
te = time ( )
errlog . debug ( f ' { func . __name__ } completed in { te - ts : .6f } seconds ' )
return out
return timed
#! Timers, remove -----------------------------------------------------------------------<
#? Set up config class and load config ------------------------------------------------>
class Config :
''' Holds all config variables and functions for loading from and saving to disk '''
2020-07-11 22:54:52 +00:00
keys : List [ str ] = [ " color_theme " , " update_ms " , " proc_sorting " , " proc_reversed " , " proc_tree " , " check_temp " , " draw_clock " , " background_update " , " custom_cpu_name " , " proc_gradient " , " proc_per_core " , " disks_filter " , " update_check " , " log_level " ]
2020-07-09 00:56:03 +00:00
conf_dict : Dict [ str , Union [ str , int , bool ] ] = { }
color_theme : str = " Default "
update_ms : int = 2500
proc_sorting : str = " cpu lazy "
proc_reversed : bool = False
proc_tree : bool = False
check_temp : bool = True
draw_clock : str = " %X "
background_update : bool = True
custom_cpu_name : str = " "
proc_gradient : bool = True
proc_per_core : bool = False
disks_filter : str = " "
update_check : bool = True
2020-07-11 22:54:52 +00:00
log_level : str = " WARNING "
warnings : List [ str ] = [ ]
sorting_options : List [ str ] = [ " pid " , " program " , " arguments " , " threads " , " user " , " memory " , " cpu lazy " , " cpu responsive " ]
log_levels : List [ str ] = [ " CRITICAL " , " ERROR " , " WARNING " , " INFO " , " DEBUG " ]
2020-07-09 00:56:03 +00:00
changed : bool = False
recreate : bool = False
config_file : str = " "
_initialized : bool = False
def __init__ ( self , path : str ) :
self . config_file = path
conf : Dict [ str , Union [ str , int , bool ] ] = self . load_config ( )
if not " version " in conf . keys ( ) :
self . recreate = True
2020-07-11 22:54:52 +00:00
self . warnings . append ( f ' Config file malformatted or missing, will be recreated on exit! ' )
2020-07-09 00:56:03 +00:00
elif conf [ " version " ] != VERSION :
self . recreate = True
2020-07-11 22:54:52 +00:00
self . warnings . append ( f ' Config file version and bpytop version missmatch, will be recreated on exit! ' )
2020-07-09 00:56:03 +00:00
for key in self . keys :
if key in conf . keys ( ) and conf [ key ] != " _error_ " :
setattr ( self , key , conf [ key ] )
else :
self . recreate = True
self . conf_dict [ key ] = getattr ( self , key )
self . _initialized = True
def __setattr__ ( self , name , value ) :
if self . _initialized :
object . __setattr__ ( self , " changed " , True )
object . __setattr__ ( self , name , value )
if name not in [ " _initialized " , " recreate " , " changed " ] :
self . conf_dict [ name ] = value
def load_config ( self ) - > Dict [ str , Union [ str , int , bool ] ] :
''' Load config from file, set correct types for values and return a dict '''
new_config : Dict [ str , Union [ str , int , bool ] ] = { }
if not os . path . isfile ( self . config_file ) : return new_config
try :
with open ( self . config_file , " r " ) as f :
for line in f :
line = line . strip ( )
if line . startswith ( " #? Config " ) :
new_config [ " version " ] = line [ line . find ( " v. " ) + 3 : ]
for key in self . keys :
if line . startswith ( key ) :
line = line . lstrip ( key + " = " )
if line . startswith ( ' " ' ) :
line = line . strip ( ' " ' )
if type ( getattr ( self , key ) ) == int :
try :
new_config [ key ] = int ( line )
except ValueError :
2020-07-11 22:54:52 +00:00
self . warnings . append ( f ' Config key " { key } " should be an integer! ' )
2020-07-09 00:56:03 +00:00
if type ( getattr ( self , key ) ) == bool :
try :
new_config [ key ] = bool ( strtobool ( line ) )
except ValueError :
2020-07-11 22:54:52 +00:00
self . warnings . append ( f ' Config key " { key } " can only be True or False! ' )
2020-07-09 00:56:03 +00:00
if type ( getattr ( self , key ) ) == str :
new_config [ key ] = str ( line )
except Exception as e :
errlog . exception ( str ( e ) )
2020-07-11 22:54:52 +00:00
if " proc_sorting " in new_config and not new_config [ " proc_sorting " ] in self . sorting_options :
2020-07-09 00:56:03 +00:00
new_config [ " proc_sorting " ] = " _error_ "
2020-07-11 22:54:52 +00:00
self . warnings . append ( f ' Config key " proc_sorted " didn \' t get an acceptable value! ' )
if " log_level " in new_config and not new_config [ " log_level " ] in self . log_levels :
new_config [ " log_level " ] = " _error_ "
self . warnings . append ( f ' Config key " log_level " didn \' t get an acceptable value! ' )
2020-07-09 00:56:03 +00:00
return new_config
def save_config ( self ) :
''' Save current config to config file if difference in values or version, creates a new file if not found '''
if not self . changed and not self . recreate : return
try :
with open ( self . config_file , " w " if os . path . isfile ( self . config_file ) else " x " ) as f :
f . write ( DEFAULT_CONF . substitute ( self . conf_dict ) )
except Exception as e :
errlog . exception ( str ( e ) )
try :
CONFIG : Config = Config ( CONFIG_FILE )
2020-07-11 22:54:52 +00:00
errlog . setLevel ( getattr ( logging , CONFIG . log_level ) )
errlog . info ( f ' New instance of bpytop version { VERSION } started with pid { os . getpid ( ) } ' )
errlog . debug ( f ' Loglevel set to { CONFIG . log_level } ' )
if CONFIG . warnings :
for warning in CONFIG . warnings :
errlog . warning ( warning )
CONFIG . warnings = [ ]
2020-07-09 00:56:03 +00:00
except Exception as e :
errlog . exception ( f ' { e } ' )
quit ( 1 )
#? Classes --------------------------------------------------------------------------------------->
class Term :
""" Terminal info and commands """
width : int = os . get_terminal_size ( ) . columns #* Current terminal width in columns
height : int = os . get_terminal_size ( ) . lines #* Current terminal height in lines
resized : bool = False #* Flag indicating if terminal was recently resized
_resizing : bool = False
_w : int = 0
_h : int = 0
fg : str = " " #* Default foreground color
bg : str = " " #* Default background color
hide_cursor = " \033 [?25l " #* Hide terminal cursor
show_cursor = " \033 [?25h " #* Show terminal cursor
alt_screen = " \033 [?1049h " #* Switch to alternate screen
normal_screen = " \033 [?1049l " #* Switch to normal screen
clear = " \033 [2J \033 [0;0f " #* Clear screen and set cursor to position 0,0
@classmethod
def refresh ( cls , * args ) :
""" Update width, height and set resized flag if terminal has been resized """
if cls . _resizing == True : return
cls . _w , cls . _h = os . get_terminal_size ( )
while ( cls . _w , cls . _h ) != ( cls . width , cls . height ) :
cls . _resizing = True
cls . resized = True
cls . width , cls . height = cls . _w , cls . _h
Draw . now ( Term . clear )
Draw . now ( f ' { create_box ( cls . _w / / 2 - 25 , cls . _h / / 2 - 2 , 50 , 3 , " resizing " ) } { THEME . main_fg } { Fx . b } { Mv . r ( 12 ) } Width : { cls . _w } Height: { cls . _h } ' )
while cls . _w < 80 or cls . _h < 24 :
Draw . now ( Term . clear )
Draw . now ( f ' { create_box ( cls . _w / / 2 - 25 , cls . _h / / 2 - 2 , 50 , 4 , " warning " ) } { THEME . main_fg } { Fx . b } { Mv . r ( 12 ) } Width: { cls . _w } Height: { cls . _h } \
{ Mv . to ( cls . _h / / 2 , cls . _w / / 2 - 23 ) } Width and Height needs to be at least 80 x24 ! ' )
sleep ( 0.3 )
cls . _w , cls . _h = os . get_terminal_size ( )
sleep ( 0.3 )
cls . _w , cls . _h = os . get_terminal_size ( )
cls . _resizing = False
Box . calc_sizes ( )
2020-07-11 22:54:52 +00:00
Box . draw_bg ( now = True if not Init . running else False )
2020-07-09 00:56:03 +00:00
@staticmethod
def echo ( on : bool ) :
""" Toggle input echo """
( iflag , oflag , cflag , lflag , ispeed , ospeed , cc ) = termios . tcgetattr ( sys . stdin . fileno ( ) )
if on :
lflag | = termios . ECHO # type: ignore
else :
lflag & = ~ termios . ECHO # type: ignore
new_attr = [ iflag , oflag , cflag , lflag , ispeed , ospeed , cc ]
termios . tcsetattr ( sys . stdin . fileno ( ) , termios . TCSANOW , new_attr )
class Fx :
""" Text effects """
start = " \033 [ " #* Escape sequence start
sep = " ; " #* Escape sequence separator
end = " m " #* Escape sequence end
reset = rs = " \033 [0m " #* Reset foreground/background color and text effects
bold = b = " \033 [1m " #* Bold on
unbold = ub = " \033 [22m " #* Bold off
dark = d = " \033 [2m " #* Dark on
undark = ud = " \033 [22m " #* Dark off
italic = i = " \033 [3m " #* Italic on
unitalic = ui = " \033 [23m " #* Italic off
underline = u = " \033 [4m " #* Underline on
ununderline = uu = " \033 [24m " #* Underline off
blink = bl = " \033 [5m " #* Blink on
unblink = ubl = " \033 [25m " #* Blink off
strike = s = " \033 [9m " #* Strike / crossed-out on
unstrike = us = " \033 [29m " #* Strike / crossed-out off
@staticmethod
def trans ( string : str ) :
''' Replace white space with escape move right to not paint background in whitespace '''
return string . replace ( " " , " \033 [1C " )
class Raw ( object ) :
""" Set raw input mode for device """
def __init__ ( self , stream ) :
self . stream = stream
self . fd = self . stream . fileno ( )
def __enter__ ( self ) :
self . original_stty = termios . tcgetattr ( self . stream )
tty . setcbreak ( self . stream )
def __exit__ ( self , type , value , traceback ) :
termios . tcsetattr ( self . stream , termios . TCSANOW , self . original_stty )
class Nonblocking ( object ) :
""" Set nonblocking mode for device """
def __init__ ( self , stream ) :
self . stream = stream
self . fd = self . stream . fileno ( )
def __enter__ ( self ) :
self . orig_fl = fcntl . fcntl ( self . fd , fcntl . F_GETFL )
fcntl . fcntl ( self . fd , fcntl . F_SETFL , self . orig_fl | os . O_NONBLOCK )
def __exit__ ( self , * args ) :
fcntl . fcntl ( self . fd , fcntl . F_SETFL , self . orig_fl )
class Mv :
""" Class with collection of cursor movement functions: .t[o](line, column) | .r[ight](columns) | .l[eft](columns) | .u[p](lines) | .d[own](lines) | .save() | .restore() """
@staticmethod
def to ( line : int , col : int ) - > str :
return f ' \033 [ { line } ; { col } f ' #* Move cursor to line, column
@staticmethod
def right ( x : int ) - > str : #* Move cursor right x columns
return f ' \033 [ { x } C '
@staticmethod
def left ( x : int ) - > str : #* Move cursor left x columns
return f ' \033 [ { x } D '
@staticmethod
def up ( x : int ) - > str : #* Move cursor up x lines
return f ' \033 [ { x } A '
@staticmethod
def down ( x : int ) - > str : #* Move cursor down x lines
return f ' \033 [ { x } B '
@staticmethod
def save ( ) - > str : #* Save cursor position
return " \033 [s "
@staticmethod
def restore ( ) - > str : #* Restore saved cursor postion
return " \033 [u "
t = to
r = right
l = left
u = up
d = down
class Key :
""" Handles the threaded input reader """
list : List [ str ] = [ ]
new = threading . Event ( )
idle = threading . Event ( )
idle . set ( )
stopping : bool = False
started : bool = False
reader : threading . Thread
@classmethod
def start ( cls ) :
cls . stopping = False
cls . reader = threading . Thread ( target = cls . _get_key )
cls . reader . start ( )
cls . started = True
@classmethod
def stop ( cls ) :
if cls . started and cls . reader . is_alive ( ) :
cls . stopping = True
try :
cls . reader . join ( )
except :
pass
2020-07-11 22:54:52 +00:00
@classmethod
def last ( cls ) - > str :
if cls . list : return cls . list . pop ( )
else : return " "
@classmethod
def get ( cls ) - > str :
if cls . list : return cls . list . pop ( 0 )
else : return " "
@classmethod
def has_key ( cls ) - > bool :
if cls . list : return True
else : return False
@classmethod
def clear ( cls ) :
cls . list = [ ]
2020-07-09 00:56:03 +00:00
@classmethod
def input_wait ( cls , sec : float = 0.0 ) - > bool :
''' Returns True if key is detected else waits out timer and returns False '''
cls . new . wait ( sec if sec > 0 else 0.0 )
if cls . new . is_set ( ) :
cls . new . clear ( )
return True
else :
return False
@classmethod
def _get_key ( cls ) :
""" Get a single key from stdin, convert to readable format and save to keys list. Meant to be run in it ' s own thread. """
input_key : str = " "
clean_key : str = " "
escape : Dict [ Union [ str , Tuple [ str , str ] ] , str ] = {
" \n " : " enter " ,
( " \x7f " , " \x08 " ) : " backspace " ,
( " [A " , " OA " ) : " up " ,
( " [B " , " OB " ) : " down " ,
( " [D " , " OD " ) : " left " ,
( " [C " , " OC " ) : " right " ,
" [2~ " : " insert " ,
" [3~ " : " delete " ,
" [H " : " home " ,
" [F " : " end " ,
" [5~ " : " page_up " ,
" [6~ " : " page_down " ,
" [Z " : " shift_tab " ,
" OP " : " f1 " ,
" OQ " : " f2 " ,
" OR " : " f3 " ,
" OS " : " f4 " ,
" [15 " : " f5 " ,
" [17 " : " f6 " ,
" [18 " : " f7 " ,
" [19 " : " f8 " ,
" [20 " : " f9 " ,
" [21 " : " f10 " ,
" [23 " : " f11 " ,
" [24 " : " f12 "
}
try :
while not cls . stopping :
with Raw ( sys . stdin ) :
if not select ( [ sys . stdin ] , [ ] , [ ] , 0.1 ) [ 0 ] : #* Wait 100ms for input on stdin then restart loop to check for stop flag
continue
cls . idle . clear ( ) #* Report IO block in progress to prevent Draw functions to get IO Block error
input_key + = sys . stdin . read ( 1 )
if input_key == " \033 " : #* If first character is a escape sequence read 5 more keys
Draw . idle . wait ( ) #* Wait for Draw function to finish if busy
with Nonblocking ( sys . stdin ) : #* Set non blocking to prevent read stall if less than 5 characters
input_key + = sys . stdin . read ( 5 )
if input_key == " \033 " : clean_key = " escape " #* Key is escape if only containing \033
2020-07-11 22:54:52 +00:00
elif input_key == " \\ " : clean_key = " \\ " #* Clean up "\" to not return escaped
2020-07-09 00:56:03 +00:00
else :
for code in escape . keys ( ) : #* Go trough dict of escape codes to get the cleaned key name
if input_key . lstrip ( " \033 " ) . startswith ( code ) :
clean_key = escape [ code ]
break
else : #* If not found in escape dict and length of key is 1, assume regular character
if len ( input_key ) == 1 :
clean_key = input_key
2020-07-11 22:54:52 +00:00
if testing : errlog . debug ( f ' Input key: { repr ( input_key ) } Clean key: { clean_key } ' ) #! Remove
2020-07-09 00:56:03 +00:00
if clean_key :
2020-07-11 22:54:52 +00:00
cls . list . append ( clean_key ) #* Store up to 10 keys in input queue for later processing
if len ( cls . list ) > 10 : del cls . list [ 0 ]
2020-07-09 00:56:03 +00:00
clean_key = " "
cls . new . set ( ) #* Set threading event to interrupt main thread sleep
input_key = " "
cls . idle . set ( ) #* Report IO blocking done
except Exception as e :
errlog . exception ( f ' Input reader failed with exception: { e } ' )
cls . idle . set ( )
cls . list . clear ( )
class Draw :
''' Holds the draw buffer and manages IO blocking queue
2020-07-11 22:54:52 +00:00
* . buffer ( [ + ] name [ ! ] , * args , append = False , now = False , z = 100 ) : Add * args to buffer
2020-07-09 00:56:03 +00:00
* - Adding " + " prefix to name sets append to True and appends to name ' s current string
* - Adding " ! " suffix to name sets now to True and print name ' s current string
* . out ( clear = False ) : Print all strings in buffer , clear = True clear all buffers after
* . now ( * args ) : Prints all arguments as a string
* . clear ( * names ) : Clear named buffers , all if no argument
2020-07-14 00:20:40 +00:00
* . last_screen ( ) : Prints all saved buffers
2020-07-09 00:56:03 +00:00
'''
strings : Dict [ str , str ] = { }
2020-07-11 22:54:52 +00:00
z_order : Dict [ str , int ] = { }
2020-07-14 00:20:40 +00:00
last : Dict [ str , str ] = { }
2020-07-09 00:56:03 +00:00
idle = threading . Event ( )
idle . set ( )
@classmethod
def now ( cls , * args ) :
''' Wait for input reader to be idle then print to screen '''
Key . idle . wait ( )
cls . idle . wait ( )
cls . idle . clear ( )
try :
print ( * args , sep = " " , end = " " , flush = True )
except BlockingIOError :
pass
Key . idle . wait ( )
print ( * args , sep = " " , end = " " , flush = True )
cls . idle . set ( )
@classmethod
2020-07-11 22:54:52 +00:00
def buffer ( cls , name : str , * args : str , append : bool = False , now : bool = False , z : int = 100 ) :
2020-07-09 00:56:03 +00:00
string : str = " "
if name . startswith ( " + " ) :
name = name . lstrip ( " + " )
append = True
if name . endswith ( " ! " ) :
name = name . rstrip ( " ! " )
now = True
2020-07-11 22:54:52 +00:00
if not name in cls . z_order or z != 100 : cls . z_order [ name ] = z
2020-07-09 00:56:03 +00:00
if args : string = " " . join ( args )
if name not in cls . strings or not append : cls . strings [ name ] = " "
cls . strings [ name ] + = string
if now : cls . now ( string )
@classmethod
2020-07-14 00:20:40 +00:00
def out ( cls , * names : str , clear = False ) :
2020-07-11 22:54:52 +00:00
out : str = " "
if not cls . strings : return
2020-07-14 00:20:40 +00:00
if names :
2020-07-11 22:54:52 +00:00
for name in sorted ( cls . z_order , key = cls . z_order . get , reverse = True ) :
if name in names :
out + = cls . strings [ name ]
2020-07-14 00:20:40 +00:00
cls . last [ name ] = out
2020-07-11 22:54:52 +00:00
if clear :
del cls . strings [ name ]
del cls . z_order [ name ]
cls . now ( out )
else :
for name in sorted ( cls . z_order , key = cls . z_order . get , reverse = True ) :
2020-07-14 00:20:40 +00:00
if name in cls . strings :
out + = cls . strings [ name ]
cls . last [ name ] = out
2020-07-11 22:54:52 +00:00
if clear : cls . strings = { }
2020-07-14 00:20:40 +00:00
cls . now ( out )
@classmethod
def last_screen ( cls ) - > str :
out : str = " "
for name in sorted ( cls . z_order , key = cls . z_order . get , reverse = True ) :
if name in cls . last :
out + = cls . last [ name ]
return out
2020-07-09 00:56:03 +00:00
@classmethod
2020-07-14 00:20:40 +00:00
def clear ( cls , * names , last : bool = False ) :
2020-07-09 00:56:03 +00:00
if names :
for name in names :
if name in cls . strings :
2020-07-11 22:54:52 +00:00
del cls . strings [ name ]
2020-07-14 00:20:40 +00:00
if last :
del cls . z_order [ name ]
if name in cls . last : del cls . last [ name ]
2020-07-09 00:56:03 +00:00
else :
cls . strings = { }
2020-07-14 00:20:40 +00:00
if last : cls . last = { }
2020-07-09 00:56:03 +00:00
class Color :
2020-07-11 22:54:52 +00:00
''' Holds representations for a 24-bit color value
2020-07-09 00:56:03 +00:00
__init__ ( color , depth = " fg " , default = False )
- - color accepts 6 digit hexadecimal : string " #RRGGBB " , 2 digit hexadecimal : string " #FF " or decimal RGB " 255 255 255 " as a string .
- - depth accepts " fg " or " bg "
__call__ ( * args ) converts arguments to a string and apply color
__str__ returns escape sequence to set color
__iter__ returns iteration over red , green and blue in integer values of 0 - 255.
* Values : . hexa : str | . dec : Tuple [ int , int , int ] | . red : int | . green : int | . blue : int | . depth : str | . escape : str
'''
2020-07-11 22:54:52 +00:00
hexa : str ; dec : Tuple [ int , int , int ] ; red : int ; green : int ; blue : int ; depth : str ; escape : str ; default : bool
2020-07-09 00:56:03 +00:00
def __init__ ( self , color : str , depth : str = " fg " , default : bool = False ) :
self . depth = depth
self . default = default
try :
if not color :
self . dec = ( - 1 , - 1 , - 1 )
self . hexa = " "
self . red = self . green = self . blue = - 1
self . escape = " \033 [49m " if depth == " bg " and default else " "
return
elif color . startswith ( " # " ) :
self . hexa = color
if len ( self . hexa ) == 3 :
self . hexa + = self . hexa [ 1 : 3 ] + self . hexa [ 1 : 3 ]
c = int ( self . hexa [ 1 : 3 ] , base = 16 )
self . dec = ( c , c , c )
elif len ( self . hexa ) == 7 :
self . dec = ( int ( self . hexa [ 1 : 3 ] , base = 16 ) , int ( self . hexa [ 3 : 5 ] , base = 16 ) , int ( self . hexa [ 5 : 7 ] , base = 16 ) )
else :
raise ValueError ( f ' Incorrectly formatted hexadeciaml rgb string: { self . hexa } ' )
else :
c_t = tuple ( map ( int , color . split ( " " ) ) )
if len ( c_t ) == 3 :
self . dec = c_t #type: ignore
else :
raise ValueError ( f ' RGB dec should be " 0-255 0-255 0-255 " ' )
ct = self . dec [ 0 ] + self . dec [ 1 ] + self . dec [ 2 ]
if ct > 255 * 3 or ct < 0 :
raise ValueError ( f ' RGB values out of range: { color } ' )
except Exception as e :
errlog . exception ( str ( e ) )
self . escape = " "
return
if self . dec and not self . hexa : self . hexa = f ' { hex ( self . dec [ 0 ] ) . lstrip ( " 0x " ) . zfill ( 2 ) } { hex ( self . dec [ 1 ] ) . lstrip ( " 0x " ) . zfill ( 2 ) } { hex ( self . dec [ 2 ] ) . lstrip ( " 0x " ) . zfill ( 2 ) } '
if self . dec and self . hexa :
self . red , self . green , self . blue = self . dec
self . escape = f ' \033 [ { 38 if self . depth == " fg " else 48 } ;2; { " ; " . join ( str ( c ) for c in self . dec ) } m '
def __str__ ( self ) - > str :
return self . escape
def __repr__ ( self ) - > str :
return repr ( self . escape )
def __iter__ ( self ) - > Iterable :
for c in self . dec : yield c
def __call__ ( self , * args : str ) - > str :
if len ( args ) == 0 : return " "
return f ' { self . escape } { " " . join ( args ) } { getattr ( Term , self . depth ) } '
@staticmethod
def escape_color ( hexa : str = " " , r : int = 0 , g : int = 0 , b : int = 0 , depth : str = " fg " ) - > str :
""" Returns escape sequence to set color
* accepts either 6 digit hexadecimal hexa = " #RRGGBB " , 2 digit hexadecimal : hexa = " #FF "
* or decimal RGB : r = 0 - 255 , g = 0 - 255 , b = 0 - 255
* depth = " fg " or " bg "
"""
dint : int = 38 if depth == " fg " else 48
color : str = " "
if hexa :
try :
if len ( hexa ) == 3 :
c = int ( hexa [ 1 : ] , base = 16 )
color = f ' \033 [ { dint } ;2; { c } ; { c } ; { c } m '
elif len ( hexa ) == 7 :
color = f ' \033 [ { dint } ;2; { int ( hexa [ 1 : 3 ] , base = 16 ) } ; { int ( hexa [ 3 : 5 ] , base = 16 ) } ; { int ( hexa [ 5 : 7 ] , base = 16 ) } m '
except ValueError as e :
errlog . exception ( f ' { e } ' )
else :
color = f ' \033 [ { dint } ;2; { r } ; { g } ; { b } m '
return color
@classmethod
def fg ( cls , * args ) - > str :
if len ( args ) > 1 : return cls . escape_color ( r = args [ 0 ] , g = args [ 1 ] , b = args [ 2 ] , depth = " fg " )
else : return cls . escape_color ( hexa = args [ 0 ] , depth = " fg " )
@classmethod
def bg ( cls , * args ) - > str :
if len ( args ) > 1 : return cls . escape_color ( r = args [ 0 ] , g = args [ 1 ] , b = args [ 2 ] , depth = " bg " )
else : return cls . escape_color ( hexa = args [ 0 ] , depth = " bg " )
class Theme :
''' __init__ accepts a dict containing { " color_element " : " color " } '''
themes : Dict [ str , str ] = { }
cached : Dict [ str , Dict [ str , str ] ] = { " Default " : DEFAULT_THEME }
current : str = " "
main_bg = main_fg = title = hi_fg = selected_bg = selected_fg = inactive_fg = proc_misc = cpu_box = mem_box = net_box = proc_box = div_line = temp_start = temp_mid = temp_end = cpu_start = cpu_mid = cpu_end = free_start = free_mid = free_end = cached_start = cached_mid = cached_end = available_start = available_mid = available_end = used_start = used_mid = used_end = download_start = download_mid = download_end = upload_start = upload_mid = upload_end = NotImplemented
gradient : Dict [ str , List [ str ] ] = {
" temp " : [ ] ,
" cpu " : [ ] ,
" free " : [ ] ,
" cached " : [ ] ,
" available " : [ ] ,
" used " : [ ] ,
" download " : [ ] ,
" upload " : [ ]
}
def __init__ ( self , theme : str ) :
self . refresh ( )
self . _load_theme ( theme )
def __call__ ( self , theme : str ) :
for k in self . gradient . keys ( ) : self . gradient [ k ] = [ ]
self . _load_theme ( theme )
def _load_theme ( self , theme : str ) :
tdict : Dict [ str , str ]
if theme in self . cached :
tdict = self . cached [ theme ]
elif theme in self . themes :
tdict = self . _load_file ( self . themes [ theme ] )
self . cached [ theme ] = tdict
else :
2020-07-11 22:54:52 +00:00
errlog . warning ( f ' No theme named " { theme } " found! ' )
2020-07-09 00:56:03 +00:00
theme = " Default "
2020-07-11 22:54:52 +00:00
CONFIG . color_theme = theme
2020-07-09 00:56:03 +00:00
tdict = DEFAULT_THEME
self . current = theme
#if CONFIG.color_theme != theme: CONFIG.color_theme = theme
#* Get key names from DEFAULT_THEME dict to not leave any color unset if missing from theme dict
for item , value in DEFAULT_THEME . items ( ) :
default = False if item not in [ " main_fg " , " main_bg " ] else True
depth = " fg " if item not in [ " main_bg " , " selected_bg " ] else " bg "
if item in tdict . keys ( ) :
setattr ( self , item , Color ( tdict [ item ] , depth = depth , default = default ) )
else :
setattr ( self , item , Color ( value , depth = depth , default = default ) )
2020-07-10 00:50:24 +00:00
#* Create color gradients from one, two or three colors, 101 values indexed 0-100
2020-07-09 00:56:03 +00:00
rgb : Dict [ str , Tuple [ int , int , int ] ]
2020-07-10 00:50:24 +00:00
colors : List [ List [ int ] ] = [ ]
2020-07-09 00:56:03 +00:00
for name in self . gradient . keys ( ) :
rgb = { " start " : getattr ( self , f ' { name } _start ' ) . dec , " mid " : getattr ( self , f ' { name } _mid ' ) . dec , " end " : getattr ( self , f ' { name } _end ' ) . dec }
2020-07-10 00:50:24 +00:00
colors = [ list ( getattr ( self , f ' { name } _start ' ) ) ]
2020-07-09 00:56:03 +00:00
if rgb [ " end " ] [ 0 ] > = 0 :
r = 50 if rgb [ " mid " ] [ 0 ] > = 0 else 100
for first , second in [ " start " , " mid " if r == 50 else " end " ] , [ " mid " , " end " ] :
for i in range ( r ) :
2020-07-10 00:50:24 +00:00
colors + = [ [ rgb [ first ] [ n ] + i * ( rgb [ second ] [ n ] - rgb [ first ] [ n ] ) / / r for n in range ( 3 ) ] ]
2020-07-09 00:56:03 +00:00
if r == 100 :
break
2020-07-10 00:50:24 +00:00
self . gradient [ name ] + = [ Color . fg ( * color ) for color in colors ]
2020-07-09 00:56:03 +00:00
else :
c = Color . fg ( * rgb [ " start " ] )
for _ in range ( 100 ) :
self . gradient [ name ] + = [ c ]
#* Set terminal colors
Term . fg , Term . bg = self . main_fg , self . main_bg
Draw . now ( self . main_fg , self . main_bg )
def refresh ( self ) :
''' Sets themes dict with names and paths to all found themes '''
self . themes = { " Default " : " Default " }
try :
for d in ( THEME_DIR , USER_THEME_DIR ) :
for f in os . listdir ( d ) :
if f . endswith ( " .theme " ) :
self . themes [ f ' { f [ : - 6 ] if d == THEME_DIR else f [ : - 6 ] + " * " } ' ] = f ' { d } / { f } '
except Exception as e :
errlog . exception ( str ( e ) )
@staticmethod
def _load_file ( path : str ) - > Dict [ str , str ] :
''' Load a bashtop formatted theme file and return a dict '''
new_theme : Dict [ str , str ] = { }
try :
with open ( path ) as f :
for line in f :
if not line . startswith ( " theme[ " ) : continue
key = line [ 6 : line . find ( " ] " ) ]
s = line . find ( ' " ' )
value = line [ s + 1 : line . find ( ' " ' , s + 1 ) ]
new_theme [ key ] = value
except Exception as e :
errlog . exception ( str ( e ) )
return new_theme
class Banner :
''' Holds the bpytop banner, .draw(line, [col=0], [center=False], [now=False]) '''
out : List [ str ] = [ ]
c_color : str = " "
length : int = 0
if not out :
2020-07-11 22:54:52 +00:00
for num , ( color , color2 , line ) in enumerate ( BANNER_SRC ) :
2020-07-09 00:56:03 +00:00
if len ( line ) > length : length = len ( line )
2020-07-11 22:54:52 +00:00
out_var = " "
2020-07-09 00:56:03 +00:00
line_color = Color . fg ( color )
2020-07-11 22:54:52 +00:00
line_color2 = Color . fg ( color2 )
2020-07-09 00:56:03 +00:00
line_dark = Color . fg ( f ' # { 80 - num * 6 } ' )
2020-07-11 22:54:52 +00:00
for n , letter in enumerate ( line ) :
2020-07-09 00:56:03 +00:00
if letter == " █ " and c_color != line_color :
2020-07-11 22:54:52 +00:00
if n > 5 and n < 25 : c_color = line_color2
else : c_color = line_color
out_var + = c_color
2020-07-09 00:56:03 +00:00
elif letter == " " :
letter = f ' { Mv . r ( 1 ) } '
2020-07-11 22:54:52 +00:00
c_color = " "
2020-07-09 00:56:03 +00:00
elif letter != " █ " and c_color != line_dark :
c_color = line_dark
2020-07-11 22:54:52 +00:00
out_var + = line_dark
out_var + = letter
out . append ( out_var )
2020-07-09 00:56:03 +00:00
@classmethod
def draw ( cls , line : int , col : int = 0 , center : bool = False , now : bool = False ) :
out : str = " "
if center : col = Term . width / / 2 - cls . length / / 2
for n , o in enumerate ( cls . out ) :
out + = f ' { Mv . to ( line + n , col ) } { o } '
out + = f ' { Term . fg } '
if now : Draw . out ( out )
else : return out
class Symbol :
h_line : str = " ─ "
v_line : str = " │ "
left_up : str = " ┌ "
right_up : str = " ┐ "
left_down : str = " └ "
right_down : str = " ┘ "
title_left : str = " ┤ "
title_right : str = " ├ "
div_up : str = " ┬ "
div_down : str = " ┴ "
graph_up : Dict [ float , str ] = {
0.0 : " ⠀ " , 0.1 : " ⢀ " , 0.2 : " ⢠ " , 0.3 : " ⢰ " , 0.4 : " ⢸ " ,
1.0 : " ⡀ " , 1.1 : " ⣀ " , 1.2 : " ⣠ " , 1.3 : " ⣰ " , 1.4 : " ⣸ " ,
2.0 : " ⡄ " , 2.1 : " ⣄ " , 2.2 : " ⣤ " , 2.3 : " ⣴ " , 2.4 : " ⣼ " ,
3.0 : " ⡆ " , 3.1 : " ⣆ " , 3.2 : " ⣦ " , 3.3 : " ⣶ " , 3.4 : " ⣾ " ,
4.0 : " ⡇ " , 4.1 : " ⣇ " , 4.2 : " ⣧ " , 4.3 : " ⣷ " , 4.4 : " ⣿ "
}
graph_down : Dict [ float , str ] = {
0.0 : " ⠀ " , 0.1 : " ⠈ " , 0.2 : " ⠘ " , 0.3 : " ⠸ " , 0.4 : " ⢸ " ,
1.0 : " ⠁ " , 1.1 : " ⠉ " , 1.2 : " ⠙ " , 1.3 : " ⠹ " , 1.4 : " ⢹ " ,
2.0 : " ⠃ " , 2.1 : " ⠋ " , 2.2 : " ⠛ " , 2.3 : " ⠻ " , 2.4 : " ⢻ " ,
3.0 : " ⠇ " , 3.1 : " ⠏ " , 3.2 : " ⠟ " , 3.3 : " ⠿ " , 3.4 : " ⢿ " ,
4.0 : " ⡇ " , 4.1 : " ⡏ " , 4.2 : " ⡟ " , 4.3 : " ⡿ " , 4.4 : " ⣿ "
}
meter : str = " ■ "
ok : str = f ' { Color . fg ( " #30ff50 " ) } √ { Color . fg ( " #cc " ) } '
fail : str = f ' { Color . fg ( " #ff3050 " ) } ! { Color . fg ( " #cc " ) } '
class Graph :
2020-07-11 22:54:52 +00:00
''' Class for creating and adding to graphs
* __str__ : returns graph as a string
* add ( value : int ) : adds a value to graph and returns it as a string
'''
2020-07-10 00:50:24 +00:00
out : str
width : int
height : int
graphs : Dict [ bool , List [ str ] ]
2020-07-09 00:56:03 +00:00
colors : List [ str ]
2020-07-10 00:50:24 +00:00
invert : bool
max_value : int
current : bool
last : int
2020-07-09 00:56:03 +00:00
symbol : Dict [ float , str ]
2020-07-10 00:50:24 +00:00
def __init__ ( self , width : int , height : int , color : Union [ List [ str ] , Color , None ] , data : List [ int ] , invert : bool = False , max_value : int = 0 ) :
self . graphs : Dict [ bool , List [ str ] ] = { False : [ ] , True : [ ] }
self . current : bool = True
self . colors : List [ str ] = [ ]
if isinstance ( color , list ) :
for i in range ( 1 , height + 1 ) : self . colors . insert ( 0 , color [ i * 100 / / height ] ) #* Calculate colors of graph
if invert : self . colors . reverse ( )
elif isinstance ( color , Color ) :
self . colors = [ f ' { color } ' for _ in range ( height ) ]
self . width = width
self . height = height
self . invert = invert
if not data : data = [ 0 ]
2020-07-09 00:56:03 +00:00
if max_value :
self . max_value = max_value
data = [ v * 100 / / max_value if v < max_value else 100 for v in data ] #* Convert values to percentage values of max_value with max_value as ceiling
2020-07-10 00:50:24 +00:00
else :
self . max_value = 0
2020-07-09 00:56:03 +00:00
self . symbol = Symbol . graph_down if invert else Symbol . graph_up
2020-07-10 00:50:24 +00:00
value_width : int = ceil ( len ( data ) / 2 )
filler : str = " "
if value_width > width : #* If the size of given data set is bigger then width of graph, shrink data set
data = data [ - ( width * 2 ) : ]
value_width = ceil ( len ( data ) / 2 )
elif value_width < width : #* If the size of given data set is smaller then width of graph, fill graph with whitespace
filler = " " * ( width - value_width )
2020-07-11 22:54:52 +00:00
if len ( data ) % 2 == 1 : data . insert ( 0 , 0 )
2020-07-10 00:50:24 +00:00
for _ in range ( height ) :
for b in [ True , False ] :
2020-07-11 22:54:52 +00:00
self . graphs [ b ] . append ( filler )
2020-07-10 00:50:24 +00:00
self . _create ( data , new = True )
def _create ( self , data : List [ int ] , new : bool = False ) :
h_high : int
h_low : int
value : Dict [ str , int ] = { " left " : 0 , " right " : 0 }
val : int
side : str
#* Create the graph
for h in range ( self . height ) :
h_high = round ( 100 * ( self . height - h ) / self . height ) if self . height > 1 else 100
h_low = round ( 100 * ( self . height - ( h + 1 ) ) / self . height ) if self . height > 1 else 0
for v in range ( len ( data ) ) :
if new : self . current = bool ( v % 2 ) #* Switch between True and False graphs
if new and v == 0 : self . last = 0
for val , side in [ self . last , " left " ] , [ data [ v ] , " right " ] : # type: ignore
if val > = h_high :
value [ side ] = 4
elif val < = h_low :
value [ side ] = 0
else :
if self . height == 1 : value [ side ] = round ( val * 4 / 100 + 0.5 )
else : value [ side ] = round ( ( val - h_low ) * 4 / ( h_high - h_low ) + 0.1 )
if new : self . last = data [ v ]
self . graphs [ self . current ] [ h ] + = self . symbol [ float ( value [ " left " ] + value [ " right " ] / 10 ) ]
self . last = data [ - 1 ]
self . out = " "
for h in range ( self . height ) :
if h > 0 : self . out + = f ' { Mv . d ( 1 ) } { Mv . l ( self . width ) } '
self . out + = f ' { " " if not self . colors else self . colors [ h ] } { self . graphs [ self . current ] [ h if not self . invert else ( self . height - 1 ) - h ] } '
self . out + = f ' { Term . fg } '
2020-07-09 00:56:03 +00:00
def add ( self , value : int ) :
self . current = not self . current
2020-07-10 00:50:24 +00:00
for n in range ( self . height ) :
self . graphs [ self . current ] [ n ] = self . graphs [ self . current ] [ n ] [ 1 : ]
2020-07-09 00:56:03 +00:00
if self . max_value : value = value * 100 / / self . max_value if value < self . max_value else 100
2020-07-10 00:50:24 +00:00
self . _create ( [ value ] )
return self . out
2020-07-09 00:56:03 +00:00
def __str__ ( self ) :
return self . out
2020-07-10 00:50:24 +00:00
def __repr__ ( self ) :
return repr ( self . out )
2020-07-09 00:56:03 +00:00
class Graphs :
''' Holds all graphs and lists of graphs for dynamically created graphs '''
cpu : Graph
cores : List [ Graph ] = [ ]
temps : List [ Graph ] = [ ]
net : Graph
detailed_cpu : Graph
detailed_mem : Graph
2020-07-11 22:54:52 +00:00
pid_cpu : Dict [ int , Graph ] = { }
2020-07-09 00:56:03 +00:00
class Meter :
''' Creates a percentage meter
__init__ ( value , width , theme , gradient_name ) to create new meter
__call__ ( value ) to set value and return meter as a string
__str__ returns last set meter as a string
'''
2020-07-10 00:50:24 +00:00
out : str
2020-07-09 00:56:03 +00:00
color_gradient : List [ str ]
color_inactive : Color
2020-07-10 00:50:24 +00:00
width : int
saved : Dict [ int , str ]
2020-07-09 00:56:03 +00:00
def __init__ ( self , value : int , width : int , gradient_name : str ) :
self . color_gradient = THEME . gradient [ gradient_name ]
self . color_inactive = THEME . inactive_fg
self . width = width
self . out = self . _create ( value )
2020-07-10 00:50:24 +00:00
self . saved = { value : self . out }
2020-07-09 00:56:03 +00:00
def __call__ ( self , value : int ) :
if value in self . saved . keys ( ) :
self . out = self . saved [ value ]
else :
self . out = self . _create ( value )
self . saved [ value ] = self . out
return self . out
def __str__ ( self ) :
return self . out
def __repr__ ( self ) :
return repr ( self . out )
def _create ( self , value : int ) :
if value > 100 : value = 100
elif value < 0 : value = 100
out : str = " "
for i in range ( 1 , self . width + 1 ) :
if value > = round ( i * 100 / self . width ) :
out + = f ' { self . color_gradient [ round ( i * 100 / self . width ) ] } { Symbol . meter } '
else :
out + = self . color_inactive ( Symbol . meter * ( self . width + 1 - i ) )
break
else :
out + = f ' { Term . fg } '
return out
class Meters :
cpu : Meter
mem_used : Meter
mem_available : Meter
mem_cached : Meter
mem_free : Meter
swap_used : Meter
swap_free : Meter
disks_used : Meter
disks_free : Meter
class Box :
''' Box class with all needed attributes for create_box() function '''
2020-07-10 00:50:24 +00:00
name : str
height_p : int
width_p : int
x : int
y : int
width : int
height : int
out : str
bg : str
_b_cpu_h : int
_b_mem_h : int
redraw_all : bool
2020-07-09 00:56:03 +00:00
@classmethod
def calc_sizes ( cls ) :
''' Calculate sizes of boxes '''
for sub in cls . __subclasses__ ( ) :
sub . _calc_size ( ) # type: ignore
@classmethod
def draw_bg ( cls , now : bool = True ) :
''' Draw all boxes outlines and titles '''
Draw . buffer ( " bg! " if now else " bg " , " " . join ( sub . _draw_bg ( ) for sub in cls . __subclasses__ ( ) ) ) # type: ignore
2020-07-14 00:20:40 +00:00
if now : Draw . clear ( " bg " )
2020-07-09 00:56:03 +00:00
class SubBox :
box_x : int = 0
box_y : int = 0
box_width : int = 0
box_height : int = 0
box_columns : int = 0
class CpuBox ( Box , SubBox ) :
name = " cpu "
2020-07-11 22:54:52 +00:00
x = 1
y = 1
2020-07-09 00:56:03 +00:00
height_p = 32
width_p = 100
2020-07-14 00:20:40 +00:00
redraw : bool = True
2020-07-09 00:56:03 +00:00
@classmethod
def _calc_size ( cls ) :
cls . width = round ( Term . width * cls . width_p / 100 )
cls . height = round ( Term . height * cls . height_p / 100 )
Box . _b_cpu_h = cls . height
#THREADS = 25
if THREADS > ( cls . height - 6 ) * 3 and cls . width > 200 : cls . box_width = 24 * 4 ; cls . box_height = ceil ( THREADS / 4 ) + 4 ; cls . box_columns = 4
elif THREADS > ( cls . height - 6 ) * 2 and cls . width > 150 : cls . box_width = 24 * 3 ; cls . box_height = ceil ( THREADS / 3 ) + 4 ; cls . box_columns = 3
elif THREADS > cls . height - 6 and cls . width > 100 : cls . box_width = 24 * 2 ; cls . box_height = ceil ( THREADS / 2 ) + 4 ; cls . box_columns = 2
else : cls . box_width = 24 ; cls . box_height = THREADS + 4 ; cls . box_columns = 1
if CONFIG . check_temp : cls . box_width + = 13 * cls . box_columns
if cls . box_height > cls . height - 2 : cls . box_height = cls . height - 2
cls . box_x = ( cls . width - 2 ) - cls . box_width
cls . box_y = cls . y + ( ( cls . height - 2 ) / / 2 ) - cls . box_height / / 2 + 1
cls . redraw_all = True
@classmethod
def _draw_bg ( cls ) - > str :
return f ' { create_box ( box = cls , line_color = THEME . cpu_box ) } \
{ Mv . to ( cls . y , cls . x + 10 ) } { THEME . cpu_box ( Symbol . title_left ) } { Fx . b } { THEME . hi_fg ( " m " ) } { THEME . title ( " enu " ) } { Fx . ub } { THEME . cpu_box ( Symbol . title_right ) } \
{ create_box ( x = cls . box_x , y = cls . box_y , width = cls . box_width , height = cls . box_height , line_color = THEME . div_line , fill = False , title = CPU_NAME [ : 18 if CONFIG . check_temp else 9 ] ) } '
@classmethod
def _draw_fg ( cls , cpu ) :
2020-07-14 00:20:40 +00:00
Draw . buffer ( " cpu " , f ' { Mv . to ( 1 , 1 ) } Cpu usage: { cpu . cpu_usage } \n Cpu freq: { cpu . cpu_freq } \n Load avg: { cpu . load_avg } \
\nTemps : { CpuCollector . cpu_temp } \n ' )
2020-07-09 00:56:03 +00:00
class MemBox ( Box ) :
name = " mem "
height_p = 40
width_p = 45
2020-07-11 22:54:52 +00:00
x = 1
y = 1
2020-07-09 00:56:03 +00:00
divider : int = 0
mem_width : int = 0
disks_width : int = 0
2020-07-14 00:20:40 +00:00
redraw : bool = True
2020-07-09 00:56:03 +00:00
@classmethod
def _calc_size ( cls ) :
cls . width = round ( Term . width * cls . width_p / 100 )
cls . height = round ( Term . height * cls . height_p / 100 ) + 1
Box . _b_mem_h = cls . height
cls . y = Box . _b_cpu_h + 1
cls . mem_width = cls . disks_width = round ( ( cls . width - 1 ) / 2 )
if cls . mem_width + cls . disks_width < cls . width - 2 : cls . mem_width + = 1
cls . divider = cls . x + cls . mem_width + 1
cls . redraw_all = True
@classmethod
def _draw_bg ( cls ) - > str :
return f ' { create_box ( box = cls , line_color = THEME . mem_box ) } \
{ Mv . to ( cls . y , cls . divider + 2 ) } { THEME . mem_box ( Symbol . title_left ) } { Fx . b } { THEME . title ( " disks " ) } { Fx . ub } { THEME . mem_box ( Symbol . title_right ) } \
{ Mv . to ( cls . y , cls . divider ) } { THEME . mem_box ( Symbol . div_up ) } \
{ Mv . to ( cls . y + cls . height - 1 , cls . divider ) } { THEME . mem_box ( Symbol . div_down ) } { THEME . div_line } \
{ " " . join ( f " { Mv . to ( cls . y + i , cls . divider ) } { Symbol . v_line } " for i in range ( 1 , cls . height - 1 ) ) } '
class NetBox ( Box , SubBox ) :
name = " net "
height_p = 28
width_p = 45
2020-07-11 22:54:52 +00:00
x = 1
y = 1
2020-07-14 00:20:40 +00:00
redraw : bool = True
2020-07-09 00:56:03 +00:00
@classmethod
def _calc_size ( cls ) :
cls . width = round ( Term . width * cls . width_p / 100 )
cls . height = Term . height - Box . _b_cpu_h - Box . _b_mem_h
cls . y = Term . height - cls . height + 1
cls . box_width = 24
cls . box_height = 9 if cls . height > 12 else cls . height - 2
cls . box_x = cls . width - cls . box_width - 2
cls . box_y = cls . y + ( ( cls . height - 2 ) / / 2 ) - cls . box_height / / 2 + 1
cls . redraw_all = True
@classmethod
def _draw_bg ( cls ) - > str :
return f ' { create_box ( box = cls , line_color = THEME . net_box ) } \
{ create_box ( x = cls . box_x , y = cls . box_y , width = cls . box_width , height = cls . box_height , line_color = THEME . div_line , fill = False , title = " Download " , title2 = " Upload " ) } '
class ProcBox ( Box ) :
name = " proc "
height_p = 68
width_p = 55
2020-07-11 22:54:52 +00:00
x = 1
y = 1
2020-07-09 00:56:03 +00:00
detailed : bool = False
detailed_x : int = 0
detailed_y : int = 0
detailed_width : int = 0
detailed_height : int = 8
2020-07-14 00:20:40 +00:00
redraw : bool = True
2020-07-09 00:56:03 +00:00
@classmethod
def _calc_size ( cls ) :
cls . width = round ( Term . width * cls . width_p / 100 )
cls . height = round ( Term . height * cls . height_p / 100 )
cls . x = Term . width - cls . width + 1
cls . y = Box . _b_cpu_h + 1
cls . detailed_x = cls . x
cls . detailed_y = cls . y
cls . detailed_height = 8
cls . detailed_width = cls . width
cls . redraw_all = True
@classmethod
def _draw_bg ( cls ) - > str :
return create_box ( box = cls , line_color = THEME . proc_box )
class Collector :
''' Data collector master class
* . start ( ) : Starts collector thread
* . stop ( ) : Stops collector thread
* . collect ( * collectors : Collector , draw_now : bool = True , interrupt : bool = False ) : queues up collectors to run '''
stopping : bool = False
started : bool = False
draw_now : bool = False
thread : threading . Thread
collect_run = threading . Event ( )
collect_idle = threading . Event ( )
collect_idle . set ( )
collect_done = threading . Event ( )
collect_queue : List = [ ]
collect_interrupt : bool = False
2020-07-14 00:20:40 +00:00
draw_block : bool = False
2020-07-09 00:56:03 +00:00
@classmethod
def start ( cls ) :
cls . stopping = False
cls . thread = threading . Thread ( target = cls . _runner , args = ( ) )
cls . thread . start ( )
cls . started = True
@classmethod
def stop ( cls ) :
if cls . started and cls . thread . is_alive ( ) :
cls . stopping = True
try :
cls . thread . join ( )
except :
pass
@classmethod
def _runner ( cls ) :
''' This is meant to run in it ' s own thread, collecting and drawing when collect_run is set '''
while not cls . stopping :
cls . collect_run . wait ( 0.1 )
if not cls . collect_run . is_set ( ) :
continue
cls . collect_run . clear ( )
cls . collect_idle . clear ( )
while cls . collect_queue :
collector = cls . collect_queue . pop ( )
collector . _collect ( )
collector . _draw ( )
2020-07-14 00:20:40 +00:00
if cls . draw_now and not cls . draw_block :
Draw . out ( " cpu " , " mem " , " net " , " proc " )
2020-07-09 00:56:03 +00:00
cls . collect_idle . set ( )
cls . collect_done . set ( )
@classmethod
def collect ( cls , * collectors : object , draw_now : bool = True , interrupt : bool = False ) :
''' Setup collect queue for _runner '''
#* Set interrupt flag if True to stop _runner prematurely
cls . collect_interrupt = interrupt
#* Wait for _runner to finish
cls . collect_idle . wait ( )
#* Reset interrupt flag
cls . collect_interrupt = False
#* Set draw_now flag if True to draw to screen instead of buffer
cls . draw_now = draw_now
#* Append any collector given as argument to _runner queue
if collectors :
cls . collect_queue = [ * collectors ]
#* Add all collectors to _runner queue if no collectors in argument
else :
cls . collect_queue = list ( cls . __subclasses__ ( ) )
#* Set run flag to start _runner
cls . collect_run . set ( )
class CpuCollector ( Collector ) :
''' Collects cpu usage for cpu and cores, cpu frequency, load_avg
_collect ( ) : Collects data
_draw ( ) : calls CpuBox . _draw_fg ( ) '''
2020-07-14 00:20:40 +00:00
cpu_usage : List [ List [ int ] ] = [ [ ] ]
cpu_temp : List [ List [ int ] ] = [ [ ] ]
cpu_temp_high : int = 0
cpu_temp_crit : int = 0
for _ in range ( THREADS ) :
2020-07-09 00:56:03 +00:00
cpu_usage . append ( [ ] )
2020-07-14 00:20:40 +00:00
cpu_temp . append ( [ ] )
2020-07-09 00:56:03 +00:00
cpu_freq : int = 0
load_avg : List [ float ] = [ ]
2020-07-14 00:20:40 +00:00
@staticmethod
def _get_sensors ( ) - > str :
''' Check if we can get cpu temps and return method of getting temps '''
if SYSTEM == " MacOS " :
try :
if which ( " osx-cpu-temp " ) and subprocess . check_output ( " osx-cpu-temp " , text = True ) . rstrip ( ) . endswith ( " °C " ) :
return " osx-cpu-temp "
except : pass
elif hasattr ( psutil , " sensors_temperatures " ) :
try :
temps = psutil . sensors_temperatures ( )
if temps :
for _ , entries in temps . items ( ) :
for entry in entries :
if entry . label . startswith ( ( " Package " , " Core 0 " , " Tdie " ) ) :
return " psutil "
except : pass
try :
if SYSTEM == " Linux " and which ( " vcgencmd " ) and subprocess . check_output ( " vcgencmd measure_temp " , text = True ) . rstrip ( ) . endswith ( " ' C " ) :
return " vcgencmd "
except : pass
return " "
sensor_method : str = _get_sensors . __func__ ( ) # type: ignore
got_sensors : bool = True if sensor_method else False
2020-07-09 00:56:03 +00:00
@classmethod
def _collect ( cls ) :
cls . cpu_usage [ 0 ] . append ( round ( psutil . cpu_percent ( percpu = False ) ) )
for n , thread in enumerate ( psutil . cpu_percent ( percpu = True ) , start = 1 ) :
cls . cpu_usage [ n ] . append ( round ( thread ) )
2020-07-14 00:20:40 +00:00
if len ( cls . cpu_usage [ n ] ) > Term . width * 2 :
del cls . cpu_usage [ n ] [ 0 ]
cls . cpu_freq = round ( psutil . cpu_freq ( ) . current )
cls . load_avg = [ round ( lavg , 2 ) for lavg in os . getloadavg ( ) ]
2020-07-09 00:56:03 +00:00
2020-07-14 00:20:40 +00:00
if CONFIG . check_temp and cls . got_sensors :
cls . _collect_temps ( )
@classmethod
def _collect_temps ( cls ) :
temp : int
cores : List [ int ] = [ ]
cpu_type : str = " "
if cls . sensor_method == " psutil " :
for _ , entries in psutil . sensors_temperatures ( ) . items ( ) :
for entry in entries :
if entry . label . startswith ( ( " Package " , " Tdie " ) ) :
cpu_type = " ryzen " if entry . label . startswith ( " Package " ) else " ryzen "
if not cls . cpu_temp_high :
cls . cpu_temp_high , cls . cpu_temp_crit = round ( entry . high ) , round ( entry . critical )
temp = round ( entry . current )
elif entry . label . startswith ( ( " Core " , " Tccd " ) ) :
if not cpu_type :
cpu_type = " other "
if not cls . cpu_temp_high :
cls . cpu_temp_high , cls . cpu_temp_crit = round ( entry . high ) , round ( entry . critical )
temp = round ( entry . current )
cores . append ( round ( entry . current ) )
if len ( cores ) < THREADS :
if cpu_type == " intel " or ( cpu_type == " other " and len ( cores ) == THREADS / / 2 ) :
cls . cpu_temp [ 0 ] . append ( temp )
for n , t in enumerate ( cores , start = 1 ) :
cls . cpu_temp [ n ] . append ( t )
cls . cpu_temp [ THREADS / / 2 + n ] . append ( t )
elif cpu_type == " ryzen " or cpu_type == " other " :
cls . cpu_temp [ 0 ] . append ( temp )
if len ( cores ) < 1 : cores . append ( temp )
z = 1
for t in cores :
for i in range ( THREADS / / len ( cores ) ) :
cls . cpu_temp [ z + i ] . append ( t )
z + = i
else :
cores . insert ( 0 , temp )
for n , t in enumerate ( cores ) :
cls . cpu_temp [ n ] . append ( t )
else :
2020-07-09 00:56:03 +00:00
try :
2020-07-14 00:20:40 +00:00
if cls . sensor_method == " osx-cpu-temp " :
temp = round ( float ( subprocess . check_output ( " osx-cpu-temp " , text = True ) . rstrip ( ) . rstrip ( " °C " ) ) )
elif cls . sensor_method == " vcgencmd " :
temp = round ( float ( subprocess . check_output ( " vcgencmd measure_temp " , text = True ) . rstrip ( ) . rstrip ( " ' C " ) ) )
except Exception as e :
errlog . exception ( f ' { e } ' )
cls . got_sensors = False
CONFIG . check_temp = False
CpuBox . _calc_size ( )
else :
2020-07-09 00:56:03 +00:00
for n in range ( THREADS + 1 ) :
2020-07-14 00:20:40 +00:00
cls . cpu_temp [ n ] . append ( temp )
if len ( cls . cpu_temp [ 0 ] ) > 5 :
for n in range ( len ( cls . cpu_temp ) ) :
del cls . cpu_temp [ n ] [ 0 ]
2020-07-09 00:56:03 +00:00
@classmethod
def _draw ( cls ) :
CpuBox . _draw_fg ( cls )
#class ProcCollector(Collector): #! add interrupt on _collect and _draw
@timerd
def testing_collectors ( ) :
2020-07-14 00:20:40 +00:00
Box . draw_bg ( )
for _ in range ( 1 ) :
Collector . collect ( CpuCollector )
Collector . collect_done . wait ( )
sleep ( 1 )
#Draw.now(f'Cpu usage: {CpuCollector.cpu_usage}\nCpu freq: {CpuCollector.cpu_freq}\nLoad avg: {CpuCollector.load_avg}\n\
# Temps: {CpuCollector.cpu_temp}\n')
2020-07-09 00:56:03 +00:00
2020-07-11 22:54:52 +00:00
class Menu :
''' Holds the main menu and all submenus
* uncolor ( string : str ) : removes all color and returns string with THEME . inactive_fg color
'''
color_re = re . compile ( r " \ 033 \ [ \ d+; \ d?;? \ d*;? \ d*;? \ d*m {1} " )
@classmethod
def uncolor ( cls , string : str ) - > str :
return f ' { THEME . inactive_fg } { cls . color_re . sub ( " " , string ) } { Term . fg } '
2020-07-09 00:56:03 +00:00
#? Functions ------------------------------------------------------------------------------------->
def get_cpu_name ( ) - > str :
''' Fetch a suitable CPU identifier from the CPU model name string '''
name : str = " "
nlist : List = [ ]
command : str = " "
cmd_out : str = " "
rem_line : str = " "
if SYSTEM == " Linux " :
command = " cat /proc/cpuinfo "
rem_line = " model name "
elif SYSTEM == " MacOS " :
command = " sysctl -n machdep.cpu.brand_string "
elif SYSTEM == " BSD " :
command = " sysctl hw.model "
rem_line = " hw.model "
2020-07-14 00:20:40 +00:00
try :
cmd_out = subprocess . check_output ( " LANG=C " + command , shell = True , universal_newlines = True )
except :
pass
2020-07-09 00:56:03 +00:00
if rem_line :
for line in cmd_out . split ( " \n " ) :
if rem_line in line :
name = re . sub ( " .* " + rem_line + " .*: " , " " , line , 1 ) . lstrip ( )
else :
name = cmd_out
nlist = name . split ( " " )
if " Xeon " in name :
name = nlist [ nlist . index ( " CPU " ) + 1 ]
elif " Ryzen " in name :
name = " " . join ( nlist [ nlist . index ( " Ryzen " ) : nlist . index ( " Ryzen " ) + 3 ] )
elif " CPU " in name :
name = nlist [ nlist . index ( " CPU " ) - 1 ]
return name
def create_box ( x : int = 0 , y : int = 0 , width : int = 0 , height : int = 0 , title : str = " " , title2 : str = " " , line_color : Color = None , fill : bool = True , box = None ) - > str :
''' Create a box from a box object or by given arguments '''
out : str = f ' { Term . fg } { Term . bg } '
if not line_color : line_color = THEME . div_line
#* Get values from box class if given
if box :
x = box . x
y = box . y
width = box . width
height = box . height
title = box . name
vlines : Tuple [ int , int ] = ( x , x + width - 1 )
hlines : Tuple [ int , int ] = ( y , y + height - 1 )
#* Fill box if enabled
if fill :
for i in range ( y + 1 , y + height - 1 ) :
out + = f ' { Mv . to ( i , x ) } { " " * ( width - 1 ) } '
out + = f ' { line_color } '
#* Draw all horizontal lines
for hpos in hlines :
out + = f ' { Mv . to ( hpos , x ) } { Symbol . h_line * ( width - 1 ) } '
#* Draw all vertical lines
for vpos in vlines :
for hpos in range ( y , y + height - 1 ) :
out + = f ' { Mv . to ( hpos , vpos ) } { Symbol . v_line } '
#* Draw corners
out + = f ' { Mv . to ( y , x ) } { Symbol . left_up } \
{ Mv . to ( y , x + width - 1 ) } { Symbol . right_up } \
{ Mv . to ( y + height - 1 , x ) } { Symbol . left_down } \
{ Mv . to ( y + height - 1 , x + width - 1 ) } { Symbol . right_down } '
#* Draw titles if enabled
if title :
out + = f ' { Mv . to ( y , x + 2 ) } { Symbol . title_left } { THEME . title } { Fx . b } { title } { Fx . ub } { line_color } { Symbol . title_right } '
if title2 :
out + = f ' { Mv . to ( y + height - 1 , x + 2 ) } { Symbol . title_left } { THEME . title } { Fx . b } { title2 } { Fx . ub } { line_color } { Symbol . title_right } '
return f ' { out } { Term . fg } { Mv . to ( y + 1 , x + 1 ) } '
def now_sleeping ( signum , frame ) :
""" Reset terminal settings and stop background input read before putting to sleep """
Key . stop ( )
Collector . stop ( )
Draw . now ( Term . clear , Term . normal_screen , Term . show_cursor )
Term . echo ( True )
os . kill ( os . getpid ( ) , signal . SIGSTOP )
def now_awake ( signum , frame ) :
""" Set terminal settings and restart background input read """
Draw . now ( Term . alt_screen , Term . clear , Term . hide_cursor )
Term . echo ( False )
Key . start ( )
Collector . start ( )
def quit_sigint ( signum , frame ) :
""" SIGINT redirection to clean_quit() """
clean_quit ( )
def clean_quit ( errcode : int = 0 , errmsg : str = " " ) :
""" Stop background input read, save current config and reset terminal settings before quitting """
Key . stop ( )
Collector . stop ( )
if not errcode : CONFIG . save_config ( )
2020-07-11 22:54:52 +00:00
if not testing : Draw . now ( Term . clear , Term . normal_screen , Term . show_cursor ) #! Enable
else : Draw . now ( Term . show_cursor ) #! Remove
2020-07-09 00:56:03 +00:00
Term . echo ( True )
if errcode == 0 :
errlog . info ( f ' Exiting. Runtime { timedelta ( seconds = round ( time ( ) - SELF_START , 0 ) ) } \n ' )
else :
errlog . warning ( f ' Exiting with errorcode ( { errcode } ). Runtime { timedelta ( seconds = round ( time ( ) - SELF_START , 0 ) ) } \n ' )
if errmsg : print ( errmsg )
raise SystemExit ( errcode )
def floating_humanizer ( value : Union [ float , int ] , bit : bool = False , per_second : bool = False , start : int = 0 , short : bool = False ) - > str :
''' Scales up in steps of 1024 to highest possible unit and returns string with unit suffixed
* bit = True or defaults to bytes
* start = int to set 1024 multiplier starting unit
* short = True always returns 0 decimals and shortens unit to 1 character
'''
out : str = " "
unit : Tuple [ str , . . . ] = UNITS [ " bit " ] if bit else UNITS [ " byte " ]
selector : int = start if start else 0
mult : int = 8 if bit else 1
if value < = 0 : value = 0
if isinstance ( value , float ) : value = round ( value * 100 * mult )
elif value > 0 : value * = 100 * mult
while len ( f ' { value } ' ) > 5 and value > = 102400 :
value >> = 10
if value < 100 :
out = f ' { value } '
break
selector + = 1
else :
if len ( f ' { value } ' ) < 5 and len ( f ' { value } ' ) > = 2 and selector > 0 :
decimals = 5 - len ( f ' { value } ' )
out = f ' { value } ' [ : - 2 ] + " . " + f ' { value } ' [ - decimals : ]
elif len ( f ' { value } ' ) > = 2 :
out = f ' { value } ' [ : - 2 ]
if short : out = out . split ( " . " ) [ 0 ]
out + = f ' { " " if short else " " } { unit [ selector ] [ 0 ] if short else unit [ selector ] } '
if per_second : out + = " /s " if bit else " ps "
return out
#? Main function --------------------------------------------------------------------------------->
def main ( ) :
clean_quit ( )
#? Pre main -------------------------------------------------------------------------------------->
CPU_NAME : str = get_cpu_name ( )
2020-07-14 00:20:40 +00:00
testing = True #! Remove
2020-07-09 00:56:03 +00:00
#! For testing ------------------------------------------------------------------------------->
def waitone ( t : float = 0.0 ) :
if t > 0.0 : Key . new . wait ( t )
else : Key . new . wait ( )
Key . new . clear ( )
Draw . clear ( )
Draw . now ( Term . clear )
@timerd
def testing_gradients ( ) :
# for theme in THEME.themes.keys():
# THEME(theme)
# timer = time()
for key in THEME . gradient . keys ( ) :
Draw . now ( f ' { Term . fg } { key } : \n { " █ " . join ( THEME . gradient [ key ] ) } \n ' )
Draw . now ( f ' { Term . fg } ' )
# Draw.now(f'Theme creation of {CONFIG.color_theme} took {time() - timer:.5f} seconds\n\n')
Draw . now ( " \n " )
# THEME("Default")
@timerd
def testing_humanizer ( ) :
for i in range ( 1 , 101 , 3 ) :
for n in range ( 1 , 6 ) :
Draw . now ( floating_humanizer ( i * ( 1050 * n ) * ( n << 10 ) , bit = False , per_second = False , short = False ) , " " )
Draw . now ( " \n " )
for i in range ( 1 , 30 ) :
for n in range ( 1 , 8 ) :
Draw . now ( floating_humanizer ( ( 995 + i ) << ( n * 10 ) , bit = False ) , " " )
Draw . now ( " \n " )
@timerd
def testing_colors ( ) :
for item , _ in DEFAULT_THEME . items ( ) :
Draw . buffer ( " +testing " , Fx . b , getattr ( THEME , item ) ( f ' { item : <20 } ' ) , Fx . ub , f ' { " hex= " + getattr ( THEME , item ) . hexa : <20 } dec= { getattr ( THEME , item ) . dec } \n ' )
Draw . out ( )
@timerd
def testing_boxes ( ) :
#Box.calc_sizes()
Box . draw_bg ( )
Draw . now ( Mv . to ( Term . height - 3 , 1 ) )
#Draw.now(create_box(20, 20, 15, 10, "Hej"))
#waitone()
#Draw.now(Term.normal_screen, Term.show_cursor)
@timerd
def testing_banner ( ) :
Draw . buffer ( " banner " , Banner . draw ( 18 , center = True ) )
Draw . out ( )
Draw . now ( Mv . to ( 35 , 1 ) )
@timerd
def testing_meter ( ) :
Draw . clear ( " meters " )
for _ in range ( 10 ) :
Draw . buffer ( " +meters " , " 1234567890 " )
Draw . buffer ( " +meters " , " \n " )
stamp = time ( )
test_meter = Meter ( 0 , Term . width , " cpu " )
for i in range ( 0 , 101 , 2 ) :
Draw . buffer ( " +meters " , test_meter ( i ) , " \n " )
Draw . buffer ( " +meter " , f ' { time ( ) - stamp } ' )
Draw . out ( )
def testing_keyinput ( ) :
line : str = " "
this_key : str = " "
count : int = 0
while True :
count + = 1
Draw . now ( f ' { Mv . to ( 1 , 1 ) } { Fx . b } { THEME . temp_start ( " Count: " ) } { count } { THEME . cached_mid ( " Time: " ) } { strftime ( " % H: % M: % S " , localtime ( ) ) } ' ,
f ' { Color . fg ( " #ff " ) } Width: { Term . width } Height: { Term . height } Resized: { Term . resized } ' )
if Key . input_wait ( 1 ) :
while Key . list :
#Key.new.clear()
this_key = Key . list . pop ( )
Draw . now ( f ' { Mv . to ( 2 , 1 ) } { Color . fg ( " #ff9050 " ) } { Fx . b } Last key= { Term . fg } { Fx . ub } { repr ( this_key ) : 14 } { " " } ' )
if this_key == " backspace " :
line = line [ : - 1 ]
elif this_key == " escape " :
line = " "
elif this_key == " Q " :
clean_quit ( )
elif this_key == " R " :
raise Exception ( " Test ERROR " )
elif len ( this_key ) == 1 :
line + = this_key
Draw . now ( f ' { Color . fg ( " #90ff50 " ) } { Fx . b } Command= { Term . fg } { Fx . ub } { line } { Fx . bl } | { Fx . ubl } \033 [0K \n ' )
if this_key == " enter " :
try :
exec ( line )
except :
pass
Draw . clear ( )
2020-07-10 00:50:24 +00:00
def testing_graphs ( ) :
my_data = [ x for x in range ( 0 , 101 ) ]
my_data + = [ x for x in range ( 100 , - 1 , - 1 ) ]
2020-07-11 22:54:52 +00:00
#my_data100 = [100 for _ in range(200)]
my_data100 = [ randint ( 0 , 100 ) for _ in range ( Term . width * 2 ) ]
my_data2 = my_data [ - 90 : ]
my_data3 = my_data [ : 86 ]
my_colors = [ ]
for i in range ( 51 ) :
for _ in range ( 2 ) : my_colors . append ( Color . fg ( i , i , i ) )
#my_colors.reverse()
2020-07-10 00:50:24 +00:00
2020-07-11 22:54:52 +00:00
my_graph = Graph ( Term . width , Term . height / / 2 , my_colors , my_data100 , invert = True )
my_graph2 = Graph ( Term . width , Term . height / / 2 , my_colors , my_data100 , invert = False )
# my_graph3 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data2)
# my_graph4 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data3)
# my_graph5 = Graph(100, Term.height // 3, THEME.inactive_fg, my_data)
2020-07-10 00:50:24 +00:00
2020-07-11 22:54:52 +00:00
#pause = re.compile(r"\033\[\d+;\d?;?\d*;?\d*;?\d*m{1}")
#repl = "\033[0;37m"
banner = Banner . draw ( Term . height / / 3 - 2 , center = True )
Draw . now ( f ' { Fx . ub } { Mv . to ( 0 , 0 ) } { my_graph } \
{ Mv . to ( Term . height / / 2 , 0 ) } { my_graph2 } \
{ banner } ' )
# {Mv.to(Term.height - (Term.height // 3), Term.width // 2 - 50)}{my_graph5}\
# {Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 - 50)}{my_graph3}\
# {Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 + 7)}{my_graph4}\
#t = 1
x = 0
for _ in range ( 200 ) :
sleep ( 0.05 )
2020-07-10 00:50:24 +00:00
x = randint ( 0 , 100 )
2020-07-11 22:54:52 +00:00
# x += 1 if t == 1 else -1
# if x == 100: t = 0
# if x == 0: t = 1
Draw . now ( f ' { Fx . ub } { Mv . to ( 0 , 0 ) } { my_graph . add ( x ) } \
{ Mv . to ( Term . height / / 2 , 0 ) } { my_graph2 . add ( x ) } \
{ banner } ' )
# Draw.now(f'{Mv.to(Term.height - (Term.height // 3), Term.width // 2 - 50)}{my_graph5.add(x)}')
# Draw.now(f'{Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 - 50)}{my_graph3.add(x)}')
# Draw.now(f'{Mv.to(Term.height - (Term.height // 3) - 1, Term.width // 2 + 7)}{my_graph4.add(x)}')
2020-07-10 00:50:24 +00:00
Draw . now ( Mv . to ( Term . height - 4 , 0 ) )
2020-07-09 00:56:03 +00:00
#! Remove ------------------------------------------------------------------------------------<
if __name__ == " __main__ " :
#? Init -------------------------------------------------------------------------------------->
Timer . start ( " Init " )
2020-07-11 22:54:52 +00:00
class Init :
running : bool = True
initbg_colors : List [ str ] = [ ]
initbg_data : List [ int ]
initbg_up : Graph
initbg_down : Graph
@staticmethod
def fail ( err ) :
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Symbol . fail } ' )
errlog . exception ( f ' { err } ' )
sleep ( 2 )
clean_quit ( 1 , errmsg = f ' Error during init! See { CONFIG_DIR } /error.log for more information. ' )
@classmethod
def success ( cls , start : bool = False ) :
if start :
Draw . buffer ( " initbg " , z = 10 )
Draw . buffer ( " init " , z = 1 )
for i in range ( 51 ) :
for _ in range ( 2 ) : cls . initbg_colors . append ( Color . fg ( i , i , i ) )
Draw . buffer ( " banner " , f ' { Banner . draw ( Term . height / / 2 - 10 , center = True ) } { Color . fg ( " #50 " ) } \n ' , z = 2 )
for _i in range ( 10 ) :
perc = f ' { str ( ( _i + 1 ) * 10 ) + " % " : >5 } '
Draw . buffer ( " +banner " , f ' { Mv . to ( Term . height / / 2 - 3 + _i , Term . width / / 2 - 28 ) } { Fx . trans ( perc ) } { Symbol . v_line } ' )
Draw . out ( " banner " )
Draw . buffer ( " +init! " , f ' { Color . fg ( " #cc " ) } { Fx . b } { Mv . to ( Term . height / / 2 - 3 , Term . width / / 2 - 21 ) } { Mv . save ( ) } ' )
if start or Term . resized :
cls . initbg_data = [ randint ( 0 , 100 ) for _ in range ( Term . width * 2 ) ]
cls . initbg_up = Graph ( Term . width , Term . height / / 2 , cls . initbg_colors , cls . initbg_data , invert = True )
cls . initbg_down = Graph ( Term . width , Term . height / / 2 , cls . initbg_colors , cls . initbg_data , invert = False )
if start : return
if not testing :
cls . draw_bg ( 10 )
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Symbol . ok } \n { Mv . r ( Term . width / / 2 - 22 ) } { Mv . save ( ) } ' )
@classmethod
def draw_bg ( cls , times : int = 10 ) :
for _ in range ( times ) :
sleep ( 0.05 )
x = randint ( 0 , 100 )
Draw . buffer ( " initbg " , f ' { Fx . ub } { Mv . to ( 0 , 0 ) } { cls . initbg_up . add ( x ) } { Mv . to ( Term . height / / 2 , 0 ) } { cls . initbg_down . add ( x ) } ' )
Draw . out ( " initbg " , " banner " , " init " )
@classmethod
def done ( cls ) :
cls . draw_bg ( 20 )
Draw . clear ( " initbg " , " banner " , " init " )
cls . running = False
del cls . initbg_up , cls . initbg_down , cls . initbg_data , cls . initbg_colors
2020-07-09 00:56:03 +00:00
#? Switch to alternate screen, clear screen, hide cursor and disable input echo
2020-07-11 22:54:52 +00:00
if not testing : Draw . now ( Term . alt_screen , Term . clear , Term . hide_cursor ) #! Enable
else : Draw . now ( Term . clear , Term . hide_cursor ) #! Disable
2020-07-09 00:56:03 +00:00
Term . echo ( False )
#? Draw banner and init status
2020-07-11 22:54:52 +00:00
if not testing : Init . success ( start = True )
2020-07-09 00:56:03 +00:00
#? Load theme
2020-07-11 22:54:52 +00:00
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Fx . trans ( " Loading theme and creating colors... " ) } { Mv . save ( ) } ' )
2020-07-09 00:56:03 +00:00
try :
THEME : Theme = Theme ( CONFIG . color_theme )
except Exception as e :
2020-07-11 22:54:52 +00:00
Init . fail ( e )
2020-07-09 00:56:03 +00:00
else :
2020-07-11 22:54:52 +00:00
Init . success ( )
2020-07-09 00:56:03 +00:00
#? Setup boxes
2020-07-11 22:54:52 +00:00
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Fx . trans ( " Doing some maths and drawing... " ) } { Mv . save ( ) } ' )
2020-07-09 00:56:03 +00:00
try :
Box . calc_sizes ( )
Box . draw_bg ( now = False )
except Exception as e :
2020-07-11 22:54:52 +00:00
Init . fail ( e )
2020-07-09 00:56:03 +00:00
else :
2020-07-11 22:54:52 +00:00
Init . success ( )
2020-07-09 00:56:03 +00:00
#? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH
2020-07-11 22:54:52 +00:00
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Fx . trans ( " Setting up signal handlers... " ) } { Mv . save ( ) } ' )
2020-07-09 00:56:03 +00:00
try :
signal . signal ( signal . SIGTSTP , now_sleeping ) #* Ctrl-Z
signal . signal ( signal . SIGCONT , now_awake ) #* Resume
signal . signal ( signal . SIGINT , quit_sigint ) #* Ctrl-C
signal . signal ( signal . SIGWINCH , Term . refresh ) #* Terminal resized
except Exception as e :
2020-07-11 22:54:52 +00:00
Init . fail ( e )
2020-07-09 00:56:03 +00:00
else :
2020-07-11 22:54:52 +00:00
Init . success ( )
2020-07-09 00:56:03 +00:00
#? Start a separate thread for reading keyboard input
2020-07-11 22:54:52 +00:00
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Fx . trans ( " Starting input reader thread... " ) } { Mv . save ( ) } ' )
2020-07-09 00:56:03 +00:00
try :
Key . start ( )
except Exception as e :
2020-07-11 22:54:52 +00:00
Init . fail ( e )
2020-07-09 00:56:03 +00:00
else :
2020-07-11 22:54:52 +00:00
Init . success ( )
2020-07-09 00:56:03 +00:00
#? Start a separate thread for data collection and drawing
2020-07-11 22:54:52 +00:00
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Fx . trans ( " Starting data collection and drawer thread... " ) } { Mv . save ( ) } ' )
2020-07-09 00:56:03 +00:00
try :
Collector . start ( )
except Exception as e :
2020-07-11 22:54:52 +00:00
Init . fail ( e )
2020-07-09 00:56:03 +00:00
else :
2020-07-11 22:54:52 +00:00
Init . success ( )
2020-07-09 00:56:03 +00:00
#? Collect data and draw to buffer
2020-07-11 22:54:52 +00:00
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Fx . trans ( " Collecting data and drawing... " ) } { Mv . save ( ) } ' )
2020-07-09 00:56:03 +00:00
try :
#Collector.collect(draw_now=False)
pass
except Exception as e :
2020-07-11 22:54:52 +00:00
Init . fail ( e )
2020-07-09 00:56:03 +00:00
else :
2020-07-11 22:54:52 +00:00
Init . success ( )
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Fx . trans ( " Collecting nuclear launch codes... " ) } { Mv . save ( ) } ' )
Init . success ( )
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Fx . trans ( " Launching missiles... " ) } { Mv . save ( ) } ' )
Init . success ( )
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Fx . trans ( " Alien invasion... " ) } { Mv . save ( ) } ' )
Init . success ( )
2020-07-09 00:56:03 +00:00
#? Draw to screen
2020-07-11 22:54:52 +00:00
Draw . buffer ( " +init! " , f ' { Mv . restore ( ) } { Fx . trans ( " Finishing up... " ) } { Mv . save ( ) } ' )
2020-07-09 00:56:03 +00:00
try :
#Collector.collect_done.wait()
pass
except Exception as e :
2020-07-11 22:54:52 +00:00
Init . fail ( e )
2020-07-09 00:56:03 +00:00
else :
2020-07-11 22:54:52 +00:00
Init . success ( )
2020-07-09 00:56:03 +00:00
2020-07-11 22:54:52 +00:00
if not testing : Init . done ( ) #! Remove if
2020-07-09 00:56:03 +00:00
if not testing : Draw . out ( clear = True ) #! Remove if
else : Draw . clear ( ) ; Draw . now ( Term . clear ) #! Remove
Timer . stop ( " Init " )
#! For testing ------------------------------------------------------------------------------->
if testing :
try :
2020-07-11 22:54:52 +00:00
#testing_graphs()
2020-07-14 00:20:40 +00:00
testing_collectors ( )
2020-07-10 00:50:24 +00:00
#testing_humanizer()
2020-07-09 00:56:03 +00:00
# waitone(1)
#testing_keyinput()
2020-07-14 00:20:40 +00:00
#testing_banner()
2020-07-09 00:56:03 +00:00
# waitone(1)
2020-07-10 00:50:24 +00:00
#testing_colors()
2020-07-09 00:56:03 +00:00
# waitone(1)
2020-07-10 00:50:24 +00:00
#testing_gradients()
2020-07-09 00:56:03 +00:00
# waitone(1)
2020-07-10 00:50:24 +00:00
#testing_boxes()
2020-07-09 00:56:03 +00:00
# waitone(1)
2020-07-10 00:50:24 +00:00
#testing_meter()
2020-07-09 00:56:03 +00:00
# Draw.idle.clear()
#Draw.now(f'{Mv.to(Term.height - 5, 1)}Any key to exit!')
#waitone()
# Draw.idle.set()
#sleep(2)
except Exception as e :
errlog . exception ( f ' { e } ' )
clean_quit ( 1 )
clean_quit ( )
#! Remove ------------------------------------------------------------------------------------<
#? Start main loop
while not False :
try :
main ( )
except Exception as e :
errlog . exception ( f ' { e } ' )
clean_quit ( 1 )
else :
#? Quit cleanly even if false starts being true...
clean_quit ( )