"""Read-only telemetry measurement display.""" from textual.widgets import Static from ..protocol import TelemetryData class TelemetryPanel(Static): """Displays all telemetry values.""" DEFAULT_CSS = """ TelemetryPanel { width: 100%; height: auto; min-height: 20; padding: 1; } """ ALPHA = 0.05 # EMA filter coefficient DT_BREAKPOINTS = [0, 3000, 5000, 10000, 20000, 30000, 45000] DT_DEFAULTS = [25, 20, 20, 20, 15, 15] DT_PARAM_IDS = [0x60, 0x61, 0x62, 0x63, 0x64, 0x65] def __init__(self): super().__init__("Waiting for telemetry...") self._data: TelemetryData | None = None self._vin_f: float = 0.0 self._vout_f: float = 0.0 self._iin_f: float = 0.0 self._iout_f: float = 0.0 self._vfly_f: float = 0.0 self._seeded: bool = False self.filter_enabled: bool = True self._dt_values: list[int] = list(self.DT_DEFAULTS) def update_telemetry(self, t: TelemetryData): self._data = t # EMA filter on V/I if not self._seeded: self._vin_f = t.vin self._vout_f = t.vout self._iin_f = t.iin self._iout_f = t.iout self._vfly_f = t.vfly self._seeded = True else: a = self.ALPHA self._vin_f += a * (t.vin - self._vin_f) self._vout_f += a * (t.vout - self._vout_f) self._iin_f += a * (t.iin - self._iin_f) self._iout_f += a * (t.iout - self._iout_f) self._vfly_f += a * (t.vfly - self._vfly_f) # Compute power and efficiency from filtered or raw values if self.filter_enabled: v_in, v_out, i_in, i_out = self._vin_f, self._vout_f, self._iin_f, self._iout_f else: v_in, v_out, i_in, i_out = t.vin, t.vout, t.iin, t.iout p_in = v_in * (-i_in) / 1e6 p_out = v_out * i_out / 1e6 if p_in > 0.1: eta = p_out / p_in * 100.0 else: eta = 0.0 # Compute active deadtime from iout using same lookup as ISR active_dt = self._dt_values[0] for i in range(len(self.DT_BREAKPOINTS) - 1, -1, -1): if i_out >= self.DT_BREAKPOINTS[i]: active_dt = self._dt_values[min(i, len(self._dt_values) - 1)] break lines = [ "[bold]MEASUREMENTS[/bold]", "", f" vin : {self._vin_f if self.filter_enabled else t.vin:8.0f} mV", f" vout : {self._vout_f if self.filter_enabled else t.vout:8.0f} mV", f" iin : {self._iin_f if self.filter_enabled else t.iin:8.0f} mA", f" iout : {self._iout_f if self.filter_enabled else t.iout:8.0f} mA", f" P_IN : {p_in:8.2f} W", f" P_OUT : {p_out:8.2f} W", f" EFF : {eta:8.1f} %" if p_in > 0.1 else " EFF : --- %", f" vfly : {self._vfly_f if self.filter_enabled else t.vfly:8.0f} mV", f" etemp : {t.etemp:8.1f} C", f" deadtime : {active_dt:8d} ticks", f" : {active_dt / 1.36:8.1f} ns", "", f" last_tmp : {t.last_tmp:8d}", f" VREF : {t.VREF:8d}", f" vfly_corr : {t.vfly_correction:8d}", "", f" vfly_int : {t.vfly_integral:10.3f}", f" vfly_avg : {t.vfly_avg_debug:10.1f}", f" cc.out_f : {t.cc_output_f:10.1f}", f" mppt.iref : {t.mppt_iref:8.0f} mA", f" mppt.vin : {t.mppt_last_vin:8.0f}", f" mppt.iin : {t.mppt_last_iin:8.0f}", ] self.update("\n".join(lines)) def update_dt_param(self, param_id: int, value: float): if param_id in self.DT_PARAM_IDS: idx = self.DT_PARAM_IDS.index(param_id) self._dt_values[idx] = int(value)