Refactor into a client/server model, to enable multiple clients to access the BMS data

This commit is contained in:
Timothy Allen 2024-04-29 18:41:41 +02:00
parent 81fa555402
commit dab791fb79
9 changed files with 747 additions and 341 deletions

View File

@ -13,4 +13,16 @@ Or, to install with influxdb and/or prometheus support:
poetry install -E influxdb -E prometheus poetry install -E influxdb -E prometheus
To run: To run:
poetry run bmspyd &
Or to run via systemd, copy bmspyd.service to /etc/systemd/system, adjust WorkingDirectory to point to the installation location, and enable and start the service:
cp bmspy-server.service /etc/systemd/system
$EDITOR /etc/systemd/system/bmspy-server.service
systemctl daemon-reload
systemctl enable bmspy-server
systemctl start bmspy-server
To run a client to get the data, choose one of the following options:
poetry run bmspy poetry run bmspy
poetry run bmspy-influxdb --url ...

13
bmspy-influxdb.service Normal file
View File

@ -0,0 +1,13 @@
[Unit]
Description=BMS tracking via InfluxDB
Requires=bmspy-server.service
[Service]
Type=exec
WorkingDirectory=/usr/local/bmspy
ExecStart=poetry run bmspy-influxdb --url http://localhost/ --org 0987654321 --bucket bms --token XXXX
RestartSec=5
Restart=on-failure
[Install]
WantedBy=default.target

14
bmspy-server.service Normal file
View File

@ -0,0 +1,14 @@
[Unit]
Description=Make BMS data available for consumers
[Service]
Type=exec
# The full path of the git repo on your system
WorkingDirectory=/usr/local/bmspy
ExecStart=poetry run bmspy-server
RestartSec=5
Restart=on-failure
[Install]
WantedBy=default.target

View File

@ -0,0 +1,96 @@
#
# bmspy
#
import atexit
import argparse
import pprint
import time
def parse_args():
parser = argparse.ArgumentParser(
description='Query JBD BMS and report status',
add_help=True,
)
parser.add_argument('--device', '-d', dest='device', action='store',
default='/dev/ttyUSB0', help='USB device to read')
parser.add_argument('--socket', '-s', dest='socket', action='store',
default='/run/bmspy/bms', help='Socket to communicate with daemon')
parser.add_argument('--json', '-j', dest='report_json', action='store_true',
default=False, help='Report data as JSON')
parser.add_argument('--prometheus', '-p', dest='report_prometheus', action='store_true',
default=False, help='Daemonize and report data to Prometheus')
parser.add_argument('--file', '-f', dest='report_textfile', type=str, action='store',
default=False, help='Report data to Prometheus using textfile <file>')
parser.add_argument('--influxdb', '-i', dest='report_influxdb', action='store_true',
default=False, help='Daemonize and report data to InfluxDB using INFLUXDB_V2_URL, INFLUXDB_V2_ORG and INFLUXDB_V2_TOKEN environment variables')
parser.add_argument('--bucket', '-b', dest='influx_bucket', type=str, action='store',
default="ups", help='Set the bucket name when sending data to influxdb (defaults to "ups")')
parser.add_argument('--url', '-u', dest='influx_url', type=str, action='store',
default=False, help='Set the URL when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--org', '-o', dest='influx_org', type=str, action='store',
default=False, help='Set the influx organization when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--token', '-t', dest='influx_token', type=str, action='store',
default=False, help='Set the influx token when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--print', dest='report_print', action='store_true',
default=True, help='Report data as text')
parser.add_argument('--verbose', '-v', action='count',
default=0, help='Print more verbose information (can be specified multiple times)')
args = parser.parse_args()
return args
def main():
try:
args = parse_args()
debug=args.verbose
if args.report_influxdb:
num_args = 0
for arg in [ args.influx_url, args.influx_org, args.influx_token ]:
if arg is not False:
num_args += 1
if num_args != 0 and num_args != 3:
raise argparse.ArgumentTypeError('Missing value for --url, --org or --token')
if args.report_prometheus:
from bmspy import prometheus
prometheus.prometheus_export(daemonize=True, debug=debug)
if args.report_influxdb:
from bmspy import influxdb as bms_influx
bms_influx.influxdb_export(bucket=args.influx_bucket, \
url=args.influx_url, \
org=args.influx_org, \
token=args.influx_token, \
debug=debug, \
daemonize=True)
elif args.report_textfile:
from bmspy import promethus
prometheus.prometheus_export(daemonize=False, filename=args.report_textfile, debug=debug)
else:
from bmspy import client
client.handle_registration(args.socket, 'bmspy', debug)
atexit.register(client.handle_registration, args.socket, 'bmspy', debug)
data = client.read_data(args.socket, 'bmspy')
if args.report_json:
print(json.dumps(data))
elif args.report_print:
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(data)
except KeyboardInterrupt as e:
bms.cleanup()
print(e)
if __name__ == '__main__':
main()

98
bmspy/client.py Normal file
View File

@ -0,0 +1,98 @@
#
# Library with socket client for use by consumers
#
import atexit, os, sys
import pickle, struct
import socket
is_registered = False
def handle_registration(socket_path, client_name, debug=0):
global is_registered
data = dict()
if is_registered:
message = {'command': 'DEREGISTER', 'client': client_name}
else:
# fork server if it's not already running
message = {'command': 'REGISTER', 'client': client_name}
try:
data = socket_comms(socket_path, message, debug)
if data['status'] == 'REGISTERED':
is_registered = True
elif data['status'] == 'DEREGISTERED':
is_registered = False
else:
raise OSError("{} registration: invalid response: {}".format(client_name, data))
except Exception as e:
if is_registered:
print("{}: failed to register with daemon: {}".format(client_name, e))
else:
print("{}: failed to deregister with daemon: {}".format(client_name, e))
return data
def socket_comms(socket_path, request_data, debug=0):
response_data = dict()
# Create a UDS socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
if debug > 2:
print('socket client: connecting to {}'.format(socket_path))
try:
sock.connect(socket_path)
except socket.error as msg:
if msg.errno == 2:
print("Failed to connect to bmspy daemon")
else:
print("socket client: {}".format(msg))
# Send request
if debug > 2:
print('socket client: sending {!r}'.format(request_data))
request = pickle.dumps(request_data)
# add length to the start of the pickled bytes, so we know how much to read on the other end
length = struct.pack('!I', len(request))
if debug > 3:
print("socket client: outgoing request length: {}, encoded as {}".format(len(request), length))
request = length + request
if debug > 4:
print("socket client: outgoing request: {}".format(request))
sock.sendall(request)
# get length of expected pickled bytes
response = sock.recv(struct.calcsize('!I'))
length = struct.unpack('!I', response)[0]
if debug > 4:
print("socket client: incoming length: {}, encoded as {}".format(length, response))
# read length bytes
response = sock.recv(length)
if debug > 3:
print("socket client: incoming response: {}".format(response))
response_data = pickle.loads(response)
if debug > 2:
print('socket client: received {!r}'.format(response_data))
sock.close()
return response_data
def read_data(socket_path, client_name, debug=0):
data = dict()
data = socket_comms(socket_path, {'command': 'GET', 'client': client_name}, debug)
if data is None:
raise
return data
if __name__ == '__main__':
sys.exit(0)

168
bmspy/influxdb.py Normal file
View File

@ -0,0 +1,168 @@
import atexit, datetime, os, sys, time
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from bmspy import client
DAEMON_UPDATE_PERIOD = 30
''' Close remote connections '''
def influx_shutdown(influxclient):
if influxclient is not None:
influxclient.close()
def writeapi_shutdown(writeapi):
if writeapi is not None:
writeapi.close()
def influxdb_export(bucket, url=None, org=None, token=None, socket_path=None, daemonize=True, debug=0):
data = dict()
if url:
influxclient = InfluxDBClient(url=url, token=token, org=org)
else:
influxclient = InfluxDBClient.from_env_properties()
atexit.register(influx_shutdown, influxclient)
if daemonize:
while True:
# collect new data, delay, and start again
data = client.read_data(socket_path, 'influxdb')
influxdb_write_snapshot(influxclient, bucket, data, debug)
time.sleep(DAEMON_UPDATE_PERIOD)
else:
data = client.read_data(socket_path, 'influxdb')
influxdb_write_snapshot(influxclient, bucket, data, debug)
influxclient.close()
atexit.unregister(influx_shutdown)
return
def influxdb_write_snapshot(influxclient, bucket, data, debug=0):
writeapi = influxclient.write_api(write_options=SYNCHRONOUS)
atexit.register(writeapi_shutdown, writeapi)
# Populate the data structure this period
if debug > 1:
print("influxdb: creating snapshot")
points = influxdb_create_snapshot(data, debug)
if debug > 1:
print("influxdb: writing snapshot")
try:
writeapi.write(bucket=bucket, record=points)
except Exception as e:
print(e)
writeapi.close()
atexit.unregister(writeapi_shutdown)
return
def influxdb_create_snapshot(data, debug=0):
points = []
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
for kind, contains in data.items():
# discard bmspy metadata
if kind == 'client':
break
helpmsg = ''
if contains.get('help'):
helpmsg = contains.get('help')
units = None
if contains.get('units'):
units = contains.get('units')
# Simple values
value = None
if contains.get('raw_value') is not None:
value = contains.get('raw_value')
if debug > 2:
print("value: {} : {}".format(kind, value));
point = Point(kind) \
.tag("units", units) \
.tag("help", helpmsg) \
.field("value", value) \
.time(now)
points.append(point)
# Doesn't have a value, but multiple values, each with a label:
label = None
if contains.get('raw_values') is not None and isinstance(contains.get('raw_values'), dict):
label = contains.get('label')
for idx, label_value in contains.get('raw_values').items():
if debug > 2:
print("labels: {} [{}] : {}".format(kind, idx, label_value));
point = Point(kind) \
.tag(label, idx) \
.tag("units", units) \
.tag("help", helpmsg) \
.field("value", label_value) \
.time(now)
points.append(point)
# Information (like a manufacturing date or a serial number)
if contains.get('info') is not None:
value = contains.get('info')
if debug > 2:
print("info: {} : {}".format(kind, value));
point = Point(kind) \
.tag("units", units) \
.tag("help", helpmsg) \
.field("value", value) \
.time(now)
points.append(point)
else:
pass
return points
def main():
import argparse
parser = argparse.ArgumentParser(
description='Query JBD BMS and report status',
add_help=True,
)
parser.add_argument('--bucket', '-b', dest='influx_bucket', type=str, action='store',
default="ups", help='Set the bucket name when sending data to influxdb (defaults to "ups")')
parser.add_argument('--url', '-u', dest='influx_url', type=str, action='store',
default=False, help='Set the URL when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--org', '-o', dest='influx_org', type=str, action='store',
default=False, help='Set the influx organization when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--token', '-t', dest='influx_token', type=str, action='store',
default=False, help='Set the influx token when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--socket', '-s', dest='socket', action='store',
default='/run/bmspy/bms', help='Socket to communicate with daemon')
parser.add_argument('--verbose', '-v', action='count',
default=0, help='Print more verbose information (can be specified multiple times)')
args = parser.parse_args()
debug=args.verbose
try:
if not os.getenv('INFLUXDB_V2_URL') and not args.influx_url:
raise argparse.ArgumentTypeError('Missing value for --url')
if not os.getenv('INFLUXDB_V2_ORG') and not args.influx_org:
raise argparse.ArgumentTypeError('Missing value for --url')
if not os.getenv('INFLUXDB_V2_TOKEN') and not args.influx_token:
raise argparse.ArgumentTypeError('Missing value for --token')
except Exception as e:
print("bmspy-influxdb: {}".format(e))
sys.exit(1)
print("Running BMS influxdb daemon on socket {}".format(args.socket))
client.handle_registration(args.socket, 'influxdb', debug)
atexit.register(client.handle_registration, args.socket, 'influxdb', debug)
influxdb_export(bucket=args.influx_bucket, \
url=args.influx_url, \
org=args.influx_org, \
token=args.influx_token, \
socket_path=args.socket, \
daemonize=True, \
debug=debug)
if __name__ == "__main__":
main()

90
bmspy/prometheus.py Normal file
View File

@ -0,0 +1,90 @@
import prometheus_client
def prometheus_export(daemonize=True, filename=None):
global debug
if not can_export_prometheus:
raise ModuleNotFoundError("Unable to export to Prometheus. Is prometheus-client installed?")
data = dict()
# Initialize data structure, to fill in help values
while bool(data) is False:
data = collect_data()
time.sleep(1)
registry = prometheus_client.CollectorRegistry(auto_describe=True)
# Set up the metric data structure for Prometheus
metric = prometheus_create_metric(registry, data)
# Populate the metric data structure this period
prometheus_populate_metric(metric, data)
if daemonize:
prometheus_client.start_http_server(9999, registry=registry)
while True:
# Delay, collect new data, and start again
time.sleep(DAEMON_UPDATE_PERIOD)
# Reset data, so it is re-populated correctly
data = dict()
while bool(data) is False:
data = collect_data()
time.sleep(1)
prometheus_populate_metric(metric, data)
prometheus_client.generate_latest(registry)
else:
if filename is None:
print("Invalid filename supplied");
return False
prometheus_client.write_to_textfile(filename, registry=registry)
return True
def prometheus_create_metric(registry, data):
metric = dict()
for name, contains in data.items():
helpmsg = ''
if contains.get('help') is not None:
helpmsg = contains.get('help')
if contains.get('units'):
helpmsg += ' (' + contains.get('units') + ')'
if contains.get('value') is not None:
metric[name] = prometheus_client.Gauge(name, helpmsg, registry=registry)
# Has multiple values, each a different label
elif contains.get('values') is not None:
if contains.get('label') is None:
print("ERROR: no label for {0} specified".format(name))
label = contains.get('label')
metric[name] = prometheus_client.Gauge(name, helpmsg, [label], registry=registry)
elif contains.get('info') is not None:
metric[name] = prometheus_client.Info(name, helpmsg, registry=registry)
else:
pass
return metric
def prometheus_populate_metric(metric, data):
for name, contains in data.items():
if contains.get('value') is not None:
value = contains.get('value')
metric[name].set(value)
# doesn't have a value, but has [1-4]:
if contains.get('values') is not None and isinstance(contains.get('values'), dict):
for idx, label_value in contains.get('values').items():
metric[name].labels(idx).set(label_value)
if contains.get('info'):
value = contains.get('info')
metric[name].info({name: value})
else:
pass
# TODO fork bms daemon if need be?
def main():
print("TODO. At present, run from bmspy directly.")
# influxdb_export(bucket=args.influx_bucket, \
# url=args.influx_url, \
# org=args.influx_org, \
# token=args.influx_token, \
# daemonize=True)
if __name__ == "__main__":
main()

View File

@ -3,38 +3,16 @@
# Communicate with a JBD/SZLLT BMS and return basic information # Communicate with a JBD/SZLLT BMS and return basic information
# in order to shutdown equipment when voltage levels drop or remaining # in order to shutdown equipment when voltage levels drop or remaining
# capacity gets low # capacity gets low
# TODO: scripts to shutdown NAS when voltage goes below 13.xV or
# percent_capacity goes below 25%
# #
import argparse import os, sys, stat, time
import atexit
import json import json
import atexit, signal
import serial, serial.rs485
import pickle, struct
import pprint import pprint
import serial
import serial.rs485
import signal
import sys
import time
try:
import prometheus_client
can_export_prometheus = True
except:
can_export_prometheus = False
try:
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
can_export_influxdb = True
influxclient = None
writeapi = None
except:
can_export_influxdb = False
influxclient = None
DAEMON_UPDATE_PERIOD = 30 connected_clients = list()
SERIALPORT = "/dev/ttyUSB0" current_data = dict()
#SERIALPORT = "/dev/rfcomm1"
BAUDRATE = 9600
# usb 1-1.4: new full-speed USB device number 4 using xhci_hcd # usb 1-1.4: new full-speed USB device number 4 using xhci_hcd
# usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00 # usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00
@ -51,37 +29,38 @@ BAUDRATE = 9600
# usb 1-1.4: FTDI USB Serial Device converter now attached to ttyUSB0 # usb 1-1.4: FTDI USB Serial Device converter now attached to ttyUSB0
# usb 1-1.4: usbfs: interface 0 claimed by ftdi_sio while 'python3' sets config #1 # usb 1-1.4: usbfs: interface 0 claimed by ftdi_sio while 'python3' sets config #1
# TODO: ensure ttyUSB0 points to idVendor=0403, idProduct=6001
# with serial.tools.list_ports.ListPortInfo
# python3 -m serial.tools.list_ports USB
ser = serial.Serial(SERIALPORT, BAUDRATE)
ser.parity = serial.PARITY_NONE # set parity check: no parity
ser.bytesize = serial.EIGHTBITS # number of bits per bytes
ser.stopbits = serial.STOPBITS_ONE # number of stop bits
ser.timeout = 1 # timeout block read # Catch systemd signals
ser.writeTimeout = 1 # timeout for write def signalHandler():
raise SystemExit('terminating')
''' Clean up socket '''
def socket_cleanup(socket_path, debug=0):
os.unlink(socket_path)
''' Clean up serial port ''' ''' Clean up serial port '''
def cleanup(): def serial_cleanup(ser, debug=0):
global debug
global ser
if debug > 2: if debug > 2:
print("Cleaning up...") print("serial: cleaning up...")
if ser.is_open: if ser.is_open:
ser.reset_input_buffer() # flush input buffer, discarding all its contents ser.reset_input_buffer() # flush input buffer, discarding all its contents
ser.reset_output_buffer() # flush output buffer, aborting current output ser.reset_output_buffer() # flush output buffer, aborting current output
ser.close() ser.close()
''' Close remote connections ''' def initialise_serial(device, debug=0):
def shutdown(): # TODO: ensure ttyUSB0 points to idVendor=0403, idProduct=6001
global debug # with serial.tools.list_ports.ListPortInfo
global influxclient # python3 -m serial.tools.list_ports USB
if influxclient is not None: ser = serial.Serial(device, baudrate=9600)
if writeapi is not None: ser.parity = serial.PARITY_NONE # set parity check: no parity
writeapi.close() ser.bytesize = serial.EIGHTBITS # number of bits per bytes
influxclient.close() ser.stopbits = serial.STOPBITS_ONE # number of stop bits
ser.timeout = 1 # timeout block read
ser.writeTimeout = 1 # timeout for write
atexit.register(serial_cleanup, ser, debug)
return ser
def calculate_checksum(msg): def calculate_checksum(msg):
checksum = '' checksum = ''
@ -120,18 +99,16 @@ def bytes_to_date(high, low):
return "{:04d}-{:02d}-{:02d}".format(year, mon, day) return "{:04d}-{:02d}-{:02d}".format(year, mon, day)
# takes a serial object and a message, returns a response # takes a serial object and a message, returns a response
def requestMessage(reqmsg): def requestMessage(ser, reqmsg, debug=0):
global ser
global debug
if debug > 2: if debug > 2:
print('Starting Up Serial Monitor') print('serial: starting up monitor')
if ser.is_open: if ser.is_open:
ser.close() ser.close()
try: try:
ser.open() ser.open()
except Exception as e: except Exception as e:
print("error open serial port: {0}".format(str(e))) print("serial: error open port: {0}".format(str(e)))
return False return False
if ser.is_open: if ser.is_open:
@ -142,41 +119,40 @@ def requestMessage(reqmsg):
ser.reset_input_buffer() # flush input buffer, discarding all its contents ser.reset_input_buffer() # flush input buffer, discarding all its contents
ser.reset_output_buffer() # flush output buffer, aborting current output ser.reset_output_buffer() # flush output buffer, aborting current output
if debug > 0: if debug > 0:
print("Write data: {0}".format("".join('0x{:02x} '.format(x) for x in reqmsg))) print("serial: write data: {0}".format("".join('0x{:02x} '.format(x) for x in reqmsg)))
w = ser.write(reqmsg) w = ser.write(reqmsg)
if debug > 2: if debug > 2:
print("Bytes written: {0}".format(w)) print("serial: bytes written: {0}".format(w))
#time.sleep(1) #time.sleep(1)
if w != len(reqmsg): if w != len(reqmsg):
print("ERROR: {0} bytes written, {1} expected.".format(w, len(reqmsg))) print("serial ERROR: {0} bytes written, {1} expected.".format(w, len(reqmsg)))
return False return False
wait_time = 0 wait_time = 0
while ser.in_waiting == 0: while ser.in_waiting == 0:
# Return an empty string if we end up waiting too long # Return an empty string if we end up waiting too long
if wait_time > 2: if wait_time > 2:
cleanup() serial_cleanup(ser, debug)
return '' return ''
if debug > 2: if debug > 2:
print("Waiting for data...") print("serial: waiting for data...")
time.sleep(0.5) time.sleep(0.5)
wait_time += 1 wait_time += 1
if debug > 1: if debug > 1:
print("Awaiting reading: {0}".format(ser.in_waiting)) print("serial: waiting reading: {0}".format(ser.in_waiting))
response = ser.read_until(b'\x77') response = ser.read_until(b'\x77')
# Return an empty string if the read timed out or returned nothing # Return an empty string if the read timed out or returned nothing
if len(response) == 0: if len(response) == 0:
return '' return ''
if debug > 0: if debug > 0:
print("Read data: {0}".format(response.hex())) print("serial: read data: {0}".format(response.hex()))
cleanup() serial_cleanup(ser, debug)
return response return response
except Exception as e: except Exception as e:
print("error communicating...: {0}".format(str(e))) print("serial: error communicating: {0}".format(str(e)))
else: else:
print("cannot open serial port") print("serial: cannot open port")
def parse_03_response(response): def parse_03_response(response, debug=0):
global debug
data = dict() data = dict()
# Response is 34 bytes: # Response is 34 bytes:
# 00 begin: \xDD # 00 begin: \xDD
@ -189,16 +165,16 @@ def parse_03_response(response):
# length+5 checksum # length+5 checksum
# length+6 end \x77 # length+6 end \x77
if bytes([response[0]]) != b'\xdd': if bytes([response[0]]) != b'\xdd':
print("ERROR: first byte not found: {0}".format(response[0])) print("parse_03_response ERROR: first byte not found: {0}".format(response[0]))
return False return False
if bytes([response[2]]) == b'\x80': if bytes([response[2]]) == b'\x80':
print("ERROR: error byte returned from BMS: {0}".format(response[2])) print("parse_03_response ERROR: error byte returned from BMS: {0}".format(response[2]))
return False return False
data_len = response[3] data_len = response[3]
if debug > 2: if debug > 2:
print("Data length (trimming 4 bytes): {0}".format(data_len)) print("parse_03_response: data length (trimming 4 bytes): {0}".format(data_len))
# The checksum is two bytes, offset by data_len + 4 # The checksum is two bytes, offset by data_len + 4
# Five bytes at the front of data # Five bytes at the front of data
@ -207,11 +183,11 @@ def parse_03_response(response):
first = data_len + 4 first = data_len + 4
second = data_len + 5 second = data_len + 5
if second > len(response): if second > len(response):
print("ERROR: primary response checksum not found") print("parse_03_response ERROR: primary response checksum not found")
return False; return False;
checksum = bytes([response[first], response[second]]) checksum = bytes([response[first], response[second]])
if not verify_checksum(response[3:first], checksum): if not verify_checksum(response[3:first], checksum):
print("ERROR: failed to validate received checksum") print("parse_03_response ERROR: failed to validate received checksum")
return False return False
if data_len > 0: if data_len > 0:
@ -455,7 +431,7 @@ def parse_03_response(response):
return data return data
def parse_04_response(response): def parse_04_response(response, debug=0):
data = dict() data = dict()
# Response is 7 + cells * 2 bytes: # Response is 7 + cells * 2 bytes:
# 00 begin: \xDD # 00 begin: \xDD
@ -468,11 +444,11 @@ def parse_04_response(response):
# length+5 checksum # length+5 checksum
# length+6 end \x77 # length+6 end \x77
if bytes([response[0]]) != b'\xdd': if bytes([response[0]]) != b'\xdd':
print("ERROR: first byte not found: {0}".format(response[0])) print("parse_04_response ERROR: first byte not found: {0}".format(response[0]))
return False return False
if bytes([response[2]]) == b'\x80': if bytes([response[2]]) == b'\x80':
print("ERROR: error byte returned from BMS: {0}".format(response[2])) print("parse_04_response ERROR: error byte returned from BMS: {0}".format(response[2]))
return False return False
data_len = response[3] data_len = response[3]
@ -486,11 +462,11 @@ def parse_04_response(response):
first = data_len + 4 first = data_len + 4
second = data_len + 5 second = data_len + 5
if second > len(response): if second > len(response):
print("ERROR: cell voltage checksum not found") print("parse_04_response ERROR: cell voltage checksum not found")
return False return False
checksum = bytes([response[first], response[second]]) checksum = bytes([response[first], response[second]])
if not verify_checksum(response[3:first], checksum): if not verify_checksum(response[3:first], checksum):
print("ERROR: failed to validate received checksum") print("parse_04_response ERROR: failed to validate received checksum")
return False return False
if data_len > 0: if data_len > 0:
@ -510,8 +486,7 @@ def parse_04_response(response):
print(" Cell {:.0f}: {:.3f}V".format(cell+1, cellv)) print(" Cell {:.0f}: {:.3f}V".format(cell+1, cellv))
return data return data
def collect_data(): def collect_data(ser, debug=0):
global debug
# Request is 7 bytes: # Request is 7 bytes:
# \xDD for start # \xDD for start
# \xA5 for read, \x5A for write # \xA5 for read, \x5A for write
@ -519,30 +494,30 @@ def collect_data():
# \x77 ends # \x77 ends
data = dict() data = dict()
reqmsg = bytearray([ 0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77 ]) reqmsg = bytearray([ 0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77 ])
response_03 = requestMessage(reqmsg) response_03 = requestMessage(ser, reqmsg, debug)
if len(response_03) == 0: if len(response_03) == 0:
if debug > 0: if debug > 0:
print("Error retrieving BMS info. Trying again...") print("collect_data: Error retrieving BMS info. Trying again...")
return False return False
response_03 = bytearray(response_03) response_03 = bytearray(response_03)
reqmsg = bytearray([ 0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77 ]) reqmsg = bytearray([ 0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77 ])
response_04 = requestMessage(reqmsg) response_04 = requestMessage(ser, reqmsg, debug)
if len(response_04) == 0: if len(response_04) == 0:
if debug > 0: if debug > 0:
print("Error retrieving BMS info. Trying again...") print("collect_data: Error retrieving BMS info. Trying again...")
return False return False
response_04 = bytearray(response_04) response_04 = bytearray(response_04)
parsed_03 = parse_03_response(response_03) parsed_03 = parse_03_response(response_03, debug)
if parsed_03 is not False: if parsed_03 is not False:
data.update(parsed_03) data.update(parsed_03)
else: else:
return False return False
parsed_04 = parse_04_response(response_04) parsed_04 = parse_04_response(response_04, debug)
if parsed_04 is not False: if parsed_04 is not False:
data.update(parsed_04) data.update(parsed_04)
else: else:
@ -551,252 +526,188 @@ def collect_data():
return data return data
def prometheus_export(daemonize=True, filename=None): def read_request(connection, debug=0):
global debug # get length of expected pickle bytes
if not can_export_prometheus: request = connection.recv(struct.calcsize('!I'))
raise ModuleNotFoundError("Unable to export to Prometheus. Is prometheus-client installed?") length = struct.unpack('!I', request)[0]
if debug > 4:
print("socket: incoming length: {}, encoded as {}".format(length, request))
data = dict() # read length bytes
# Initialize data structure, to fill in help values request = connection.recv(length)
while bool(data) is False: if debug > 3:
data = collect_data() print("socket: incoming request: {}".format(request))
time.sleep(1) request_data = pickle.loads(request)
registry = prometheus_client.CollectorRegistry(auto_describe=True)
# Set up the metric data structure for Prometheus
metric = prometheus_create_metric(registry, data)
# Populate the metric data structure this period
prometheus_populate_metric(metric, data)
if daemonize:
prometheus_client.start_http_server(9999, registry=registry)
while True:
# Delay, collect new data, and start again
time.sleep(DAEMON_UPDATE_PERIOD)
# Reset data, so it is re-populated correctly
data = dict()
while bool(data) is False:
data = collect_data()
time.sleep(1)
prometheus_populate_metric(metric, data)
prometheus_client.generate_latest(registry)
else:
if filename is None:
print("Invalid filename supplied");
return False
prometheus_client.write_to_textfile(filename, registry=registry)
return True
def prometheus_create_metric(registry, data):
metric = dict()
for name, contains in data.items():
helpmsg = ''
if contains.get('help') is not None:
helpmsg = contains.get('help')
if contains.get('units'):
helpmsg += ' (' + contains.get('units') + ')'
if contains.get('value') is not None:
metric[name] = prometheus_client.Gauge(name, helpmsg, registry=registry)
# Has multiple values, each a different label
elif contains.get('values') is not None:
if contains.get('label') is None:
print("ERROR: no label for {0} specified".format(name))
label = contains.get('label')
metric[name] = prometheus_client.Gauge(name, helpmsg, [label], registry=registry)
elif contains.get('info') is not None:
metric[name] = prometheus_client.Info(name, helpmsg, registry=registry)
else:
pass
return metric
def prometheus_populate_metric(metric, data):
for name, contains in data.items():
if contains.get('value') is not None:
value = contains.get('value')
metric[name].set(value)
# doesn't have a value, but has [1-4]:
if contains.get('values') is not None and isinstance(contains.get('values'), dict):
for idx, label_value in contains.get('values').items():
metric[name].labels(idx).set(label_value)
if contains.get('info'):
value = contains.get('info')
metric[name].info({name: value})
else:
pass
def influxdb_export(bucket, url=None, org=None, token=None, daemonize=True):
global debug
global influxclient
if not can_export_influxdb:
raise ModuleNotFoundError("Unable to export to InfluxDB. Is influxdb-client installed?")
data = dict()
# Initialize data structure, to fill in help values
while bool(data) is False:
data = collect_data()
time.sleep(1)
if url:
influxclient = InfluxDBClient(url=url, token=token, org=org)
else:
influxclient = InfluxDBClient.from_env_properties()
influxdb_write_snapshot(bucket, data)
if daemonize:
while True:
# Delay, collect new data, and start again
time.sleep(DAEMON_UPDATE_PERIOD)
# Reset data, so it is re-populated correctly
data = dict()
while bool(data) is False:
data = collect_data()
time.sleep(1)
influxdb_write_snapshot(bucket, data)
influxclient.close()
return
def influxdb_write_snapshot(bucket, data):
global debug
global influxclient
global writeapi
writeapi = influxclient.write_api(write_options=SYNCHRONOUS)
# Populate the data structure this period
points = influxdb_create_snapshot(data)
writeapi.write(bucket=bucket, record=points)
writeapi.close()
return
def influxdb_create_snapshot(data):
global debug
points = []
helpmsg = ''
units = ''
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
for kind, contains in data.items():
helpmsg = None
if contains.get('help'):
helpmsg = contains.get('help')
units = None
if contains.get('units'):
units = contains.get('units')
# Simple values
value = None
if contains.get('raw_value') is not None:
value = contains.get('raw_value')
if debug > 2: if debug > 2:
print("value: {} : {}".format(kind, value)); print('socket: received {!r}'.format(request_data))
point = Point(kind) \
.tag("units", units) \ return request_data
.tag("help", helpmsg) \
.field("value", value) \
.time(now)
points.append(point)
# Doesn't have a value, but multiple values, each with a label:
label = None
if contains.get('raw_values') is not None and isinstance(contains.get('raw_values'), dict):
label = contains.get('label')
for idx, label_value in contains.get('raw_values').items():
if debug > 2:
print("labels: {} [{}] : {}".format(kind, idx, label_value));
point = Point(kind) \
.tag(label, idx) \
.tag("units", units) \
.tag("help", helpmsg) \
.field("value", label_value) \
.time(now)
points.append(point)
# Information (like a manufacturing date or a serial number)
if contains.get('info') is not None:
value = contains.get('info')
if debug > 2:
print("info: {} : {}".format(kind, value));
point = Point(kind) \
.tag("units", units) \
.tag("help", helpmsg) \
.field("value", value) \
.time(now)
points.append(point)
else:
pass
return points
def parse_args(): def send_response(connection, response_data, debug=0):
try:
client = response_data['client']
except:
client = "unknown client"
if debug > 2:
print('socket: sending {!r}'.format(response_data))
response = pickle.dumps(response_data)
# add length to the start of the pickled bytes, so we know how much to read on the other end
length = struct.pack('!I', len(response))
if debug > 4:
print('socket: sending {} data of length: {}'.format(client, length))
response = length + response
if debug > 3:
print("socket: outgoing response: {}".format(response))
return connection.sendall(response)
def main():
import argparse
import socket, socketserver
import pwd, grp
signal.signal(signal.SIGTERM, signalHandler)
global current_data
timestamp = 0
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Query JBD BMS and report status', description='Query JBD BMS and report status',
add_help=True, add_help=True,
) )
parser.add_argument('--json', '-j', dest='report_json', action='store_true', parser.add_argument('--device', '-d', dest='device', action='store',
default=False, help='Report data as JSON') default='/dev/ttyUSB0', help='USB device to read')
parser.add_argument('--prometheus', '-p', dest='report_prometheus', action='store_true', parser.add_argument('--socket', '-s', dest='socket', action='store',
default=False, help='Daemonize and report data to Prometheus') default='/run/bmspy/bms', help='Socket to communicate with daemon')
parser.add_argument('--file', '-f', dest='report_textfile', type=str, action='store', parser.add_argument('--user', '-u', dest='uid_name', action='store',
default=False, help='Report data to Prometheus using textfile <file>') default='nobody', help='Run daemon as user')
parser.add_argument('--influxdb', '-i', dest='report_influxdb', action='store_true', parser.add_argument('--group', '-g', dest='gid_name', action='store',
default=False, help='Daemonize and report data to InfluxDB using INFLUXDB_V2_URL, INFLUXDB_V2_ORG and INFLUXDB_V2_TOKEN environment variables') default='dialout', help='Run daemon as group')
parser.add_argument('--bucket', '-b', dest='influx_bucket', type=str, action='store',
default="ups", help='Set the bucket name when sending data to influxdb (defaults to "ups")')
parser.add_argument('--url', '-u', dest='influx_url', type=str, action='store',
default=False, help='Set the URL when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--org', '-o', dest='influx_org', type=str, action='store',
default=False, help='Set the influx organization when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--token', '-t', dest='influx_token', type=str, action='store',
default=False, help='Set the influx token when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--print', dest='report_print', action='store_true',
default=True, help='Report data as text')
parser.add_argument('--verbose', '-v', action='count', parser.add_argument('--verbose', '-v', action='count',
default=0, help='Print more verbose information (can be specified multiple times)') default=0, help='Print more verbose information (can be specified multiple times)')
args = parser.parse_args() args = parser.parse_args()
return args
def main():
global debug
try:
args = parse_args()
debug=args.verbose debug=args.verbose
if args.report_influxdb: if debug > 0:
num_args = 0 print("Running BMS query daemon on socket {}".format(args.socket))
for arg in [ args.influx_url, args.influx_org, args.influx_token ]:
if arg is not False:
num_args += 1
if num_args != 0 and num_args != 3:
raise argparse.ArgumentTypeError('Missing value for --url, --org or --token')
if args.report_prometheus: ser = initialise_serial(args.device)
prometheus_export(daemonize=True)
if args.report_influxdb: # Create any necessary directories for the socket
influxdb_export(bucket=args.influx_bucket, \ socket_dir = os.path.dirname(args.socket)
url=args.influx_url, \ socket_dir_created = False
org=args.influx_org, \ if not os.path.isdir(socket_dir):
token=args.influx_token, \ os.makedirs(socket_dir, exist_ok=True)
daemonize=True) socket_dir_created = True
elif args.report_textfile:
prometheus_export(daemonize=False, filename=args.report_textfile) starting_uid = os.getuid()
else: starting_gid = os.getgid()
data = dict() if starting_uid == 0:
while bool(data) is False: running_uid = pwd.getpwnam(args.uid_name)[2]
data = collect_data() running_gid = grp.getgrnam(args.gid_name)[2]
# If we've created a new directory for the socket, ensure that
# the highest-level directory has the correct permissions
if socket_dir_created:
os.chown(socket_dir, running_uid, running_gid)
os.chmod(socket_dir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
new_umask = 0o003
old_umask = os.umask(new_umask)
if debug > 1:
print('socket: old umask: %s, new umask: %s' % \
(oct(old_umask), oct(new_umask)))
# Try setting the new uid/gid
try:
os.setgid(running_gid)
except OSError as e:
print('could not set effective group id: {}'.format(e))
try:
os.setuid(running_uid)
except OSError as e:
print('could not set effective user id: {}'.format(e))
final_uid = os.getuid()
final_gid = os.getgid()
if debug > 0:
print('socket: running as {}:{}'.format(pwd.getpwuid(final_uid)[0], grp.getgrgid(final_gid)[0]))
# Make sure the socket does not exist
if os.path.exists(args.socket):
raise OSError("socket {} already exists; exiting...".format(args.socket))
# Create socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Bind the socket to the port
if debug > 2:
print('starting up on {}'.format(args.socket))
sock.bind(args.socket)
atexit.register(socket_cleanup, args.socket, debug)
# Listen for incoming connections
sock.listen(1)
while True:
connection = None
client_address = None
try:
# Wait for a connection
if debug > 2:
print('socket: waiting for a connection')
connection, client_address = sock.accept()
request_data = dict()
request_data = read_request(connection, debug)
client = request_data['client'] or 'unknown'
match request_data['command']:
case 'REGISTER':
connected_clients.append(client)
send_response(connection, {'status': 'REGISTERED', 'client': client}, debug)
case 'DEREGISTER':
try:
connected_clients.remove(client)
except:
pass
send_response(connection, {'status': 'DEREGISTERED', 'client': client}, debug)
case 'GET':
print("reading data, current timestamp is {}, time is {}".format(timestamp, time.time()))
# only get new data five seconds after the last read
if timestamp <= time.time() - 5:
current_data = None
# Retry every second until we get a result
while bool(current_data) is False:
current_data = collect_data(ser, debug)
time.sleep(1) time.sleep(1)
timestamp = time.time()
current_data['client'] = client
if args.report_json: send_response(connection, current_data, debug)
print(json.dumps(data))
case _:
print('socket: invalid request from {}'.format(request_data['client']))
break
elif args.report_print:
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(data)
except KeyboardInterrupt: except KeyboardInterrupt:
cleanup() if connection:
connection.close()
sys.exit(1)
finally:
# Clean up the connection
if connection:
connection.close()
if __name__ == "__main__":
if __name__ == '__main__':
debug = 0
atexit.register(cleanup)
atexit.register(shutdown)
main() main()

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "bmspy" name = "bmspy"
version = "1.0" version = "2.0"
description = "bmspy is a tool to get information from a xiaoxiang-type BMS system" description = "bmspy is a tool to get information from a xiaoxiang-type BMS system"
authors = ["Timothy Allen <tim@treehouse.org.za>"] authors = ["Timothy Allen <tim@treehouse.org.za>"]
license = "CC BY-NC-SA 4.0" license = "CC BY-NC-SA 4.0"
@ -21,4 +21,8 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts] [tool.poetry.scripts]
bmspy = "bmspy.bms:main" bmspy = "bmspy:main"
bmspy-server = "bmspy.server:main"
bmspy-influxdb = "bmspy.influxdb:main"
bmspy-prometheus = "bmspy.prometheus:main"
#bmspy-usbd = "bmspy.usbhid:main"