mirror of https://github.com/aristocratos/bpytop
added floating_humanizer function and init progress display
parent
bd6c2ffed3
commit
aef318e37a
559
bpytop
559
bpytop
|
@ -46,10 +46,6 @@ if errors:
|
|||
print("\nInstall required modules!\n")
|
||||
quit(1)
|
||||
|
||||
from functools import partial
|
||||
|
||||
print: partial = partial(print, sep="", end="", flush=True) #* Setup print function to default to empty seperator and no new line
|
||||
|
||||
#? Constants ------------------------------------------------------------------------------------->
|
||||
|
||||
BANNER_SRC: Dict[str, str] = {
|
||||
|
@ -166,6 +162,10 @@ DEFAULT_THEME: Dict[str, str] = {
|
|||
"upload_end" : "#dcafde"
|
||||
}
|
||||
|
||||
UNITS: Dict[str, Tuple[str, ...]] = {
|
||||
"bit" : ("bit", "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", "Zib", "Yib", "Bib", "GEb"),
|
||||
"byte" : ("Byte", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB", "BiB", "GEB")
|
||||
}
|
||||
|
||||
#? Setup error logger ---------------------------------------------------------------------------->
|
||||
|
||||
|
@ -262,22 +262,22 @@ class Mv:
|
|||
def to(line: int, col: int) -> str:
|
||||
return f'\033[{line};{col}f' #* Move cursor to line, column
|
||||
@staticmethod
|
||||
def right(x: int) -> str: #* Move cursor right x columns
|
||||
def right(x: int) -> str: #* Move cursor right x columns
|
||||
return f'\033[{x}C'
|
||||
@staticmethod
|
||||
def left(x: int) -> str: #* Move cursor left x columns
|
||||
def left(x: int) -> str: #* Move cursor left x columns
|
||||
return f'\033[{x}D'
|
||||
@staticmethod
|
||||
def up(x: int) -> str: #* Move cursor up x lines
|
||||
def up(x: int) -> str: #* Move cursor up x lines
|
||||
return f'\033[{x}A'
|
||||
@staticmethod
|
||||
def down(x: int) -> str: #* Move cursor down x lines
|
||||
def down(x: int) -> str: #* Move cursor down x lines
|
||||
return f'\033[{x}B'
|
||||
@staticmethod
|
||||
def save() -> str: #* Save cursor position
|
||||
def save() -> str: #* Save cursor position
|
||||
return "\033[s"
|
||||
@staticmethod
|
||||
def restore() -> str: #* Restore saved cursor postion
|
||||
def restore() -> str: #* Restore saved cursor postion
|
||||
return "\033[u"
|
||||
t = to
|
||||
r = right
|
||||
|
@ -309,6 +309,63 @@ class Key:
|
|||
except:
|
||||
pass
|
||||
|
||||
class Draw:
|
||||
'''Holds the draw buffer and manages IO blocking queue
|
||||
* .buffer([+]name[!], *args, append=False, now=False) : Add *args to buffer
|
||||
* - Adding "+" prefix to name sets append to True and appends to name's current string
|
||||
* - Adding "!" suffix to name sets now to True and print name's current string
|
||||
* .out(clear=False) : Print all strings in buffer, clear=True clear all buffers after
|
||||
* .now(*args) : Prints all arguments as a string
|
||||
* .clear(*names) : Clear named buffers, all if no argument
|
||||
'''
|
||||
strings: Dict[str, str] = {}
|
||||
last_screen: str = ""
|
||||
idle = threading.Event()
|
||||
idle.set()
|
||||
|
||||
@classmethod
|
||||
def now(cls, *args):
|
||||
'''Wait for input reader to be idle then print to screen, if excepted by IO block, set busy flag and retry'''
|
||||
Key.idle.wait()
|
||||
try:
|
||||
print(*args, sep="", end="", flush=True)
|
||||
except BlockingIOError:
|
||||
pass
|
||||
cls.idle.clear()
|
||||
Key.idle.wait()
|
||||
print(*args, sep="", end="", flush=True)
|
||||
cls.idle.set()
|
||||
|
||||
@classmethod
|
||||
def buffer(cls, name: str, *args, append: bool = False, now: bool = False):
|
||||
string: str = ""
|
||||
if name.startswith("+"):
|
||||
name = name.lstrip("+")
|
||||
append = True
|
||||
if name.endswith("!"):
|
||||
name = name.rstrip("!")
|
||||
now = True
|
||||
if name == "": name = "_null"
|
||||
if args: string = "".join(map(str, args))
|
||||
if name not in cls.strings or not append: cls.strings[name] = ""
|
||||
cls.strings[name] += string
|
||||
if now: cls.now(string)
|
||||
|
||||
@classmethod
|
||||
def out(cls, clear = False):
|
||||
cls.last_screen = "".join(cls.strings.values())
|
||||
if clear: cls.strings = {}
|
||||
cls.now(cls.last_screen)
|
||||
|
||||
@classmethod
|
||||
def clear(cls, *names):
|
||||
if names:
|
||||
for name in names:
|
||||
if name in cls.strings:
|
||||
del cls.strings["name"]
|
||||
else:
|
||||
cls.strings = {}
|
||||
|
||||
class Color:
|
||||
'''Holds representations for a 24-bit color
|
||||
__init__(color, depth="fg", default=False)
|
||||
|
@ -469,7 +526,7 @@ class Theme:
|
|||
self.gradient[name] += [c]
|
||||
#* Set terminal colors
|
||||
Term.fg, Term.bg = self.main_fg, self.main_bg
|
||||
print(self.main_fg, self.main_bg)
|
||||
Draw.now(self.main_fg, self.main_bg)
|
||||
|
||||
class Banner:
|
||||
'''Holds the bpytop banner, .draw(line=, [col=0], [center=False], [now=False])'''
|
||||
|
@ -500,62 +557,9 @@ class Banner:
|
|||
for n, o in enumerate(cls.out):
|
||||
out += f'{Mv.to(line + n, col)}{o}'
|
||||
out += f'{Term.fg}'
|
||||
if now: print(out)
|
||||
if now: Draw.out(out)
|
||||
else: return out
|
||||
|
||||
class Draw:
|
||||
'''Holds the draw buffer and manages IO blocking queue
|
||||
* .buffer([+]name[!], *args, append=False, now=False) : Add *args to buffer
|
||||
* - Adding "+" prefix to name sets append to True and appends to name's current string
|
||||
* - Adding "!" suffix to name sets now to True and print name's current string
|
||||
* .out(clear=False) : Print all strings in buffer, clear=True clear all buffers after
|
||||
* .now(*args) : Prints all arguments as a string
|
||||
'''
|
||||
strings: Dict[str, str] = {}
|
||||
last_screen: str = ""
|
||||
idle = threading.Event()
|
||||
idle.set()
|
||||
|
||||
@classmethod
|
||||
def now(cls, *args):
|
||||
'''Print to screen'''
|
||||
Key.idle.wait()
|
||||
try:
|
||||
print(*args)
|
||||
except BlockingIOError:
|
||||
#pass
|
||||
cls.idle.clear()
|
||||
Key.idle.wait()
|
||||
print(*args)
|
||||
cls.idle.set()
|
||||
print("Error!")
|
||||
|
||||
@classmethod
|
||||
def buffer(cls, name: str, *args, append: bool = False, now: bool = False):
|
||||
string: str = ""
|
||||
if name.startswith("+"):
|
||||
name = name.lstrip("+")
|
||||
append = True
|
||||
if name.endswith("!"):
|
||||
name = name.rstrip("!")
|
||||
now = True
|
||||
if name == "": name = "_null"
|
||||
if args: string = "".join(map(str, args))
|
||||
if name not in cls.strings or not append: cls.strings[name] = ""
|
||||
cls.strings[name] += string
|
||||
if now: cls.now(string)
|
||||
|
||||
@classmethod
|
||||
def out(cls, clear = False):
|
||||
cls.last_screen = "".join(cls.strings.values())
|
||||
if clear: cls.strings = {}
|
||||
cls.now(cls.last_screen)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class Symbol:
|
||||
h_line: str = "─"
|
||||
v_line: str = "│"
|
||||
|
@ -582,6 +586,8 @@ class Symbol:
|
|||
4.0 : "⡇", 4.1 : "⡏", 4.2 : "⡟", 4.3 : "⡿", 4.4 : "⣿"
|
||||
}
|
||||
meter: str = "■"
|
||||
ok: str = f'{Color.fg("#30ff50")}√{Color.fg("#cc")}'
|
||||
fail: str = f'{Color.fg("#ff3050")}!{Color.fg("#cc")}'
|
||||
|
||||
class Graphs:
|
||||
'''Holds all graph objects and dicts for dynamically created graphs'''
|
||||
|
@ -667,7 +673,62 @@ class Config:
|
|||
if name not in ["_Config__initialized", "recreate", "changed"]:
|
||||
self.conf_dict[name] = value
|
||||
|
||||
class Meter:
|
||||
'''Creates a percentage meter
|
||||
__init__(value, width, color_gradient) to create new meter
|
||||
__call__(value) to set value and return meter as a string
|
||||
__str__ returns last set meter as a string
|
||||
'''
|
||||
out: str = ""
|
||||
color_gradient: List[str]
|
||||
width: int
|
||||
saved: Dict[int, str] = {}
|
||||
|
||||
def __init__(self, value: int, width: int, color_gradient: List[str]):
|
||||
self.color_gradient = color_gradient
|
||||
self.width = width
|
||||
self.out = self._create(value, width, color_gradient)
|
||||
self.saved[value] = self.out
|
||||
|
||||
def __call__(self, value: int):
|
||||
if value in self.saved.keys():
|
||||
self.out = self.saved[value]
|
||||
else:
|
||||
self.out = self._create(value, self.width, self.color_gradient)
|
||||
self.saved[value] = self.out
|
||||
return self.out
|
||||
|
||||
def __str__(self):
|
||||
return self.out
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.out)
|
||||
|
||||
@staticmethod
|
||||
def _create(value: int, width: int, color_gradient: List[str], add: bool = False):
|
||||
if value > 100: value = 100
|
||||
elif value < 0: value = 100
|
||||
out: str = ""
|
||||
for i in range(1, width + 1):
|
||||
if value >= round(i * 100 / width):
|
||||
out += f'{color_gradient[round(i * 100 / width)]}{Symbol.meter}'
|
||||
else:
|
||||
out += theme.inactive_fg(Symbol.meter * (width + 1 - i))
|
||||
break
|
||||
else:
|
||||
out += f'{Term.fg}'
|
||||
return out
|
||||
|
||||
class Meters:
|
||||
cpu: Meter
|
||||
mem_used: Meter
|
||||
mem_available: Meter
|
||||
mem_cached: Meter
|
||||
mem_free: Meter
|
||||
swap_used: Meter
|
||||
swap_free: Meter
|
||||
disks_used: Meter
|
||||
disks_free: Meter
|
||||
|
||||
|
||||
#? Functions ------------------------------------------------------------------------------------->
|
||||
|
@ -785,16 +846,15 @@ def get_key():
|
|||
clean_key: str = ""
|
||||
try:
|
||||
while not Key.stopping:
|
||||
#Draw.idle.wait()
|
||||
with Raw(sys.stdin): #* Set raw mode
|
||||
#with Nonblocking(sys.stdin): #* Set nonblocking mode
|
||||
with Raw(sys.stdin):
|
||||
if not select([sys.stdin], [], [], 0.1)[0]:
|
||||
continue
|
||||
Key.idle.clear()
|
||||
try:
|
||||
input_key += sys.stdin.read(1)
|
||||
if input_key == "\033":
|
||||
with Nonblocking(sys.stdin): #* Set nonblocking mode
|
||||
Draw.idle.wait()
|
||||
with Nonblocking(sys.stdin):
|
||||
input_key += sys.stdin.read(3)
|
||||
except Exception as e:
|
||||
errlog.exception(f'{e}')
|
||||
|
@ -803,7 +863,7 @@ def get_key():
|
|||
elif input_key.startswith("\n"): clean_key = "enter"
|
||||
elif input_key.startswith("\x7f") or input_key.startswith("\x08"): clean_key = "backspace"
|
||||
|
||||
elif input_key.isalnum(): clean_key = input_key
|
||||
elif len(input_key) == 1: clean_key = input_key
|
||||
else: errlog.info(f'Pressed key: {repr(input_key)}')
|
||||
|
||||
if clean_key:
|
||||
|
@ -811,37 +871,14 @@ def get_key():
|
|||
clean_key = ""
|
||||
Key.new.set() #* Set threading event to interrupt main thread sleep
|
||||
input_key = ""
|
||||
Draw.idle.wait()
|
||||
with Nonblocking(sys.stdin):
|
||||
sys.stdin.read(10) #* Clear stdin
|
||||
sys.stdin.read(10) #* Clear stdin
|
||||
Key.idle.set()
|
||||
except Exception as e:
|
||||
errlog.exception(f'{e}')
|
||||
clean_quit(1)
|
||||
|
||||
|
||||
def now_sleeping(signum, frame):
|
||||
"""Reset terminal settings and stop background input read before putting to sleep"""
|
||||
Key.stop()
|
||||
print(Term.clear, Term.normal_screen, Term.show_cursor)
|
||||
os.kill(os.getpid(), signal.SIGSTOP)
|
||||
|
||||
def now_awake(signum, frame):
|
||||
"""Set terminal settings and restart background input read"""
|
||||
print(Term.alt_screen, Term.clear, Term.hide_cursor)
|
||||
Key.start()
|
||||
|
||||
def quit_sigint(signum, frame):
|
||||
"""SIGINT redirection to clean_quit()"""
|
||||
clean_quit()
|
||||
|
||||
def clean_quit(errcode: int = 0):
|
||||
"""Reset terminal settings, save settings to config and stop background input read before quitting"""
|
||||
Key.stop()
|
||||
save_config(CONFIG_FILE, config)
|
||||
#print(Term.clear, Term.normal_screen, Term.show_cursor)
|
||||
Term.echo(True)
|
||||
raise SystemExit(errcode)
|
||||
|
||||
def calc_sizes():
|
||||
'''Calculate sizes of boxes'''
|
||||
|
||||
|
@ -885,7 +922,7 @@ def calc_sizes():
|
|||
net.box_x = net.width - net.box_width - 2
|
||||
net.box_y = net.y + ((net.height - 2) // 2) - round(net.box_height / 2)
|
||||
|
||||
def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", line_color: Color = None, fill: bool = False, box_object: object = None):
|
||||
def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", title2: str = "", line_color: Color = None, fill: bool = True, box_object: object = None):
|
||||
'''Create a box from a box object or by given arguments'''
|
||||
out: str = f'{Term.fg}{Term.bg}'
|
||||
if not line_color: line_color = theme.div_line
|
||||
|
@ -902,19 +939,16 @@ def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: s
|
|||
|
||||
#* Fill box if enabled
|
||||
if fill:
|
||||
i: int
|
||||
for i in range(y + 1, y + height):
|
||||
out += f'{Mv.to(i, x)}{" " * (width - 1)}'
|
||||
|
||||
out += f'{line_color}'
|
||||
|
||||
#* Draw all horizontal lines
|
||||
hpos: int
|
||||
for hpos in hlines:
|
||||
out += f'{Mv.to(hpos, x)}{Symbol.h_line * width}'
|
||||
|
||||
#* Draw all vertical lines
|
||||
vpos: int
|
||||
for vpos in vlines:
|
||||
for hpos in range(y, y + height):
|
||||
out += f'{Mv.to(hpos, vpos)}{Symbol.v_line}'
|
||||
|
@ -925,22 +959,23 @@ def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: s
|
|||
{Mv.to(y + height, x)}{Symbol.left_down}\
|
||||
{Mv.to(y + height, x + width)}{Symbol.right_down}'
|
||||
|
||||
#* Draw title if enabled
|
||||
#* Draw titles if enabled
|
||||
if title:
|
||||
out += f'{Mv.to(y, x + 2)}{Symbol.title_left}{theme.title}{Fx.b}{title}{Fx.ub}{line_color}{Symbol.title_right}'
|
||||
if title2:
|
||||
out += f'{Mv.to(y + height, x + 2)}{Symbol.title_left}{theme.title}{Fx.b}{title2}{Fx.ub}{line_color}{Symbol.title_right}'
|
||||
|
||||
return out
|
||||
return f'{out}{Term.fg}'
|
||||
|
||||
def draw_bg(now: bool = True):
|
||||
'''Draw all boxes to buffer and print to screen if now=True'''
|
||||
|
||||
#* Draw cpu box and cpu sub box
|
||||
cpu_box = f'{create_box(box_object=cpu, line_color=theme.cpu_box, fill=True)}\
|
||||
cpu_box = f'{create_box(box_object=cpu, line_color=theme.cpu_box)}\
|
||||
{Mv.to(cpu.y, cpu.x + 10)}{theme.cpu_box(Symbol.title_left)}{Fx.b}{theme.hi_fg("m")}{theme.title("enu")}{Fx.ub}{theme.cpu_box(Symbol.title_right)}\
|
||||
{create_box(x=cpu.box_x, y=cpu.box_y, width=cpu.box_width, height=cpu.box_height, line_color=theme.div_line, title=CPU_NAME[:18 if config.check_temp else 9])}'
|
||||
{create_box(x=cpu.box_x, y=cpu.box_y, width=cpu.box_width, height=cpu.box_height, line_color=theme.div_line, fill=False, title=CPU_NAME[:18 if config.check_temp else 9])}'
|
||||
|
||||
#* Draw mem/disk box and divider
|
||||
mem_box = f'{create_box(box_object=mem, line_color=theme.mem_box, fill=True)}\
|
||||
mem_box = f'{create_box(box_object=mem, line_color=theme.mem_box)}\
|
||||
{Mv.to(mem.y, mem.divider + 2)}{theme.mem_box(Symbol.title_left)}{Fx.b}{theme.title("disks")}{Fx.ub}{theme.mem_box(Symbol.title_right)}\
|
||||
{Mv.to(mem.y, mem.divider)}{theme.mem_box(Symbol.div_up)}\
|
||||
{Mv.to(mem.y + mem.height, mem.divider)}{theme.mem_box(Symbol.div_down)}{theme.div_line}'
|
||||
|
@ -948,116 +983,91 @@ def draw_bg(now: bool = True):
|
|||
mem_box += f'{Mv.to(mem.y + i, mem.divider)}{Symbol.v_line}'
|
||||
|
||||
#* Draw net box and net sub box
|
||||
net_box = f'{create_box(box_object=net, line_color=theme.net_box, fill=True)}\
|
||||
{create_box(x=net.box_x, y=net.box_y, width=net.box_width, height=net.box_height, line_color=theme.div_line, title="Download")}\
|
||||
{Mv.to(net.box_y + net.box_height, net.box_x + 1)}{theme.div_line(Symbol.title_left)}{Fx.b}{theme.title("Upload")}{Fx.ub}{theme.div_line(Symbol.title_right)}'
|
||||
net_box = f'{create_box(box_object=net, line_color=theme.net_box)}\
|
||||
{create_box(x=net.box_x, y=net.box_y, width=net.box_width, height=net.box_height, line_color=theme.div_line, fill=False, title="Download", title2="Upload")}'
|
||||
|
||||
#* Draw proc box
|
||||
proc_box = create_box(box_object=proc, line_color=theme.proc_box, fill=True)
|
||||
proc_box = create_box(box_object=proc, line_color=theme.proc_box)
|
||||
|
||||
Draw.buffer("bg", cpu_box, mem_box, net_box, proc_box, Term.fg)
|
||||
Draw.buffer("bg!" if now else "bg", cpu_box, mem_box, net_box, proc_box)
|
||||
|
||||
#? Function dependent classes -------------------------------------------------------------------->
|
||||
def now_sleeping(signum, frame):
|
||||
"""Reset terminal settings and stop background input read before putting to sleep"""
|
||||
Key.stop()
|
||||
Draw.now(Term.clear, Term.normal_screen, Term.show_cursor)
|
||||
Term.echo(True)
|
||||
os.kill(os.getpid(), signal.SIGSTOP)
|
||||
|
||||
class Meter:
|
||||
'''Creates a percentage meter
|
||||
__init__(value, width, color_gradient) to create new meter
|
||||
__call__(value) to set value and return meter as a string
|
||||
__str__ returns last set meter as a string
|
||||
def now_awake(signum, frame):
|
||||
"""Set terminal settings and restart background input read"""
|
||||
Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor)
|
||||
Term.echo(False)
|
||||
Key.start()
|
||||
|
||||
def quit_sigint(signum, frame):
|
||||
"""SIGINT redirection to clean_quit()"""
|
||||
clean_quit()
|
||||
|
||||
def clean_quit(errcode: int = 0, errmsg: str = ""):
|
||||
"""Stop background input read, save current config and reset terminal settings before quitting"""
|
||||
Key.stop()
|
||||
if not errcode: save_config(CONFIG_FILE, config)
|
||||
if not testing: Draw.now(Term.clear, Term.normal_screen, Term.show_cursor) #! Remove if
|
||||
Term.echo(True)
|
||||
if errmsg: print(errmsg)
|
||||
raise SystemExit(errcode)
|
||||
|
||||
def floating_humanizer(value: Union[float, int], bit: bool = False, per_second: bool = False, start: int = 0, short: bool = False) -> str:
|
||||
'''Scales up in steps of 1024 to highest possible unit and returns string with unit suffixed
|
||||
* Defaults to bytes unless bit=True
|
||||
* start=int to set 1024 multiplier starting unit
|
||||
* short=True always returns 0 decimals and shortens unit to 1 character
|
||||
'''
|
||||
out: str = ""
|
||||
color_gradient: List[str]
|
||||
width: int
|
||||
saved: Dict[int, str] = {}
|
||||
unit: Tuple[str, ...] = UNITS["bit"] if bit else UNITS["byte"]
|
||||
selector: int = start if start else 0
|
||||
mult: int = 8 if bit else 1
|
||||
|
||||
def __init__(self, value: int, width: int, color_gradient: List[str]):
|
||||
self.color_gradient = color_gradient
|
||||
self.width = width
|
||||
self.out = self._create(value, width, color_gradient)
|
||||
self.saved[value] = self.out
|
||||
if isinstance(value, float): value = round(value)
|
||||
if value > 0: value = value * 100 * mult
|
||||
|
||||
def __call__(self, value: int):
|
||||
if value in self.saved.keys():
|
||||
self.out = self.saved[value]
|
||||
else:
|
||||
self.out = self._create(value, self.width, self.color_gradient)
|
||||
self.saved[value] = self.out
|
||||
return self.out
|
||||
while len(f'{value}') > 5:
|
||||
value >>= 10
|
||||
if value < 100: value = 100
|
||||
selector += 1
|
||||
|
||||
def __str__(self):
|
||||
return self.out
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.out)
|
||||
|
||||
@staticmethod
|
||||
def _create(value: int, width: int, color_gradient: List[str], add: bool = False):
|
||||
if value > 100: value = 100
|
||||
elif value < 0: value = 100
|
||||
out: str = ""
|
||||
for i in range(1, width + 1):
|
||||
if value >= round(i * 100 / width):
|
||||
out += f'{color_gradient[round(i * 100 / width)]}{Symbol.meter}'
|
||||
else:
|
||||
out += theme.inactive_fg(Symbol.meter * (width + 1 - i))
|
||||
break
|
||||
else:
|
||||
out += f'{Term.fg}'
|
||||
return out
|
||||
|
||||
|
||||
class Meters:
|
||||
cpu: Meter
|
||||
mem_used: Meter
|
||||
mem_available: Meter
|
||||
mem_cached: Meter
|
||||
mem_free: Meter
|
||||
swap_used: Meter
|
||||
swap_free: Meter
|
||||
disks_used: Meter
|
||||
disks_free: Meter
|
||||
if len(f'{value}') < 5 and len(f'{value}') >= 2 and selector > 0:
|
||||
decimals = 5 - len(f'{value}')
|
||||
out = f'{value}'[:-2] + "." + f'{value}'[-decimals:]
|
||||
elif len(f'{value}') >= 2:
|
||||
out = f'{value}'[:-2]
|
||||
|
||||
if short: out = out.split(".")[0]
|
||||
out += f'{"" if short else " "}{unit[selector][0] if short else unit[selector]}'
|
||||
if per_second: out += "/s" if bit else "ps"
|
||||
|
||||
return out
|
||||
|
||||
|
||||
#? Main function --------------------------------------------------------------------------------->
|
||||
|
||||
def main():
|
||||
pass
|
||||
clean_quit()
|
||||
|
||||
|
||||
#? Init ------------------------------------------------------------------------------------------>
|
||||
#? Pre main -------------------------------------------------------------------------------------->
|
||||
|
||||
print(Term.alt_screen, Term.clear, Term.hide_cursor)
|
||||
Term.echo(False)
|
||||
|
||||
signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C
|
||||
|
||||
Key.start()
|
||||
|
||||
CPU_NAME: str = get_cpu_name()
|
||||
|
||||
config: Config = Config(load_config(CONFIG_FILE))
|
||||
|
||||
config.proc_per_core = True
|
||||
|
||||
#config.color_theme = "solarized_dark"
|
||||
|
||||
theme: Theme = Theme(load_theme(config.color_theme))
|
||||
|
||||
cpu = Box("cpu", height_p=32, width_p=100)
|
||||
mem = Box("mem", height_p=40, width_p=45)
|
||||
net = Box("net", height_p=28, width_p=mem.width_p)
|
||||
proc = Box("proc", height_p=100 - cpu.height_p, width_p=100 - mem.width_p)
|
||||
|
||||
boxes: List[object] = [cpu, mem, net, proc]
|
||||
|
||||
blue = theme.temp_start
|
||||
lime = theme.cached_mid
|
||||
orange = theme.available_end
|
||||
green = theme.cpu_start
|
||||
dfg = theme.main_fg
|
||||
testing = True #! Remove
|
||||
|
||||
#! For testing ------------------------------------------------------------------------------->
|
||||
def testing_humanizer():
|
||||
for i in range(1, 101, 3):
|
||||
for n in range(5):
|
||||
Draw.now(floating_humanizer(i + 23 * n * (i * i + 14) << n * n, bit=False, per_second=False, short=False), " ")
|
||||
Draw.now("\n")
|
||||
|
||||
def testing_colors():
|
||||
for item, _ in DEFAULT_THEME.items():
|
||||
|
@ -1068,17 +1078,16 @@ def testing_colors():
|
|||
def testing_boxes():
|
||||
calc_sizes()
|
||||
draw_bg()
|
||||
Draw.out()
|
||||
print(Mv.to(35, 1))
|
||||
Draw.now(Mv.to(35, 1))
|
||||
|
||||
def testing_banner():
|
||||
Draw.buffer("banner", Banner.draw(18, center=True))
|
||||
Draw.out()
|
||||
|
||||
print(Mv.to(35, 1))
|
||||
Draw.now(Mv.to(35, 1))
|
||||
|
||||
def testing_meter():
|
||||
Draw.buffer("meters")
|
||||
Draw.clear("meters")
|
||||
for _ in range(10):
|
||||
Draw.buffer("+meters", "1234567890")
|
||||
Draw.buffer("+meters", "\n")
|
||||
|
@ -1096,71 +1105,151 @@ def testing_keyinput():
|
|||
count: int = 0
|
||||
while True:
|
||||
count += 1
|
||||
Draw.buffer("!", f'{Mv.to(1,1)}{Fx.b}{blue("Count:")} {count} {lime("Time:")} {time.strftime("%H:%M:%S", time.localtime())}',
|
||||
Draw.now(f'{Mv.to(1,1)}{Fx.b}{theme.temp_start("Count:")} {count} {theme.cached_mid("Time:")} {time.strftime("%H:%M:%S", time.localtime())}',
|
||||
f'{Color.fg("#ff")} Width: {Term.width} Height: {Term.height} Resized: {Term.resized}')
|
||||
while Key.list:
|
||||
Key.new.clear()
|
||||
this_key = Key.list.pop()
|
||||
Draw.buffer("!", f'{Mv.to(2,1)}{Color.fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key)}{" " * 40}')
|
||||
Draw.now(f'{Mv.to(2,1)}{Color.fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key):14}{" "}')
|
||||
if this_key == "backspace":
|
||||
line = line[:-1]
|
||||
elif this_key == "enter":
|
||||
line += "\n"
|
||||
else:
|
||||
line += this_key
|
||||
Draw.buffer("!", f'{Mv.to(3,1)}{Color.fg("#90ff50")}{Fx.b}Full line= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}')
|
||||
if this_key == "q":
|
||||
elif this_key == "escape":
|
||||
line = ""
|
||||
elif this_key == "Q":
|
||||
clean_quit()
|
||||
if this_key == "R":
|
||||
elif this_key == "R":
|
||||
raise Exception("Test ERROR")
|
||||
elif len(this_key) == 1:
|
||||
line += this_key
|
||||
Draw.now(f'{Color.fg("#90ff50")}{Fx.b}Command= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}\033[0K\n')
|
||||
if this_key == "enter":
|
||||
try:
|
||||
exec(line)
|
||||
except:
|
||||
pass
|
||||
Draw.clear()
|
||||
|
||||
if not Key.reader.is_alive():
|
||||
clean_quit(1)
|
||||
Key.new.wait(1.0)
|
||||
|
||||
try:
|
||||
#testing_keyinput()
|
||||
#testing_banner()
|
||||
#testing_colors()
|
||||
#testing_boxes()
|
||||
testing_meter()
|
||||
# Draw.idle.clear()
|
||||
# Key.idle.wait()
|
||||
# input(f'{Mv.to(Term.height - 5, 1)}Enter to exit')
|
||||
# Draw.idle.set()
|
||||
#time.sleep(2)
|
||||
except Exception as e:
|
||||
errlog.exception(f'{e}')
|
||||
clean_quit(1)
|
||||
|
||||
|
||||
clean_quit()
|
||||
#! Remove ------------------------------------------------------------------------------------<
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
#? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH
|
||||
signal.signal(signal.SIGTSTP, now_sleeping) #* Ctrl-Z
|
||||
signal.signal(signal.SIGCONT, now_awake) #* Resume
|
||||
signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C
|
||||
signal.signal(signal.SIGWINCH, Term.refresh) #* Terminal resized
|
||||
#? Init -------------------------------------------------------------------------------------->
|
||||
|
||||
#? Switch to alternate screen, clear screen and hide cursor
|
||||
print(Term.alt_screen, Term.clear, Term.hide_cursor)
|
||||
#? Temporary functions for init
|
||||
def _fail(err):
|
||||
Draw.now(f'{Symbol.fail}')
|
||||
errlog.exception(f'{err}')
|
||||
time.sleep(2)
|
||||
clean_quit(1, errmsg=f'Error during init! See {CONFIG_DIR}/error.log for more information.')
|
||||
def _success():
|
||||
time.sleep(0.1) #! Enable
|
||||
Draw.now(f'{Symbol.ok}\n{Mv.r(Term.width // 2 - 19)}')
|
||||
|
||||
#? Switch to alternate screen, clear screen, hide cursor and disable input echo
|
||||
Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor)
|
||||
Term.echo(False)
|
||||
|
||||
#? Draw banner and init status
|
||||
Draw.now(Banner.draw(Term.height // 2 - 10, center=True), "\n")
|
||||
Draw.now(Color.fg("#50"))
|
||||
for _i in range(10):
|
||||
Draw.now(f'{Mv.to(Term.height // 2 - 3 + _i, Term.width // 2 - 25)}{str((_i + 1) * 10) + "%":>5}{Symbol.v_line}')
|
||||
Draw.now(f'{Color.fg("#cc")}{Fx.b}{Mv.to(Term.height // 2 - 3, Term.width // 2 - 18)}')
|
||||
|
||||
#? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH
|
||||
Draw.now("Setting up signal handlers... ")
|
||||
try:
|
||||
signal.signal(signal.SIGTSTP, now_sleeping) #* Ctrl-Z
|
||||
signal.signal(signal.SIGCONT, now_awake) #* Resume
|
||||
signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C
|
||||
signal.signal(signal.SIGWINCH, Term.refresh) #* Terminal resized
|
||||
except Exception as e:
|
||||
_fail(e)
|
||||
else:
|
||||
_success()
|
||||
|
||||
|
||||
#? Load config
|
||||
Draw.now("Loading configuration... ")
|
||||
try:
|
||||
config: Config = Config(load_config(CONFIG_FILE))
|
||||
except Exception as e:
|
||||
_fail(e)
|
||||
else:
|
||||
_success()
|
||||
|
||||
#? Load theme
|
||||
Draw.now("Loading theme and creating colors... ")
|
||||
try:
|
||||
theme: Theme = Theme(load_theme(config.color_theme))
|
||||
except Exception as e:
|
||||
_fail(e)
|
||||
else:
|
||||
_success()
|
||||
|
||||
#? Setup boxes and calculate sizes
|
||||
Draw.now("Doing some maths and drawing... ")
|
||||
try:
|
||||
cpu = Box("cpu", height_p=32, width_p=100)
|
||||
mem = Box("mem", height_p=40, width_p=45)
|
||||
net = Box("net", height_p=28, width_p=mem.width_p)
|
||||
proc = Box("proc", height_p=100 - cpu.height_p, width_p=100 - mem.width_p)
|
||||
boxes: List[object] = [cpu, mem, net, proc]
|
||||
calc_sizes()
|
||||
draw_bg(now=False)
|
||||
except Exception as e:
|
||||
_fail(e)
|
||||
else:
|
||||
_success()
|
||||
|
||||
#? Start a separate thread for reading keyboard input
|
||||
Draw.now("Starting input reader thread... ")
|
||||
try:
|
||||
Key.start()
|
||||
except Exception as e:
|
||||
_fail(e)
|
||||
else:
|
||||
_success()
|
||||
|
||||
#! Change this block ->
|
||||
time.sleep(1)
|
||||
del _fail, _success
|
||||
Draw.now(Term.clear)
|
||||
#! <-
|
||||
|
||||
#! For testing ------------------------------------------------------------------------------->
|
||||
testing = True
|
||||
if testing:
|
||||
try:
|
||||
pass
|
||||
#testing_humanizer()
|
||||
#testing_keyinput()
|
||||
#testing_banner()
|
||||
#testing_colors()
|
||||
testing_boxes()
|
||||
#testing_meter()
|
||||
# Draw.idle.clear()
|
||||
# Key.idle.wait()
|
||||
# input(f'{Mv.to(Term.height - 5, 1)}Enter to exit')
|
||||
# Draw.idle.set()
|
||||
#time.sleep(2)
|
||||
except Exception as e:
|
||||
errlog.exception(f'{e}')
|
||||
clean_quit(1)
|
||||
|
||||
while True:
|
||||
clean_quit()
|
||||
#! Remove ------------------------------------------------------------------------------------<
|
||||
|
||||
#? Start main loop
|
||||
while not False:
|
||||
try:
|
||||
main()
|
||||
except Exception as e:
|
||||
errlog.exception(f'{e}')
|
||||
clean_quit(1)
|
||||
|
||||
|
||||
|
||||
|
||||
clean_quit()
|
||||
else:
|
||||
#? Quit cleanly even if false starts being true...
|
||||
clean_quit()
|
||||
|
|
Loading…
Reference in New Issue