bmspy/bmspy/server.py

738 lines
30 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Communicate with a JBD/SZLLT BMS and return basic information
# in order to shutdown equipment when voltage levels drop or remaining
# capacity gets low
#
import os, sys, stat, time
import json
import atexit, signal
import serial, serial.rs485
import struct, json
import pprint
connected_clients = list()
current_data = dict()
# usb 1-1.4: new full-speed USB device number 4 using xhci_hcd
# usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00
# usb 1-1.4: New USB device strings: Mfr=1, Product=2, SerialNumber=3
# usb 1-1.4: Product: FT232R USB UART
# usb 1-1.4: Manufacturer: FTDI
# usb 1-1.4: SerialNumber: AQ00QFHZ
# usbcore: registered new interface driver usbserial_generic
# usbserial: USB Serial support registered for generic
# usbcore: registered new interface driver ftdi_sio
# usbserial: USB Serial support registered for FTDI USB Serial Device
# ftdi_sio 1-1.4:1.0: FTDI USB Serial Device converter detected
# usb 1-1.4: Detected FT232RL
# usb 1-1.4: FTDI USB Serial Device converter now attached to ttyUSB0
# usb 1-1.4: usbfs: interface 0 claimed by ftdi_sio while 'python3' sets config #1
# Catch systemd signals
def signalHandler():
raise SystemExit('terminating')
''' Clean up socket '''
def socket_cleanup(socket_path, debug=0):
os.unlink(socket_path)
''' Clean up serial port '''
def serial_cleanup(ser, debug=0):
if debug > 2:
print("serial: cleaning up...")
if ser.is_open:
ser.reset_input_buffer() # flush input buffer, discarding all its contents
ser.reset_output_buffer() # flush output buffer, aborting current output
ser.close()
def initialise_serial(device, debug=0):
# TODO: ensure ttyUSB0 points to idVendor=0403, idProduct=6001
# with serial.tools.list_ports.ListPortInfo
# python3 -m serial.tools.list_ports USB
ser = serial.Serial(device, baudrate=9600)
ser.parity = serial.PARITY_NONE # set parity check: no parity
ser.bytesize = serial.EIGHTBITS # number of bits per bytes
ser.stopbits = serial.STOPBITS_ONE # number of stop bits
ser.timeout = 1 # timeout block read
ser.writeTimeout = 1 # timeout for write
atexit.register(serial_cleanup, ser, debug)
return ser
def calculate_checksum(msg):
checksum = ''
return checksum
def verify_checksum(data, checksum):
# (data + length + command code) checksum, then complement, then add 1, high bit first, low bit last
# data should have start/rw stripped
s = 0
for i in data:
s += i
s = (s ^ 0xFFFF) + 1
chk = bytes_to_digits(checksum[0], checksum[1])
return s == chk
def convert_to_signed(x):
# For values below 1024, these seem to be actual results
# For values above 1024, these seem to be encoded to account for high and negative floats
max_uint = 1024
if x >= max_uint:
return (x - 2**9) % 2**10 - 2**9
else:
return x
def bytes_to_digits(high, low):
result = high
result <<= 8
result = result | low
return result
def bytes_to_date(high, low):
result= bytes_to_digits(high, low)
day = result & 0x1f
mon = (result >> 5) & 0x0f
year = 2000 + (result >> 9)
return "{:04d}-{:02d}-{:02d}".format(year, mon, day)
# takes a serial object and a message, returns a response
def requestMessage(ser, reqmsg, debug=0):
if debug > 2:
print('serial: starting up monitor')
if ser.is_open:
ser.close()
try:
ser.open()
except Exception as e:
print("serial: error open port: {0}".format(str(e)))
return False
if ser.is_open:
try:
# Resetting once alone doesn't seem to do the trick when we discarded data
# on a previous run
for i in range(2):
ser.reset_input_buffer() # flush input buffer, discarding all its contents
ser.reset_output_buffer() # flush output buffer, aborting current output
if debug > 0:
print("serial: write data: {0}".format("".join('0x{:02x} '.format(x) for x in reqmsg)))
w = ser.write(reqmsg)
if debug > 2:
print("serial: bytes written: {0}".format(w))
#time.sleep(1)
if w != len(reqmsg):
print("serial ERROR: {0} bytes written, {1} expected.".format(w, len(reqmsg)))
return False
wait_time = 0
while ser.in_waiting == 0:
# Return an empty string if we end up waiting too long
if wait_time > 2:
serial_cleanup(ser, debug)
return ''
if debug > 2:
print("serial: waiting for data...")
time.sleep(0.5)
wait_time += 1
if debug > 1:
print("serial: waiting reading: {0}".format(ser.in_waiting))
response = ser.read_until(b'\x77')
# Return an empty string if the read timed out or returned nothing
if len(response) == 0:
return ''
if debug > 0:
print("serial: read data: {0}".format(response.hex()))
serial_cleanup(ser, debug)
return response
except Exception as e:
print("serial: error communicating: {0}".format(str(e)))
else:
print("serial: cannot open port")
def parse_03_response(response, debug=0):
data = dict()
# Response is 34 bytes:
# 00 begin: \xDD
# 01 r/w: \xA5
# 02 status: \x00 = correct; \x80 = incorrect
# 03 length (usually 27)
# 04 data (size of length)
# ...
# length+4 checksum
# length+5 checksum
# length+6 end \x77
if bytes([response[0]]) != b'\xdd':
print("parse_03_response ERROR: first byte not found: {0}".format(response[0]))
return False
if bytes([response[2]]) == b'\x80':
print("parse_03_response ERROR: error byte returned from BMS: {0}".format(response[2]))
return False
data_len = response[3]
if debug > 2:
print("parse_03_response: data length (trimming 4 bytes): {0}".format(data_len))
# The checksum is two bytes, offset by data_len + 4
# Five bytes at the front of data
# begin; rw; status, command; length
# The checksum should check command, length, and data: [3] to [3+data_len+1]
first = data_len + 4
second = data_len + 5
if second > len(response):
print("parse_03_response ERROR: primary response checksum not found")
return False;
checksum = bytes([response[first], response[second]])
if not verify_checksum(response[3:first], checksum):
print("parse_03_response ERROR: failed to validate received checksum")
return False
if data_len > 0:
vtot = bytes_to_digits(response[4], response[5]) * 0.01
data['bms_voltage_total_volts'] = dict()
data['bms_voltage_total_volts']['help'] = "Total Voltage"
data['bms_voltage_total_volts']['raw_value'] = vtot
data['bms_voltage_total_volts']['value'] = "{:.2f}".format(vtot)
data['bms_voltage_total_volts']['units'] = "V"
if debug > 1:
print(" Total voltage: {:.2f}V".format(vtot))
current = bytes_to_digits(response[6], response[7])
current = convert_to_signed(current) * 0.01
data['bms_current_amps'] = dict()
data['bms_current_amps']['help'] = "Current"
data['bms_current_amps']['raw_value'] = current
data['bms_current_amps']['value'] = "{:.2f}".format(current)
data['bms_current_amps']['units'] = "A"
if debug > 1:
print(" Current: {:.2f}A".format(current))
res_cap = bytes_to_digits(response[8], response[9]) * 0.01
nom_cap = bytes_to_digits(response[10], response[11]) * 0.01
data['bms_capacity_remaining_ah'] = dict()
data['bms_capacity_remaining_ah']['help'] = "Remaining Capacity"
data['bms_capacity_remaining_ah']['raw_value'] = res_cap
data['bms_capacity_remaining_ah']['value'] = "{:.2f}".format(res_cap)
data['bms_capacity_remaining_ah']['units'] = "Ah"
data['bms_capacity_nominal_ah'] = dict()
data['bms_capacity_nominal_ah']['help'] = "Nominal Capacity"
data['bms_capacity_nominal_ah']['raw_value'] = nom_cap
data['bms_capacity_nominal_ah']['value'] = "{:.2f}".format(nom_cap)
data['bms_capacity_nominal_ah']['units'] = "Ah"
if debug > 1:
print(" Remaining capacity: {:.2f}Ah".format(res_cap))
print(" Nominal capacity: {:.2f}Ah".format(nom_cap))
cycle_times = bytes_to_digits(response[12], response[13])
data['bms_charge_cycles'] = dict()
data['bms_charge_cycles']['help'] = "Charge Cycles"
data['bms_charge_cycles']['raw_value'] = cycle_times
data['bms_charge_cycles']['value'] = "{0}".format(cycle_times)
if debug > 1:
print(" Cycle times: {0}".format(cycle_times))
man_date = bytes_to_date(response[14], response[15])
data['bms_manufacture_date'] = dict()
data['bms_manufacture_date']['help'] = "Date of Manufacture"
data['bms_manufacture_date']['info'] = "{0}".format(man_date)
if debug > 1:
print(" Manufacturing date: {0}".format(man_date))
cells = response[25] # 4S
data['bms_cell_number'] = dict()
data['bms_cell_number']['help'] = "Cells"
data['bms_cell_number']['raw_value'] = cells
data['bms_cell_number']['value'] = "{0}".format(cells)
if debug > 1:
print(" Cells: {0}S".format(cells))
balance_state_high = bytes_to_digits(response[16], response[17]) # 1S to 16S
balance_state_low = bytes_to_digits(response[18], response[19]) # 17S to 32S
# 1 bit per 4S (2 bytes = 16S); in 4S, we should expect:
# 0x0 (no cells balancing) 0
# 0x1 (cell 1 balancing) 1
# 0x2 (cell 2 balancing) 2
# 0x3 (cells 1 + 2 balancing) 3
# 0x4 (cell 3 balancing) 4
# 0x5 (cells 1 + 3 balancing) 5
# 0x6 (cells 2 + 3 balancing) 6
# 0x7 (cells 1 + 2 + 3 balancing) 7
# 0x8 (cell 4 balancing) 8
# 0x9 (cells 1 + 4 balancing) 9
# 0xA (cells 2 + 4 balancing) 10
# 0xB (cells 1 + 2 + 4 balancing) 11
# 0xC (cells 3 + 4 balancing) 12
# 0xD (cells 1 + 3 + 4 balancing) 13
# 0xE (cells 2 + 3 + 4 balancing) 14
# 0xF (cells 1 + 2 + 3 + 4 balancing) 15
#data["Balancing"] = dict()
data['bms_cells_balancing'] = dict()
data['bms_cells_balancing']['help'] = "Cells balancing"
data['bms_cells_balancing']['label'] = 'cell'
data['bms_cells_balancing']['raw_values'] = dict()
data['bms_cells_balancing']['values'] = dict()
for cell in range(cells):
# Cells from 1 to 16 are recorded in balance_state_low,
# and from 17 to 32 in balance_state_high; hilo_cell records the offset
# relative to the state group
if cell >= 16:
state = balance_state_high
hilo_cell = cell - 16
else:
state = balance_state_low
hilo_cell = cell
# Cells are recorded as groups of 4 bits (0x0-0xF) per 4 cells
g = int(hilo_cell / 4)
b = 2**(hilo_cell - (g * 4 ))
data['bms_cells_balancing']['raw_values'][cell+1] = bool((state >> g) & b)
data['bms_cells_balancing']['values'][cell+1] = "{0}".format(int(bool((state >> g) & b)))
if debug > 1:
print(" Balancing cell {0}: {1}".format(cell, bool((state >> g & b))))
protection_state = bytes_to_digits(response[20], response[21])
sop = protection_state & 1
sup = protection_state & 2
gop = protection_state & 4
gup = protection_state & 8
cotp = protection_state & 16
cutp = protection_state & 32
dotp = protection_state & 64
dutp = protection_state & 128
cocp = protection_state & 256
docp = protection_state & 512
scp = protection_state & 1024
fdic = protection_state & 2048
slm = protection_state & 4096
data['bms_protection_sop_bool'] = dict()
data['bms_protection_sop_bool']['help'] = "Single overvoltage protection"
data['bms_protection_sop_bool']['raw_value'] = bool(sop)
data['bms_protection_sop_bool']['value'] = "{0}".format(int(bool(sop)))
data['bms_protection_sup_bool'] = dict()
data['bms_protection_sup_bool']['help'] = "Single undervoltage protection"
data['bms_protection_sup_bool']['raw_value'] = bool(sup)
data['bms_protection_sup_bool']['value'] = "{0}".format(int(bool(sup)))
data['bms_protection_wgop_bool'] = dict()
data['bms_protection_wgop_bool']['help'] = "Whole group overvoltage protection"
data['bms_protection_wgop_bool']['raw_value'] = bool(gop)
data['bms_protection_wgop_bool']['value'] = "{0}".format(int(bool(gop)))
data['bms_protection_wgup_bool'] = dict()
data['bms_protection_wgup_bool']['help'] = "Whole group undervoltage protection"
data['bms_protection_wgup_bool']['raw_value'] = bool(gup)
data['bms_protection_wgup_bool']['value'] = "{0}".format(int(bool(gup)))
data['bms_protection_cotp_bool'] = dict()
data['bms_protection_cotp_bool']['help'] = "Charging over-temperature protection"
data['bms_protection_cotp_bool']['raw_value'] = bool(cotp)
data['bms_protection_cotp_bool']['value'] = "{0}".format(int(bool(cotp)))
data['bms_protection_cutp_bool'] = dict()
data['bms_protection_cutp_bool']['help'] = "Charging under-temperature protection"
data['bms_protection_cutp_bool']['raw_value'] = bool(cutp)
data['bms_protection_cutp_bool']['value'] = "{0}".format(int(bool(cutp)))
data['bms_protection_dotp_bool'] = dict()
data['bms_protection_dotp_bool']['help'] = "Discharging over-temperature protection"
data['bms_protection_dotp_bool']['raw_value'] = bool(dotp)
data['bms_protection_dotp_bool']['value'] = "{0}".format(int(bool(dotp)))
data['bms_protection_dutp_bool'] = dict()
data['bms_protection_dutp_bool']['help'] = "Discharging under-protection"
data['bms_protection_dutp_bool']['raw_value'] = bool(dutp)
data['bms_protection_dutp_bool']['value'] = "{0}".format(int(bool(dutp)))
data['bms_protection_cocp_bool'] = dict()
data['bms_protection_cocp_bool']['help'] = "Charging over-current protection"
data['bms_protection_cocp_bool']['raw_value'] = bool(cocp)
data['bms_protection_cocp_bool']['value'] = "{0}".format(int(bool(cocp)))
data['bms_protection_docp_bool'] = dict()
data['bms_protection_docp_bool']['help'] = "Discharging over-current protection"
data['bms_protection_docp_bool']['raw_value'] = bool(docp)
data['bms_protection_docp_bool']['value'] = "{0}".format(int(bool(docp)))
data['bms_protection_scp_bool'] = dict()
data['bms_protection_scp_bool']['help'] = "Short-circuit protection"
data['bms_protection_scp_bool']['raw_value'] = bool(scp)
data['bms_protection_scp_bool']['value'] = "{0}".format(int(bool(scp)))
data['bms_protection_fdic_bool'] = dict()
data['bms_protection_fdic_bool']['help'] = "Front detection IC error"
data['bms_protection_fdic_bool']['raw_value'] = bool(fdic)
data['bms_protection_fdic_bool']['value'] = "{0}".format(int(bool(fdic)))
data['bms_protection_slmos_bool'] = dict()
data['bms_protection_slmos_bool']['help'] = "Software lock MOS"
data['bms_protection_slmos_bool']['raw_value'] = bool(slm)
data['bms_protection_slmos_bool']['value'] = "{0}".format(int(bool(slm)))
if debug > 2:
print(" Protection state: {0}".format(protection_state))
print(" Single overvoltage protection: {0}".format(bool(sop)))
print(" Single undervoltage protection: {0}".format(bool(sup)))
print(" Whole group overvoltage protection: {0}".format(bool(gop)))
print(" Whole group undervoltage protection: {0}".format(bool(gup)))
print(" Charging over-temperature protection: {0}".format(bool(cotp)))
print(" Charging under-temperature protection: {0}".format(bool(cutp)))
print(" Discharging over-temperature protection: {0}".format(bool(dotp)))
print(" Discharging under-protection: {0}".format(bool(dutp)))
print(" Charging over-current protection: {0}".format(bool(cocp)))
print(" Discharging over-current protection: {0}".format(bool(docp)))
print(" Short-circuit protection: {0}".format(bool(scp)))
print(" Front detection IC error: {0}".format(bool(fdic)))
print(" Software lock MOS: {0}".format(bool(slm)))
software_version = bytes([response[22]])
# percent of capacity remaining, converted to a per mille ratio between 0 and 1
rsoc = response[23] * 0.01
data['bms_capacity_charge_ratio'] = dict()
data['bms_capacity_charge_ratio']['help'] = "Percent Charge"
data['bms_capacity_charge_ratio']['raw_value'] = rsoc
data['bms_capacity_charge_ratio']['value'] = "{0}".format(rsoc)
data['bms_capacity_charge_ratio']['units'] = "\u2030"
if debug > 1:
print(" Capacity remaining: {0}%".format(rsoc * 100))
# bit0 = charging; bit1 = discharging; 0 = MOS closing; 1 = MOS opening
control_status = response[24]
data['bms_charge_is_charging'] = dict()
data['bms_charge_is_charging']['help'] = "MOSFET charging"
data['bms_charge_is_charging']['raw_value'] = bool(control_status & 1)
data['bms_charge_is_charging']['value'] = "{0}".format(int(bool(control_status & 1)))
data['bms_charge_is_discharging'] = dict()
data['bms_charge_is_discharging']['help'] = "MOSFET discharging"
data['bms_charge_is_discharging']['raw_value'] = bool(control_status & 1)
data['bms_charge_is_discharging']['value'] = "{0}".format(int(bool(control_status & 1)))
if debug > 1:
if (control_status & 1):
print(" MOSFET charging: yes")
else:
print(" MOSFET charging: no")
if ((control_status >> 1) & 1):
print(" MOSFET discharging: yes")
else:
print(" MOSFET discharging: no")
ntc_num = response[26] # number of temperature sensors
ntc_content = bytearray() # 2 * ntc_num in size
temperatures = list()
for i in range(ntc_num):
temperatures.append((bytes_to_digits(response[27+(2*i)], response[28+(2*i)]) - 2731) * 0.1)
data['bms_temperature_sensor_num'] = dict()
data['bms_temperature_sensor_num']['help'] = "Temperature Sensors"
data['bms_temperature_sensor_num']['raw_value'] = ntc_num
data['bms_temperature_sensor_num']['value'] = "{0}".format(ntc_num)
data['bms_temperature_celcius'] = dict()
data['bms_temperature_celcius']['help'] = "Temperature"
data['bms_temperature_celcius']['units'] = "\u00B0C"
data['bms_temperature_celcius']['label'] = 'sensor'
data['bms_temperature_celcius']['raw_values'] = dict()
data['bms_temperature_celcius']['values'] = dict()
for i, temp in enumerate(temperatures):
data['bms_temperature_celcius']['raw_values'][i+1] = temp
data['bms_temperature_celcius']['values'][i+1] = "{:.2f}".format(temp)
if debug > 1:
print(" Number of temperature sensors: {0}".format(ntc_num))
for i, temp in enumerate(temperatures):
print(u" Temperature sensor {:d}: {:.2f}\u00B0C".format(i+1, temp))
return data
def parse_04_response(response, debug=0):
data = dict()
# Response is 7 + cells * 2 bytes:
# 00 begin: \xDD
# 01 r/w: \xA5
# 02 status: \x00 = correct; \x80 = incorrect
# 03 length (usually 8)
# 04 data (size of length)
# ...
# length+4 checksum
# length+5 checksum
# length+6 end \x77
if bytes([response[0]]) != b'\xdd':
print("parse_04_response ERROR: first byte not found: {0}".format(response[0]))
return False
if bytes([response[2]]) == b'\x80':
print("parse_04_response ERROR: error byte returned from BMS: {0}".format(response[2]))
return False
data_len = response[3]
if debug > 2:
print(" Data length (trimming 4 bytes): {0}".format(data_len))
# The checksum is two bytes, offset by data_len + 4
# Five bytes at the front of data
# begin; rw; status, command; length
# The checksum should check command, length, and data: [3] to [3+data_len+1]
first = data_len + 4
second = data_len + 5
if second > len(response):
print("parse_04_response ERROR: cell voltage checksum not found")
return False
checksum = bytes([response[first], response[second]])
if not verify_checksum(response[3:first], checksum):
print("parse_04_response ERROR: failed to validate received checksum")
return False
if data_len > 0:
data['bms_voltage_cells_volts'] = dict()
data['bms_voltage_cells_volts']['help'] = "Cell Voltages"
data['bms_voltage_cells_volts']['units'] = "V"
data['bms_voltage_cells_volts']['label'] = "cell"
data['bms_voltage_cells_volts']['raw_values'] = dict()
data['bms_voltage_cells_volts']['values'] = dict()
for cell in range(int(data_len / 2)):
first = (cell * 2) + 4
second = (cell * 2) + 5
cellv = bytes_to_digits(response[first], response[second]) * 0.001
data['bms_voltage_cells_volts']['raw_values'][cell+1] = cellv
data['bms_voltage_cells_volts']['values'][cell+1] = "{:.3f}".format(cellv)
if debug > 1:
print(" Cell {:.0f}: {:.3f}V".format(cell+1, cellv))
return data
def collect_data(ser, debug=0):
# Request is 7 bytes:
# \xDD for start
# \xA5 for read, \x5A for write
# \x03 for regular info; \x04 for individual voltages
# \x77 ends
data = dict()
reqmsg = bytearray([ 0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77 ])
response_03 = requestMessage(ser, reqmsg, debug)
if len(response_03) == 0:
if debug > 0:
print("collect_data: Error retrieving BMS info. Trying again...")
return False
response_03 = bytearray(response_03)
reqmsg = bytearray([ 0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77 ])
response_04 = requestMessage(ser, reqmsg, debug)
if len(response_04) == 0:
if debug > 0:
print("collect_data: Error retrieving BMS info. Trying again...")
return False
response_04 = bytearray(response_04)
parsed_03 = parse_03_response(response_03, debug)
if parsed_03 is not False:
data.update(parsed_03)
else:
return False
parsed_04 = parse_04_response(response_04, debug)
if parsed_04 is not False:
data.update(parsed_04)
else:
return False
return data
def read_request(connection, debug=0):
# get length of expected json string
request = bytes()
try:
request = connection.recv(struct.calcsize('!I'))
except Exception as e:
raise OSError("unable to read request length from socket: {}".format(e))
try:
length = struct.unpack('!I', request)[0]
except Exception as e:
raise Exception("unable to determine request length: {}".format(e))
if debug > 4:
print("socket: incoming length: {}, encoded as {}".format(length, request))
# read length bytes
try:
request = connection.recv(length)
except Exception as e:
raise OSError("unable to read socket: {}".format(e))
if debug > 3:
print("socket: incoming request: {}".format(request))
try:
request_data = json.loads(request)
except Exception as e:
raise Exception("unable to read incoming request: {}".format(e))
if debug > 2:
print('socket: received {!r}'.format(request_data))
return request_data
def send_response(connection, response_data, debug=0):
try:
client = response_data['client']
except:
client = "unknown client"
if debug > 2:
print('socket: sending {!r}'.format(response_data))
try:
response = json.dumps(response_data).encode()
# add length to the start of the json string, so we know how much to read on the other end
length = struct.pack('!I', len(response))
if debug > 4:
print('socket: sending {} data of length: {}'.format(client, length))
response = length + response
if debug > 3:
print("socket: outgoing response: {}".format(response))
return connection.sendall(response)
except Exception as e:
raise OSError("unable to encode response: {}".format(e))
def main():
import argparse
import socket, socketserver
import pwd, grp
signal.signal(signal.SIGTERM, signalHandler)
global current_data
timestamp = 0
parser = argparse.ArgumentParser(
description='Query JBD BMS and report status',
add_help=True,
)
parser.add_argument('--device', '-d', dest='device', action='store',
default='/dev/ttyUSB0', help='USB device to read')
parser.add_argument('--socket', '-s', dest='socket', action='store',
default='/run/bmspy/bms', help='Socket to communicate with daemon')
parser.add_argument('--user', '-u', dest='uid_name', action='store',
default='nobody', help='Run daemon as user')
parser.add_argument('--group', '-g', dest='gid_name', action='store',
default='dialout', help='Run daemon as group')
parser.add_argument('--verbose', '-v', action='count',
default=0, help='Print more verbose information (can be specified multiple times)')
args = parser.parse_args()
debug=args.verbose
if debug > 0:
print("Running BMS query daemon on socket {}".format(args.socket))
ser = initialise_serial(args.device)
# Create any necessary directories for the socket
socket_dir = os.path.dirname(args.socket)
socket_dir_created = False
if not os.path.isdir(socket_dir):
os.makedirs(socket_dir, exist_ok=True)
socket_dir_created = True
starting_uid = os.getuid()
starting_gid = os.getgid()
if starting_uid == 0:
running_uid = pwd.getpwnam(args.uid_name)[2]
running_gid = grp.getgrnam(args.gid_name)[2]
# If we've created a new directory for the socket, ensure that
# the highest-level directory has the correct permissions
if socket_dir_created:
os.chown(socket_dir, running_uid, running_gid)
os.chmod(socket_dir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
new_umask = 0o003
old_umask = os.umask(new_umask)
if debug > 1:
print('socket: old umask: %s, new umask: %s' % \
(oct(old_umask), oct(new_umask)))
# Try setting the new uid/gid
try:
os.setgid(running_gid)
except OSError as e:
print('could not set effective group id: {}'.format(e))
try:
os.setuid(running_uid)
except OSError as e:
print('could not set effective user id: {}'.format(e))
final_uid = os.getuid()
final_gid = os.getgid()
if debug > 0:
print('socket: running as {}:{}'.format(pwd.getpwuid(final_uid)[0], grp.getgrgid(final_gid)[0]))
# Make sure the socket does not exist
if os.path.exists(args.socket):
raise OSError("socket {} already exists; exiting...".format(args.socket))
# Create socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Bind the socket to the port
if debug > 2:
print('starting up on {}'.format(args.socket))
sock.bind(args.socket)
atexit.register(socket_cleanup, args.socket, debug)
# Listen for incoming connections
sock.listen(1)
while True:
connection = None
client_address = None
try:
# Wait for a connection
if debug > 2:
print('socket: waiting for a connection')
connection, client_address = sock.accept()
request_data = dict()
try:
request_data = read_request(connection, debug)
except Exception as e:
print("socket ERROR: {}".format(e))
continue
client = request_data['client'] or 'unknown'
match request_data['command']:
case 'REGISTER':
connected_clients.append(client)
send_response(connection, {'status': 'REGISTERED', 'client': client}, debug)
case 'DEREGISTER':
try:
connected_clients.remove(client)
except:
pass
send_response(connection, {'status': 'DEREGISTERED', 'client': client}, debug)
case 'GET':
timestamp = 0
if bool(current_data) is True:
timestamp = current_data.get('timestamp', 0)
print("reading data, current timestamp is {}, time is {}".format(timestamp, time.time()))
# only get new data five seconds after the last read
if timestamp <= time.time() - 5:
current_data = None
# Retry every second until we get a result
while bool(current_data) is False:
current_data = collect_data(ser, debug)
time.sleep(1)
current_data['timestamp'] = time.time()
current_data['client'] = client
send_response(connection, current_data, debug)
case _:
print('socket: invalid request from {}'.format(request_data['client']))
break
except KeyboardInterrupt:
if connection:
connection.close()
sys.exit(1)
finally:
# Clean up the connection
if connection:
connection.close()
if __name__ == "__main__":
main()