Added meter class to create percentage meters and fixed input io blocking errors when printing

pull/27/head
aristocratos 2020-07-03 04:01:31 +02:00
parent bea8bf084a
commit bd6c2ffed3
1 changed files with 321 additions and 188 deletions

509
bpytop
View File

@ -289,15 +289,20 @@ class Key:
""".list = Input queue | .new = Input threading event | .reader = Input reader thread | .start() = Start input reader | .stop() = Stop input reader"""
list: List[str] = []
new = threading.Event()
idle = threading.Event()
idle.set()
stopping: bool = False
started: bool = False
reader: threading.Thread
@classmethod
def start(cls):
cls.stopping = False
cls.reader = threading.Thread(target=get_key)
cls.reader.start()
cls.started = True
@classmethod
def stop(cls):
if cls.reader.is_alive():
if cls.started and cls.reader.is_alive():
cls.stopping = True
try:
cls.reader.join()
@ -305,11 +310,14 @@ class Key:
pass
class Color:
'''self.__init__ accepts 6 digit hexadecimal: string "#RRGGBB", 2 digit hexadecimal: string "#FF" and decimal RGB "0-255 0-255 0-255" as a string.\n
self.__init__ also accepts depth="fg" or "bg" | default=bool\n
self.__call__(*args) converts arguments to a string and apply color\n
self.__str__ returns escape sequence to set color. __iter__ returns iteration over red, green and blue in integer values of 0-255.\n
Values: .hex: str | .dec: Tuple[int] | .red: int | .green: int | .blue: int | .depth: str | .escape: str\n
'''Holds representations for a 24-bit color
__init__(color, depth="fg", default=False)
-- color accepts 6 digit hexadecimal: string "#RRGGBB", 2 digit hexadecimal: string "#FF" or decimal RGB "255 255 255" as a string.
-- depth accepts "fg" or "bg"
__call__(*args) converts arguments to a string and apply color
__str__ returns escape sequence to set color
__iter__ returns iteration over red, green and blue in integer values of 0-255.
* Values: .hex: str | .dec: Tuple[int, int, int] | .red: int | .green: int | .blue: int | .depth: str | .escape: str
'''
hex: str; dec: Tuple[int, int, int]; red: int; green: int; blue: int; depth: str; escape: str; default: bool
@ -318,11 +326,10 @@ class Color:
self.default = default
try:
if not color:
if depth != "bg" and not default: raise ValueError("No RGB values given")
self.dec = (0, 0, 0)
self.dec = (-1, -1, -1)
self.hex = ""
self.red = self.green = self.blue = 0
self.escape = "\033[49m"
self.red = self.green = self.blue = -1
self.escape = "\033[49m" if depth == "bg" and default else ""
return
elif color.startswith("#"):
@ -370,44 +377,184 @@ class Color:
if len(args) == 0: return ""
return f'{self.escape}{"".join(map(str, args))}{getattr(Term, self.depth)}'
@staticmethod
def fg(h_r: Union[str, int], g: int = 0, b: int = 0) -> str:
"""Returns escape sequence to set foreground color, accepts either 6 digit hexadecimal: "#RRGGBB", 2 digit hexadecimal: "#FF" or decimal RGB: 0-255, 0-255, 0-255"""
color: str = ""
if isinstance(h_r, int): #* Check for decimal RGB
color = f'\033[38;2;{h_r};{g};{b}m'
else: #* Check for 2 or 6 digit hexadecimal RGB
if h_r[0] == "#": h_r = h_r[1:]
try:
if len(h_r) == 2:
c = int(h_r, base=16)
color = f'\033[38;2;{c};{c};{c}m'
elif len(h_r) == 6:
color = f'\033[38;2;{int(h_r[0:2], base=16)};{int(h_r[2:4], base=16)};{int(h_r[4:6], base=16)}m'
except ValueError:
pass
return color
@staticmethod
def bg(h_r: Union[str, int], g: int = 0, b: int = 0) -> str:
"""Returns escape sequence to set background color, accepts either 6 digit hexadecimal: "#RRGGBB", 2 digit hexadecimal: "#FF" or decimal RGB: 0-255, 0-255, 0-255"""
color: str = ""
if isinstance(h_r, int): #* Check for decimal RGB
color = f'\033[48;2;{h_r};{g};{b}m'
else: #* Check for 2 or 6 digit hexadecimal RGB
if h_r[0] == "#": h_r = h_r[1:]
try:
if len(h_r) == 2:
c = int(h_r, base=16)
color = f'\033[48;2;{c};{c};{c}m'
elif len(h_r) == 6:
color = f'\033[48;2;{int(h_r[0:2], base=16)};{int(h_r[2:4], base=16)};{int(h_r[4:6], base=16)}m'
except ValueError:
pass
return color
class Theme:
'''__init__ accepts a dict containing { "color_element" : "color" } , errors defaults to default theme color'''
'''__init__ accepts a dict containing { "color_element" : "color" }'''
main_bg = main_fg = title = hi_fg = selected_bg = selected_fg = inactive_fg = proc_misc = cpu_box = mem_box = net_box = proc_box = div_line = temp_start = temp_mid = temp_end = cpu_start = cpu_mid = cpu_end = free_start = free_mid = free_end = cached_start = cached_mid = cached_end = available_start = available_mid = available_end = used_start = used_mid = used_end = download_start = download_mid = download_end = upload_start = upload_mid = upload_end = NotImplemented
gradient: Dict[str, List[str]] = {
"temp" : [],
"cpu" : [],
"free" : [],
"cached" : [],
"available" : [],
"used" : [],
"download" : [],
"upload" : []
}
def __init__(self, tdict: Dict[str, str]):
for item, value in tdict.items():
if hasattr(self, item):
default = False if item not in ["main_fg", "main_bg"] else True
depth = "fg" if item not in ["main_bg", "selected_bg"] else "bg"
#* Get key names from DEFAULT_THEME dict to not leave any color unset if missing from theme dict
for item, value in DEFAULT_THEME.items():
default = False if item not in ["main_fg", "main_bg"] else True
depth = "fg" if item not in ["main_bg", "selected_bg"] else "bg"
if item in tdict.keys():
setattr(self, item, Color(tdict[item], depth=depth, default=default))
else:
setattr(self, item, Color(value, depth=depth, default=default))
if getattr(self, item).escape == "":
setattr(self, item, Color(DEFAULT_THEME[item], depth=depth, default=default))
#* Create color gradients from one, two or three colors, 101 values
rgb: Dict[str, Tuple[int, int, int]]
colors: List[Tuple[int, ...]]
rgb_check: List[int]
for name in self.gradient.keys():
rgb = { "start" : getattr(self, f'{name}_start').dec, "mid" : getattr(self, f'{name}_mid').dec, "end" : getattr(self, f'{name}_end').dec }
colors = [ getattr(self, f'{name}_start') ]
if rgb["end"][0] >= 0:
r = 50 if rgb["mid"][0] >= 0 else 100
for first, second in ["start", "mid" if r == 50 else "end"], ["mid", "end"]:
for i in range(r):
rgb_check = []
for n in range(3):
rgb_check += [rgb[first][n] + i * (rgb[second][n] - rgb[first][n]) // r]
for n, ch in enumerate(rgb_check):
if ch > 255: rgb_check[n] = 255
elif ch < 0: rgb_check[n] = 0
colors += [tuple(rgb_check)]
if r == 100:
break
for color in colors:
self.gradient[name] += [Color.fg(*color)] # pylint: disable=no-value-for-parameter
else:
c = Color.fg(*rgb["start"])
for _ in range(100):
self.gradient[name] += [c]
#* Set terminal colors
Term.fg, Term.bg = self.main_fg, self.main_bg
print(self.main_fg, self.main_bg) #* Set terminal colors
print(self.main_fg, self.main_bg)
class Banner:
'''Holds the bpytop banner, .draw(line=, [col=0], [center=False], [now=False])'''
out: List[str] = []
c_color: str = ""
length: int = 0
if not out:
for num, (color, line) in enumerate(BANNER_SRC.items()):
if len(line) > length: length = len(line)
out += [""]
line_color = Color.fg(color)
line_dark = Color.fg(f'#{80 - num * 6}')
for letter in line:
if letter == "█" and c_color != line_color:
c_color = line_color
out[num] += line_color
elif letter == " ":
letter = f'{Mv.r(1)}'
elif letter != "█" and c_color != line_dark:
c_color = line_dark
out[num] += line_dark
out[num] += letter
@classmethod
def draw(cls, line: int, col: int = 0, center: bool = False, now: bool = False):
out: str = ""
if center: col = Term.width // 2 - cls.length // 2
for n, o in enumerate(cls.out):
out += f'{Mv.to(line + n, col)}{o}'
out += f'{Term.fg}'
if now: print(out)
else: return out
class Draw:
'''Holds the draw buffer\n
Add to buffer: .buffer(name, *args, append=False, now=False)\n
Print buffer: .out(clear=False)\n
'''Holds the draw buffer and manages IO blocking queue
* .buffer([+]name[!], *args, append=False, now=False) : Add *args to buffer
* - Adding "+" prefix to name sets append to True and appends to name's current string
* - Adding "!" suffix to name sets now to True and print name's current string
* .out(clear=False) : Print all strings in buffer, clear=True clear all buffers after
* .now(*args) : Prints all arguments as a string
'''
strings: Dict[str, str] = {}
last_screen: str = ""
idle = threading.Event()
idle.set()
@classmethod
def now(cls, *args):
'''Print to screen'''
Key.idle.wait()
try:
print(*args)
except BlockingIOError:
#pass
cls.idle.clear()
Key.idle.wait()
print(*args)
cls.idle.set()
print("Error!")
@classmethod
def buffer(cls, name: str, *args, append: bool = False, now: bool = False):
string: str = ""
if name.startswith("+"):
name = name.lstrip("+")
append = True
if name.endswith("!"):
name = name.rstrip("!")
now = True
if name == "": name = "_null"
if args: string = "".join(map(str, args))
if name not in cls.strings or not append: cls.strings[name] = ""
cls.strings[name] += string
if now: print(string)
if now: cls.now(string)
@classmethod
def out(cls, clear = False):
cls.last_screen = "".join(cls.strings.values())
if clear: cls.strings = {}
print(cls.last_screen)
cls.now(cls.last_screen)
class Symbol:
h_line: str = "─"
@ -434,6 +581,7 @@ class Symbol:
3.0 : "⠇", 3.1 : "⠏", 3.2 : "⠟", 3.3 : "⠿", 3.4 : "⢿",
4.0 : "⡇", 4.1 : "⡏", 4.2 : "⡟", 4.3 : "⡿", 4.4 : "⣿"
}
meter: str = "■"
class Graphs:
'''Holds all graph objects and dicts for dynamically created graphs'''
@ -445,17 +593,7 @@ class Graphs:
detailed_mem: object = None
pid_cpu: Dict[int, object] = {}
class Meters:
'''Holds created meters to reuse instead of recreating meters of past values'''
cpu: Dict[int, str] = {}
mem_used: Dict[int, str] = {}
mem_available: Dict[int, str] = {}
mem_cached: Dict[int, str] = {}
mem_free: Dict[int, str] = {}
swap_used: Dict[int, str] = {}
swap_free: Dict[int, str] = {}
disks_used: Dict[int, str] = {}
disks_free: Dict[int, str] = {}
class Box:
'''Box object with all needed attributes for create_box() function'''
@ -641,69 +779,45 @@ def save_config(path: str, conf: Config):
except Exception as e:
errlog.exception(str(e))
def fg(h_r: Union[str, int], g: int = 0, b: int = 0) -> str:
"""Returns escape sequence to set foreground color, accepts either 6 digit hexadecimal: "#RRGGBB", 2 digit hexadecimal: "#FF" or decimal RGB: 0-255, 0-255, 0-255"""
color: str = ""
if isinstance(h_r, int): #* Check for decimal RGB
color = f'\033[38;2;{h_r};{g};{b}m'
else: #* Check for 2 or 6 digit hexadecimal RGB
if h_r[0] == "#": h_r = h_r[1:]
try:
if len(h_r) == 2:
c = int(h_r, base=16)
color = f'\033[38;2;{c};{c};{c}m'
elif len(h_r) == 6:
color = f'\033[38;2;{int(h_r[0:2], base=16)};{int(h_r[2:4], base=16)};{int(h_r[4:6], base=16)}m'
except ValueError:
pass
return color
def bg(h_r: Union[str, int], g: int = 0, b: int = 0) -> str:
"""Returns escape sequence to set background color, accepts either 6 digit hexadecimal: "#RRGGBB", 2 digit hexadecimal: "#FF" or decimal RGB: 0-255, 0-255, 0-255"""
color: str = ""
if isinstance(h_r, int): #* Check for decimal RGB
color = f'\033[48;2;{h_r};{g};{b}m'
else: #* Check for 2 or 6 digit hexadecimal RGB
if h_r[0] == "#": h_r = h_r[1:]
try:
if len(h_r) == 2:
c = int(h_r, base=16)
color = f'\033[48;2;{c};{c};{c}m'
elif len(h_r) == 6:
color = f'\033[48;2;{int(h_r[0:2], base=16)};{int(h_r[2:4], base=16)};{int(h_r[4:6], base=16)}m'
except ValueError:
pass
return color
def get_key():
"""Get a single key from stdin, convert to readable format and save to keys list"""
input_key: str = ""
clean_key: str = ""
with Raw(sys.stdin): #* Set raw mode
with Nonblocking(sys.stdin): #* Set nonblocking mode
while not Key.stopping:
if not select([sys.stdin], [], [], 0.1)[0]: #* Wait 100ms for input then restart loop to check for stop signal
try:
while not Key.stopping:
#Draw.idle.wait()
with Raw(sys.stdin): #* Set raw mode
#with Nonblocking(sys.stdin): #* Set nonblocking mode
if not select([sys.stdin], [], [], 0.1)[0]:
continue
Key.idle.clear()
try:
input_key = sys.stdin.read(1) #* Read 1 character from stdin
if input_key == "\033": #* Read 3 additional characters if first is escape character
input_key += sys.stdin.read(3)
except:
pass
if input_key == "\033": clean_key = "escape"
elif input_key == "\n": clean_key = "enter"
elif input_key == "\x7f" or input_key == "\x08": clean_key = "backspace"
input_key += sys.stdin.read(1)
if input_key == "\033":
with Nonblocking(sys.stdin): #* Set nonblocking mode
input_key += sys.stdin.read(3)
except Exception as e:
errlog.exception(f'{e}')
elif input_key.isprintable(): clean_key = input_key #* Return character if input key is printable
if input_key == "\033": clean_key = "escape"
elif input_key.startswith("\n"): clean_key = "enter"
elif input_key.startswith("\x7f") or input_key.startswith("\x08"): clean_key = "backspace"
elif input_key.isalnum(): clean_key = input_key
else: errlog.info(f'Pressed key: {repr(input_key)}')
if clean_key:
Key.list.append(clean_key) #* Store keys in input queue for later processing
clean_key = ""
Key.new.set() #* Set threading event to interrupt main thread sleep
input_key = ""
sys.stdin.read(100) #* Clear stdin
with Nonblocking(sys.stdin):
sys.stdin.read(10) #* Clear stdin
Key.idle.set()
except Exception as e:
errlog.exception(f'{e}')
clean_quit(1)
def now_sleeping(signum, frame):
"""Reset terminal settings and stop background input read before putting to sleep"""
@ -724,7 +838,8 @@ def clean_quit(errcode: int = 0):
"""Reset terminal settings, save settings to config and stop background input read before quitting"""
Key.stop()
save_config(CONFIG_FILE, config)
print(Term.clear, Term.normal_screen, Term.show_cursor)
#print(Term.clear, Term.normal_screen, Term.show_cursor)
Term.echo(True)
raise SystemExit(errcode)
def calc_sizes():
@ -840,74 +955,85 @@ def draw_bg(now: bool = True):
#* Draw proc box
proc_box = create_box(box_object=proc, line_color=theme.proc_box, fill=True)
Draw.buffer("bg", cpu_box, mem_box, net_box, proc_box)
Draw.buffer("bg", cpu_box, mem_box, net_box, proc_box, Term.fg)
#? Function dependent classes -------------------------------------------------------------------->
class Banner:
out: List[str] = []
c_color: str = ""
length: int = 0
if not out:
for num, (color, line) in enumerate(BANNER_SRC.items()):
if len(line) > length: length = len(line)
out += [""]
line_color = fg(color)
line_dark = fg(f'#{80 - num * 6}')
for letter in line:
if letter == "█" and c_color != line_color:
c_color = line_color
out[num] += line_color
elif letter == " ":
letter = f'{Mv.r(1)}'
elif letter != "█" and c_color != line_dark:
c_color = line_dark
out[num] += line_dark
out[num] += letter
class Meter:
'''Creates a percentage meter
__init__(value, width, color_gradient) to create new meter
__call__(value) to set value and return meter as a string
__str__ returns last set meter as a string
'''
out: str = ""
color_gradient: List[str]
width: int
saved: Dict[int, str] = {}
@classmethod
def draw(cls, line: int, col: int = 0, center: bool = False, now: bool = False):
def __init__(self, value: int, width: int, color_gradient: List[str]):
self.color_gradient = color_gradient
self.width = width
self.out = self._create(value, width, color_gradient)
self.saved[value] = self.out
def __call__(self, value: int):
if value in self.saved.keys():
self.out = self.saved[value]
else:
self.out = self._create(value, self.width, self.color_gradient)
self.saved[value] = self.out
return self.out
def __str__(self):
return self.out
def __repr__(self):
return repr(self.out)
@staticmethod
def _create(value: int, width: int, color_gradient: List[str], add: bool = False):
if value > 100: value = 100
elif value < 0: value = 100
out: str = ""
if center: col = Term.width // 2 - cls.length // 2
for n, o in enumerate(cls.out):
out += f'{Mv.to(line + n, col)}{o}'
out += f'{Term.fg}'
if now: print(out)
else: return out
for i in range(1, width + 1):
if value >= round(i * 100 / width):
out += f'{color_gradient[round(i * 100 / width)]}{Symbol.meter}'
else:
out += theme.inactive_fg(Symbol.meter * (width + 1 - i))
break
else:
out += f'{Term.fg}'
return out
class Meters:
cpu: Meter
mem_used: Meter
mem_available: Meter
mem_cached: Meter
mem_free: Meter
swap_used: Meter
swap_free: Meter
disks_used: Meter
disks_free: Meter
#? Main function --------------------------------------------------------------------------------->
def main():
line: str = ""
this_key: str = ""
count: int = 0
while True:
count += 1
print(f'{Mv.to(1,1)}{Fx.b}{blue("Count:")} {count} {lime("Time:")} {time.strftime("%H:%M:%S", time.localtime())}')
print(f'{fg("#ff")} Width: {Term.width} Height: {Term.height} Resized: {Term.resized}')
while Key.list:
Key.new.clear()
this_key = Key.list.pop()
print(f'{Mv.to(2,1)}{fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key)}{" " * 40}')
if this_key == "backspace":
line = line[:-1]
elif this_key == "enter":
line += "\n"
else:
line += this_key
print(f'{Mv.to(3,1)}{fg("#90ff50")}{Fx.b}Full line= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}')
if this_key == "q":
clean_quit()
if this_key == "R":
raise Exception("Test ERROR")
if not Key.reader.is_alive():
clean_quit(1)
Key.new.wait(1.0)
pass
#? Init ------------------------------------------------------------------------------------------>
#Key.start()
print(Term.alt_screen, Term.clear, Term.hide_cursor)
Term.echo(False)
signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C
Key.start()
CPU_NAME: str = get_cpu_name()
@ -915,6 +1041,8 @@ config: Config = Config(load_config(CONFIG_FILE))
config.proc_per_core = True
#config.color_theme = "solarized_dark"
theme: Theme = Theme(load_theme(config.color_theme))
cpu = Box("cpu", height_p=32, width_p=100)
@ -930,77 +1058,82 @@ orange = theme.available_end
green = theme.cpu_start
dfg = theme.main_fg
def testing_colors():
for item, _ in DEFAULT_THEME.items():
Draw.buffer("testing", Fx.b, getattr(theme, item)(f'{item:<20}'), Fx.ub, f'{"hex=" + getattr(theme, item).hex:<20} dec={getattr(theme, item).dec}\n')
Draw.buffer("+testing", Fx.b, getattr(theme, item)(f'{item:<20}'), Fx.ub, f'{"hex=" + getattr(theme, item).hex:<20} dec={getattr(theme, item).dec}\n')
Draw.out()
print()
print(theme.temp_start, "Hej!\n")
print(Term.fg, "\nHEJ\n")
print(repr(Term.fg), repr(Term.bg))
quit()
def testing_boxes():
calc_sizes()
draw_bg()
Draw.out()
print(Mv.to(35, 1))
def testing_banner():
print(Term.normal_screen, Term.alt_screen)
#Key.start()
calc_sizes()
draw_bg()
Draw.buffer("banner", Banner.draw(18, 45))
Draw.buffer("banner", Banner.draw(18, center=True))
Draw.out()
print(Mv.to(35, 1))
def testing_meter():
Draw.buffer("meters")
for _ in range(10):
Draw.buffer("+meters", "1234567890")
Draw.buffer("+meters", "\n")
korv = Meter(0, Term.width, theme.gradient["cpu"])
quit()
for i in range(0,101, 2):
Draw.buffer("+meters", korv(i), "\n")
Draw.out()
# quit()
# global theme
# path = "/home/gnm/.config/bashtop/themes/"
# for file in os.listdir(path):
# if file.endswith(".theme"):
# theme = Theme(load_theme(path + file))
# draw_bg()
# Draw.out()
# time.sleep(1)
#draw_bg()
#Draw.buffer("banner", Banner.draw(5, center=True))
#Draw.out()
# print(f'\n{Fx.b}Terminal Height={Term.height} Width={Term.width}')
# total_h = total_w = 0
# for box in boxes:
# print(f'\n{getattr(box, "name")} Height={getattr(box, "height")} Width={getattr(box, "width")}')
# total_h += getattr(box, "height")
# total_w += getattr(box, "width")
# print(f'\nTotal Height={cpu.height + net.height + mem.height} Width={net.width + proc.width}')
#Key.stop()
quit()
#testing_colors()
#error_log("/home/gnm/bashtop/misc/error.log")
def testing_keyinput():
line: str = ""
this_key: str = ""
count: int = 0
while True:
count += 1
Draw.buffer("!", f'{Mv.to(1,1)}{Fx.b}{blue("Count:")} {count} {lime("Time:")} {time.strftime("%H:%M:%S", time.localtime())}',
f'{Color.fg("#ff")} Width: {Term.width} Height: {Term.height} Resized: {Term.resized}')
while Key.list:
Key.new.clear()
this_key = Key.list.pop()
Draw.buffer("!", f'{Mv.to(2,1)}{Color.fg("#ff9050")}{Fx.b}Last key= {Term.fg}{Fx.ub}{repr(this_key)}{" " * 40}')
if this_key == "backspace":
line = line[:-1]
elif this_key == "enter":
line += "\n"
else:
line += this_key
Draw.buffer("!", f'{Mv.to(3,1)}{Color.fg("#90ff50")}{Fx.b}Full line= {Term.fg}{Fx.ub}{line}{Fx.bl}| {Fx.ubl}')
if this_key == "q":
clean_quit()
if this_key == "R":
raise Exception("Test ERROR")
if not Key.reader.is_alive():
clean_quit(1)
Key.new.wait(1.0)
try:
testing_banner()
#testing_keyinput()
#testing_banner()
#testing_colors()
#testing_boxes()
testing_meter()
# Draw.idle.clear()
# Key.idle.wait()
# input(f'{Mv.to(Term.height - 5, 1)}Enter to exit')
# Draw.idle.set()
#time.sleep(2)
except Exception as e:
errlog.exception(f'{e}')
clean_quit(1)
quit()
clean_quit()
if __name__ == "__main__":