diff --git a/bpytop.py b/bpytop.py index 95774d8..e853a7a 100755 --- a/bpytop.py +++ b/bpytop.py @@ -25,6 +25,7 @@ from distutils.util import strtobool from string import Template from math import ceil from random import randint +from shutil import which from typing import List, Set, Dict, Tuple, Optional, Union, Any, Callable, ContextManager, Iterable errors: List[str] = [] @@ -630,10 +631,11 @@ class Draw: * .out(clear=False) : Print all strings in buffer, clear=True clear all buffers after * .now(*args) : Prints all arguments as a string * .clear(*names) : Clear named buffers, all if no argument + * .last_screen() : Prints all saved buffers ''' strings: Dict[str, str] = {} z_order: Dict[str, int] = {} - last_screen: str = "" + last: Dict[str, str] = {} idle = threading.Event() idle.set() @@ -667,37 +669,47 @@ class Draw: if now: cls.now(string) @classmethod - def out(cls, *names: str, clear = False, no_order: bool = False): + def out(cls, *names: str, clear = False): out: str = "" if not cls.strings: return - if no_order: - cls.last_screen = "".join(cls.strings.values()) - if clear: cls.strings = {} - cls.now(cls.last_screen) - elif names: + if names: for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): if name in names: out += cls.strings[name] + cls.last[name] = out if clear: del cls.strings[name] del cls.z_order[name] cls.now(out) else: for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): - if name in cls.strings: out += cls.strings[name] + if name in cls.strings: + out += cls.strings[name] + cls.last[name] = out if clear: cls.strings = {} - cls.last_screen = out - cls.now(cls.last_screen) + cls.now(out) @classmethod - def clear(cls, *names): + def last_screen(cls) -> str: + out: str = "" + for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): + if name in cls.last: + out += cls.last[name] + return out + + + @classmethod + def clear(cls, *names, last: bool = False): if names: for name in names: if name in cls.strings: del cls.strings[name] - del cls.z_order[name] + if last: + del cls.z_order[name] + if name in cls.last: del cls.last[name] else: cls.strings = {} + if last: cls.last = {} class Color: '''Holds representations for a 24-bit color value @@ -1152,6 +1164,7 @@ class Box: def draw_bg(cls, now: bool = True): '''Draw all boxes outlines and titles''' Draw.buffer("bg!" if now else "bg", "".join(sub._draw_bg() for sub in cls.__subclasses__())) # type: ignore + if now: Draw.clear("bg") class SubBox: box_x: int = 0 @@ -1166,6 +1179,7 @@ class CpuBox(Box, SubBox): y = 1 height_p = 32 width_p = 100 + redraw: bool = True @classmethod def _calc_size(cls): @@ -1193,7 +1207,8 @@ class CpuBox(Box, SubBox): @classmethod def _draw_fg(cls, cpu): - Draw.buffer("cpu", f'Cpu usage: {cpu.cpu_usage}\nCpu freq: {cpu.cpu_freq}\nLoad avg: {cpu.load_avg}\n') + Draw.buffer("cpu", f'{Mv.to(1,1)}Cpu usage: {cpu.cpu_usage}\nCpu freq: {cpu.cpu_freq}\nLoad avg: {cpu.load_avg}\ + \nTemps: {CpuCollector.cpu_temp}\n') class MemBox(Box): name = "mem" @@ -1204,6 +1219,7 @@ class MemBox(Box): divider: int = 0 mem_width: int = 0 disks_width: int = 0 + redraw: bool = True @classmethod def _calc_size(cls): @@ -1230,6 +1246,7 @@ class NetBox(Box, SubBox): width_p = 45 x = 1 y = 1 + redraw: bool = True @classmethod def _calc_size(cls): @@ -1258,6 +1275,7 @@ class ProcBox(Box): detailed_y: int = 0 detailed_width: int = 0 detailed_height: int = 8 + redraw: bool = True @classmethod def _calc_size(cls): @@ -1290,6 +1308,7 @@ class Collector: collect_done = threading.Event() collect_queue: List = [] collect_interrupt: bool = False + draw_block: bool = False @classmethod def start(cls): @@ -1320,8 +1339,8 @@ class Collector: collector = cls.collect_queue.pop() collector._collect() collector._draw() - if cls.draw_now: - Draw.out() + if cls.draw_now and not cls.draw_block: + Draw.out("cpu", "mem", "net", "proc") cls.collect_idle.set() cls.collect_done.set() @@ -1353,29 +1372,121 @@ class CpuCollector(Collector): '''Collects cpu usage for cpu and cores, cpu frequency, load_avg _collect(): Collects data _draw(): calls CpuBox._draw_fg()''' - cpu_usage: List[List[float]] = [[]] - for i in range(THREADS): + cpu_usage: List[List[int]] = [[]] + cpu_temp: List[List[int]] = [[]] + cpu_temp_high: int = 0 + cpu_temp_crit: int = 0 + for _ in range(THREADS): cpu_usage.append([]) + cpu_temp.append([]) cpu_freq: int = 0 load_avg: List[float] = [] + @staticmethod + def _get_sensors() -> str: + '''Check if we can get cpu temps and return method of getting temps''' + if SYSTEM == "MacOS": + try: + if which("osx-cpu-temp") and subprocess.check_output("osx-cpu-temp", text=True).rstrip().endswith("°C"): + return "osx-cpu-temp" + except: pass + elif hasattr(psutil, "sensors_temperatures"): + try: + temps = psutil.sensors_temperatures() + if temps: + for _, entries in temps.items(): + for entry in entries: + if entry.label.startswith(("Package", "Core 0", "Tdie")): + return "psutil" + except: pass + try: + if SYSTEM == "Linux" and which("vcgencmd") and subprocess.check_output("vcgencmd measure_temp", text=True).rstrip().endswith("'C"): + return "vcgencmd" + except: pass + return "" + + sensor_method: str = _get_sensors.__func__() # type: ignore + got_sensors: bool = True if sensor_method else False + @classmethod def _collect(cls): cls.cpu_usage[0].append(round(psutil.cpu_percent(percpu=False))) for n, thread in enumerate(psutil.cpu_percent(percpu=True), start=1): cls.cpu_usage[n].append(round(thread)) - - if len(cls.cpu_usage[0]) > Term.width: - try: - for n in range(THREADS + 1): - del cls.cpu_usage[n][0] - except IndexError: - pass + if len(cls.cpu_usage[n]) > Term.width * 2: + del cls.cpu_usage[n][0] cls.cpu_freq = round(psutil.cpu_freq().current) cls.load_avg = [round(lavg, 2) for lavg in os.getloadavg()] + if CONFIG.check_temp and cls.got_sensors: + cls._collect_temps() + + @classmethod + def _collect_temps(cls): + temp: int + cores: List[int] = [] + cpu_type: str = "" + if cls.sensor_method == "psutil": + for _, entries in psutil.sensors_temperatures().items(): + for entry in entries: + if entry.label.startswith(("Package", "Tdie")): + cpu_type = "ryzen" if entry.label.startswith("Package") else "ryzen" + if not cls.cpu_temp_high: + cls.cpu_temp_high, cls.cpu_temp_crit = round(entry.high), round(entry.critical) + temp = round(entry.current) + elif entry.label.startswith(("Core", "Tccd")): + if not cpu_type: + cpu_type = "other" + if not cls.cpu_temp_high: + cls.cpu_temp_high, cls.cpu_temp_crit = round(entry.high), round(entry.critical) + temp = round(entry.current) + cores.append(round(entry.current)) + if len(cores) < THREADS: + if cpu_type == "intel" or (cpu_type == "other" and len(cores) == THREADS // 2): + cls.cpu_temp[0].append(temp) + for n, t in enumerate(cores, start=1): + cls.cpu_temp[n].append(t) + cls.cpu_temp[THREADS // 2 + n].append(t) + elif cpu_type == "ryzen" or cpu_type == "other": + cls.cpu_temp[0].append(temp) + if len(cores) < 1: cores.append(temp) + z = 1 + for t in cores: + for i in range(THREADS // len(cores)): + cls.cpu_temp[z + i].append(t) + z += i + else: + cores.insert(0, temp) + for n, t in enumerate(cores): + cls.cpu_temp[n].append(t) + + else: + try: + if cls.sensor_method == "osx-cpu-temp": + temp = round(float(subprocess.check_output("osx-cpu-temp", text=True).rstrip().rstrip("°C"))) + elif cls.sensor_method == "vcgencmd": + temp = round(float(subprocess.check_output("vcgencmd measure_temp", text=True).rstrip().rstrip("'C"))) + except Exception as e: + errlog.exception(f'{e}') + cls.got_sensors = False + CONFIG.check_temp = False + CpuBox._calc_size() + else: + for n in range(THREADS + 1): + cls.cpu_temp[n].append(temp) + + if len(cls.cpu_temp[0]) > 5: + for n in range(len(cls.cpu_temp)): + del cls.cpu_temp[n][0] + + + + + + + @classmethod def _draw(cls): CpuBox._draw_fg(cls) @@ -1384,11 +1495,15 @@ class CpuCollector(Collector): @timerd def testing_collectors(): - Collector.collect() - Collector.collect_done.wait() - #sleep(2) - #Collector.collect(CpuCollector) - #Draw.now(f'Cpu usage: {CpuCollector.cpu_usage}\nCpu freq: {CpuCollector.cpu_freq}\nLoad avg: {CpuCollector.load_avg}\n') + + Box.draw_bg() + + for _ in range(1): + Collector.collect(CpuCollector) + Collector.collect_done.wait() + sleep(1) + #Draw.now(f'Cpu usage: {CpuCollector.cpu_usage}\nCpu freq: {CpuCollector.cpu_freq}\nLoad avg: {CpuCollector.load_avg}\n\ + # Temps: {CpuCollector.cpu_temp}\n') class Menu: @@ -1421,7 +1536,10 @@ def get_cpu_name() -> str: command ="sysctl hw.model" rem_line = "hw.model" - cmd_out = subprocess.check_output("LANG=C " + command, shell=True, universal_newlines=True) + try: + cmd_out = subprocess.check_output("LANG=C " + command, shell=True, universal_newlines=True) + except: + pass if rem_line: for line in cmd_out.split("\n"): if rem_line in line: @@ -1564,7 +1682,7 @@ def main(): CPU_NAME: str = get_cpu_name() -testing = False #! Remove +testing = True #! Remove #! For testing -------------------------------------------------------------------------------> @@ -1883,11 +2001,11 @@ if __name__ == "__main__": if testing: try: #testing_graphs() - #testing_collectors() + testing_collectors() #testing_humanizer() # waitone(1) #testing_keyinput() - testing_banner() + #testing_banner() # waitone(1) #testing_colors() # waitone(1)