Files
bmspy/bmspy/server.py
T
tim f2ffc4568a Add multi-device support
server.py
  - --device is now repeatable (-d ups1:/dev/ttyUSB0 -d ups2:/dev/ttyUSB1). Bare paths (/dev/ttyUSB0) auto-name from the last path component (ttyUSB0).
  - Maintains {name: {ser, data, timestamp}} per UPS — each device has independent data freshness.
  - GET response is now {ups_name: JBDUPS}. Accepts optional ups key in the request to return only one.

client.py
  - read_data() gains ups=None parameter — pass a name to filter server-side, or omit for all.
  - Always returns {ups_name: JBDUPS}.

influxdb.py
  - influxdb_create_snapshot() iterates {name: JBDUPS} and tags every InfluxDB point with ups=name.
  - influxdb_export() / bmspy-influxdb gain --ups to export only a specific UPS.

__init__.py
  - bmspy CLI gains --ups to display only a named UPS.
  - Displays each UPS under a === name === header.
2026-05-02 17:40:31 +02:00

331 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Daemon: listens on a Unix socket and serves JBD BMS data to clients
#
import os
import sys
import stat
import time
import atexit
import signal
import json
import struct
from dataclasses import asdict as dataclass_asdict
from bmspy.jbd_ups import collect_data, initialise_serial
# Expected kernel log output when the USB-serial adapter is plugged in:
# usb 1-1.4: new full-speed USB device number 4 using xhci_hcd
# usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00
# usb 1-1.4: New USB device strings: Mfr=1, Product=2, SerialNumber=3
# usb 1-1.4: Product: FT232R USB UART
# usb 1-1.4: Manufacturer: FTDI
# usb 1-1.4: SerialNumber: AQ00QFHZ
# usbcore: registered new interface driver usbserial_generic
# usbserial: USB Serial support registered for generic
# usbcore: registered new interface driver ftdi_sio
# usbserial: USB Serial support registered for FTDI USB Serial Device
# ftdi_sio 1-1.4:1.0: FTDI USB Serial Device converter detected
# usb 1-1.4: Detected FT232RL
# usb 1-1.4: FTDI USB Serial Device converter now attached to ttyUSB0
connected_clients = list()
def signalHandler():
raise SystemExit("terminating")
def socket_cleanup(socket_path, debug=0):
os.unlink(socket_path)
def read_request(connection, debug=0):
# get length of expected json string
request = bytes()
try:
request = connection.recv(struct.calcsize("!I"))
except Exception as e:
raise OSError("unable to read request length from socket: {}".format(e))
try:
length = struct.unpack("!I", request)[0]
except Exception as e:
raise Exception("unable to determine request length: {}".format(e))
if debug > 4:
print("socket: incoming length: {}, encoded as {}".format(length, request))
# read length bytes
try:
request = connection.recv(length)
except Exception as e:
raise OSError("unable to read socket: {}".format(e))
if debug > 3:
print("socket: incoming request: {}".format(request))
try:
request_data = json.loads(request)
except Exception as e:
raise Exception("unable to read incoming request: {}".format(e))
if debug > 2:
print("socket: received {!r}".format(request_data))
return request_data
def send_response(connection, response_data, client, debug=0):
if debug > 2:
print("socket: sending {!r}".format(response_data))
try:
response = json.dumps(response_data).encode()
response = json.dumps(
response_data,
default=lambda o: {k: dataclass_asdict(v) for k, v in o.items()}
if hasattr(o, "items") and not isinstance(o, dict)
else str(o),
).encode()
# add length to the start of the json string, so we know how much to read on the other end
length = struct.pack("!I", len(response))
if debug > 4:
print("socket: sending {} data of length: {}".format(client, length))
response = length + response
if debug > 3:
print("socket: outgoing response: {}".format(response))
return connection.sendall(response)
except Exception as e:
raise OSError("unable to encode response: {}".format(e))
def parse_device(device_str):
"""Parse 'name:/dev/path' or '/dev/path' into (name, path)."""
if not device_str.startswith("/") and ":" in device_str:
name, path = device_str.split(":", 1)
return name, path
name = device_str.split("/")[-1]
return name, device_str
def main():
import argparse
import socket
import pwd
import grp
signal.signal(signal.SIGTERM, signalHandler)
parser = argparse.ArgumentParser(
description="Query JBD BMS and report status",
add_help=True,
)
parser.add_argument(
"--device",
"-d",
dest="devices",
action="append",
default=None,
metavar="[NAME:]/dev/PATH",
help="USB device to read (may be specified multiple times; optionally prefixed with name:)",
)
parser.add_argument(
"--socket",
"-s",
dest="socket",
action="store",
default="/run/bmspy/bms",
help="Socket to communicate with daemon",
)
parser.add_argument(
"--user",
"-u",
dest="uid_name",
action="store",
default="nobody",
help="Run daemon as user",
)
parser.add_argument(
"--group",
"-g",
dest="gid_name",
action="store",
default="dialout",
help="Run daemon as group",
)
parser.add_argument(
"--verbose",
"-v",
action="count",
default=0,
help="Print more verbose information (can be specified multiple times)",
)
args = parser.parse_args()
debug = args.verbose
device_list = args.devices or ["/dev/ttyUSB0"]
ups_devices = {}
for device_str in device_list:
name, path = parse_device(device_str)
if name in ups_devices:
print("server: duplicate UPS name '{}', skipping {}".format(name, path))
continue
ups_devices[name] = {
"ser": initialise_serial(path, debug),
"data": None,
"timestamp": 0,
}
if debug > 0:
print("server: registered UPS '{}' on {}".format(name, path))
if debug > 0:
print("Running BMS query daemon on socket {}".format(args.socket))
socket_dir = os.path.dirname(args.socket)
socket_dir_created = False
if not os.path.isdir(socket_dir):
os.makedirs(socket_dir, exist_ok=True)
socket_dir_created = True
starting_uid = os.getuid()
starting_gid = os.getgid()
if starting_uid == 0:
running_uid = pwd.getpwnam(args.uid_name)[2]
running_gid = grp.getgrnam(args.gid_name)[2]
if socket_dir_created:
os.chown(socket_dir, running_uid, running_gid)
os.chmod(
socket_dir,
stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IXUSR
| stat.S_IRGRP
| stat.S_IWGRP
| stat.S_IXGRP
| stat.S_IXGRP
| stat.S_IROTH
| stat.S_IXOTH,
)
new_umask = 0o003
old_umask = os.umask(new_umask)
if debug > 1:
print(
"socket: old umask: %s, new umask: %s"
% (oct(old_umask), oct(new_umask))
)
try:
os.setgid(running_gid)
except OSError as e:
print("could not set effective group id: {}".format(e))
try:
os.setuid(running_uid)
except OSError as e:
print("could not set effective user id: {}".format(e))
final_uid = os.getuid()
final_gid = os.getgid()
if debug > 0:
print(
"socket: running as {}:{}".format(
pwd.getpwuid(final_uid)[0], grp.getgrgid(final_gid)[0]
)
)
if os.path.exists(args.socket):
raise OSError("socket {} already exists; exiting...".format(args.socket))
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if debug > 2:
print("starting up on {}".format(args.socket))
sock.bind(args.socket)
atexit.register(socket_cleanup, args.socket, debug)
sock.listen(1)
while True:
connection = None
client_address = None
try:
if debug > 2:
print("socket: waiting for a connection")
connection, client_address = sock.accept()
request_data = dict()
try:
request_data = read_request(connection, debug)
except Exception as e:
print("socket ERROR: {}".format(e))
continue
client = request_data["client"] or "unknown"
match request_data["command"]:
case "REGISTER":
connected_clients.append(client)
send_response(
connection,
{"status": "REGISTERED", "client": client},
client,
debug,
)
case "DEREGISTER":
try:
connected_clients.remove(client)
except Exception:
pass
send_response(
connection,
{"status": "DEREGISTERED", "client": client},
client,
debug,
)
case "GET":
ups_filter = request_data.get("ups")
targets = (
{ups_filter: ups_devices[ups_filter]}
if ups_filter and ups_filter in ups_devices
else ups_devices
)
result = {}
for name, device in targets.items():
if debug > 0:
print(
"reading data for '{}', timestamp={}, time={}".format(
name, device["timestamp"], time.time()
)
)
# only get new data five seconds after the last read
if device["timestamp"] <= time.time() - 5:
device["data"] = None
while not device["data"]:
device["data"] = collect_data(device["ser"], debug)
time.sleep(1)
device["timestamp"] = time.time()
result[name] = device["data"]
send_response(connection, result, client, debug)
case _:
print(
"socket: invalid request from {}".format(request_data["client"])
)
break
except KeyboardInterrupt:
if connection:
connection.close()
sys.exit(1)
finally:
if connection:
connection.close()
if __name__ == "__main__":
main()