diff --git a/README.md b/README.md index 6b4f1cc..fc87404 100644 --- a/README.md +++ b/README.md @@ -13,4 +13,16 @@ Or, to install with influxdb and/or prometheus support: poetry install -E influxdb -E prometheus To run: + poetry run bmspyd & + +Or to run via systemd, copy bmspyd.service to /etc/systemd/system, adjust WorkingDirectory to point to the installation location, and enable and start the service: + cp bmspy-server.service /etc/systemd/system + $EDITOR /etc/systemd/system/bmspy-server.service + systemctl daemon-reload + systemctl enable bmspy-server + systemctl start bmspy-server + +To run a client to get the data, choose one of the following options: poetry run bmspy + poetry run bmspy-influxdb --url ... + diff --git a/bmspy-influxdb.service b/bmspy-influxdb.service new file mode 100644 index 0000000..56ddeaf --- /dev/null +++ b/bmspy-influxdb.service @@ -0,0 +1,13 @@ +[Unit] +Description=BMS tracking via InfluxDB +Requires=bmspy-server.service + +[Service] +Type=exec +WorkingDirectory=/usr/local/bmspy +ExecStart=poetry run bmspy-influxdb --url http://localhost/ --org 0987654321 --bucket bms --token XXXX +RestartSec=5 +Restart=on-failure + +[Install] +WantedBy=default.target diff --git a/bmspy-server.service b/bmspy-server.service new file mode 100644 index 0000000..159ebcc --- /dev/null +++ b/bmspy-server.service @@ -0,0 +1,14 @@ +[Unit] +Description=Make BMS data available for consumers + +[Service] +Type=exec +# The full path of the git repo on your system +WorkingDirectory=/usr/local/bmspy +ExecStart=poetry run bmspy-server +RestartSec=5 +Restart=on-failure + + +[Install] +WantedBy=default.target diff --git a/bmspy/__init__.py b/bmspy/__init__.py index e69de29..3916392 100644 --- a/bmspy/__init__.py +++ b/bmspy/__init__.py @@ -0,0 +1,96 @@ +# +# bmspy +# +import atexit +import argparse +import pprint +import time + + +def parse_args(): + parser = argparse.ArgumentParser( + description='Query JBD BMS and report status', + add_help=True, + ) + parser.add_argument('--device', '-d', dest='device', action='store', + default='/dev/ttyUSB0', help='USB device to read') + parser.add_argument('--socket', '-s', dest='socket', action='store', + default='/run/bmspy/bms', help='Socket to communicate with daemon') + parser.add_argument('--json', '-j', dest='report_json', action='store_true', + default=False, help='Report data as JSON') + parser.add_argument('--prometheus', '-p', dest='report_prometheus', action='store_true', + default=False, help='Daemonize and report data to Prometheus') + parser.add_argument('--file', '-f', dest='report_textfile', type=str, action='store', + default=False, help='Report data to Prometheus using textfile ') + parser.add_argument('--influxdb', '-i', dest='report_influxdb', action='store_true', + default=False, help='Daemonize and report data to InfluxDB using INFLUXDB_V2_URL, INFLUXDB_V2_ORG and INFLUXDB_V2_TOKEN environment variables') + parser.add_argument('--bucket', '-b', dest='influx_bucket', type=str, action='store', + default="ups", help='Set the bucket name when sending data to influxdb (defaults to "ups")') + parser.add_argument('--url', '-u', dest='influx_url', type=str, action='store', + default=False, help='Set the URL when sending data to influxdb (overrides INFLUXDB environment variables)') + parser.add_argument('--org', '-o', dest='influx_org', type=str, action='store', + default=False, help='Set the influx organization when sending data to influxdb (overrides INFLUXDB environment variables)') + parser.add_argument('--token', '-t', dest='influx_token', type=str, action='store', + default=False, help='Set the influx token when sending data to influxdb (overrides INFLUXDB environment variables)') + parser.add_argument('--print', dest='report_print', action='store_true', + default=True, help='Report data as text') + parser.add_argument('--verbose', '-v', action='count', + default=0, help='Print more verbose information (can be specified multiple times)') + args = parser.parse_args() + return args + + +def main(): + try: + args = parse_args() + + debug=args.verbose + + if args.report_influxdb: + num_args = 0 + for arg in [ args.influx_url, args.influx_org, args.influx_token ]: + if arg is not False: + num_args += 1 + if num_args != 0 and num_args != 3: + raise argparse.ArgumentTypeError('Missing value for --url, --org or --token') + + if args.report_prometheus: + from bmspy import prometheus + prometheus.prometheus_export(daemonize=True, debug=debug) + + if args.report_influxdb: + from bmspy import influxdb as bms_influx + + bms_influx.influxdb_export(bucket=args.influx_bucket, \ + url=args.influx_url, \ + org=args.influx_org, \ + token=args.influx_token, \ + debug=debug, \ + daemonize=True) + + elif args.report_textfile: + from bmspy import promethus + prometheus.prometheus_export(daemonize=False, filename=args.report_textfile, debug=debug) + + else: + from bmspy import client + client.handle_registration(args.socket, 'bmspy', debug) + atexit.register(client.handle_registration, args.socket, 'bmspy', debug) + + data = client.read_data(args.socket, 'bmspy') + + if args.report_json: + print(json.dumps(data)) + + elif args.report_print: + pp = pprint.PrettyPrinter(indent=4) + pp.pprint(data) + + except KeyboardInterrupt as e: + bms.cleanup() + print(e) + + + +if __name__ == '__main__': + main() diff --git a/bmspy/client.py b/bmspy/client.py new file mode 100644 index 0000000..e7ef98f --- /dev/null +++ b/bmspy/client.py @@ -0,0 +1,98 @@ +# +# Library with socket client for use by consumers +# +import atexit, os, sys +import pickle, struct +import socket + + +is_registered = False +def handle_registration(socket_path, client_name, debug=0): + global is_registered + data = dict() + + if is_registered: + message = {'command': 'DEREGISTER', 'client': client_name} + else: + # fork server if it's not already running + message = {'command': 'REGISTER', 'client': client_name} + + try: + data = socket_comms(socket_path, message, debug) + if data['status'] == 'REGISTERED': + is_registered = True + elif data['status'] == 'DEREGISTERED': + is_registered = False + else: + raise OSError("{} registration: invalid response: {}".format(client_name, data)) + + except Exception as e: + if is_registered: + print("{}: failed to register with daemon: {}".format(client_name, e)) + else: + print("{}: failed to deregister with daemon: {}".format(client_name, e)) + + return data + + +def socket_comms(socket_path, request_data, debug=0): + response_data = dict() + + # Create a UDS socket + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + # Connect the socket to the port where the server is listening + if debug > 2: + print('socket client: connecting to {}'.format(socket_path)) + try: + sock.connect(socket_path) + except socket.error as msg: + if msg.errno == 2: + print("Failed to connect to bmspy daemon") + else: + print("socket client: {}".format(msg)) + + # Send request + if debug > 2: + print('socket client: sending {!r}'.format(request_data)) + request = pickle.dumps(request_data) + # add length to the start of the pickled bytes, so we know how much to read on the other end + length = struct.pack('!I', len(request)) + if debug > 3: + print("socket client: outgoing request length: {}, encoded as {}".format(len(request), length)) + request = length + request + if debug > 4: + print("socket client: outgoing request: {}".format(request)) + sock.sendall(request) + + # get length of expected pickled bytes + response = sock.recv(struct.calcsize('!I')) + length = struct.unpack('!I', response)[0] + if debug > 4: + print("socket client: incoming length: {}, encoded as {}".format(length, response)) + # read length bytes + response = sock.recv(length) + if debug > 3: + print("socket client: incoming response: {}".format(response)) + response_data = pickle.loads(response) + if debug > 2: + print('socket client: received {!r}'.format(response_data)) + + sock.close() + + return response_data + + +def read_data(socket_path, client_name, debug=0): + data = dict() + + data = socket_comms(socket_path, {'command': 'GET', 'client': client_name}, debug) + + if data is None: + raise + + return data + + +if __name__ == '__main__': + sys.exit(0) diff --git a/bmspy/influxdb.py b/bmspy/influxdb.py new file mode 100644 index 0000000..6cbbb7c --- /dev/null +++ b/bmspy/influxdb.py @@ -0,0 +1,168 @@ +import atexit, datetime, os, sys, time +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.write_api import SYNCHRONOUS +from bmspy import client + +DAEMON_UPDATE_PERIOD = 30 + +''' Close remote connections ''' +def influx_shutdown(influxclient): + if influxclient is not None: + influxclient.close() + +def writeapi_shutdown(writeapi): + if writeapi is not None: + writeapi.close() + + +def influxdb_export(bucket, url=None, org=None, token=None, socket_path=None, daemonize=True, debug=0): + data = dict() + + if url: + influxclient = InfluxDBClient(url=url, token=token, org=org) + else: + influxclient = InfluxDBClient.from_env_properties() + atexit.register(influx_shutdown, influxclient) + + if daemonize: + while True: + # collect new data, delay, and start again + data = client.read_data(socket_path, 'influxdb') + influxdb_write_snapshot(influxclient, bucket, data, debug) + time.sleep(DAEMON_UPDATE_PERIOD) + else: + data = client.read_data(socket_path, 'influxdb') + influxdb_write_snapshot(influxclient, bucket, data, debug) + + influxclient.close() + atexit.unregister(influx_shutdown) + + return + +def influxdb_write_snapshot(influxclient, bucket, data, debug=0): + writeapi = influxclient.write_api(write_options=SYNCHRONOUS) + atexit.register(writeapi_shutdown, writeapi) + + # Populate the data structure this period + if debug > 1: + print("influxdb: creating snapshot") + points = influxdb_create_snapshot(data, debug) + if debug > 1: + print("influxdb: writing snapshot") + try: + writeapi.write(bucket=bucket, record=points) + except Exception as e: + print(e) + + writeapi.close() + atexit.unregister(writeapi_shutdown) + + return + +def influxdb_create_snapshot(data, debug=0): + points = [] + now = datetime.datetime.now(datetime.timezone.utc).isoformat() + + for kind, contains in data.items(): + # discard bmspy metadata + if kind == 'client': + break + + helpmsg = '' + if contains.get('help'): + helpmsg = contains.get('help') + units = None + if contains.get('units'): + units = contains.get('units') + # Simple values + value = None + if contains.get('raw_value') is not None: + value = contains.get('raw_value') + if debug > 2: + print("value: {} : {}".format(kind, value)); + point = Point(kind) \ + .tag("units", units) \ + .tag("help", helpmsg) \ + .field("value", value) \ + .time(now) + points.append(point) + # Doesn't have a value, but multiple values, each with a label: + label = None + if contains.get('raw_values') is not None and isinstance(contains.get('raw_values'), dict): + label = contains.get('label') + for idx, label_value in contains.get('raw_values').items(): + if debug > 2: + print("labels: {} [{}] : {}".format(kind, idx, label_value)); + point = Point(kind) \ + .tag(label, idx) \ + .tag("units", units) \ + .tag("help", helpmsg) \ + .field("value", label_value) \ + .time(now) + points.append(point) + # Information (like a manufacturing date or a serial number) + if contains.get('info') is not None: + value = contains.get('info') + if debug > 2: + print("info: {} : {}".format(kind, value)); + point = Point(kind) \ + .tag("units", units) \ + .tag("help", helpmsg) \ + .field("value", value) \ + .time(now) + points.append(point) + else: + pass + + return points + + +def main(): + import argparse + + parser = argparse.ArgumentParser( + description='Query JBD BMS and report status', + add_help=True, + ) + parser.add_argument('--bucket', '-b', dest='influx_bucket', type=str, action='store', + default="ups", help='Set the bucket name when sending data to influxdb (defaults to "ups")') + parser.add_argument('--url', '-u', dest='influx_url', type=str, action='store', + default=False, help='Set the URL when sending data to influxdb (overrides INFLUXDB environment variables)') + parser.add_argument('--org', '-o', dest='influx_org', type=str, action='store', + default=False, help='Set the influx organization when sending data to influxdb (overrides INFLUXDB environment variables)') + parser.add_argument('--token', '-t', dest='influx_token', type=str, action='store', + default=False, help='Set the influx token when sending data to influxdb (overrides INFLUXDB environment variables)') + parser.add_argument('--socket', '-s', dest='socket', action='store', + default='/run/bmspy/bms', help='Socket to communicate with daemon') + parser.add_argument('--verbose', '-v', action='count', + default=0, help='Print more verbose information (can be specified multiple times)') + args = parser.parse_args() + + debug=args.verbose + + try: + if not os.getenv('INFLUXDB_V2_URL') and not args.influx_url: + raise argparse.ArgumentTypeError('Missing value for --url') + if not os.getenv('INFLUXDB_V2_ORG') and not args.influx_org: + raise argparse.ArgumentTypeError('Missing value for --url') + if not os.getenv('INFLUXDB_V2_TOKEN') and not args.influx_token: + raise argparse.ArgumentTypeError('Missing value for --token') + except Exception as e: + print("bmspy-influxdb: {}".format(e)) + sys.exit(1) + + print("Running BMS influxdb daemon on socket {}".format(args.socket)) + + client.handle_registration(args.socket, 'influxdb', debug) + atexit.register(client.handle_registration, args.socket, 'influxdb', debug) + + influxdb_export(bucket=args.influx_bucket, \ + url=args.influx_url, \ + org=args.influx_org, \ + token=args.influx_token, \ + socket_path=args.socket, \ + daemonize=True, \ + debug=debug) + +if __name__ == "__main__": + main() diff --git a/bmspy/prometheus.py b/bmspy/prometheus.py new file mode 100644 index 0000000..598c034 --- /dev/null +++ b/bmspy/prometheus.py @@ -0,0 +1,90 @@ +import prometheus_client + +def prometheus_export(daemonize=True, filename=None): + global debug + if not can_export_prometheus: + raise ModuleNotFoundError("Unable to export to Prometheus. Is prometheus-client installed?") + + data = dict() + # Initialize data structure, to fill in help values + while bool(data) is False: + data = collect_data() + time.sleep(1) + + registry = prometheus_client.CollectorRegistry(auto_describe=True) + # Set up the metric data structure for Prometheus + metric = prometheus_create_metric(registry, data) + # Populate the metric data structure this period + prometheus_populate_metric(metric, data) + + if daemonize: + prometheus_client.start_http_server(9999, registry=registry) + + while True: + # Delay, collect new data, and start again + time.sleep(DAEMON_UPDATE_PERIOD) + # Reset data, so it is re-populated correctly + data = dict() + while bool(data) is False: + data = collect_data() + time.sleep(1) + prometheus_populate_metric(metric, data) + prometheus_client.generate_latest(registry) + else: + if filename is None: + print("Invalid filename supplied"); + return False + prometheus_client.write_to_textfile(filename, registry=registry) + return True + +def prometheus_create_metric(registry, data): + metric = dict() + for name, contains in data.items(): + helpmsg = '' + if contains.get('help') is not None: + helpmsg = contains.get('help') + if contains.get('units'): + helpmsg += ' (' + contains.get('units') + ')' + if contains.get('value') is not None: + metric[name] = prometheus_client.Gauge(name, helpmsg, registry=registry) + # Has multiple values, each a different label + elif contains.get('values') is not None: + if contains.get('label') is None: + print("ERROR: no label for {0} specified".format(name)) + label = contains.get('label') + metric[name] = prometheus_client.Gauge(name, helpmsg, [label], registry=registry) + elif contains.get('info') is not None: + metric[name] = prometheus_client.Info(name, helpmsg, registry=registry) + else: + pass + return metric + +def prometheus_populate_metric(metric, data): + for name, contains in data.items(): + if contains.get('value') is not None: + value = contains.get('value') + metric[name].set(value) + # doesn't have a value, but has [1-4]: + if contains.get('values') is not None and isinstance(contains.get('values'), dict): + for idx, label_value in contains.get('values').items(): + metric[name].labels(idx).set(label_value) + if contains.get('info'): + value = contains.get('info') + metric[name].info({name: value}) + else: + pass + + +# TODO fork bms daemon if need be? + +def main(): + print("TODO. At present, run from bmspy directly.") + +# influxdb_export(bucket=args.influx_bucket, \ +# url=args.influx_url, \ +# org=args.influx_org, \ +# token=args.influx_token, \ +# daemonize=True) + +if __name__ == "__main__": + main() diff --git a/bmspy/bms.py b/bmspy/server.py similarity index 56% rename from bmspy/bms.py rename to bmspy/server.py index 27ad150..d697246 100755 --- a/bmspy/bms.py +++ b/bmspy/server.py @@ -3,38 +3,16 @@ # Communicate with a JBD/SZLLT BMS and return basic information # in order to shutdown equipment when voltage levels drop or remaining # capacity gets low -# TODO: scripts to shutdown NAS when voltage goes below 13.xV or -# percent_capacity goes below 25% # -import argparse -import atexit +import os, sys, stat, time import json +import atexit, signal +import serial, serial.rs485 +import pickle, struct import pprint -import serial -import serial.rs485 -import signal -import sys -import time -try: - import prometheus_client - can_export_prometheus = True -except: - can_export_prometheus = False -try: - from influxdb_client import InfluxDBClient, Point - from influxdb_client.client.write_api import SYNCHRONOUS - import datetime - can_export_influxdb = True - influxclient = None - writeapi = None -except: - can_export_influxdb = False - influxclient = None -DAEMON_UPDATE_PERIOD = 30 -SERIALPORT = "/dev/ttyUSB0" -#SERIALPORT = "/dev/rfcomm1" -BAUDRATE = 9600 +connected_clients = list() +current_data = dict() # usb 1-1.4: new full-speed USB device number 4 using xhci_hcd # usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00 @@ -51,37 +29,38 @@ BAUDRATE = 9600 # usb 1-1.4: FTDI USB Serial Device converter now attached to ttyUSB0 # usb 1-1.4: usbfs: interface 0 claimed by ftdi_sio while 'python3' sets config #1 -# TODO: ensure ttyUSB0 points to idVendor=0403, idProduct=6001 -# with serial.tools.list_ports.ListPortInfo -# python3 -m serial.tools.list_ports USB -ser = serial.Serial(SERIALPORT, BAUDRATE) -ser.parity = serial.PARITY_NONE # set parity check: no parity -ser.bytesize = serial.EIGHTBITS # number of bits per bytes -ser.stopbits = serial.STOPBITS_ONE # number of stop bits -ser.timeout = 1 # timeout block read -ser.writeTimeout = 1 # timeout for write +# Catch systemd signals +def signalHandler(): + raise SystemExit('terminating') + +''' Clean up socket ''' +def socket_cleanup(socket_path, debug=0): + os.unlink(socket_path) ''' Clean up serial port ''' -def cleanup(): - global debug - global ser +def serial_cleanup(ser, debug=0): if debug > 2: - print("Cleaning up...") + print("serial: cleaning up...") if ser.is_open: ser.reset_input_buffer() # flush input buffer, discarding all its contents ser.reset_output_buffer() # flush output buffer, aborting current output ser.close() -''' Close remote connections ''' -def shutdown(): - global debug - global influxclient - if influxclient is not None: - if writeapi is not None: - writeapi.close() - influxclient.close() +def initialise_serial(device, debug=0): + # TODO: ensure ttyUSB0 points to idVendor=0403, idProduct=6001 + # with serial.tools.list_ports.ListPortInfo + # python3 -m serial.tools.list_ports USB + ser = serial.Serial(device, baudrate=9600) + ser.parity = serial.PARITY_NONE # set parity check: no parity + ser.bytesize = serial.EIGHTBITS # number of bits per bytes + ser.stopbits = serial.STOPBITS_ONE # number of stop bits + ser.timeout = 1 # timeout block read + ser.writeTimeout = 1 # timeout for write + + atexit.register(serial_cleanup, ser, debug) + return ser def calculate_checksum(msg): checksum = '' @@ -120,18 +99,16 @@ def bytes_to_date(high, low): return "{:04d}-{:02d}-{:02d}".format(year, mon, day) # takes a serial object and a message, returns a response -def requestMessage(reqmsg): - global ser - global debug +def requestMessage(ser, reqmsg, debug=0): if debug > 2: - print('Starting Up Serial Monitor') + print('serial: starting up monitor') if ser.is_open: ser.close() try: ser.open() except Exception as e: - print("error open serial port: {0}".format(str(e))) + print("serial: error open port: {0}".format(str(e))) return False if ser.is_open: @@ -142,41 +119,40 @@ def requestMessage(reqmsg): ser.reset_input_buffer() # flush input buffer, discarding all its contents ser.reset_output_buffer() # flush output buffer, aborting current output if debug > 0: - print("Write data: {0}".format("".join('0x{:02x} '.format(x) for x in reqmsg))) + print("serial: write data: {0}".format("".join('0x{:02x} '.format(x) for x in reqmsg))) w = ser.write(reqmsg) if debug > 2: - print("Bytes written: {0}".format(w)) + print("serial: bytes written: {0}".format(w)) #time.sleep(1) if w != len(reqmsg): - print("ERROR: {0} bytes written, {1} expected.".format(w, len(reqmsg))) + print("serial ERROR: {0} bytes written, {1} expected.".format(w, len(reqmsg))) return False wait_time = 0 while ser.in_waiting == 0: # Return an empty string if we end up waiting too long if wait_time > 2: - cleanup() + serial_cleanup(ser, debug) return '' if debug > 2: - print("Waiting for data...") + print("serial: waiting for data...") time.sleep(0.5) wait_time += 1 if debug > 1: - print("Awaiting reading: {0}".format(ser.in_waiting)) + print("serial: waiting reading: {0}".format(ser.in_waiting)) response = ser.read_until(b'\x77') # Return an empty string if the read timed out or returned nothing if len(response) == 0: return '' if debug > 0: - print("Read data: {0}".format(response.hex())) - cleanup() + print("serial: read data: {0}".format(response.hex())) + serial_cleanup(ser, debug) return response except Exception as e: - print("error communicating...: {0}".format(str(e))) + print("serial: error communicating: {0}".format(str(e))) else: - print("cannot open serial port") + print("serial: cannot open port") -def parse_03_response(response): - global debug +def parse_03_response(response, debug=0): data = dict() # Response is 34 bytes: # 00 begin: \xDD @@ -189,16 +165,16 @@ def parse_03_response(response): # length+5 checksum # length+6 end \x77 if bytes([response[0]]) != b'\xdd': - print("ERROR: first byte not found: {0}".format(response[0])) + print("parse_03_response ERROR: first byte not found: {0}".format(response[0])) return False if bytes([response[2]]) == b'\x80': - print("ERROR: error byte returned from BMS: {0}".format(response[2])) + print("parse_03_response ERROR: error byte returned from BMS: {0}".format(response[2])) return False data_len = response[3] if debug > 2: - print("Data length (trimming 4 bytes): {0}".format(data_len)) + print("parse_03_response: data length (trimming 4 bytes): {0}".format(data_len)) # The checksum is two bytes, offset by data_len + 4 # Five bytes at the front of data @@ -207,11 +183,11 @@ def parse_03_response(response): first = data_len + 4 second = data_len + 5 if second > len(response): - print("ERROR: primary response checksum not found") + print("parse_03_response ERROR: primary response checksum not found") return False; checksum = bytes([response[first], response[second]]) if not verify_checksum(response[3:first], checksum): - print("ERROR: failed to validate received checksum") + print("parse_03_response ERROR: failed to validate received checksum") return False if data_len > 0: @@ -222,7 +198,7 @@ def parse_03_response(response): data['bms_voltage_total_volts']['value'] = "{:.2f}".format(vtot) data['bms_voltage_total_volts']['units'] = "V" if debug > 1: - print("Total voltage: {:.2f}V".format(vtot)) + print(" Total voltage: {:.2f}V".format(vtot)) current = bytes_to_digits(response[6], response[7]) current = convert_to_signed(current) * 0.01 @@ -232,7 +208,7 @@ def parse_03_response(response): data['bms_current_amps']['value'] = "{:.2f}".format(current) data['bms_current_amps']['units'] = "A" if debug > 1: - print("Current: {:.2f}A".format(current)) + print(" Current: {:.2f}A".format(current)) res_cap = bytes_to_digits(response[8], response[9]) * 0.01 nom_cap = bytes_to_digits(response[10], response[11]) * 0.01 @@ -247,8 +223,8 @@ def parse_03_response(response): data['bms_capacity_nominal_ah']['value'] = "{:.2f}".format(nom_cap) data['bms_capacity_nominal_ah']['units'] = "Ah" if debug > 1: - print("Remaining capacity: {:.2f}Ah".format(res_cap)) - print("Nominal capacity: {:.2f}Ah".format(nom_cap)) + print(" Remaining capacity: {:.2f}Ah".format(res_cap)) + print(" Nominal capacity: {:.2f}Ah".format(nom_cap)) cycle_times = bytes_to_digits(response[12], response[13]) data['bms_charge_cycles'] = dict() @@ -256,14 +232,14 @@ def parse_03_response(response): data['bms_charge_cycles']['raw_value'] = cycle_times data['bms_charge_cycles']['value'] = "{0}".format(cycle_times) if debug > 1: - print("Cycle times: {0}".format(cycle_times)) + print(" Cycle times: {0}".format(cycle_times)) man_date = bytes_to_date(response[14], response[15]) data['bms_manufacture_date'] = dict() data['bms_manufacture_date']['help'] = "Date of Manufacture" data['bms_manufacture_date']['info'] = "{0}".format(man_date) if debug > 1: - print("Manufacturing date: {0}".format(man_date)) + print(" Manufacturing date: {0}".format(man_date)) cells = response[25] # 4S data['bms_cell_number'] = dict() @@ -271,7 +247,7 @@ def parse_03_response(response): data['bms_cell_number']['raw_value'] = cells data['bms_cell_number']['value'] = "{0}".format(cells) if debug > 1: - print("Cells: {0}S".format(cells)) + print(" Cells: {0}S".format(cells)) balance_state_high = bytes_to_digits(response[16], response[17]) # 1S to 16S balance_state_low = bytes_to_digits(response[18], response[19]) # 17S to 32S @@ -314,7 +290,7 @@ def parse_03_response(response): data['bms_cells_balancing']['raw_values'][cell+1] = bool((state >> g) & b) data['bms_cells_balancing']['values'][cell+1] = "{0}".format(int(bool((state >> g) & b))) if debug > 1: - print("Balancing cell {0}: {1}".format(cell, bool((state >> g & b)))) + print(" Balancing cell {0}: {1}".format(cell, bool((state >> g & b)))) protection_state = bytes_to_digits(response[20], response[21]) sop = protection_state & 1 @@ -383,20 +359,20 @@ def parse_03_response(response): data['bms_protection_slmos_bool']['raw_value'] = bool(slm) data['bms_protection_slmos_bool']['value'] = "{0}".format(int(bool(slm))) if debug > 2: - print("Protection state: {0}".format(protection_state)) - print("Single overvoltage protection: {0}".format(bool(sop))) - print("Single undervoltage protection: {0}".format(bool(sup))) - print("Whole group overvoltage protection: {0}".format(bool(gop))) - print("Whole group undervoltage protection: {0}".format(bool(gup))) - print("Charging over-temperature protection: {0}".format(bool(cotp))) - print("Charging under-temperature protection: {0}".format(bool(cutp))) - print("Discharging over-temperature protection: {0}".format(bool(dotp))) - print("Discharging under-protection: {0}".format(bool(dutp))) - print("Charging over-current protection: {0}".format(bool(cocp))) - print("Discharging over-current protection: {0}".format(bool(docp))) - print("Short-circuit protection: {0}".format(bool(scp))) - print("Front detection IC error: {0}".format(bool(fdic))) - print("Software lock MOS: {0}".format(bool(slm))) + print(" Protection state: {0}".format(protection_state)) + print(" Single overvoltage protection: {0}".format(bool(sop))) + print(" Single undervoltage protection: {0}".format(bool(sup))) + print(" Whole group overvoltage protection: {0}".format(bool(gop))) + print(" Whole group undervoltage protection: {0}".format(bool(gup))) + print(" Charging over-temperature protection: {0}".format(bool(cotp))) + print(" Charging under-temperature protection: {0}".format(bool(cutp))) + print(" Discharging over-temperature protection: {0}".format(bool(dotp))) + print(" Discharging under-protection: {0}".format(bool(dutp))) + print(" Charging over-current protection: {0}".format(bool(cocp))) + print(" Discharging over-current protection: {0}".format(bool(docp))) + print(" Short-circuit protection: {0}".format(bool(scp))) + print(" Front detection IC error: {0}".format(bool(fdic))) + print(" Software lock MOS: {0}".format(bool(slm))) software_version = bytes([response[22]]) @@ -408,7 +384,7 @@ def parse_03_response(response): data['bms_capacity_charge_ratio']['value'] = "{0}".format(rsoc) data['bms_capacity_charge_ratio']['units'] = "\u2030" if debug > 1: - print("Capacity remaining: {0}%".format(rsoc * 100)) + print(" Capacity remaining: {0}%".format(rsoc * 100)) # bit0 = charging; bit1 = discharging; 0 = MOS closing; 1 = MOS opening control_status = response[24] @@ -422,13 +398,13 @@ def parse_03_response(response): data['bms_charge_is_discharging']['value'] = "{0}".format(int(bool(control_status & 1))) if debug > 1: if (control_status & 1): - print("MOSFET charging: yes") + print(" MOSFET charging: yes") else: - print("MOSFET charging: no") + print(" MOSFET charging: no") if ((control_status >> 1) & 1): - print("MOSFET discharging: yes") + print(" MOSFET discharging: yes") else: - print("MOSFET discharging: no") + print(" MOSFET discharging: no") ntc_num = response[26] # number of temperature sensors ntc_content = bytearray() # 2 * ntc_num in size @@ -449,13 +425,13 @@ def parse_03_response(response): data['bms_temperature_celcius']['raw_values'][i+1] = temp data['bms_temperature_celcius']['values'][i+1] = "{:.2f}".format(temp) if debug > 1: - print("Number of temperature sensors: {0}".format(ntc_num)) + print(" Number of temperature sensors: {0}".format(ntc_num)) for i, temp in enumerate(temperatures): - print(u"Temperature sensor {:d}: {:.2f}\u00B0C".format(i+1, temp)) + print(u" Temperature sensor {:d}: {:.2f}\u00B0C".format(i+1, temp)) return data -def parse_04_response(response): +def parse_04_response(response, debug=0): data = dict() # Response is 7 + cells * 2 bytes: # 00 begin: \xDD @@ -468,16 +444,16 @@ def parse_04_response(response): # length+5 checksum # length+6 end \x77 if bytes([response[0]]) != b'\xdd': - print("ERROR: first byte not found: {0}".format(response[0])) + print("parse_04_response ERROR: first byte not found: {0}".format(response[0])) return False if bytes([response[2]]) == b'\x80': - print("ERROR: error byte returned from BMS: {0}".format(response[2])) + print("parse_04_response ERROR: error byte returned from BMS: {0}".format(response[2])) return False data_len = response[3] if debug > 2: - print("Data length (trimming 4 bytes): {0}".format(data_len)) + print(" Data length (trimming 4 bytes): {0}".format(data_len)) # The checksum is two bytes, offset by data_len + 4 # Five bytes at the front of data @@ -486,11 +462,11 @@ def parse_04_response(response): first = data_len + 4 second = data_len + 5 if second > len(response): - print("ERROR: cell voltage checksum not found") + print("parse_04_response ERROR: cell voltage checksum not found") return False checksum = bytes([response[first], response[second]]) if not verify_checksum(response[3:first], checksum): - print("ERROR: failed to validate received checksum") + print("parse_04_response ERROR: failed to validate received checksum") return False if data_len > 0: @@ -507,11 +483,10 @@ def parse_04_response(response): data['bms_voltage_cells_volts']['raw_values'][cell+1] = cellv data['bms_voltage_cells_volts']['values'][cell+1] = "{:.3f}".format(cellv) if debug > 1: - print("Cell {:.0f}: {:.3f}V".format(cell+1, cellv)) + print(" Cell {:.0f}: {:.3f}V".format(cell+1, cellv)) return data -def collect_data(): - global debug +def collect_data(ser, debug=0): # Request is 7 bytes: # \xDD for start # \xA5 for read, \x5A for write @@ -519,30 +494,30 @@ def collect_data(): # \x77 ends data = dict() reqmsg = bytearray([ 0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77 ]) - response_03 = requestMessage(reqmsg) + response_03 = requestMessage(ser, reqmsg, debug) if len(response_03) == 0: if debug > 0: - print("Error retrieving BMS info. Trying again...") + print("collect_data: Error retrieving BMS info. Trying again...") return False response_03 = bytearray(response_03) reqmsg = bytearray([ 0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77 ]) - response_04 = requestMessage(reqmsg) + response_04 = requestMessage(ser, reqmsg, debug) if len(response_04) == 0: if debug > 0: - print("Error retrieving BMS info. Trying again...") + print("collect_data: Error retrieving BMS info. Trying again...") return False response_04 = bytearray(response_04) - parsed_03 = parse_03_response(response_03) + parsed_03 = parse_03_response(response_03, debug) if parsed_03 is not False: data.update(parsed_03) else: return False - parsed_04 = parse_04_response(response_04) + parsed_04 = parse_04_response(response_04, debug) if parsed_04 is not False: data.update(parsed_04) else: @@ -551,252 +526,188 @@ def collect_data(): return data -def prometheus_export(daemonize=True, filename=None): - global debug - if not can_export_prometheus: - raise ModuleNotFoundError("Unable to export to Prometheus. Is prometheus-client installed?") +def read_request(connection, debug=0): + # get length of expected pickle bytes + request = connection.recv(struct.calcsize('!I')) + length = struct.unpack('!I', request)[0] + if debug > 4: + print("socket: incoming length: {}, encoded as {}".format(length, request)) - data = dict() - # Initialize data structure, to fill in help values - while bool(data) is False: - data = collect_data() - time.sleep(1) + # read length bytes + request = connection.recv(length) + if debug > 3: + print("socket: incoming request: {}".format(request)) + request_data = pickle.loads(request) + if debug > 2: + print('socket: received {!r}'.format(request_data)) - registry = prometheus_client.CollectorRegistry(auto_describe=True) - # Set up the metric data structure for Prometheus - metric = prometheus_create_metric(registry, data) - # Populate the metric data structure this period - prometheus_populate_metric(metric, data) - - if daemonize: - prometheus_client.start_http_server(9999, registry=registry) - - while True: - # Delay, collect new data, and start again - time.sleep(DAEMON_UPDATE_PERIOD) - # Reset data, so it is re-populated correctly - data = dict() - while bool(data) is False: - data = collect_data() - time.sleep(1) - prometheus_populate_metric(metric, data) - prometheus_client.generate_latest(registry) - else: - if filename is None: - print("Invalid filename supplied"); - return False - prometheus_client.write_to_textfile(filename, registry=registry) - return True - -def prometheus_create_metric(registry, data): - metric = dict() - for name, contains in data.items(): - helpmsg = '' - if contains.get('help') is not None: - helpmsg = contains.get('help') - if contains.get('units'): - helpmsg += ' (' + contains.get('units') + ')' - if contains.get('value') is not None: - metric[name] = prometheus_client.Gauge(name, helpmsg, registry=registry) - # Has multiple values, each a different label - elif contains.get('values') is not None: - if contains.get('label') is None: - print("ERROR: no label for {0} specified".format(name)) - label = contains.get('label') - metric[name] = prometheus_client.Gauge(name, helpmsg, [label], registry=registry) - elif contains.get('info') is not None: - metric[name] = prometheus_client.Info(name, helpmsg, registry=registry) - else: - pass - return metric - -def prometheus_populate_metric(metric, data): - for name, contains in data.items(): - if contains.get('value') is not None: - value = contains.get('value') - metric[name].set(value) - # doesn't have a value, but has [1-4]: - if contains.get('values') is not None and isinstance(contains.get('values'), dict): - for idx, label_value in contains.get('values').items(): - metric[name].labels(idx).set(label_value) - if contains.get('info'): - value = contains.get('info') - metric[name].info({name: value}) - else: - pass + return request_data -def influxdb_export(bucket, url=None, org=None, token=None, daemonize=True): - global debug - global influxclient +def send_response(connection, response_data, debug=0): + try: + client = response_data['client'] + except: + client = "unknown client" - if not can_export_influxdb: - raise ModuleNotFoundError("Unable to export to InfluxDB. Is influxdb-client installed?") - - data = dict() - # Initialize data structure, to fill in help values - while bool(data) is False: - data = collect_data() - time.sleep(1) - - if url: - influxclient = InfluxDBClient(url=url, token=token, org=org) - else: - influxclient = InfluxDBClient.from_env_properties() - influxdb_write_snapshot(bucket, data) - - if daemonize: - while True: - # Delay, collect new data, and start again - time.sleep(DAEMON_UPDATE_PERIOD) - # Reset data, so it is re-populated correctly - data = dict() - while bool(data) is False: - data = collect_data() - time.sleep(1) - influxdb_write_snapshot(bucket, data) - influxclient.close() - return - -def influxdb_write_snapshot(bucket, data): - global debug - global influxclient - global writeapi - writeapi = influxclient.write_api(write_options=SYNCHRONOUS) - # Populate the data structure this period - points = influxdb_create_snapshot(data) - writeapi.write(bucket=bucket, record=points) - writeapi.close() - return - -def influxdb_create_snapshot(data): - global debug - points = [] - helpmsg = '' - units = '' - now = datetime.datetime.now(datetime.timezone.utc).isoformat() - for kind, contains in data.items(): - helpmsg = None - if contains.get('help'): - helpmsg = contains.get('help') - units = None - if contains.get('units'): - units = contains.get('units') - # Simple values - value = None - if contains.get('raw_value') is not None: - value = contains.get('raw_value') - if debug > 2: - print("value: {} : {}".format(kind, value)); - point = Point(kind) \ - .tag("units", units) \ - .tag("help", helpmsg) \ - .field("value", value) \ - .time(now) - points.append(point) - # Doesn't have a value, but multiple values, each with a label: - label = None - if contains.get('raw_values') is not None and isinstance(contains.get('raw_values'), dict): - label = contains.get('label') - for idx, label_value in contains.get('raw_values').items(): - if debug > 2: - print("labels: {} [{}] : {}".format(kind, idx, label_value)); - point = Point(kind) \ - .tag(label, idx) \ - .tag("units", units) \ - .tag("help", helpmsg) \ - .field("value", label_value) \ - .time(now) - points.append(point) - # Information (like a manufacturing date or a serial number) - if contains.get('info') is not None: - value = contains.get('info') - if debug > 2: - print("info: {} : {}".format(kind, value)); - point = Point(kind) \ - .tag("units", units) \ - .tag("help", helpmsg) \ - .field("value", value) \ - .time(now) - points.append(point) - else: - pass - return points + if debug > 2: + print('socket: sending {!r}'.format(response_data)) + response = pickle.dumps(response_data) + # add length to the start of the pickled bytes, so we know how much to read on the other end + length = struct.pack('!I', len(response)) + if debug > 4: + print('socket: sending {} data of length: {}'.format(client, length)) + response = length + response + if debug > 3: + print("socket: outgoing response: {}".format(response)) + return connection.sendall(response) -def parse_args(): +def main(): + import argparse + import socket, socketserver + import pwd, grp + + signal.signal(signal.SIGTERM, signalHandler) + + global current_data + timestamp = 0 + parser = argparse.ArgumentParser( description='Query JBD BMS and report status', add_help=True, ) - parser.add_argument('--json', '-j', dest='report_json', action='store_true', - default=False, help='Report data as JSON') - parser.add_argument('--prometheus', '-p', dest='report_prometheus', action='store_true', - default=False, help='Daemonize and report data to Prometheus') - parser.add_argument('--file', '-f', dest='report_textfile', type=str, action='store', - default=False, help='Report data to Prometheus using textfile ') - parser.add_argument('--influxdb', '-i', dest='report_influxdb', action='store_true', - default=False, help='Daemonize and report data to InfluxDB using INFLUXDB_V2_URL, INFLUXDB_V2_ORG and INFLUXDB_V2_TOKEN environment variables') - parser.add_argument('--bucket', '-b', dest='influx_bucket', type=str, action='store', - default="ups", help='Set the bucket name when sending data to influxdb (defaults to "ups")') - parser.add_argument('--url', '-u', dest='influx_url', type=str, action='store', - default=False, help='Set the URL when sending data to influxdb (overrides INFLUXDB environment variables)') - parser.add_argument('--org', '-o', dest='influx_org', type=str, action='store', - default=False, help='Set the influx organization when sending data to influxdb (overrides INFLUXDB environment variables)') - parser.add_argument('--token', '-t', dest='influx_token', type=str, action='store', - default=False, help='Set the influx token when sending data to influxdb (overrides INFLUXDB environment variables)') - parser.add_argument('--print', dest='report_print', action='store_true', - default=True, help='Report data as text') + parser.add_argument('--device', '-d', dest='device', action='store', + default='/dev/ttyUSB0', help='USB device to read') + parser.add_argument('--socket', '-s', dest='socket', action='store', + default='/run/bmspy/bms', help='Socket to communicate with daemon') + parser.add_argument('--user', '-u', dest='uid_name', action='store', + default='nobody', help='Run daemon as user') + parser.add_argument('--group', '-g', dest='gid_name', action='store', + default='dialout', help='Run daemon as group') parser.add_argument('--verbose', '-v', action='count', default=0, help='Print more verbose information (can be specified multiple times)') args = parser.parse_args() - return args + debug=args.verbose -def main(): - global debug - try: - args = parse_args() + if debug > 0: + print("Running BMS query daemon on socket {}".format(args.socket)) - debug=args.verbose + ser = initialise_serial(args.device) - if args.report_influxdb: - num_args = 0 - for arg in [ args.influx_url, args.influx_org, args.influx_token ]: - if arg is not False: - num_args += 1 - if num_args != 0 and num_args != 3: - raise argparse.ArgumentTypeError('Missing value for --url, --org or --token') + # Create any necessary directories for the socket + socket_dir = os.path.dirname(args.socket) + socket_dir_created = False + if not os.path.isdir(socket_dir): + os.makedirs(socket_dir, exist_ok=True) + socket_dir_created = True - if args.report_prometheus: - prometheus_export(daemonize=True) - if args.report_influxdb: - influxdb_export(bucket=args.influx_bucket, \ - url=args.influx_url, \ - org=args.influx_org, \ - token=args.influx_token, \ - daemonize=True) - elif args.report_textfile: - prometheus_export(daemonize=False, filename=args.report_textfile) - else: - data = dict() - while bool(data) is False: - data = collect_data() - time.sleep(1) + starting_uid = os.getuid() + starting_gid = os.getgid() + if starting_uid == 0: + running_uid = pwd.getpwnam(args.uid_name)[2] + running_gid = grp.getgrnam(args.gid_name)[2] - if args.report_json: - print(json.dumps(data)) + # If we've created a new directory for the socket, ensure that + # the highest-level directory has the correct permissions + if socket_dir_created: + os.chown(socket_dir, running_uid, running_gid) + os.chmod(socket_dir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) - elif args.report_print: - pp = pprint.PrettyPrinter(indent=4) - pp.pprint(data) - except KeyboardInterrupt: - cleanup() + new_umask = 0o003 + old_umask = os.umask(new_umask) + if debug > 1: + print('socket: old umask: %s, new umask: %s' % \ + (oct(old_umask), oct(new_umask))) + # Try setting the new uid/gid + try: + os.setgid(running_gid) + except OSError as e: + print('could not set effective group id: {}'.format(e)) + try: + os.setuid(running_uid) + except OSError as e: + print('could not set effective user id: {}'.format(e)) + final_uid = os.getuid() + final_gid = os.getgid() -if __name__ == '__main__': - debug = 0 - atexit.register(cleanup) - atexit.register(shutdown) + if debug > 0: + print('socket: running as {}:{}'.format(pwd.getpwuid(final_uid)[0], grp.getgrgid(final_gid)[0])) + + # Make sure the socket does not exist + if os.path.exists(args.socket): + raise OSError("socket {} already exists; exiting...".format(args.socket)) + + # Create socket + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + # Bind the socket to the port + if debug > 2: + print('starting up on {}'.format(args.socket)) + sock.bind(args.socket) + atexit.register(socket_cleanup, args.socket, debug) + + # Listen for incoming connections + sock.listen(1) + + while True: + connection = None + client_address = None + + try: + # Wait for a connection + if debug > 2: + print('socket: waiting for a connection') + connection, client_address = sock.accept() + + request_data = dict() + request_data = read_request(connection, debug) + client = request_data['client'] or 'unknown' + + match request_data['command']: + case 'REGISTER': + connected_clients.append(client) + + send_response(connection, {'status': 'REGISTERED', 'client': client}, debug) + + case 'DEREGISTER': + try: + connected_clients.remove(client) + except: + pass + + send_response(connection, {'status': 'DEREGISTERED', 'client': client}, debug) + + case 'GET': + print("reading data, current timestamp is {}, time is {}".format(timestamp, time.time())) + # only get new data five seconds after the last read + if timestamp <= time.time() - 5: + current_data = None + # Retry every second until we get a result + while bool(current_data) is False: + current_data = collect_data(ser, debug) + time.sleep(1) + timestamp = time.time() + current_data['client'] = client + + send_response(connection, current_data, debug) + + case _: + print('socket: invalid request from {}'.format(request_data['client'])) + break + + except KeyboardInterrupt: + if connection: + connection.close() + sys.exit(1) + + finally: + # Clean up the connection + if connection: + connection.close() + +if __name__ == "__main__": main() diff --git a/pyproject.toml b/pyproject.toml index ef3df01..366eb6b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "bmspy" -version = "1.0" +version = "2.0" description = "bmspy is a tool to get information from a xiaoxiang-type BMS system" authors = ["Timothy Allen "] license = "CC BY-NC-SA 4.0" @@ -21,4 +21,8 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] -bmspy = "bmspy.bms:main" +bmspy = "bmspy:main" +bmspy-server = "bmspy.server:main" +bmspy-influxdb = "bmspy.influxdb:main" +bmspy-prometheus = "bmspy.prometheus:main" +#bmspy-usbd = "bmspy.usbhid:main"