Added draw function for CpuBox class

pull/27/head
aristocratos 2020-07-16 02:51:55 +02:00
parent 681342b745
commit 7c53ab982e
1 changed files with 212 additions and 70 deletions

282
bpytop.py
View File

@ -20,6 +20,7 @@
import os, sys, threading, signal, re, subprocess, logging, logging.handlers import os, sys, threading, signal, re, subprocess, logging, logging.handlers
from time import time, sleep, strftime, localtime from time import time, sleep, strftime, localtime
from datetime import timedelta from datetime import timedelta
from _thread import interrupt_main
from select import select from select import select
from distutils.util import strtobool from distutils.util import strtobool
from string import Template from string import Template
@ -162,6 +163,8 @@ USER_THEME_DIR: str = f'{CONFIG_DIR}/user_themes'
CORES: int = psutil.cpu_count(logical=False) or 1 CORES: int = psutil.cpu_count(logical=False) or 1
THREADS: int = psutil.cpu_count(logical=True) or 1 THREADS: int = psutil.cpu_count(logical=True) or 1
THREAD_ERROR: int = 0
DEFAULT_THEME: Dict[str, str] = { DEFAULT_THEME: Dict[str, str] = {
"main_bg" : "", "main_bg" : "",
"main_fg" : "#cc", "main_fg" : "#cc",
@ -428,7 +431,9 @@ class Term:
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, new_attr) termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, new_attr)
class Fx: class Fx:
"""Text effects""" """Text effects
* trans(string: str): Replace whitespace with escape move right to not paint background in whitespace.
* uncolor(string: str) : Removes all color and returns string with THEME.inactive_fg color."""
start = "\033[" #* Escape sequence start start = "\033[" #* Escape sequence start
sep = ";" #* Escape sequence separator sep = ";" #* Escape sequence separator
end = "m" #* Escape sequence end end = "m" #* Escape sequence end
@ -446,11 +451,16 @@ class Fx:
strike = s = "\033[9m" #* Strike / crossed-out on strike = s = "\033[9m" #* Strike / crossed-out on
unstrike = us = "\033[29m" #* Strike / crossed-out off unstrike = us = "\033[29m" #* Strike / crossed-out off
color_re = re.compile(r"\033\[\d+;\d?;?\d*;?\d*;?\d*m")
@staticmethod @staticmethod
def trans(string: str): def trans(string: str):
'''Replace white space with escape move right to not paint background in whitespace'''
return string.replace(" ", "\033[1C") return string.replace(" ", "\033[1C")
@classmethod
def uncolor(cls, string: str) -> str:
return f'{THEME.inactive_fg}{cls.color_re.sub("", string)}{Term.fg}'
class Raw(object): class Raw(object):
"""Set raw input mode for device""" """Set raw input mode for device"""
def __init__(self, stream): def __init__(self, stream):
@ -619,9 +629,10 @@ class Key:
cls.idle.set() #* Report IO blocking done cls.idle.set() #* Report IO blocking done
except Exception as e: except Exception as e:
errlog.exception(f'Input reader failed with exception: {e}') errlog.exception(f'Input thread failed with exception: {e}')
cls.idle.set() cls.idle.set()
cls.list.clear() cls.list.clear()
clean_quit(1, thread=True)
class Draw: class Draw:
'''Holds the draw buffer and manages IO blocking queue '''Holds the draw buffer and manages IO blocking queue
@ -654,7 +665,7 @@ class Draw:
cls.idle.set() cls.idle.set()
@classmethod @classmethod
def buffer(cls, name: str, *args: str, append: bool = False, now: bool = False, z: int = 100): def buffer(cls, name: str, *args: str, append: bool = False, now: bool = False, z: int = 100, save: bool = False, uncolor: bool = False):
string: str = "" string: str = ""
if name.startswith("+"): if name.startswith("+"):
name = name.lstrip("+") name = name.lstrip("+")
@ -664,8 +675,13 @@ class Draw:
now = True now = True
if not name in cls.z_order or z != 100: cls.z_order[name] = z if not name in cls.z_order or z != 100: cls.z_order[name] = z
if args: string = "".join(args) if args: string = "".join(args)
if uncolor: string = Fx.uncolor(string)
if name not in cls.strings or not append: cls.strings[name] = "" if name not in cls.strings or not append: cls.strings[name] = ""
cls.strings[name] += string if save:
if name not in cls.last or not append: cls.last[name] = ""
cls.last[name] = string
else:
cls.strings[name] += string
if now: cls.now(string) if now: cls.now(string)
@classmethod @classmethod
@ -690,7 +706,7 @@ class Draw:
cls.now(out) cls.now(out)
@classmethod @classmethod
def last_screen(cls) -> str: def last_buffer(cls) -> str:
out: str = "" out: str = ""
for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True):
if name in cls.last: if name in cls.last:
@ -965,6 +981,9 @@ class Symbol:
3.0 : "", 3.1 : "", 3.2 : "", 3.3 : "", 3.4 : "", 3.0 : "", 3.1 : "", 3.2 : "", 3.3 : "", 3.4 : "",
4.0 : "", 4.1 : "", 4.2 : "", 4.3 : "", 4.4 : "" 4.0 : "", 4.1 : "", 4.2 : "", 4.3 : "", 4.4 : ""
} }
graph_up_small = graph_up.copy()
graph_up_small[0.0] = "\033[1C"
graph_down: Dict[float, str] = { graph_down: Dict[float, str] = {
0.0 : "", 0.1 : "", 0.2 : "", 0.3 : "", 0.4 : "", 0.0 : "", 0.1 : "", 0.2 : "", 0.3 : "", 0.4 : "",
1.0 : "", 1.1 : "", 1.2 : "", 1.3 : "", 1.4 : "", 1.0 : "", 1.1 : "", 1.2 : "", 1.3 : "", 1.4 : "",
@ -972,6 +991,8 @@ class Symbol:
3.0 : "", 3.1 : "", 3.2 : "", 3.3 : "", 3.4 : "", 3.0 : "", 3.1 : "", 3.2 : "", 3.3 : "", 3.4 : "",
4.0 : "", 4.1 : "", 4.2 : "", 4.3 : "", 4.4 : "" 4.0 : "", 4.1 : "", 4.2 : "", 4.3 : "", 4.4 : ""
} }
graph_down_small = graph_down.copy()
graph_down_small[0.0] = "\033[1C"
meter: str = "" meter: str = ""
ok: str = f'{Color.fg("#30ff50")}{Color.fg("#cc")}' ok: str = f'{Color.fg("#30ff50")}{Color.fg("#cc")}'
fail: str = f'{Color.fg("#ff3050")}!{Color.fg("#cc")}' fail: str = f'{Color.fg("#ff3050")}!{Color.fg("#cc")}'
@ -988,11 +1009,12 @@ class Graph:
colors: List[str] colors: List[str]
invert: bool invert: bool
max_value: int max_value: int
offset: int
current: bool current: bool
last: int last: int
symbol: Dict[float, str] symbol: Dict[float, str]
def __init__(self, width: int, height: int, color: Union[List[str], Color, None], data: List[int], invert: bool = False, max_value: int = 0): def __init__(self, width: int, height: int, color: Union[List[str], Color, None], data: List[int], invert: bool = False, max_value: int = 0, offset: int = 0):
self.graphs: Dict[bool, List[str]] = {False : [], True : []} self.graphs: Dict[bool, List[str]] = {False : [], True : []}
self.current: bool = True self.current: bool = True
self.colors: List[str] = [] self.colors: List[str] = []
@ -1004,20 +1026,24 @@ class Graph:
self.width = width self.width = width
self.height = height self.height = height
self.invert = invert self.invert = invert
self.offset = offset
if not data: data = [0] if not data: data = [0]
if max_value: if max_value:
self.max_value = max_value self.max_value = max_value
data = [ v * 100 // max_value if v < max_value else 100 for v in data ] #* Convert values to percentage values of max_value with max_value as ceiling data = [ (v + offset) * 100 // (max_value + offset) if v < max_value else 100 for v in data ] #* Convert values to percentage values of max_value with max_value as ceiling
else: else:
self.max_value = 0 self.max_value = 0
self.symbol = Symbol.graph_down if invert else Symbol.graph_up if self.height == 1:
self.symbol = Symbol.graph_down_small if invert else Symbol.graph_up_small
else:
self.symbol = Symbol.graph_down if invert else Symbol.graph_up
value_width: int = ceil(len(data) / 2) value_width: int = ceil(len(data) / 2)
filler: str = "" filler: str = ""
if value_width > width: #* If the size of given data set is bigger then width of graph, shrink data set if value_width > width: #* If the size of given data set is bigger then width of graph, shrink data set
data = data[-(width*2):] data = data[-(width*2):]
value_width = ceil(len(data) / 2) value_width = ceil(len(data) / 2)
elif value_width < width: #* If the size of given data set is smaller then width of graph, fill graph with whitespace elif value_width < width: #* If the size of given data set is smaller then width of graph, fill graph with whitespace
filler = " " * (width - value_width) filler = self.symbol[0.0] * (width - value_width)
if len(data) % 2 == 1: data.insert(0, 0) if len(data) % 2 == 1: data.insert(0, 0)
for _ in range(height): for _ in range(height):
for b in [True, False]: for b in [True, False]:
@ -1054,13 +1080,19 @@ class Graph:
for h in range(self.height): for h in range(self.height):
if h > 0: self.out += f'{Mv.d(1)}{Mv.l(self.width)}' if h > 0: self.out += f'{Mv.d(1)}{Mv.l(self.width)}'
self.out += f'{"" if not self.colors else self.colors[h]}{self.graphs[self.current][h if not self.invert else (self.height - 1) - h]}' self.out += f'{"" if not self.colors else self.colors[h]}{self.graphs[self.current][h if not self.invert else (self.height - 1) - h]}'
self.out += f'{Term.fg}' if self.colors: self.out += f'{Term.fg}'
def add(self, value: int): def add(self, value: int):
self.current = not self.current self.current = not self.current
for n in range(self.height): if self.height == 1:
self.graphs[self.current][n] = self.graphs[self.current][n][1:] if self.graphs[self.current][0].startswith(self.symbol[0.0]):
if self.max_value: value = value * 100 // self.max_value if value < self.max_value else 100 self.graphs[self.current][0] = self.graphs[self.current][0].replace(self.symbol[0.0], "", 1)
else:
self.graphs[self.current][0] = self.graphs[self.current][0][1:]
else:
for n in range(self.height):
self.graphs[self.current][n] = self.graphs[self.current][n][1:]
if self.max_value: value = (value + self.offset) * 100 // (self.max_value + self.offset) if value < self.max_value else 100
self._create([value]) self._create([value])
return self.out return self.out
@ -1073,9 +1105,9 @@ class Graph:
class Graphs: class Graphs:
'''Holds all graphs and lists of graphs for dynamically created graphs''' '''Holds all graphs and lists of graphs for dynamically created graphs'''
cpu: Graph cpu: Dict[str, Graph] = {}
cores: List[Graph] = [] cores: List[Graph] = [NotImplemented] * THREADS
temps: List[Graph] = [] temps: List[Graph] = [NotImplemented] * (THREADS + 1)
net: Graph net: Graph
detailed_cpu: Graph detailed_cpu: Graph
detailed_mem: Graph detailed_mem: Graph
@ -1153,18 +1185,20 @@ class Box:
_b_cpu_h: int _b_cpu_h: int
_b_mem_h: int _b_mem_h: int
redraw_all: bool redraw_all: bool
buffers: List[str] = []
@classmethod @classmethod
def calc_sizes(cls): def calc_sizes(cls):
'''Calculate sizes of boxes''' '''Calculate sizes of boxes'''
for sub in cls.__subclasses__(): for sub in cls.__subclasses__():
sub._calc_size() # type: ignore sub._calc_size() # type: ignore
sub.resized = True # type: ignore
@classmethod @classmethod
def draw_bg(cls, now: bool = True): def draw_bg(cls, now: bool = True):
'''Draw all boxes outlines and titles''' '''Draw all boxes outlines and titles'''
Draw.buffer("bg!" if now else "bg", "".join(sub._draw_bg() for sub in cls.__subclasses__())) # type: ignore Draw.buffer("bg!" if now and not Menu.active else "bg", "".join(sub._draw_bg() for sub in cls.__subclasses__()), z=1000, save=True if Menu.active else False) # type: ignore
if now: Draw.clear("bg") if now and not Menu.active: Draw.clear("bg")
class SubBox: class SubBox:
box_x: int = 0 box_x: int = 0
@ -1172,6 +1206,7 @@ class SubBox:
box_width: int = 0 box_width: int = 0
box_height: int = 0 box_height: int = 0
box_columns: int = 0 box_columns: int = 0
column_size: int = 0
class CpuBox(Box, SubBox): class CpuBox(Box, SubBox):
name = "cpu" name = "cpu"
@ -1179,36 +1214,118 @@ class CpuBox(Box, SubBox):
y = 1 y = 1
height_p = 32 height_p = 32
width_p = 100 width_p = 100
resized: bool = True
redraw: bool = True redraw: bool = True
buffer: str = "cpu"
Box.buffers.append(buffer)
@classmethod @classmethod
def _calc_size(cls): def _calc_size(cls):
cls.width = round(Term.width * cls.width_p / 100) cls.width = round(Term.width * cls.width_p / 100)
cls.height = round(Term.height * cls.height_p / 100) cls.height = round(Term.height * cls.height_p / 100)
Box._b_cpu_h = cls.height Box._b_cpu_h = cls.height
#THREADS = 25 #THREADS = 64
if THREADS > (cls.height - 6) * 3 and cls.width > 200: cls.box_width = 24 * 4; cls.box_height = ceil(THREADS / 4) + 4; cls.box_columns = 4 cls.box_columns = ceil((THREADS + 1) / (cls.height - 5))
elif THREADS > (cls.height - 6) * 2 and cls.width > 150: cls.box_width = 24 * 3; cls.box_height = ceil(THREADS / 3) + 4; cls.box_columns = 3 if cls.box_columns * (24 + 13 if CONFIG.check_temp else 24) < cls.width - (cls.width // 4): cls.column_size = 2
elif THREADS > cls.height - 6 and cls.width > 100: cls.box_width = 24 * 2; cls.box_height = ceil(THREADS / 2) + 4; cls.box_columns = 2 elif cls.box_columns * (19 + 6 if CONFIG.check_temp else 19) < cls.width - (cls.width // 4): cls.column_size = 1
else: cls.box_width = 24; cls.box_height = THREADS + 4; cls.box_columns = 1 else: cls.box_columns = (cls.width - cls.width // 4) // (11 + 6 if CONFIG.check_temp else 11)
if cls.column_size == 2: cls.box_width = (24 + 13 if CONFIG.check_temp else 24) * cls.box_columns
elif cls.column_size == 1: cls.box_width = (19 + 6 if CONFIG.check_temp else 19) * cls.box_columns
else: cls.box_width = (11 + 6 if CONFIG.check_temp else 11) * cls.box_columns
cls.box_height = ceil(THREADS / cls.box_columns) + 4
if CONFIG.check_temp: cls.box_width += 13 * cls.box_columns
if cls.box_height > cls.height - 2: cls.box_height = cls.height - 2 if cls.box_height > cls.height - 2: cls.box_height = cls.height - 2
cls.box_x = (cls.width - 2) - cls.box_width cls.box_x = (cls.width - 1) - cls.box_width
cls.box_y = cls.y + ((cls.height - 2) // 2) - cls.box_height // 2 + 1 cls.box_y = cls.y + ceil((cls.height - 2) / 2) - ceil(cls.box_height / 2) + 1
cls.redraw_all = True
@classmethod @classmethod
def _draw_bg(cls) -> str: def _draw_bg(cls) -> str:
return f'{create_box(box=cls, line_color=THEME.cpu_box)}\ return (f'{create_box(box=cls, line_color=THEME.cpu_box)}'
{Mv.to(cls.y, cls.x + 10)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("m")}{THEME.title("enu")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}\ f'{Mv.to(cls.y, cls.x + 10)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("m")}{THEME.title("enu")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}'
{create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title=CPU_NAME[:18 if CONFIG.check_temp else 9])}' f'{create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title=CPU_NAME[:18 if CONFIG.check_temp else 9])}')
@classmethod @classmethod
def _draw_fg(cls, cpu): def _draw_fg(cls, cpu):
Draw.buffer("cpu", f'{Mv.to(1,1)}Cpu usage: {cpu.cpu_usage}\nCpu freq: {cpu.cpu_freq}\nLoad avg: {cpu.load_avg}\ # Draw.buffer(cls.buffer, f'{Mv.to(1,1)}Cpu usage: {cpu.cpu_usage}\nCpu freq: {cpu.cpu_freq}\nLoad avg: {cpu.load_avg}\
\nTemps: {CpuCollector.cpu_temp}\n') # \nTemps: {cpu.cpu_temp}\n', save=True if Menu.active else False)
cpu = CpuCollector #! remove
out: str = ""
misc: str = ""
lavg: str = ""
x, y, w, h = cls.x + 1, cls.y + 1, cls.width - 2, cls.height - 2
bx, by, bw, bh = cls.box_x + 1, cls.box_y + 1, cls.box_width - 2, cls.box_height - 2
hh: int = ceil(h / 2)
if cls.resized:
Graphs.cpu["up"] = Graph(w - bw - 3, hh, THEME.gradient["cpu"], cpu.cpu_usage[0])
Graphs.cpu["down"] = Graph(w - bw - 3, h - hh, THEME.gradient["cpu"], cpu.cpu_usage[0], invert=True)
Meters.cpu = Meter(cpu.cpu_usage[0][-1], 10 if cls.box_columns == 1 else (bw - 16 - 13 if CONFIG.check_temp else bw - 16), "cpu")
if cls.column_size > 0:
for n in range(THREADS):
Graphs.cores[n] = Graph(5 * cls.column_size, 1, None, cpu.cpu_usage[n + 1])
if CONFIG.check_temp:
Graphs.temps[0] = Graph(5, 1, None, cpu.cpu_temp[0], max_value=cpu.cpu_temp_crit, offset=-23)
if cls.column_size > 1:
for n in range(1, THREADS + 1):
Graphs.temps[n] = Graph(5, 1, None, cpu.cpu_temp[n], max_value=cpu.cpu_temp_crit, offset=-23)
if cls.resized or cls.redraw:
misc += (f'{Mv.to(y - 1, x + w - len(str(CONFIG.update_ms)) - 7)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("+")} '
f'{THEME.title(str(CONFIG.update_ms))} {THEME.hi_fg("+")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}')
cx = cy = cc = 0
ccw = (bw + 2) // cls.box_columns
freq: str = f'{cpu.cpu_freq} Mhz' if cpu.cpu_freq < 1000 else f'{float(cpu.cpu_freq / 1000):.1f} GHz'
out += (f'{Mv.to(by - 1, bx + bw - 9)}{THEME.div_line(Symbol.title_left)}{Fx.b}{THEME.title(freq)}{Fx.ub}{THEME.div_line(Symbol.title_right)}'
f'{Mv.to(y, x)}{Graphs.cpu["up"].add(cpu.cpu_usage[0][-1])}{Mv.to(y + hh, x)}{Graphs.cpu["down"].add(cpu.cpu_usage[0][-1])}'
f'{THEME.main_fg}{Mv.to(by + cy, bx + cx)}{"CPU " if cls.box_columns == 1 else "CPU Total "}{Meters.cpu(cpu.cpu_usage[0][-1])}'
f'{THEME.gradient["cpu"][cpu.cpu_usage[0][-1]]}{cpu.cpu_usage[0][-1]:>4}{THEME.main_fg}%')
if CONFIG.check_temp:
out += (f'{THEME.inactive_fg} ⡀⡀⡀⡀⡀{Mv.l(5)}{THEME.gradient["temp"][cpu.cpu_temp[0][-1]]}{Graphs.temps[0].add(cpu.cpu_temp[0][-1])}'
f'{cpu.cpu_temp[0][-1]:>4}{THEME.main_fg}°C')
cy += 1
for n in range(1, THREADS + 1):
out += f'{THEME.main_fg}{Mv.to(by + cy, bx + cx)}{"Core" + str(n):<{7 if cls.column_size > 0 else 5}}'
if cls.column_size > 0:
out += f'{THEME.inactive_fg}{"" * (5 * cls.column_size)}{Mv.l(5 * cls.column_size)}{THEME.gradient["cpu"][cpu.cpu_usage[n][-1]]}{Graphs.cores[n-1].add(cpu.cpu_usage[n][-1])}'
else:
out += f'{THEME.gradient["cpu"][cpu.cpu_usage[n][-1]]}'
out += f'{cpu.cpu_usage[n][-1]:>4}{THEME.main_fg}%'
if CONFIG.check_temp:
if cls.column_size > 1:
out += f'{THEME.inactive_fg} ⡀⡀⡀⡀⡀{Mv.l(5)}{THEME.gradient["temp"][cpu.cpu_temp[n][-1]]}{Graphs.temps[n].add(cpu.cpu_temp[n][-1])}'
else:
out += f'{THEME.gradient["temp"][cpu.cpu_temp[n][-1]]}'
out += f'{cpu.cpu_temp[n][-1]:>4}{THEME.main_fg}°C'
cy += 1
if cy == bh:
cc += 1; cy = 1; cx = ccw * cc
if cc == cls.box_columns: break
if cy < bh - 1: cy = bh - 1
if cls.column_size == 2 and CONFIG.check_temp:
lavg = f'Load Average: {" ".join(str(l) for l in cpu.load_avg):^18.18}'
elif cls.column_size == 2 or (cls.column_size == 1 and CONFIG.check_temp):
lavg = f'L-AVG: {" ".join(str(l) for l in cpu.load_avg):^14.14}'
else:
lavg = f'{" ".join(str(round(l, 1)) for l in cpu.load_avg):^11.11}'
out += f'{Mv.to(by + cy, bx + cx)}{THEME.main_fg}{lavg}'
out += f'{Mv.to(y + h - 1, x + 1)}{THEME.inactive_fg}up {cpu.uptime}'
out += Mv.to(Term.height - 5, 1) #! Remove
if misc: Draw.buffer("cpu_misc", misc, save=True)
Draw.buffer(cls.buffer, f'{out}{misc}{Term.fg}', save=Menu.active)
cls.redraw = cls.resized = False
class MemBox(Box): class MemBox(Box):
name = "mem" name = "mem"
@ -1220,6 +1337,8 @@ class MemBox(Box):
mem_width: int = 0 mem_width: int = 0
disks_width: int = 0 disks_width: int = 0
redraw: bool = True redraw: bool = True
buffer: str = "mem"
Box.buffers.append(buffer)
@classmethod @classmethod
def _calc_size(cls): def _calc_size(cls):
@ -1234,11 +1353,11 @@ class MemBox(Box):
@classmethod @classmethod
def _draw_bg(cls) -> str: def _draw_bg(cls) -> str:
return f'{create_box(box=cls, line_color=THEME.mem_box)}\ return (f'{create_box(box=cls, line_color=THEME.mem_box)}'
{Mv.to(cls.y, cls.divider + 2)}{THEME.mem_box(Symbol.title_left)}{Fx.b}{THEME.title("disks")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}\ f'{Mv.to(cls.y, cls.divider + 2)}{THEME.mem_box(Symbol.title_left)}{Fx.b}{THEME.title("disks")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}'
{Mv.to(cls.y, cls.divider)}{THEME.mem_box(Symbol.div_up)}\ f'{Mv.to(cls.y, cls.divider)}{THEME.mem_box(Symbol.div_up)}'
{Mv.to(cls.y + cls.height - 1, cls.divider)}{THEME.mem_box(Symbol.div_down)}{THEME.div_line}\ f'{Mv.to(cls.y + cls.height - 1, cls.divider)}{THEME.mem_box(Symbol.div_down)}{THEME.div_line}'
{"".join(f"{Mv.to(cls.y + i, cls.divider)}{Symbol.v_line}" for i in range(1, cls.height - 1))}' f'{"".join(f"{Mv.to(cls.y + i, cls.divider)}{Symbol.v_line}" for i in range(1, cls.height - 1))}')
class NetBox(Box, SubBox): class NetBox(Box, SubBox):
name = "net" name = "net"
@ -1247,6 +1366,8 @@ class NetBox(Box, SubBox):
x = 1 x = 1
y = 1 y = 1
redraw: bool = True redraw: bool = True
buffer: str = "net"
Box.buffers.append(buffer)
@classmethod @classmethod
def _calc_size(cls): def _calc_size(cls):
@ -1276,6 +1397,8 @@ class ProcBox(Box):
detailed_width: int = 0 detailed_width: int = 0
detailed_height: int = 8 detailed_height: int = 8
redraw: bool = True redraw: bool = True
buffer: str = "proc"
Box.buffers.append(buffer)
@classmethod @classmethod
def _calc_size(cls): def _calc_size(cls):
@ -1308,7 +1431,6 @@ class Collector:
collect_done = threading.Event() collect_done = threading.Event()
collect_queue: List = [] collect_queue: List = []
collect_interrupt: bool = False collect_interrupt: bool = False
draw_block: bool = False
@classmethod @classmethod
def start(cls): def start(cls):
@ -1329,20 +1451,28 @@ class Collector:
@classmethod @classmethod
def _runner(cls): def _runner(cls):
'''This is meant to run in it's own thread, collecting and drawing when collect_run is set''' '''This is meant to run in it's own thread, collecting and drawing when collect_run is set'''
while not cls.stopping: draw_buffers: List[str] = []
cls.collect_run.wait(0.1) try:
if not cls.collect_run.is_set(): while not cls.stopping:
continue cls.collect_run.wait(0.1)
cls.collect_run.clear() if not cls.collect_run.is_set():
cls.collect_idle.clear() continue
while cls.collect_queue: cls.collect_run.clear()
collector = cls.collect_queue.pop() cls.collect_idle.clear()
collector._collect() while cls.collect_queue:
collector._draw() collector = cls.collect_queue.pop()
if cls.draw_now and not cls.draw_block: collector._collect()
Draw.out("cpu", "mem", "net", "proc") collector._draw()
cls.collect_idle.set() draw_buffers.append(collector.buffer)
if cls.draw_now and not Menu.active:
Draw.out(*draw_buffers)
draw_buffers = []
cls.collect_idle.set()
cls.collect_done.set()
except Exception as e:
errlog.exception(f'Data collection thread failed with exception: {e}')
cls.collect_done.set() cls.collect_done.set()
clean_quit(1, thread=True)
@classmethod @classmethod
def collect(cls, *collectors: object, draw_now: bool = True, interrupt: bool = False): def collect(cls, *collectors: object, draw_now: bool = True, interrupt: bool = False):
@ -1372,15 +1502,17 @@ class CpuCollector(Collector):
'''Collects cpu usage for cpu and cores, cpu frequency, load_avg '''Collects cpu usage for cpu and cores, cpu frequency, load_avg
_collect(): Collects data _collect(): Collects data
_draw(): calls CpuBox._draw_fg()''' _draw(): calls CpuBox._draw_fg()'''
cpu_usage: List[List[int]] = [[]] cpu_usage: List[List[int]] = []
cpu_temp: List[List[int]] = [[]] cpu_temp: List[List[int]] = []
cpu_temp_high: int = 0 cpu_temp_high: int = 0
cpu_temp_crit: int = 0 cpu_temp_crit: int = 0
for _ in range(THREADS): for _ in range(THREADS + 1):
cpu_usage.append([]) cpu_usage.append([])
cpu_temp.append([]) cpu_temp.append([])
cpu_freq: int = 0 cpu_freq: int = 0
load_avg: List[float] = [] load_avg: List[float] = []
uptime: str = ""
buffer: str = CpuBox.buffer
@staticmethod @staticmethod
def _get_sensors() -> str: def _get_sensors() -> str:
@ -1419,6 +1551,7 @@ class CpuCollector(Collector):
cls.cpu_freq = round(psutil.cpu_freq().current) cls.cpu_freq = round(psutil.cpu_freq().current)
cls.load_avg = [round(lavg, 2) for lavg in os.getloadavg()] cls.load_avg = [round(lavg, 2) for lavg in os.getloadavg()]
cls.uptime = str(timedelta(seconds=round(time()-psutil.boot_time(),0)))[:-3]
if CONFIG.check_temp and cls.got_sensors: if CONFIG.check_temp and cls.got_sensors:
cls._collect_temps() cls._collect_temps()
@ -1484,9 +1617,6 @@ class CpuCollector(Collector):
@classmethod @classmethod
def _draw(cls): def _draw(cls):
CpuBox._draw_fg(cls) CpuBox._draw_fg(cls)
@ -1496,9 +1626,11 @@ class CpuCollector(Collector):
@timerd @timerd
def testing_collectors(): def testing_collectors():
# CONFIG.check_temp = True
# Box.calc_sizes()
Box.draw_bg() Box.draw_bg()
for _ in range(1): for _ in range(60):
Collector.collect(CpuCollector) Collector.collect(CpuCollector)
Collector.collect_done.wait() Collector.collect_done.wait()
sleep(1) sleep(1)
@ -1507,14 +1639,12 @@ def testing_collectors():
class Menu: class Menu:
'''Holds the main menu and all submenus '''Holds the main menu and all submenus'''
* uncolor(string: str) : removes all color and returns string with THEME.inactive_fg color active: bool = False
'''
color_re = re.compile(r"\033\[\d+;\d?;?\d*;?\d*;?\d*m{1}") #Draw.buffer("menubg", Draw.last_buffer, z=1000, uncolor=True)
#Draw.clear("menubg", last=True)
@classmethod
def uncolor(cls, string: str) -> str:
return f'{THEME.inactive_fg}{cls.color_re.sub("", string)}{Term.fg}'
@ -1620,8 +1750,14 @@ def quit_sigint(signum, frame):
"""SIGINT redirection to clean_quit()""" """SIGINT redirection to clean_quit()"""
clean_quit() clean_quit()
def clean_quit(errcode: int = 0, errmsg: str = ""): def clean_quit(errcode: int = 0, errmsg: str = "", thread: bool = False):
"""Stop background input read, save current config and reset terminal settings before quitting""" """Stop background input read, save current config and reset terminal settings before quitting"""
global THREAD_ERROR
if thread:
THREAD_ERROR = errcode
interrupt_main()
return
if THREAD_ERROR: errcode = THREAD_ERROR
Key.stop() Key.stop()
Collector.stop() Collector.stop()
if not errcode: CONFIG.save_config() if not errcode: CONFIG.save_config()
@ -1633,6 +1769,7 @@ def clean_quit(errcode: int = 0, errmsg: str = ""):
else: else:
errlog.warning(f'Exiting with errorcode ({errcode}). Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n') errlog.warning(f'Exiting with errorcode ({errcode}). Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n')
if errmsg: print(errmsg) if errmsg: print(errmsg)
raise SystemExit(errcode) raise SystemExit(errcode)
def floating_humanizer(value: Union[float, int], bit: bool = False, per_second: bool = False, start: int = 0, short: bool = False) -> str: def floating_humanizer(value: Union[float, int], bit: bool = False, per_second: bool = False, start: int = 0, short: bool = False) -> str:
@ -1794,7 +1931,11 @@ def testing_graphs():
my_data = [x for x in range(0, 101)] my_data = [x for x in range(0, 101)]
my_data += [x for x in range(100, -1, -1)] my_data += [x for x in range(100, -1, -1)]
#my_data100 = [100 for _ in range(200)]
my_graph = Graph(100, 1, THEME.main_fg, my_data)
Draw.now(f'{Fx.ub}{Mv.to(0, 0)}{my_graph}')
return
my_data100 = [randint(0, 100) for _ in range(Term.width * 2)] my_data100 = [randint(0, 100) for _ in range(Term.width * 2)]
@ -1809,6 +1950,7 @@ def testing_graphs():
my_graph = Graph(Term.width, Term.height // 2, my_colors, my_data100, invert=True) my_graph = Graph(Term.width, Term.height // 2, my_colors, my_data100, invert=True)
my_graph2 = Graph(Term.width, Term.height // 2, my_colors, my_data100, invert=False) my_graph2 = Graph(Term.width, Term.height // 2, my_colors, my_data100, invert=False)
# my_graph3 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data2) # my_graph3 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data2)
# my_graph4 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data3) # my_graph4 = Graph(100 // 3 + 10, 1, THEME.proc_misc, my_data3)
# my_graph5 = Graph(100, Term.height // 3, THEME.inactive_fg, my_data) # my_graph5 = Graph(100, Term.height // 3, THEME.inactive_fg, my_data)
@ -1993,7 +2135,7 @@ if __name__ == "__main__":
if not testing: Init.done() #! Remove if if not testing: Init.done() #! Remove if
if not testing: Draw.out(clear=True) #! Remove if if not testing: Draw.out(clear=True) #! Remove if
else: Draw.clear(); Draw.now(Term.clear) #! Remove else: Draw.clear(); Draw.now(Term.clear); Init.running = False #! Remove
Timer.stop("Init") Timer.stop("Init")