Add a new DeviceState class to capture metadata from a reading

This commit is contained in:
2026-05-02 18:13:33 +02:00
parent d08e26b0db
commit deb6b2bdcc
+19 -14
View File
@@ -11,12 +11,21 @@ import atexit
import signal import signal
import json import json
import struct import struct
from dataclasses import asdict as dataclass_asdict import serial
from dataclasses import asdict as dataclass_asdict, dataclass
from typing import Any, NoReturn from typing import Any, NoReturn
from bmspy.utilities import debugger from bmspy.utilities import debugger
from bmspy.classes import UPS
from bmspy.jbd_bms import collect_data, initialise_serial from bmspy.jbd_bms import collect_data, initialise_serial
@dataclass
class DeviceState:
ser: serial.Serial
data: UPS | None = None
timestamp: float = 0.0
# Expected kernel log output when the USB-serial adapter is plugged in: # Expected kernel log output when the USB-serial adapter is plugged in:
# usb 1-1.4: new full-speed USB device number 4 using xhci_hcd # usb 1-1.4: new full-speed USB device number 4 using xhci_hcd
# usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00 # usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00
@@ -161,17 +170,13 @@ def main():
debug = args.verbose debug = args.verbose
device_list = args.devices or ["/dev/ttyUSB0"] device_list = args.devices or ["/dev/ttyUSB0"]
ups_devices = {} ups_devices: dict[str, DeviceState] = {}
for device_str in device_list: for device_str in device_list:
name, path = parse_device(device_str) name, path = parse_device(device_str)
if name in ups_devices: if name in ups_devices:
debugger("server: duplicate UPS name '{}', skipping {}".format(name, path)) debugger("server: duplicate UPS name '{}', skipping {}".format(name, path))
continue continue
ups_devices[name] = { ups_devices[name] = DeviceState(ser=initialise_serial(path, debug))
"ser": initialise_serial(path, debug),
"data": None,
"timestamp": 0,
}
if debug > 0: if debug > 0:
print("server: registered UPS '{}' on {}".format(name, path)) print("server: registered UPS '{}' on {}".format(name, path))
@@ -297,17 +302,17 @@ def main():
if debug > 0: if debug > 0:
debugger( debugger(
"reading data for '{}', timestamp={}, time={}".format( "reading data for '{}', timestamp={}, time={}".format(
name, device["timestamp"], time.time() name, device.timestamp, time.time()
) )
) )
# only get new data five seconds after the last read # only get new data five seconds after the last read
if device["timestamp"] <= time.time() - 5: if device.timestamp <= time.time() - 5:
device["data"] = None device.data = None
while not device["data"]: while not device.data:
device["data"] = collect_data(device["ser"], debug) device.data = collect_data(device.ser, debug)
time.sleep(1) time.sleep(1)
device["timestamp"] = time.time() device.timestamp = time.time()
result[name] = device["data"] result[name] = device.data
send_response(connection, result, client, debug) send_response(connection, result, client, debug)