From aef318e37a4c2d383d1442d988d2cc9404a0a8bf Mon Sep 17 00:00:00 2001 From: aristocratos Date: Sat, 4 Jul 2020 01:04:09 +0200 Subject: [PATCH] added floating_humanizer function and init progress display --- bpytop | 559 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 324 insertions(+), 235 deletions(-) diff --git a/bpytop b/bpytop index b959b12..a6f3c10 100755 --- a/bpytop +++ b/bpytop @@ -46,10 +46,6 @@ if errors: print("\nInstall required modules!\n") quit(1) -from functools import partial - -print: partial = partial(print, sep="", end="", flush=True) #* Setup print function to default to empty seperator and no new line - #? Constants -------------------------------------------------------------------------------------> BANNER_SRC: Dict[str, str] = { @@ -166,6 +162,10 @@ DEFAULT_THEME: Dict[str, str] = { "upload_end" : "#dcafde" } +UNITS: Dict[str, Tuple[str, ...]] = { + "bit" : ("bit", "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", "Zib", "Yib", "Bib", "GEb"), + "byte" : ("Byte", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB", "BiB", "GEB") +} #? Setup error logger ----------------------------------------------------------------------------> @@ -262,22 +262,22 @@ class Mv: def to(line: int, col: int) -> str: return f'\033[{line};{col}f' #* Move cursor to line, column @staticmethod - def right(x: int) -> str: #* Move cursor right x columns + def right(x: int) -> str: #* Move cursor right x columns return f'\033[{x}C' @staticmethod - def left(x: int) -> str: #* Move cursor left x columns + def left(x: int) -> str: #* Move cursor left x columns return f'\033[{x}D' @staticmethod - def up(x: int) -> str: #* Move cursor up x lines + def up(x: int) -> str: #* Move cursor up x lines return f'\033[{x}A' @staticmethod - def down(x: int) -> str: #* Move cursor down x lines + def down(x: int) -> str: #* Move cursor down x lines return f'\033[{x}B' @staticmethod - def save() -> str: #* Save cursor position + def save() -> str: #* Save cursor position return "\033[s" @staticmethod - def restore() -> str: #* Restore saved cursor postion + def restore() -> str: #* Restore saved cursor postion return "\033[u" t = to r = right @@ -309,6 +309,63 @@ class Key: except: pass +class Draw: + '''Holds the draw buffer and manages IO blocking queue + * .buffer([+]name[!], *args, append=False, now=False) : Add *args to buffer + * - Adding "+" prefix to name sets append to True and appends to name's current string + * - Adding "!" suffix to name sets now to True and print name's current string + * .out(clear=False) : Print all strings in buffer, clear=True clear all buffers after + * .now(*args) : Prints all arguments as a string + * .clear(*names) : Clear named buffers, all if no argument + ''' + strings: Dict[str, str] = {} + last_screen: str = "" + idle = threading.Event() + idle.set() + + @classmethod + def now(cls, *args): + '''Wait for input reader to be idle then print to screen, if excepted by IO block, set busy flag and retry''' + Key.idle.wait() + try: + print(*args, sep="", end="", flush=True) + except BlockingIOError: + pass + cls.idle.clear() + Key.idle.wait() + print(*args, sep="", end="", flush=True) + cls.idle.set() + + @classmethod + def buffer(cls, name: str, *args, append: bool = False, now: bool = False): + string: str = "" + if name.startswith("+"): + name = name.lstrip("+") + append = True + if name.endswith("!"): + name = name.rstrip("!") + now = True + if name == "": name = "_null" + if args: string = "".join(map(str, args)) + if name not in cls.strings or not append: cls.strings[name] = "" + cls.strings[name] += string + if now: cls.now(string) + + @classmethod + def out(cls, clear = False): + cls.last_screen = "".join(cls.strings.values()) + if clear: cls.strings = {} + cls.now(cls.last_screen) + + @classmethod + def clear(cls, *names): + if names: + for name in names: + if name in cls.strings: + del cls.strings["name"] + else: + cls.strings = {} + class Color: '''Holds representations for a 24-bit color __init__(color, depth="fg", default=False) @@ -469,7 +526,7 @@ class Theme: self.gradient[name] += [c] #* Set terminal colors Term.fg, Term.bg = self.main_fg, self.main_bg - print(self.main_fg, self.main_bg) + Draw.now(self.main_fg, self.main_bg) class Banner: '''Holds the bpytop banner, .draw(line=, [col=0], [center=False], [now=False])''' @@ -500,62 +557,9 @@ class Banner: for n, o in enumerate(cls.out): out += f'{Mv.to(line + n, col)}{o}' out += f'{Term.fg}' - if now: print(out) + if now: Draw.out(out) else: return out -class Draw: - '''Holds the draw buffer and manages IO blocking queue - * .buffer([+]name[!], *args, append=False, now=False) : Add *args to buffer - * - Adding "+" prefix to name sets append to True and appends to name's current string - * - Adding "!" suffix to name sets now to True and print name's current string - * .out(clear=False) : Print all strings in buffer, clear=True clear all buffers after - * .now(*args) : Prints all arguments as a string - ''' - strings: Dict[str, str] = {} - last_screen: str = "" - idle = threading.Event() - idle.set() - - @classmethod - def now(cls, *args): - '''Print to screen''' - Key.idle.wait() - try: - print(*args) - except BlockingIOError: - #pass - cls.idle.clear() - Key.idle.wait() - print(*args) - cls.idle.set() - print("Error!") - - @classmethod - def buffer(cls, name: str, *args, append: bool = False, now: bool = False): - string: str = "" - if name.startswith("+"): - name = name.lstrip("+") - append = True - if name.endswith("!"): - name = name.rstrip("!") - now = True - if name == "": name = "_null" - if args: string = "".join(map(str, args)) - if name not in cls.strings or not append: cls.strings[name] = "" - cls.strings[name] += string - if now: cls.now(string) - - @classmethod - def out(cls, clear = False): - cls.last_screen = "".join(cls.strings.values()) - if clear: cls.strings = {} - cls.now(cls.last_screen) - - - - - - class Symbol: h_line: str = "─" v_line: str = "│" @@ -582,6 +586,8 @@ class Symbol: 4.0 : "⡇", 4.1 : "⡏", 4.2 : "⡟", 4.3 : "⡿", 4.4 : "⣿" } meter: str = "■" + ok: str = f'{Color.fg("#30ff50")}√{Color.fg("#cc")}' + fail: str = f'{Color.fg("#ff3050")}!{Color.fg("#cc")}' class Graphs: '''Holds all graph objects and dicts for dynamically created graphs''' @@ -667,7 +673,62 @@ class Config: if name not in ["_Config__initialized", "recreate", "changed"]: self.conf_dict[name] = value +class Meter: + '''Creates a percentage meter + __init__(value, width, color_gradient) to create new meter + __call__(value) to set value and return meter as a string + __str__ returns last set meter as a string + ''' + out: str = "" + color_gradient: List[str] + width: int + saved: Dict[int, str] = {} + def __init__(self, value: int, width: int, color_gradient: List[str]): + self.color_gradient = color_gradient + self.width = width + self.out = self._create(value, width, color_gradient) + self.saved[value] = self.out + + def __call__(self, value: int): + if value in self.saved.keys(): + self.out = self.saved[value] + else: + self.out = self._create(value, self.width, self.color_gradient) + self.saved[value] = self.out + return self.out + + def __str__(self): + return self.out + + def __repr__(self): + return repr(self.out) + + @staticmethod + def _create(value: int, width: int, color_gradient: List[str], add: bool = False): + if value > 100: value = 100 + elif value < 0: value = 100 + out: str = "" + for i in range(1, width + 1): + if value >= round(i * 100 / width): + out += f'{color_gradient[round(i * 100 / width)]}{Symbol.meter}' + else: + out += theme.inactive_fg(Symbol.meter * (width + 1 - i)) + break + else: + out += f'{Term.fg}' + return out + +class Meters: + cpu: Meter + mem_used: Meter + mem_available: Meter + mem_cached: Meter + mem_free: Meter + swap_used: Meter + swap_free: Meter + disks_used: Meter + disks_free: Meter #? Functions -------------------------------------------------------------------------------------> @@ -785,16 +846,15 @@ def get_key(): clean_key: str = "" try: while not Key.stopping: - #Draw.idle.wait() - with Raw(sys.stdin): #* Set raw mode - #with Nonblocking(sys.stdin): #* Set nonblocking mode + with Raw(sys.stdin): if not select([sys.stdin], [], [], 0.1)[0]: continue Key.idle.clear() try: input_key += sys.stdin.read(1) if input_key == "\033": - with Nonblocking(sys.stdin): #* Set nonblocking mode + Draw.idle.wait() + with Nonblocking(sys.stdin): input_key += sys.stdin.read(3) except Exception as e: errlog.exception(f'{e}') @@ -803,7 +863,7 @@ def get_key(): elif input_key.startswith("\n"): clean_key = "enter" elif input_key.startswith("\x7f") or input_key.startswith("\x08"): clean_key = "backspace" - elif input_key.isalnum(): clean_key = input_key + elif len(input_key) == 1: clean_key = input_key else: errlog.info(f'Pressed key: {repr(input_key)}') if clean_key: @@ -811,37 +871,14 @@ def get_key(): clean_key = "" Key.new.set() #* Set threading event to interrupt main thread sleep input_key = "" + Draw.idle.wait() with Nonblocking(sys.stdin): - sys.stdin.read(10) #* Clear stdin + sys.stdin.read(10) #* Clear stdin Key.idle.set() except Exception as e: errlog.exception(f'{e}') clean_quit(1) - -def now_sleeping(signum, frame): - """Reset terminal settings and stop background input read before putting to sleep""" - Key.stop() - print(Term.clear, Term.normal_screen, Term.show_cursor) - os.kill(os.getpid(), signal.SIGSTOP) - -def now_awake(signum, frame): - """Set terminal settings and restart background input read""" - print(Term.alt_screen, Term.clear, Term.hide_cursor) - Key.start() - -def quit_sigint(signum, frame): - """SIGINT redirection to clean_quit()""" - clean_quit() - -def clean_quit(errcode: int = 0): - """Reset terminal settings, save settings to config and stop background input read before quitting""" - Key.stop() - save_config(CONFIG_FILE, config) - #print(Term.clear, Term.normal_screen, Term.show_cursor) - Term.echo(True) - raise SystemExit(errcode) - def calc_sizes(): '''Calculate sizes of boxes''' @@ -885,7 +922,7 @@ def calc_sizes(): net.box_x = net.width - net.box_width - 2 net.box_y = net.y + ((net.height - 2) // 2) - round(net.box_height / 2) -def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", line_color: Color = None, fill: bool = False, box_object: object = None): +def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", title2: str = "", line_color: Color = None, fill: bool = True, box_object: object = None): '''Create a box from a box object or by given arguments''' out: str = f'{Term.fg}{Term.bg}' if not line_color: line_color = theme.div_line @@ -902,19 +939,16 @@ def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: s #* Fill box if enabled if fill: - i: int for i in range(y + 1, y + height): out += f'{Mv.to(i, x)}{" " * (width - 1)}' out += f'{line_color}' #* Draw all horizontal lines - hpos: int for hpos in hlines: out += f'{Mv.to(hpos, x)}{Symbol.h_line * width}' #* Draw all vertical lines - vpos: int for vpos in vlines: for hpos in range(y, y + height): out += f'{Mv.to(hpos, vpos)}{Symbol.v_line}' @@ -925,22 +959,23 @@ def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: s {Mv.to(y + height, x)}{Symbol.left_down}\ {Mv.to(y + height, x + width)}{Symbol.right_down}' - #* Draw title if enabled + #* Draw titles if enabled if title: out += f'{Mv.to(y, x + 2)}{Symbol.title_left}{theme.title}{Fx.b}{title}{Fx.ub}{line_color}{Symbol.title_right}' + if title2: + out += f'{Mv.to(y + height, x + 2)}{Symbol.title_left}{theme.title}{Fx.b}{title2}{Fx.ub}{line_color}{Symbol.title_right}' - return out + return f'{out}{Term.fg}' def draw_bg(now: bool = True): '''Draw all boxes to buffer and print to screen if now=True''' - #* Draw cpu box and cpu sub box - cpu_box = f'{create_box(box_object=cpu, line_color=theme.cpu_box, fill=True)}\ + cpu_box = f'{create_box(box_object=cpu, line_color=theme.cpu_box)}\ {Mv.to(cpu.y, cpu.x + 10)}{theme.cpu_box(Symbol.title_left)}{Fx.b}{theme.hi_fg("m")}{theme.title("enu")}{Fx.ub}{theme.cpu_box(Symbol.title_right)}\ - {create_box(x=cpu.box_x, y=cpu.box_y, width=cpu.box_width, height=cpu.box_height, line_color=theme.div_line, title=CPU_NAME[:18 if config.check_temp else 9])}' + {create_box(x=cpu.box_x, y=cpu.box_y, width=cpu.box_width, height=cpu.box_height, line_color=theme.div_line, fill=False, title=CPU_NAME[:18 if config.check_temp else 9])}' #* Draw mem/disk box and divider - mem_box = f'{create_box(box_object=mem, line_color=theme.mem_box, fill=True)}\ + mem_box = f'{create_box(box_object=mem, line_color=theme.mem_box)}\ {Mv.to(mem.y, mem.divider + 2)}{theme.mem_box(Symbol.title_left)}{Fx.b}{theme.title("disks")}{Fx.ub}{theme.mem_box(Symbol.title_right)}\ {Mv.to(mem.y, mem.divider)}{theme.mem_box(Symbol.div_up)}\ {Mv.to(mem.y + mem.height, mem.divider)}{theme.mem_box(Symbol.div_down)}{theme.div_line}' @@ -948,116 +983,91 @@ def draw_bg(now: bool = True): mem_box += f'{Mv.to(mem.y + i, mem.divider)}{Symbol.v_line}' #* Draw net box and net sub box - net_box = f'{create_box(box_object=net, line_color=theme.net_box, fill=True)}\ - {create_box(x=net.box_x, y=net.box_y, width=net.box_width, height=net.box_height, line_color=theme.div_line, title="Download")}\ - {Mv.to(net.box_y + net.box_height, net.box_x + 1)}{theme.div_line(Symbol.title_left)}{Fx.b}{theme.title("Upload")}{Fx.ub}{theme.div_line(Symbol.title_right)}' + net_box = f'{create_box(box_object=net, line_color=theme.net_box)}\ + {create_box(x=net.box_x, y=net.box_y, width=net.box_width, height=net.box_height, line_color=theme.div_line, fill=False, title="Download", title2="Upload")}' #* Draw proc box - proc_box = create_box(box_object=proc, line_color=theme.proc_box, fill=True) + proc_box = create_box(box_object=proc, line_color=theme.proc_box) - Draw.buffer("bg", cpu_box, mem_box, net_box, proc_box, Term.fg) + Draw.buffer("bg!" if now else "bg", cpu_box, mem_box, net_box, proc_box) -#? Function dependent classes --------------------------------------------------------------------> +def now_sleeping(signum, frame): + """Reset terminal settings and stop background input read before putting to sleep""" + Key.stop() + Draw.now(Term.clear, Term.normal_screen, Term.show_cursor) + Term.echo(True) + os.kill(os.getpid(), signal.SIGSTOP) -class Meter: - '''Creates a percentage meter - __init__(value, width, color_gradient) to create new meter - __call__(value) to set value and return meter as a string - __str__ returns last set meter as a string +def now_awake(signum, frame): + """Set terminal settings and restart background input read""" + Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor) + Term.echo(False) + Key.start() + +def quit_sigint(signum, frame): + """SIGINT redirection to clean_quit()""" + clean_quit() + +def clean_quit(errcode: int = 0, errmsg: str = ""): + """Stop background input read, save current config and reset terminal settings before quitting""" + Key.stop() + if not errcode: save_config(CONFIG_FILE, config) + if not testing: Draw.now(Term.clear, Term.normal_screen, Term.show_cursor) #! Remove if + Term.echo(True) + if errmsg: print(errmsg) + raise SystemExit(errcode) + +def floating_humanizer(value: Union[float, int], bit: bool = False, per_second: bool = False, start: int = 0, short: bool = False) -> str: + '''Scales up in steps of 1024 to highest possible unit and returns string with unit suffixed + * Defaults to bytes unless bit=True + * start=int to set 1024 multiplier starting unit + * short=True always returns 0 decimals and shortens unit to 1 character ''' out: str = "" - color_gradient: List[str] - width: int - saved: Dict[int, str] = {} + unit: Tuple[str, ...] = UNITS["bit"] if bit else UNITS["byte"] + selector: int = start if start else 0 + mult: int = 8 if bit else 1 - def __init__(self, value: int, width: int, color_gradient: List[str]): - self.color_gradient = color_gradient - self.width = width - self.out = self._create(value, width, color_gradient) - self.saved[value] = self.out + if isinstance(value, float): value = round(value) + if value > 0: value = value * 100 * mult - def __call__(self, value: int): - if value in self.saved.keys(): - self.out = self.saved[value] - else: - self.out = self._create(value, self.width, self.color_gradient) - self.saved[value] = self.out - return self.out + while len(f'{value}') > 5: + value >>= 10 + if value < 100: value = 100 + selector += 1 - def __str__(self): - return self.out - - def __repr__(self): - return repr(self.out) - - @staticmethod - def _create(value: int, width: int, color_gradient: List[str], add: bool = False): - if value > 100: value = 100 - elif value < 0: value = 100 - out: str = "" - for i in range(1, width + 1): - if value >= round(i * 100 / width): - out += f'{color_gradient[round(i * 100 / width)]}{Symbol.meter}' - else: - out += theme.inactive_fg(Symbol.meter * (width + 1 - i)) - break - else: - out += f'{Term.fg}' - return out - - -class Meters: - cpu: Meter - mem_used: Meter - mem_available: Meter - mem_cached: Meter - mem_free: Meter - swap_used: Meter - swap_free: Meter - disks_used: Meter - disks_free: Meter + if len(f'{value}') < 5 and len(f'{value}') >= 2 and selector > 0: + decimals = 5 - len(f'{value}') + out = f'{value}'[:-2] + "." + f'{value}'[-decimals:] + elif len(f'{value}') >= 2: + out = f'{value}'[:-2] + if short: out = out.split(".")[0] + out += f'{"" if short else " "}{unit[selector][0] if short else unit[selector]}' + if per_second: out += "/s" if bit else "ps" + return out #? Main function ---------------------------------------------------------------------------------> def main(): - pass + clean_quit() -#? Init ------------------------------------------------------------------------------------------> +#? Pre main --------------------------------------------------------------------------------------> -print(Term.alt_screen, Term.clear, Term.hide_cursor) -Term.echo(False) - -signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C - -Key.start() CPU_NAME: str = get_cpu_name() -config: Config = Config(load_config(CONFIG_FILE)) - -config.proc_per_core = True - -#config.color_theme = "solarized_dark" - -theme: Theme = Theme(load_theme(config.color_theme)) - -cpu = Box("cpu", height_p=32, width_p=100) -mem = Box("mem", height_p=40, width_p=45) -net = Box("net", height_p=28, width_p=mem.width_p) -proc = Box("proc", height_p=100 - cpu.height_p, width_p=100 - mem.width_p) - -boxes: List[object] = [cpu, mem, net, proc] - -blue = theme.temp_start -lime = theme.cached_mid -orange = theme.available_end -green = theme.cpu_start -dfg = theme.main_fg +testing = True #! Remove +#! For testing -------------------------------------------------------------------------------> +def testing_humanizer(): + for i in range(1, 101, 3): + for n in range(5): + Draw.now(floating_humanizer(i + 23 * n * (i * i + 14) << n * n, bit=False, per_second=False, short=False), " ") + Draw.now("\n") def testing_colors(): for item, _ in DEFAULT_THEME.items(): @@ -1068,17 +1078,16 @@ def testing_colors(): def testing_boxes(): calc_sizes() draw_bg() - Draw.out() - print(Mv.to(35, 1)) + Draw.now(Mv.to(35, 1)) def testing_banner(): Draw.buffer("banner", Banner.draw(18, center=True)) Draw.out() - print(Mv.to(35, 1)) + Draw.now(Mv.to(35, 1)) def testing_meter(): - Draw.buffer("meters") + Draw.clear("meters") for _ in range(10): Draw.buffer("+meters", "1234567890") Draw.buffer("+meters", "\n") @@ -1096,71 +1105,151 @@ def testing_keyinput(): count: int = 0 while True: count += 1 - Draw.buffer("!", f'{Mv.to(1,1)}{Fx.b}{blue("Count:")} {count} {lime("Time:")} {time.strftime("%H:%M:%S", time.localtime())}', + Draw.now(f'{Mv.to(1,1)}{Fx.b}{theme.temp_start("Count:")} {count} {theme.cached_mid("Time:")} {time.strftime("%H:%M:%S", time.localtime())}', f'{Color.fg("#ff")} Width: {Term.width} Height: {Term.height} Resized: {Term.resized}') while Key.list: Key.new.clear() this_key = Key.list.pop() - Draw.buffer("!", f'{Mv.to(2,1)}{Color.fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key)}{" " * 40}') + Draw.now(f'{Mv.to(2,1)}{Color.fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key):14}{" "}') if this_key == "backspace": line = line[:-1] - elif this_key == "enter": - line += "\n" - else: - line += this_key - Draw.buffer("!", f'{Mv.to(3,1)}{Color.fg("#90ff50")}{Fx.b}Full line= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}') - if this_key == "q": + elif this_key == "escape": + line = "" + elif this_key == "Q": clean_quit() - if this_key == "R": + elif this_key == "R": raise Exception("Test ERROR") + elif len(this_key) == 1: + line += this_key + Draw.now(f'{Color.fg("#90ff50")}{Fx.b}Command= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}\033[0K\n') + if this_key == "enter": + try: + exec(line) + except: + pass + Draw.clear() + if not Key.reader.is_alive(): clean_quit(1) Key.new.wait(1.0) - -try: - #testing_keyinput() - #testing_banner() - #testing_colors() - #testing_boxes() - testing_meter() - # Draw.idle.clear() - # Key.idle.wait() - # input(f'{Mv.to(Term.height - 5, 1)}Enter to exit') - # Draw.idle.set() - #time.sleep(2) -except Exception as e: - errlog.exception(f'{e}') - clean_quit(1) - - -clean_quit() +#! Remove ------------------------------------------------------------------------------------< if __name__ == "__main__": - #? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH - signal.signal(signal.SIGTSTP, now_sleeping) #* Ctrl-Z - signal.signal(signal.SIGCONT, now_awake) #* Resume - signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C - signal.signal(signal.SIGWINCH, Term.refresh) #* Terminal resized + #? Init --------------------------------------------------------------------------------------> - #? Switch to alternate screen, clear screen and hide cursor - print(Term.alt_screen, Term.clear, Term.hide_cursor) + #? Temporary functions for init + def _fail(err): + Draw.now(f'{Symbol.fail}') + errlog.exception(f'{err}') + time.sleep(2) + clean_quit(1, errmsg=f'Error during init! See {CONFIG_DIR}/error.log for more information.') + def _success(): + time.sleep(0.1) #! Enable + Draw.now(f'{Symbol.ok}\n{Mv.r(Term.width // 2 - 19)}') + + #? Switch to alternate screen, clear screen, hide cursor and disable input echo + Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor) + Term.echo(False) + + #? Draw banner and init status + Draw.now(Banner.draw(Term.height // 2 - 10, center=True), "\n") + Draw.now(Color.fg("#50")) + for _i in range(10): + Draw.now(f'{Mv.to(Term.height // 2 - 3 + _i, Term.width // 2 - 25)}{str((_i + 1) * 10) + "%":>5}{Symbol.v_line}') + Draw.now(f'{Color.fg("#cc")}{Fx.b}{Mv.to(Term.height // 2 - 3, Term.width // 2 - 18)}') + + #? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH + Draw.now("Setting up signal handlers... ") + try: + signal.signal(signal.SIGTSTP, now_sleeping) #* Ctrl-Z + signal.signal(signal.SIGCONT, now_awake) #* Resume + signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C + signal.signal(signal.SIGWINCH, Term.refresh) #* Terminal resized + except Exception as e: + _fail(e) + else: + _success() + + + #? Load config + Draw.now("Loading configuration... ") + try: + config: Config = Config(load_config(CONFIG_FILE)) + except Exception as e: + _fail(e) + else: + _success() + + #? Load theme + Draw.now("Loading theme and creating colors... ") + try: + theme: Theme = Theme(load_theme(config.color_theme)) + except Exception as e: + _fail(e) + else: + _success() + + #? Setup boxes and calculate sizes + Draw.now("Doing some maths and drawing... ") + try: + cpu = Box("cpu", height_p=32, width_p=100) + mem = Box("mem", height_p=40, width_p=45) + net = Box("net", height_p=28, width_p=mem.width_p) + proc = Box("proc", height_p=100 - cpu.height_p, width_p=100 - mem.width_p) + boxes: List[object] = [cpu, mem, net, proc] + calc_sizes() + draw_bg(now=False) + except Exception as e: + _fail(e) + else: + _success() #? Start a separate thread for reading keyboard input + Draw.now("Starting input reader thread... ") try: Key.start() except Exception as e: + _fail(e) + else: + _success() + + #! Change this block -> + time.sleep(1) + del _fail, _success + Draw.now(Term.clear) + #! <- + + #! For testing -------------------------------------------------------------------------------> + testing = True + if testing: + try: + pass + #testing_humanizer() + #testing_keyinput() + #testing_banner() + #testing_colors() + testing_boxes() + #testing_meter() + # Draw.idle.clear() + # Key.idle.wait() + # input(f'{Mv.to(Term.height - 5, 1)}Enter to exit') + # Draw.idle.set() + #time.sleep(2) + except Exception as e: errlog.exception(f'{e}') clean_quit(1) - while True: + clean_quit() + #! Remove ------------------------------------------------------------------------------------< + + #? Start main loop + while not False: try: main() except Exception as e: errlog.exception(f'{e}') clean_quit(1) - - - - - clean_quit() + else: + #? Quit cleanly even if false starts being true... + clean_quit()