From f2ffc4568aa7eb38dd92b485a72a99d13c968e71 Mon Sep 17 00:00:00 2001 From: Timothy Allen Date: Sat, 2 May 2026 09:41:07 +0200 Subject: [PATCH] Add multi-device support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit server.py - --device is now repeatable (-d ups1:/dev/ttyUSB0 -d ups2:/dev/ttyUSB1). Bare paths (/dev/ttyUSB0) auto-name from the last path component (ttyUSB0). - Maintains {name: {ser, data, timestamp}} per UPS — each device has independent data freshness. - GET response is now {ups_name: JBDUPS}. Accepts optional ups key in the request to return only one. client.py - read_data() gains ups=None parameter — pass a name to filter server-side, or omit for all. - Always returns {ups_name: JBDUPS}. influxdb.py - influxdb_create_snapshot() iterates {name: JBDUPS} and tags every InfluxDB point with ups=name. - influxdb_export() / bmspy-influxdb gain --ups to export only a specific UPS. __init__.py - bmspy CLI gains --ups to display only a named UPS. - Displays each UPS under a === name === header. --- README.md | 41 ++++++++++++++++---- bmspy/__init__.py | 39 +++++++++++-------- bmspy/client.py | 64 ++++++++++++++++++------------ bmspy/influxdb.py | 84 ++++++++++++++++++++++------------------ bmspy/server.py | 99 +++++++++++++++++++++++++++++------------------ 5 files changed, 203 insertions(+), 124 deletions(-) diff --git a/README.md b/README.md index a2baadd..d7ebdf0 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,9 @@ bmspy is a tool to get information from a [xiaoxiang-type](https://www.lithiumbatterypcb.com/product/4s-or-3s-12v-li-ion-or-lifepo4-battery-smart-bms-with-bluetooth-function-uart-and-rs485-communication-with-60a-to-120a-constant-current/?attribute_specification-selection=4S+Lifepo4+120A+with+UART+and+RS485) BMS system, using some sort of serial connection. -It can display the information as text, in JSON, or export the data continuously to a Prometheus exporter. +It can display the information as text, in JSON, or export the data continuously to InfluxDB or a Prometheus exporter. + +Multiple BMS/UPS devices can be connected at once. Each is identified by a name, and data from all of them (or just one) can be pushed to InfluxDB or Prometheus in the same connection, with each measurement tagged with the UPS name. To install: git clone https://git.treehouse.org.za/tim/bmspy @@ -12,17 +14,42 @@ To install: Or, to install with influxdb and/or prometheus support: poetry install -E influxdb -E prometheus -To run: - poetry run bmspyd & +## Running the server + +The server daemon reads from one or more serial devices and makes the data available over a Unix socket. + +Single device (defaults to `/dev/ttyUSB0`): + + poetry run bmspy-server + +Multiple devices, with optional names (default name is derived from the device path, e.g. `ttyUSB0`): + + poetry run bmspy-server -d network:/dev/ttyUSB0 -d nas:/dev/ttyUSB1 + +To run via systemd, copy `bmspy-server.service` to `/etc/systemd/system`, adjust `WorkingDirectory` and the `ExecStart` line as needed, then enable and start it: -Or to run via systemd, copy bmspyd.service to /etc/systemd/system, adjust WorkingDirectory to point to the installation location, and enable and start the service: cp bmspy-server.service /etc/systemd/system $EDITOR /etc/systemd/system/bmspy-server.service systemctl daemon-reload systemctl enable bmspy-server systemctl start bmspy-server -To run a client to get the data, choose one of the following options: - poetry run bmspy - poetry run bmspy-influxdb --url ... +## Running a client +To print a summary of all connected UPSes: + + poetry run bmspy + +To show only a specific UPS: + + poetry run bmspy --ups network + +To push data for all UPSes to InfluxDB (each measurement is tagged `ups=`): + + poetry run bmspy-influxdb --url https://influx.example.com --org myorg --token mytoken + +To push data for a single UPS only: + + poetry run bmspy-influxdb --ups network --url ... + +InfluxDB connection details can also be supplied via environment variables (`INFLUXDB_V2_URL`, `INFLUXDB_V2_ORG`, `INFLUXDB_V2_TOKEN`) instead of command-line flags. diff --git a/bmspy/__init__.py b/bmspy/__init__.py index 3916392..8f1154d 100644 --- a/bmspy/__init__.py +++ b/bmspy/__init__.py @@ -12,10 +12,10 @@ def parse_args(): description='Query JBD BMS and report status', add_help=True, ) - parser.add_argument('--device', '-d', dest='device', action='store', - default='/dev/ttyUSB0', help='USB device to read') parser.add_argument('--socket', '-s', dest='socket', action='store', default='/run/bmspy/bms', help='Socket to communicate with daemon') + parser.add_argument('--ups', dest='ups', action='store', default=None, + help='Only show data for this UPS name (default: all)') parser.add_argument('--json', '-j', dest='report_json', action='store_true', default=False, help='Report data as JSON') parser.add_argument('--prometheus', '-p', dest='report_prometheus', action='store_true', @@ -44,11 +44,11 @@ def main(): try: args = parse_args() - debug=args.verbose + debug = args.verbose if args.report_influxdb: num_args = 0 - for arg in [ args.influx_url, args.influx_org, args.influx_token ]: + for arg in [args.influx_url, args.influx_org, args.influx_token]: if arg is not False: num_args += 1 if num_args != 0 and num_args != 3: @@ -60,16 +60,19 @@ def main(): if args.report_influxdb: from bmspy import influxdb as bms_influx - - bms_influx.influxdb_export(bucket=args.influx_bucket, \ - url=args.influx_url, \ - org=args.influx_org, \ - token=args.influx_token, \ - debug=debug, \ - daemonize=True) + + bms_influx.influxdb_export( + bucket=args.influx_bucket, + url=args.influx_url, + org=args.influx_org, + token=args.influx_token, + ups=args.ups, + debug=debug, + daemonize=True, + ) elif args.report_textfile: - from bmspy import promethus + from bmspy import prometheus prometheus.prometheus_export(daemonize=False, filename=args.report_textfile, debug=debug) else: @@ -77,20 +80,22 @@ def main(): client.handle_registration(args.socket, 'bmspy', debug) atexit.register(client.handle_registration, args.socket, 'bmspy', debug) - data = client.read_data(args.socket, 'bmspy') + # {ups_name: JBDUPS} + data = client.read_data(args.socket, 'bmspy', ups=args.ups, debug=debug) if args.report_json: - print(json.dumps(data)) + import json + print(json.dumps({name: dict(ups.items()) for name, ups in data.items()}, default=str)) elif args.report_print: pp = pprint.PrettyPrinter(indent=4) - pp.pprint(data) + for ups_name, ups_data in data.items(): + print("=== {} ===".format(ups_name)) + pp.pprint(ups_data) except KeyboardInterrupt as e: - bms.cleanup() print(e) - if __name__ == '__main__': main() diff --git a/bmspy/client.py b/bmspy/client.py index 5e41cef..0366679 100644 --- a/bmspy/client.py +++ b/bmspy/client.py @@ -1,30 +1,35 @@ # # Library with socket client for use by consumers # -import atexit, os, sys -import struct, json +import sys +import struct +import json import socket is_registered = False + + def handle_registration(socket_path, client_name, debug=0): global is_registered data = dict() if is_registered: - message = {'command': 'DEREGISTER', 'client': client_name} + message = {"command": "DEREGISTER", "client": client_name} else: # fork server if it's not already running - message = {'command': 'REGISTER', 'client': client_name} + message = {"command": "REGISTER", "client": client_name} try: data = socket_comms(socket_path, message, debug) - if data['status'] == 'REGISTERED': + if data["status"] == "REGISTERED": is_registered = True - elif data['status'] == 'DEREGISTERED': + elif data["status"] == "DEREGISTERED": is_registered = False else: - raise OSError("{} registration: invalid response: {}".format(client_name, data)) + raise OSError( + "{} registration: invalid response: {}".format(client_name, data) + ) except Exception as e: if is_registered: @@ -37,13 +42,13 @@ def handle_registration(socket_path, client_name, debug=0): def socket_comms(socket_path, request_data, debug=0): response_data = dict() - + # Create a UDS socket sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # Connect the socket to the port where the server is listening if debug > 2: - print('socket client: connecting to {}'.format(socket_path)) + print("socket client: connecting to {}".format(socket_path)) try: sock.connect(socket_path) except socket.error as msg: @@ -54,54 +59,65 @@ def socket_comms(socket_path, request_data, debug=0): # Send request if debug > 2: - print('socket client: sending {!r}'.format(request_data)) + print("socket client: sending {!r}".format(request_data)) request = bytes() try: request = json.dumps(request_data).encode() # add length to the start of the json string, so we know how much to read on the other end - length = struct.pack('!I', len(request)) + length = struct.pack("!I", len(request)) if debug > 3: - print("socket client: outgoing request length: {}, encoded as {}".format(len(request), length)) + print( + "socket client: outgoing request length: {}, encoded as {}".format( + len(request), length + ) + ) request = length + request if debug > 4: print("socket client: outgoing request: {}".format(request)) - except: + except Exception: print("socket client ERROR: unable to encode request") sys.exit(1) sock.sendall(request) - + # get length of expected json string - response = sock.recv(struct.calcsize('!I')) + response = sock.recv(struct.calcsize("!I")) try: - length = struct.unpack('!I', response)[0] + length = struct.unpack("!I", response)[0] if debug > 4: - print("socket client: incoming length: {}, encoded as {}".format(length, response)) + print( + "socket client: incoming length: {}, encoded as {}".format( + length, response + ) + ) # read length bytes response = sock.recv(length) if debug > 3: print("socket client: incoming response: {}".format(response)) response_data = json.loads(response) - except: + except Exception: print("socket client ERROR: unable to decode response") sys.exit(1) if debug > 2: - print('socket client: received {!r}'.format(response_data)) + print("socket client: received {!r}".format(response_data)) sock.close() return response_data -def read_data(socket_path, client_name, debug=0): - data = dict() +def read_data(socket_path, client_name, ups=None, debug=0): + """Return {ups_name: JBDUPS} for all UPSes, or just the named one.""" + request = {"command": "GET", "client": client_name} + if ups is not None: + request["ups"] = ups - data = socket_comms(socket_path, {'command': 'GET', 'client': client_name}, debug) + data = socket_comms(socket_path, request, debug) if data is None: - raise + raise RuntimeError("No data received from daemon") return data -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(0) diff --git a/bmspy/influxdb.py b/bmspy/influxdb.py index c782261..90fbb3b 100644 --- a/bmspy/influxdb.py +++ b/bmspy/influxdb.py @@ -10,7 +10,7 @@ def influx_shutdown(influxclient): influxclient.close() -def influxdb_export(bucket, url=None, org=None, token=None, socket_path=None, daemonize=True, debug=0): +def influxdb_export(bucket, url=None, org=None, token=None, socket_path=None, ups=None, daemonize=True, debug=0): if not url: url = os.environ["INFLUXDB_V2_URL"] org = os.environ.get("INFLUXDB_V2_ORG") @@ -21,21 +21,21 @@ def influxdb_export(bucket, url=None, org=None, token=None, socket_path=None, da if daemonize: while True: - data = client.read_data(socket_path, 'influxdb') + data = client.read_data(socket_path, 'influxdb', ups=ups) influxdb_write_snapshot(influxclient, bucket, data, debug) time.sleep(DAEMON_UPDATE_PERIOD) else: - data = client.read_data(socket_path, 'influxdb') + data = client.read_data(socket_path, 'influxdb', ups=ups) influxdb_write_snapshot(influxclient, bucket, data, debug) influxclient.close() atexit.unregister(influx_shutdown) -def influxdb_write_snapshot(influxclient, bucket, data, debug=0): +def influxdb_write_snapshot(influxclient, bucket, ups_data, debug=0): if debug > 1: print("influxdb: creating snapshot") - points = influxdb_create_snapshot(data, debug) + points = influxdb_create_snapshot(ups_data, debug) if debug > 1: print("influxdb: writing snapshot") try: @@ -44,51 +44,56 @@ def influxdb_write_snapshot(influxclient, bucket, data, debug=0): print(e) -def influxdb_create_snapshot(data, debug=0): +def influxdb_create_snapshot(ups_data, debug=0): + """Build InfluxDB points from {ups_name: JBDUPS}, tagging each point with the UPS name.""" points = [] now = datetime.datetime.now(datetime.timezone.utc) - for kind, contains in data.items(): - helpmsg = contains.get('help') or '' - units = contains.get('units') + for ups_name, data in ups_data.items(): + for kind, contains in data.items(): + helpmsg = contains.get('help') or '' + units = contains.get('units') - if contains.get('raw_value') is not None: - value = contains.get('raw_value') - if debug > 2: - print("value: {} : {}".format(kind, value)) - points.append( - Point(kind) - .tag("units", units) - .tag("help", helpmsg) - .field("value", value) - .time(now) - ) - - if contains.get('raw_values') is not None and isinstance(contains.get('raw_values'), dict): - label = contains.get('label') - for idx, label_value in contains.get('raw_values').items(): + if contains.get('raw_value') is not None: + value = contains.get('raw_value') if debug > 2: - print("labels: {} [{}] : {}".format(kind, idx, label_value)) + print("value: {} [{}] : {}".format(kind, ups_name, value)) points.append( Point(kind) - .tag(label, idx) + .tag("ups", ups_name) .tag("units", units) .tag("help", helpmsg) - .field("value", label_value) + .field("value", value) .time(now) ) - if contains.get('info') is not None: - value = contains.get('info') - if debug > 2: - print("info: {} : {}".format(kind, value)) - points.append( - Point(kind) - .tag("units", units) - .tag("help", helpmsg) - .field("value", value) - .time(now) - ) + if contains.get('raw_values') is not None and isinstance(contains.get('raw_values'), dict): + label = contains.get('label') + for idx, label_value in contains.get('raw_values').items(): + if debug > 2: + print("labels: {} [{}][{}] : {}".format(kind, ups_name, idx, label_value)) + points.append( + Point(kind) + .tag("ups", ups_name) + .tag(label, idx) + .tag("units", units) + .tag("help", helpmsg) + .field("value", label_value) + .time(now) + ) + + if contains.get('info') is not None: + value = contains.get('info') + if debug > 2: + print("info: {} [{}] : {}".format(kind, ups_name, value)) + points.append( + Point(kind) + .tag("ups", ups_name) + .tag("units", units) + .tag("help", helpmsg) + .field("value", value) + .time(now) + ) return points @@ -110,6 +115,8 @@ def main(): default=False, help='Set the influx token when sending data to influxdb (overrides INFLUXDB environment variables)') parser.add_argument('--socket', '-s', dest='socket', action='store', default='/run/bmspy/bms', help='Socket to communicate with daemon') + parser.add_argument('--ups', dest='ups', action='store', default=None, + help='Only export data for this UPS name (default: all)') parser.add_argument('--verbose', '-v', action='count', default=0, help='Print more verbose information (can be specified multiple times)') args = parser.parse_args() @@ -138,6 +145,7 @@ def main(): org=args.influx_org or None, token=args.influx_token or None, socket_path=args.socket, + ups=args.ups, daemonize=True, debug=debug, ) diff --git a/bmspy/server.py b/bmspy/server.py index 2382c12..823f3d8 100755 --- a/bmspy/server.py +++ b/bmspy/server.py @@ -30,7 +30,6 @@ from bmspy.jbd_ups import collect_data, initialise_serial # usb 1-1.4: FTDI USB Serial Device converter now attached to ttyUSB0 connected_clients = list() -current_data = None def signalHandler(): @@ -72,12 +71,7 @@ def read_request(connection, debug=0): return request_data -def send_response(connection, response_data, debug=0): - try: - client = response_data.client - except AttributeError: - client = response_data.get("client", "unknown client") - +def send_response(connection, response_data, client, debug=0): if debug > 2: print("socket: sending {!r}".format(response_data)) try: @@ -100,6 +94,15 @@ def send_response(connection, response_data, debug=0): raise OSError("unable to encode response: {}".format(e)) +def parse_device(device_str): + """Parse 'name:/dev/path' or '/dev/path' into (name, path).""" + if not device_str.startswith("/") and ":" in device_str: + name, path = device_str.split(":", 1) + return name, path + name = device_str.split("/")[-1] + return name, device_str + + def main(): import argparse import socket @@ -108,9 +111,6 @@ def main(): signal.signal(signal.SIGTERM, signalHandler) - global current_data - timestamp = 0 - parser = argparse.ArgumentParser( description="Query JBD BMS and report status", add_help=True, @@ -118,10 +118,11 @@ def main(): parser.add_argument( "--device", "-d", - dest="device", - action="store", - default="/dev/ttyUSB0", - help="USB device to read", + dest="devices", + action="append", + default=None, + metavar="[NAME:]/dev/PATH", + help="USB device to read (may be specified multiple times; optionally prefixed with name:)", ) parser.add_argument( "--socket", @@ -158,11 +159,24 @@ def main(): debug = args.verbose + device_list = args.devices or ["/dev/ttyUSB0"] + ups_devices = {} + for device_str in device_list: + name, path = parse_device(device_str) + if name in ups_devices: + print("server: duplicate UPS name '{}', skipping {}".format(name, path)) + continue + ups_devices[name] = { + "ser": initialise_serial(path, debug), + "data": None, + "timestamp": 0, + } + if debug > 0: + print("server: registered UPS '{}' on {}".format(name, path)) + if debug > 0: print("Running BMS query daemon on socket {}".format(args.socket)) - ser = initialise_serial(args.device) - socket_dir = os.path.dirname(args.socket) socket_dir_created = False if not os.path.isdir(socket_dir): @@ -251,7 +265,10 @@ def main(): case "REGISTER": connected_clients.append(client) send_response( - connection, {"status": "REGISTERED", "client": client}, debug + connection, + {"status": "REGISTERED", "client": client}, + client, + debug, ) case "DEREGISTER": @@ -260,32 +277,38 @@ def main(): except Exception: pass send_response( - connection, {"status": "DEREGISTERED", "client": client}, debug - ) - - send_response( - connection, {"status": "DEREGISTERED", "client": client}, debug + connection, + {"status": "DEREGISTERED", "client": client}, + client, + debug, ) case "GET": - timestamp = 0 - if bool(current_data) is True: - timestamp = current_data.get("timestamp", 0) - print( - "reading data, current timestamp is {}, time is {}".format( - timestamp, time.time() - ) + ups_filter = request_data.get("ups") + targets = ( + {ups_filter: ups_devices[ups_filter]} + if ups_filter and ups_filter in ups_devices + else ups_devices ) - # only get new data five seconds after the last read - if timestamp <= time.time() - 5: - current_data = None - while bool(current_data) is False: - current_data = collect_data(ser, debug) - time.sleep(1) - current_data["timestamp"] = time.time() - current_data["client"] = client - send_response(connection, current_data, debug) + result = {} + for name, device in targets.items(): + if debug > 0: + print( + "reading data for '{}', timestamp={}, time={}".format( + name, device["timestamp"], time.time() + ) + ) + # only get new data five seconds after the last read + if device["timestamp"] <= time.time() - 5: + device["data"] = None + while not device["data"]: + device["data"] = collect_data(device["ser"], debug) + time.sleep(1) + device["timestamp"] = time.time() + result[name] = device["data"] + + send_response(connection, result, client, debug) case _: print(