"""Post-process HIOKI 3193 CSV data: remove rows affected by auto-range transitions.""" from __future__ import annotations import argparse import csv import sys # Values above this threshold are instrument error codes (blank, over-range, etc.) ERROR_THRESHOLD = 1e90 # Number of rows to discard AFTER the last error/range-change row SETTLE_ROWS = 3 def postprocess(input_path: str, output_path: str, settle: int) -> None: with open(input_path, newline="") as f: reader = csv.reader(f) header = next(reader) rows = list(reader) # Find range columns range_cols = [i for i, h in enumerate(header) if h.endswith("_range")] # Find measurement columns (everything between timestamp and first range col) first_range = range_cols[0] if range_cols else len(header) meas_cols = list(range(1, first_range)) total = len(rows) kept = [] discarded = 0 discard_remaining = 0 prev_ranges = None for row in rows: # Current ranges ranges = tuple(row[i] for i in range_cols) if range_cols else () # Check for error values in measurement columns has_error = False for ci in meas_cols: try: if abs(float(row[ci])) > ERROR_THRESHOLD: has_error = True break except (ValueError, IndexError): pass # Detect range change range_changed = prev_ranges is not None and ranges != prev_ranges if has_error or range_changed: # Reset settle counter on every bad row or range change discard_remaining = settle discarded += 1 prev_ranges = ranges continue if discard_remaining > 0: discard_remaining -= 1 discarded += 1 prev_ranges = ranges continue kept.append(row) prev_ranges = ranges # Write cleaned CSV with open(output_path, "w", newline="") as f: writer = csv.writer(f) writer.writerow(header) writer.writerows(kept) print(f"Input: {total} rows") print(f"Discarded: {discarded} rows (errors + {settle} settle rows after each transition)") print(f"Output: {len(kept)} rows -> {output_path}") def visualize(input_path: str) -> None: """Plot cleaned CSV data with subplots grouped by measurement type.""" import matplotlib.pyplot as plt from datetime import datetime with open(input_path, newline="") as f: reader = csv.DictReader(f) rows = list(reader) if not rows: print("No data to plot.") return # Parse timestamps as elapsed seconds from first reading times = [] t0 = None for row in rows: t = datetime.strptime(row["timestamp"], "%Y-%m-%d %H:%M:%S") if t0 is None: t0 = t times.append((t - t0).total_seconds()) # Find measurement columns (exclude timestamp and range columns) all_cols = list(rows[0].keys()) meas_cols = [c for c in all_cols if c != "timestamp" and not c.endswith("_range")] # Group by type voltage = [c for c in meas_cols if c.startswith("U")] current = [c for c in meas_cols if c.startswith("I")] power = [c for c in meas_cols if c.startswith("P")] eff = [c for c in meas_cols if c.startswith("EFF")] other = [c for c in meas_cols if c not in voltage + current + power + eff] groups: list[tuple[str, list[str], str]] = [] if voltage: groups.append(("Voltage", voltage, "V")) if current: groups.append(("Current", current, "A")) if power: groups.append(("Power", power, "W")) if eff: groups.append(("Efficiency", eff, "%")) if other: groups.append(("Other", other, "")) n_plots = len(groups) fig, axes = plt.subplots(n_plots, 1, figsize=(14, 3.5 * n_plots), squeeze=False) fig.suptitle(f"HIOKI 3193-10 — {input_path}", fontsize=14, fontweight="bold") axes = axes.flatten() for ax, (title, cols, unit) in zip(axes, groups): for col in cols: vals = [float(row[col]) for row in rows] ax.plot(times, vals, label=col, linewidth=1.2) ax.set_ylabel(f"{title} ({unit})" if unit else title) ax.set_xlabel("Time (s)") ax.set_title(title, fontsize=11) ax.grid(True, alpha=0.3) ax.legend(loc="upper left", fontsize=9) fig.tight_layout() plt.show() def main() -> None: parser = argparse.ArgumentParser( description="Clean and visualize HIOKI 3193 CSV data.", ) sub = parser.add_subparsers(dest="command") # clean p_clean = sub.add_parser("clean", help="Remove auto-range transition artifacts") p_clean.add_argument("input", help="Input CSV file from hioki monitor/live") p_clean.add_argument( "-o", "--output", help="Output CSV file (default: _clean.csv)", ) p_clean.add_argument( "-s", "--settle", type=int, default=SETTLE_ROWS, help=f"Number of extra rows to discard after last error/range-change (default: {SETTLE_ROWS})", ) # plot p_plot = sub.add_parser("plot", help="Visualize CSV data") p_plot.add_argument("input", help="CSV file to plot (use cleaned data for best results)") args = parser.parse_args() if args.command == "clean": output = args.output if not output: base = args.input.rsplit(".", 1) output = f"{base[0]}_clean.{base[1]}" if len(base) == 2 else f"{args.input}_clean" postprocess(args.input, output, args.settle) elif args.command == "plot": visualize(args.input) else: parser.print_help() if __name__ == "__main__": main()