Add bms.py.
This commit is contained in:
		| @@ -1,3 +1,5 @@ | ||||
| # bmspy | ||||
|  | ||||
| bmspy can be used to get information from a xiaoxiang-type BMS system. | ||||
| bmspy is a tool to get information from a xiaoxiang-type BMS system, using some sort of serial connection. | ||||
|  | ||||
| It can display the information as text, in JSON, or export the data continuously to a Prometheus exporter. | ||||
|   | ||||
							
								
								
									
										642
									
								
								bms.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										642
									
								
								bms.py
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,642 @@ | ||||
| #!/usr/bin/env python3 | ||||
| # | ||||
| # Communicate with a JBD/SZLLT BMS and return basic information | ||||
| # in order to shutdown equipment when voltage levels drop or remaining | ||||
| # capacity gets low | ||||
| # TODO: get individual cell voltage | ||||
| # TODO: scripts to shutdown NAS when voltage goes below 13.xV or | ||||
| #       percent_capacity goes below 25% | ||||
| # | ||||
| import argparse | ||||
| import json | ||||
| import pprint | ||||
| import serial | ||||
| import serial.rs485 | ||||
| import signal | ||||
| import sys | ||||
| import time | ||||
| try: | ||||
|     import prometheus_client | ||||
|     can_export_prometheus = True | ||||
| except: | ||||
|     can_export_prometheus = False | ||||
|  | ||||
|  | ||||
| PROMETHEUS_UPDATE_PERIOD = 30 | ||||
| SERIALPORT = "/dev/ttyUSB0" | ||||
| #SERIALPORT = "/dev/rfcomm1" | ||||
| BAUDRATE = 9600 | ||||
|  | ||||
| # usb 1-1.4: new full-speed USB device number 4 using xhci_hcd | ||||
| # usb 1-1.4: New USB device found, idVendor=0403, idProduct=6001, bcdDevice= 6.00 | ||||
| # usb 1-1.4: New USB device strings: Mfr=1, Product=2, SerialNumber=3 | ||||
| # usb 1-1.4: Product: FT232R USB UART | ||||
| # usb 1-1.4: Manufacturer: FTDI | ||||
| # usb 1-1.4: SerialNumber: AQ00QFHZ | ||||
| # usbcore: registered new interface driver usbserial_generic | ||||
| # usbserial: USB Serial support registered for generic | ||||
| # usbcore: registered new interface driver ftdi_sio | ||||
| # usbserial: USB Serial support registered for FTDI USB Serial Device | ||||
| # ftdi_sio 1-1.4:1.0: FTDI USB Serial Device converter detected | ||||
| # usb 1-1.4: Detected FT232RL | ||||
| # usb 1-1.4: FTDI USB Serial Device converter now attached to ttyUSB0 | ||||
| # usb 1-1.4: usbfs: interface 0 claimed by ftdi_sio while 'python3' sets config #1 | ||||
|  | ||||
| # TODO: ensure ttyUSB0 points to  idVendor=0403, idProduct=6001 | ||||
| # with serial.tools.list_ports.ListPortInfo | ||||
| # python3 -m serial.tools.list_ports USB | ||||
| ser = serial.Serial(SERIALPORT, BAUDRATE) | ||||
| ser.parity = serial.PARITY_NONE     # set parity check: no parity | ||||
| ser.bytesize = serial.EIGHTBITS     # number of bits per bytes | ||||
| ser.stopbits = serial.STOPBITS_ONE  # number of stop bits | ||||
|  | ||||
| ser.timeout = 1         # timeout block read | ||||
| ser.writeTimeout = 1    # timeout for write | ||||
|  | ||||
| def cleanup(ser, debug): | ||||
|     if debug > 2: | ||||
|         print("Cleaning up...") | ||||
|     if ser.is_open: | ||||
|         ser.reset_input_buffer()  # flush input buffer, discarding all its contents | ||||
|         ser.reset_output_buffer() # flush output buffer, aborting current output | ||||
|         ser.close() | ||||
|  | ||||
| def calculate_checksum(msg): | ||||
|     checksum = '' | ||||
|     return checksum | ||||
|  | ||||
| def verify_checksum(data, checksum): | ||||
|     # (data + length + command code) checksum, then complement, then add 1, high bit first, low bit last | ||||
|     # data should have start/rw stripped | ||||
|     s = 0 | ||||
|     for i in data: | ||||
|         s += i | ||||
|     s = (s ^ 0xFFFF) + 1 | ||||
|     chk = bytes_to_digits(checksum[0], checksum[1]) | ||||
|     return s == chk | ||||
|  | ||||
| def convert_to_signed(x): | ||||
|     # For values below 1024, these seem to be actual results | ||||
|     # For values above 1024, these seem to be encoded to account for high and negative floats | ||||
|     max_uint = 1024 | ||||
|     if x >= max_uint: | ||||
|         return (x - 2**9) % 2**10 - 2**9 | ||||
|     else: | ||||
|         return x | ||||
|  | ||||
| def bytes_to_digits(high, low): | ||||
|     result = high | ||||
|     result <<= 8 | ||||
|     result = result | low | ||||
|     return result | ||||
|  | ||||
| def bytes_to_date(high, low): | ||||
|     result= bytes_to_digits(high, low) | ||||
|     day  = result & 0x1f | ||||
|     mon  = (result >> 5) & 0x0f | ||||
|     year = 2000 + (result >> 9) | ||||
|     return "{:04d}-{:02d}-{:02d}".format(year, mon, day) | ||||
|  | ||||
| # takes a serial object and a message, returns a response | ||||
| def requestMessage(ser, reqmsg, debug): | ||||
|     if debug > 2: | ||||
|         print('Starting Up Serial Monitor') | ||||
|     if ser.is_open: | ||||
|         ser.close() | ||||
|  | ||||
|     try: | ||||
|         ser.open() | ||||
|     except Exception as e: | ||||
|         print("error open serial port: {0}".format(str(e))) | ||||
|         return False | ||||
|  | ||||
|     if ser.is_open: | ||||
|         try: | ||||
|             # Resetting once alone doesn't seem to do the trick when  we discarded data | ||||
|             # on a previous run | ||||
|             for i in range(2): | ||||
|                 ser.reset_input_buffer()  # flush input buffer, discarding all its contents | ||||
|                 ser.reset_output_buffer() # flush output buffer, aborting current output | ||||
|             if debug > 0: | ||||
|                 print("Write data: {0}".format("".join('0x{:02x} '.format(x) for x in reqmsg))) | ||||
|             w = ser.write(reqmsg) | ||||
|             if debug > 2: | ||||
|                 print("Bytes written: {0}".format(w)) | ||||
|             #time.sleep(1) | ||||
|             if w != len(reqmsg): | ||||
|                 print("ERROR: {0} bytes written, {1} expected.".format(w, len(reqmsg))) | ||||
|                 return False | ||||
|             wait_time = 0 | ||||
|             while ser.in_waiting == 0: | ||||
|                 # Return an empty string if we end up waiting too long | ||||
|                 if wait_time > 2: | ||||
|                     cleanup(ser, debug) | ||||
|                     return '' | ||||
|                 if debug > 2: | ||||
|                     print("Waiting for data...") | ||||
|                 time.sleep(0.5) | ||||
|                 wait_time += 1 | ||||
|             if debug > 1: | ||||
|                 print("Awaiting reading: {0}".format(ser.in_waiting)) | ||||
|             response = ser.read_until(b'\x77') | ||||
|             # Return an empty string if the read timed out or returned nothing | ||||
|             if len(response) == 0: | ||||
|                 return '' | ||||
|             if debug > 0: | ||||
|                 print("Read data: {0}".format(response.hex())) | ||||
|             cleanup(ser, debug) | ||||
|             return response | ||||
|         except Exception as e: | ||||
|             print("error communicating...: {0}".function(str(e))) | ||||
|     else: | ||||
|         print("cannot open serial port") | ||||
|  | ||||
| def parse_03_response(response): | ||||
|     data = dict() | ||||
|     # Response is 34 bytes: | ||||
|     # 00        begin:  \xDD | ||||
|     # 01        r/w:    \xA5 | ||||
|     # 02        status: \x00 = correct; \x80 = incorrect | ||||
|     # 03        length (usually 27) | ||||
|     # 04        data (size of length) | ||||
|     # ... | ||||
|     # length+4  checksum | ||||
|     # length+5  checksum | ||||
|     # length+6  end      \x77 | ||||
|     if bytes([response[0]]) != b'\xdd': | ||||
|         print("ERROR: first byte not found: {0}".format(response[0])) | ||||
|         return False | ||||
|  | ||||
|     if bytes([response[2]]) == b'\x80': | ||||
|         print("ERROR: error byte returned from BMS: {0}".format(response[2])) | ||||
|         return False | ||||
|  | ||||
|     data_len = response[3] | ||||
|     if debug > 2: | ||||
|         print("Data length (trimming 4 bytes): {0}".format(data_len)) | ||||
|  | ||||
|     # The checksum is two bytes, offset by data_len + 4 | ||||
|     # Five bytes at the front of data | ||||
|     # begin; rw; status, command; length | ||||
|     # The checksum should check command, length, and data: [3] to [3+data_len+1] | ||||
|     first  = data_len + 4 | ||||
|     second = data_len + 5 | ||||
|     if second > len(response): | ||||
|         print("ERROR: primary response checksum not found") | ||||
|         return False; | ||||
|     checksum = bytes([response[first], response[second]]) | ||||
|     if not verify_checksum(response[3:first], checksum): | ||||
|         print("ERROR: failed to validate received checksum") | ||||
|         return False | ||||
|  | ||||
|     if data_len > 0: | ||||
|         vtot = bytes_to_digits(response[4], response[5]) * 0.01 | ||||
|         data['bms_voltage_total_volts'] = dict() | ||||
|         data['bms_voltage_total_volts']['help']  = "Total Voltage" | ||||
|         data['bms_voltage_total_volts']['value'] = "{:.2f}".format(vtot) | ||||
|         data['bms_voltage_total_volts']['units'] = "V" | ||||
|         #data["Total Voltage"] = "{:.2f}V".format(vtot) | ||||
|         if debug > 1: | ||||
|             print("Total voltage: {:.2f}V".format(vtot)) | ||||
|  | ||||
|         current = bytes_to_digits(response[6], response[7]) | ||||
|         current = convert_to_signed(current) * 0.01 | ||||
|         data["bms_current_amps"] = dict() | ||||
|         data["bms_current_amps"]['help']  = "Current" | ||||
|         data["bms_current_amps"]['value'] = "{:.2f}".format(current) | ||||
|         data["bms_current_amps"]['units'] = "A" | ||||
|         #data["Current"] = "{:.2f}A".format(current) | ||||
|         if debug > 1: | ||||
|             print("Current: {:.2f}A".format(current)) | ||||
|  | ||||
|         res_cap = bytes_to_digits(response[8], response[9]) * 0.01 | ||||
|         nom_cap = bytes_to_digits(response[10], response[11]) * 0.01 | ||||
|         data['bms_capacity_remaining_ah'] = dict() | ||||
|         data['bms_capacity_remaining_ah']['help']  = "Remaining Capacity" | ||||
|         data['bms_capacity_remaining_ah']['value'] = "{:.2f}".format(res_cap) | ||||
|         data['bms_capacity_remaining_ah']['units'] = "Ah" | ||||
|         data['bms_capacity_nominal_ah'] = dict() | ||||
|         data['bms_capacity_nominal_ah']['help']  = "Nominal Capacity" | ||||
|         data['bms_capacity_nominal_ah']['value'] = "{:.2f}".format(nom_cap) | ||||
|         data['bms_capacity_nominal_ah']['units'] = "Ah" | ||||
|         #data["Remaining Capacity"] = "{:.2f}Ah".format(res_cap) | ||||
|         #data["Nominal Capacity"] = "{:.2f}Ah".format(nom_cap) | ||||
|         if debug > 1: | ||||
|             print("Remaining capacity: {:.2f}Ah".format(res_cap)) | ||||
|             print("Nominal capacity: {:.2f}Ah".format(nom_cap)) | ||||
|  | ||||
|         cycle_times = bytes_to_digits(response[12], response[13]) | ||||
|         data['bms_charge_cycles'] = dict() | ||||
|         data['bms_charge_cycles']['help']  = "Charge Cycles" | ||||
|         data['bms_charge_cycles']['value'] = "{0}".format(cycle_times) | ||||
|         #data["Charge Cycles"] = "{0}".format(cycle_times) | ||||
|         if debug > 1: | ||||
|             print("Cycle times: {0}".format(cycle_times)) | ||||
|  | ||||
|         man_date = bytes_to_date(response[14], response[15]) | ||||
|         data['bms_manufacture_date'] = dict() | ||||
|         data['bms_manufacture_date']['help'] = "Date of Manufacture" | ||||
|         data['bms_manufacture_date']['info'] = "{0}".format(man_date) | ||||
|         #data["Date of Manufacture"] = "{0}".format(man_date) | ||||
|         if debug > 1: | ||||
|             print("Manufacturing date: {0}".format(man_date)) | ||||
|  | ||||
|         cells = response[25] # 4S | ||||
|         data['bms_cell_number'] = dict() | ||||
|         data['bms_cell_number']['help']  = "Cells" | ||||
|         data['bms_cell_number']['value'] = "{0}".format(cells) | ||||
|         #data["Cells"] = "{0}".format(cells) | ||||
|         if debug > 1: | ||||
|             print("Cells: {0}S".format(cells)) | ||||
|  | ||||
|         balance_state_high = bytes_to_digits(response[16], response[17]) # 1S  to 16S | ||||
|         balance_state_low  = bytes_to_digits(response[18], response[19]) # 17S to 32S | ||||
|         # 1 bit per 4S (2 bytes = 16S); in 4S, we should expect: | ||||
|         # 0x0 (no cells balancing)               0 | ||||
|         # 0x1 (cell 1 balancing)                 1 | ||||
|         # 0x2 (cell 2 balancing)                 2 | ||||
|         # 0x3 (cells 1 + 2 balancing)            3 | ||||
|         # 0x4 (cell 3 balancing)                 4 | ||||
|         # 0x5 (cells 1 + 3 balancing)            5 | ||||
|         # 0x6 (cells 2 + 3 balancing)            6 | ||||
|         # 0x7 (cells 1 + 2 + 3 balancing)        7 | ||||
|         # 0x8 (cell 4 balancing)                 8 | ||||
|         # 0x9 (cells 1 + 4 balancing)            9 | ||||
|         # 0xA (cells 2 + 4 balancing)           10 | ||||
|         # 0xB (cells 1 + 2 + 4 balancing)       11 | ||||
|         # 0xC (cells 3 + 4 balancing)           12 | ||||
|         # 0xD (cells 1 + 3 + 4 balancing)       13 | ||||
|         # 0xE (cells 2 + 3 + 4 balancing)       14 | ||||
|         # 0xF (cells 1 + 2 + 3 + 4 balancing)   15 | ||||
|         #data["Balancing"] = dict() | ||||
|         data['bms_cells_balancing'] = dict() | ||||
|         data['bms_cells_balancing']['help']   = "Cells balancing" | ||||
|         data['bms_cells_balancing']['label']  = 'cell' | ||||
|         data['bms_cells_balancing']['values'] = dict() | ||||
|         for cell in range(cells): | ||||
|             # Cells from 1 to 16 are recorded in balance_state_low, | ||||
|             # and from 17 to 32 in balance_state_high; hilo_cell records the offset | ||||
|             # relative to the state group | ||||
|             if cell >= 16: | ||||
|                 state = balance_state_high | ||||
|                 hilo_cell = cell - 16 | ||||
|             else: | ||||
|                 state = balance_state_low | ||||
|                 hilo_cell = cell | ||||
|             # Cells are recorded as groups of 4 bits (0x0-0xF) per 4 cells | ||||
|             g = int(hilo_cell / 4)  | ||||
|             b = 2**(hilo_cell - (g * 4 )) | ||||
|             data['bms_cells_balancing']['values'][cell+1] = "{0}".format(int(bool((state >> g) & b))) | ||||
|             #data["Balancing"][cell+1] = "{0}".format(bool((state >> g) & b)) | ||||
|             if debug > 1: | ||||
|                 print("Balancing cell {0}: {1}".format(cell, bool((state >> g & b)))) | ||||
|                              | ||||
|         protection_state = bytes_to_digits(response[20], response[21]) | ||||
|         sop  = protection_state & 1 | ||||
|         sup  = protection_state & 2 | ||||
|         gop  = protection_state & 4 | ||||
|         gup  = protection_state & 8 | ||||
|         cotp = protection_state & 16 | ||||
|         cutp = protection_state & 32 | ||||
|         dotp = protection_state & 64 | ||||
|         dutp = protection_state & 128 | ||||
|         cocp = protection_state & 256 | ||||
|         docp = protection_state & 512 | ||||
|         scp  = protection_state & 1024 | ||||
|         fdic = protection_state & 2048 | ||||
|         slm  = protection_state & 4096 | ||||
|         data['bms_protection_sop_bool'] = dict() | ||||
|         data['bms_protection_sop_bool']['help'] = "Single overvoltage protection" | ||||
|         data['bms_protection_sop_bool']['value'] ="{0}".format(int(bool(sop))) | ||||
|         data['bms_protection_sup_bool'] = dict() | ||||
|         data['bms_protection_sup_bool']['help']  = "Single undervoltage protection" | ||||
|         data['bms_protection_sup_bool']['value'] ="{0}".format(int(bool(sup))) | ||||
|         data['bms_protection_wgop_bool'] = dict() | ||||
|         data['bms_protection_wgop_bool']['help']  = "Whole group overvoltage protection" | ||||
|         data['bms_protection_wgop_bool']['value'] ="{0}".format(int(bool(gop))) | ||||
|         data['bms_protection_wgup_bool'] = dict() | ||||
|         data['bms_protection_wgup_bool']['help']  = "Whole group undervoltage protection" | ||||
|         data['bms_protection_wgup_bool']['value'] ="{0}".format(int(bool(gup))) | ||||
|         data['bms_protection_cotp_bool'] = dict() | ||||
|         data['bms_protection_cotp_bool']['help']  = "Charging over-temperature protection" | ||||
|         data['bms_protection_cotp_bool']['value'] ="{0}".format(int(bool(cotp))) | ||||
|         data['bms_protection_cutp_bool'] = dict() | ||||
|         data['bms_protection_cutp_bool']['help']  = "Charging under-temperature protection" | ||||
|         data['bms_protection_cutp_bool']['value'] ="{0}".format(int(bool(cutp))) | ||||
|         data['bms_protection_dotp_bool'] = dict() | ||||
|         data['bms_protection_dotp_bool']['help']  = "Discharging over-temperature protection" | ||||
|         data['bms_protection_dotp_bool']['value'] ="{0}".format(int(bool(dotp))) | ||||
|         data['bms_protection_dutp_bool'] = dict() | ||||
|         data['bms_protection_dutp_bool']['help']  = "Discharging under-protection" | ||||
|         data['bms_protection_dutp_bool']['value'] ="{0}".format(int(bool(dutp))) | ||||
|         data['bms_protection_cocp_bool'] = dict() | ||||
|         data['bms_protection_cocp_bool']['help']  = "Charging over-current protection" | ||||
|         data['bms_protection_cocp_bool']['value'] ="{0}".format(int(bool(cocp))) | ||||
|         data['bms_protection_docp_bool'] = dict() | ||||
|         data['bms_protection_docp_bool']['help']  = "Discharging over-current protection" | ||||
|         data['bms_protection_docp_bool']['value'] ="{0}".format(int(bool(docp))) | ||||
|         data['bms_protection_scp_bool'] = dict() | ||||
|         data['bms_protection_scp_bool']['help']  = "Short-circuit protection" | ||||
|         data['bms_protection_scp_bool']['value'] ="{0}".format(int(bool(scp))) | ||||
|         data['bms_protection_fdic_bool'] = dict() | ||||
|         data['bms_protection_fdic_bool']['help']  = "Front detection IC error" | ||||
|         data['bms_protection_fdic_bool']['value'] ="{0}".format(int(bool(fdic))) | ||||
|         data['bms_protection_slmos_bool'] = dict() | ||||
|         data['bms_protection_slmos_bool']['help']  = "Software lock MOS" | ||||
|         data['bms_protection_slmos_bool']['value'] ="{0}".format(int(bool(slm))) | ||||
|         #data["Single overvoltage protection"] = "{0}".format(bool(sop)) | ||||
|         #data["Single undervoltage protection"] = "{0}".format(bool(sup)) | ||||
|         #data["Whole group overvoltage protection"] = "{0}".format(bool(gop)) | ||||
|         #data["Whole group undervoltage protection"] = "{0}".format(bool(gup)) | ||||
|         #data["Charging over-temperature protection"] = "{0}".format(bool(cotp)) | ||||
|         #data["Charging under-temperature protection"] = "{0}".format(bool(cutp)) | ||||
|         #data["Discharging over-temperature protection"] = "{0}".format(bool(dotp)) | ||||
|         #data["Discharging under-protection"] = "{0}".format(bool(dutp)) | ||||
|         #data["Charging over-current protection"] = "{0}".format(bool(cocp)) | ||||
|         #data["Discharging over-current protection"] = "{0}".format(bool(docp)) | ||||
|         #data["Short-circuit protection"] = "{0}".format(bool(scp)) | ||||
|         #data["Front detection IC error"] = "{0}".format(bool(fdic)) | ||||
|         #data["Software lock MOS"] = "{0}".format(bool(slm)) | ||||
|         if debug > 2: | ||||
|             print("Protection state: {0}".format(protection_state)) | ||||
|             print("Single overvoltage protection: {0}".format(bool(sop))) | ||||
|             print("Single undervoltage protection: {0}".format(bool(sup))) | ||||
|             print("Whole group overvoltage protection: {0}".format(bool(gop))) | ||||
|             print("Whole group undervoltage protection: {0}".format(bool(gup))) | ||||
|             print("Charging over-temperature protection: {0}".format(bool(cotp))) | ||||
|             print("Charging under-temperature protection: {0}".format(bool(cutp))) | ||||
|             print("Discharging over-temperature protection: {0}".format(bool(dotp))) | ||||
|             print("Discharging under-protection: {0}".format(bool(dutp))) | ||||
|             print("Charging over-current protection: {0}".format(bool(cocp))) | ||||
|             print("Discharging over-current protection: {0}".format(bool(docp))) | ||||
|             print("Short-circuit protection: {0}".format(bool(scp))) | ||||
|             print("Front detection IC error: {0}".format(bool(fdic))) | ||||
|             print("Software lock MOS: {0}".format(bool(slm))) | ||||
|  | ||||
|         software_version = bytes([response[22]]) | ||||
|  | ||||
|         # percent of capacity remaining, converted to a per mille ratio between 0 and 1 | ||||
|         rsoc = response[23] * 0.01 | ||||
|         data['bms_capacity_charge_ratio'] = dict() | ||||
|         data['bms_capacity_charge_ratio']['help']  = "Percent Charge" | ||||
|         data['bms_capacity_charge_ratio']['value'] = "{0}".format(rsoc) | ||||
|         data['bms_capacity_charge_ratio']['units'] = "\u2030" | ||||
|         #data["Percent Charge"] = "{0}".format(rsoc) | ||||
|         if debug > 1: | ||||
|             print("Capacity remaining: {0}%".format(rsoc * 100)) | ||||
|  | ||||
|         # bit0 = charging; bit1 = discharging; 0 = MOS closing; 1 = MOS opening | ||||
|         control_status = response[24] | ||||
|         data['bms_charge_is_charging'] = dict() | ||||
|         data['bms_charge_is_charging']['help']  = "MOSFET charging" | ||||
|         data['bms_charge_is_charging']['value'] = int(bool(control_status & 1)) | ||||
|         data['bms_charge_is_discharging'] = dict() | ||||
|         data['bms_charge_is_discharging']['help']  = "MOSFET discharging" | ||||
|         data['bms_charge_is_discharging']['value'] = int(bool(control_status & 1)) | ||||
|         #data["Charging"] = bool(control_status & 1) | ||||
|         #data["Discharging"] = bool((control_status >> 1) & 1) | ||||
|         if debug > 1: | ||||
|             if (control_status & 1): | ||||
|                 print("MOSFET charging: yes") | ||||
|             else: | ||||
|                 print("MOSFET charging: no") | ||||
|             if ((control_status >> 1) & 1): | ||||
|                 print("MOSFET discharging: yes") | ||||
|             else: | ||||
|                 print("MOSFET discharging: no") | ||||
|  | ||||
|         ntc_num      = response[26] # number of temperature sensors | ||||
|         ntc_content  = bytearray() # 2 * ntc_num in size | ||||
|         temperatures = list() | ||||
|         for i in range(ntc_num): | ||||
|             temperatures.append((bytes_to_digits(response[27+(2*i)], response[28+(2*i)]) - 2731) * 0.1) | ||||
|         data['bms_temperature_sensor_num'] = dict() | ||||
|         data['bms_temperature_sensor_num']['help']  = "Temperature Sensors" | ||||
|         data['bms_temperature_sensor_num']['value'] =  ntc_num | ||||
|         #data["Temperature Sensors"] = ntc_num | ||||
|         #data["Temperature"] = dict() | ||||
|         data['bms_temperature_celcius'] = dict() | ||||
|         data['bms_temperature_celcius']['help']   = "Temperature" | ||||
|         data['bms_temperature_celcius']['units']  = "\u00B0C" | ||||
|         data['bms_temperature_celcius']['label']  = 'sensor' | ||||
|         data['bms_temperature_celcius']['values'] = dict() | ||||
|         for i, temp in enumerate(temperatures): | ||||
|             data['bms_temperature_celcius']['values'][i+1] = "{:.2f}".format(temp) | ||||
|             #data["Temperature"][i+1] = "{:.2f}\u00B0C".format(temp) | ||||
|         if debug > 1: | ||||
|             print("Number of temperature sensors: {0}".format(ntc_num)) | ||||
|             for i, temp in enumerate(temperatures): | ||||
|                 print(u"Temperature sensor {:d}: {:.2f}\u00B0C".format(i+1, temp)) | ||||
|  | ||||
|         return data | ||||
|  | ||||
| def parse_04_response(response): | ||||
|     data = dict() | ||||
|     # Response is 7 + cells * 2 bytes: | ||||
|     # 00        begin:  \xDD | ||||
|     # 01        r/w:    \xA5 | ||||
|     # 02        status: \x00 = correct; \x80 = incorrect | ||||
|     # 03        length (usually 8) | ||||
|     # 04        data (size of length) | ||||
|     # ... | ||||
|     # length+4  checksum | ||||
|     # length+5  checksum | ||||
|     # length+6  end      \x77 | ||||
|     if bytes([response[0]]) != b'\xdd': | ||||
|         print("ERROR: first byte not found: {0}".format(response[0])) | ||||
|         return False | ||||
|  | ||||
|     if bytes([response[2]]) == b'\x80': | ||||
|         print("ERROR: error byte returned from BMS: {0}".format(response[2])) | ||||
|         return False | ||||
|  | ||||
|     data_len = response[3] | ||||
|     if debug > 2: | ||||
|         print("Data length (trimming 4 bytes): {0}".format(data_len)) | ||||
|  | ||||
|     # The checksum is two bytes, offset by data_len + 4 | ||||
|     # Five bytes at the front of data | ||||
|     # begin; rw; status, command; length | ||||
|     # The checksum should check command, length, and data: [3] to [3+data_len+1] | ||||
|     first  = data_len + 4 | ||||
|     second = data_len + 5 | ||||
|     if second > len(response): | ||||
|         print("ERROR: cell voltage checksum not found") | ||||
|         return False | ||||
|     checksum = bytes([response[first], response[second]]) | ||||
|     if not verify_checksum(response[3:first], checksum): | ||||
|         print("ERROR: failed to validate received checksum") | ||||
|         return False | ||||
|  | ||||
|     if data_len > 0: | ||||
|         #data["Cell Voltages"] = dict() | ||||
|         data['bms_voltage_cells_volts'] = dict() | ||||
|         data['bms_voltage_cells_volts']['help']   = "Cell Voltages" | ||||
|         data['bms_voltage_cells_volts']['units']  = "V" | ||||
|         data['bms_voltage_cells_volts']['label']  = "cell" | ||||
|         data['bms_voltage_cells_volts']['values'] = dict() | ||||
|         for cell in range(int(data_len / 2)): | ||||
|             first  = (cell * 2) + 4 | ||||
|             second = (cell * 2) + 5 | ||||
|             cellv = bytes_to_digits(response[first], response[second]) * 0.001 | ||||
|             data['bms_voltage_cells_volts']['values'][cell+1]  = "{:.3f}".format(cellv) | ||||
|             #data["Cell Voltages"][i+1] = "{:.3f}V".format(cellv) | ||||
|             if debug > 1: | ||||
|                 print("Cell {:.0f}: {:.3f}V".format(cell+1, cellv)) | ||||
|     return data | ||||
|  | ||||
| def collect_data(ser, debug): | ||||
|     # Request is 7 bytes: | ||||
|     # \xDD for start | ||||
|     # \xA5 for read, \x5A for write | ||||
|     # \x03 for regular info; \x04 for individual voltages | ||||
|     # \x77 ends | ||||
|     data = dict() | ||||
|     reqmsg = bytearray([ 0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77 ]) | ||||
|     response_03 = requestMessage(ser, reqmsg, debug) | ||||
|  | ||||
|     if len(response_03) == 0: | ||||
|         if debug > 0: | ||||
|             print("Error retrieving BMS info. Trying again...") | ||||
|         return False | ||||
|     response_03 = bytearray(response_03) | ||||
|  | ||||
|     reqmsg = bytearray([ 0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77 ]) | ||||
|     response_04 = requestMessage(ser, reqmsg, debug) | ||||
|  | ||||
|     if len(response_04) == 0: | ||||
|         if debug > 0: | ||||
|             print("Error retrieving BMS info. Trying again...") | ||||
|         return False | ||||
|     response_04 = bytearray(response_04) | ||||
|  | ||||
|     parsed_03 = parse_03_response(response_03) | ||||
|     if parsed_03 is not False: | ||||
|         data.update(parsed_03) | ||||
|     else: | ||||
|         return False | ||||
|  | ||||
|     parsed_04 = parse_04_response(response_04) | ||||
|     if parsed_04 is not False: | ||||
|         data.update(parsed_04) | ||||
|     else: | ||||
|         return False | ||||
|  | ||||
|     return data | ||||
|  | ||||
|  | ||||
| def main(ser, debug): | ||||
|     data = dict() | ||||
|     while bool(data) is False: | ||||
|         data = collect_data(ser, debug) | ||||
|         time.sleep(1) | ||||
|  | ||||
|     if args.report_json: | ||||
|         print(json.dumps(data)) | ||||
|  | ||||
|     elif args.report_print: | ||||
|         pp = pprint.PrettyPrinter(indent=4) | ||||
|         pp.pprint(data) | ||||
|  | ||||
|  | ||||
| def prometheus_export(ser, debug, daemonize=True, filename=False): | ||||
|     if not can_export_prometheus: | ||||
|         return | ||||
|  | ||||
|     data = dict() | ||||
|     # Initialize data structure, to fill in help values | ||||
|     while bool(data) is False: | ||||
|         data = collect_data(ser, debug) | ||||
|         time.sleep(1) | ||||
|  | ||||
|     registry = prometheus_client.CollectorRegistry(auto_describe=True) | ||||
|     # Set up the metric data structure for Prometheus | ||||
|     metric = prometheus_create_metric(registry, data) | ||||
|     # Populate the metric data structure this period | ||||
|     prometheus_populate_metric(metric, data) | ||||
|  | ||||
|     if (daemonize): | ||||
|         prometheus_client.start_http_server(9999, registry=registry) | ||||
|  | ||||
|         while True: | ||||
|             # Delay, collect new data, and start again | ||||
|             time.sleep(PROMETHEUS_UPDATE_PERIOD) | ||||
|             # Reset data, so it is re-populated correctly | ||||
|             data = dict() | ||||
|             while bool(data) is False: | ||||
|                 data = collect_data(ser, debug) | ||||
|             time.sleep(1) | ||||
|             prometheus_populate_metric(metric, data) | ||||
|             prometheus_client.generate_latest(registry) | ||||
|     else: | ||||
|         if not filename: | ||||
|             print("Invalid filename supplied"); | ||||
|             return False      | ||||
|         prometheus_client.write_to_textfile(filename, registry=registry) | ||||
|         return True | ||||
|    | ||||
| def prometheus_create_metric(registry, data): | ||||
|     metric = dict() | ||||
|     for name, contains in data.items(): | ||||
|         helpmsg = '' | ||||
|         if contains.get('help'): | ||||
|             helpmsg = contains.get('help') | ||||
|             if contains.get('units'): | ||||
|                 helpmsg += ' (' + contains.get('units') + ')' | ||||
|         if contains.get('value'): | ||||
|             metric[name] = prometheus_client.Gauge(name, helpmsg, registry=registry) | ||||
|         # Has multiple values, each a different label | ||||
|         elif contains.get('values'): | ||||
|             if contains.get('label') is None: | ||||
|                 print("ERROR: no label for {0} specified".format(name)) | ||||
|             label = contains.get('label') | ||||
|             metric[name] = prometheus_client.Gauge(name, helpmsg, [label], registry=registry) | ||||
|         elif contains.get('info'): | ||||
|             metric[name] = prometheus_client.Info(name, helpmsg, registry=registry) | ||||
|         else: | ||||
|             pass | ||||
|     return metric | ||||
|  | ||||
| def prometheus_populate_metric(metric, data): | ||||
|     for name, contains in data.items(): | ||||
|         if contains.get('value'): | ||||
|             value = contains.get('value') | ||||
|             metric[name].set(value) | ||||
|         # doesn't have a value, but has [1-4]: | ||||
|         if contains.get('values') and isinstance(contains.get('values'), dict): | ||||
|             for idx, label_value in contains.get('values').items(): | ||||
|                 metric[name].labels(idx).set(label_value) | ||||
|         if contains.get('info'): | ||||
|             value = contains.get('info') | ||||
|             metric[name].info({name: value}) | ||||
|         else: | ||||
|             pass | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     try: | ||||
|         parser = argparse.ArgumentParser( | ||||
|                     description='Query JBD BMS and report status',  | ||||
|                     add_help=True, | ||||
|         ) | ||||
|         parser.add_argument('--json', '-j', dest='report_json', action='store_true',  | ||||
|                     default=False, help='Report data as JSON') | ||||
|         parser.add_argument('--prometheus', '-p', dest='report_prometheus', action='store_true', | ||||
|                     default=False, help='Daemonize and report data to Prometheus') | ||||
|         parser.add_argument('--textfile', '-t', dest='report_textfile', type=str, action='store', | ||||
|                     default=False, help='Report data to Prometheus using textfile <file>') | ||||
|         parser.add_argument('--print', dest='report_print', action='store_true', | ||||
|                     default=True, help='Report data as text') | ||||
|         parser.add_argument('--verbose', '-v', action='count', | ||||
|                     default=0, help='Print more verbose information (can be specified multiple times)') | ||||
|         args = parser.parse_args() | ||||
|         debug=args.verbose | ||||
|  | ||||
|         if args.report_prometheus: | ||||
|             prometheus_export(ser, debug, daemonize=True) | ||||
|         elif args.report_textfile: | ||||
|             prometheus_export(ser, debug, daemonize=False, filename=args.report_textfile) | ||||
|         else: | ||||
|             main(ser, debug) | ||||
|     except KeyboardInterrupt: | ||||
|         cleanup(ser, debug) | ||||
		Reference in New Issue
	
	Block a user