Add docstrings

This commit is contained in:
2026-05-02 23:12:02 +02:00
parent 86f2c548b3
commit 4e2a967dcf
7 changed files with 21 additions and 0 deletions
+1
View File
@@ -63,6 +63,7 @@ class UPS(ABC):
...
def __bool__(self) -> bool:
"""Return True if the UPS has at least one populated field."""
return next(iter(self.items()), None) is not None
@classmethod
+2
View File
@@ -15,6 +15,7 @@ is_registered = False
def handle_registration(socket_path: str, client_name: str, debug: int = 0) -> dict[str, Any]:
"""Register or deregister a named client with the bmspy daemon over the socket."""
global is_registered
data: dict[str, Any] = dict()
@@ -45,6 +46,7 @@ def handle_registration(socket_path: str, client_name: str, debug: int = 0) -> d
def socket_comms(socket_path: str, request_data: dict[str, Any], debug: int = 0) -> dict[str, Any]:
"""Send a JSON request to the daemon socket and return the decoded JSON response."""
response_data: dict[str, Any] = dict()
# Create a UDS socket
+3
View File
@@ -12,6 +12,7 @@ DAEMON_UPDATE_PERIOD = 30
def influx_shutdown(influxclient: InfluxDBClient3 | None) -> None:
"""Close the InfluxDB client connection if it is not None."""
if influxclient is not None:
influxclient.close()
@@ -26,6 +27,7 @@ def influxdb_export(
daemonize: bool = True,
debug: int = 0,
) -> None:
"""Export BMS data to InfluxDB, either once or as a daemon loop."""
if not url:
url = os.environ["INFLUXDB_V2_URL"]
org = os.environ.get("INFLUXDB_V2_ORG")
@@ -48,6 +50,7 @@ def influxdb_export(
def influxdb_write_snapshot(influxclient: InfluxDBClient3, bucket: str, ups_data: dict[str, UPS], debug: int = 0) -> None:
"""Create InfluxDB points from ups_data and write them to the database."""
if debug > 1:
debugger("influxdb: creating snapshot")
points = influxdb_create_snapshot(ups_data, debug)
+9
View File
@@ -52,6 +52,7 @@ class JBDBMS(UPS):
def serial_cleanup(ser: serial.Serial, debug: int = 0) -> None:
"""Flush and close the serial port if it is open."""
if debug > 2:
debugger("serial: cleaning up...")
if ser.is_open:
@@ -79,6 +80,7 @@ def calculate_checksum(msg: bytes | bytearray) -> str:
def verify_checksum(data: bytes | bytearray, checksum: bytes) -> bool:
"""Verify a JBD BMS packet checksum (sum complement + 1, big-endian)."""
# (data + length + command code) checksum, then complement, then add 1, high bit first, low bit last
# data should have start/rw stripped
s = 0
@@ -90,6 +92,7 @@ def verify_checksum(data: bytes | bytearray, checksum: bytes) -> bool:
def convert_to_signed(x: int) -> int:
"""Convert a JBD current value (possibly encoded) to a signed integer."""
# For values below 1024, these seem to be actual results
# For values above 1024, these seem to be encoded to account for high and negative floats
max_uint = 1024
@@ -100,6 +103,7 @@ def convert_to_signed(x: int) -> int:
def bytes_to_digits(high: int, low: int) -> int:
"""Combine two bytes (high, low) into a single 16-bit unsigned integer."""
result = high
result <<= 8
result = result | low
@@ -107,6 +111,7 @@ def bytes_to_digits(high: int, low: int) -> int:
def bytes_to_date(high: int, low: int) -> str:
"""Decode a JBD-encoded manufacture date from two bytes into 'YYYY-MM-DD' format."""
result = bytes_to_digits(high, low)
day = result & 0x1F
mon = (result >> 5) & 0x0F
@@ -115,6 +120,7 @@ def bytes_to_date(high: int, low: int) -> str:
def requestMessage(ser: serial.Serial, reqmsg: bytearray, debug: int = 0) -> bytes | str | bool:
"""Send a request message to the BMS over serial and return the raw response bytes."""
if debug > 2:
debugger("serial: starting up monitor")
if ser.is_open:
@@ -174,6 +180,7 @@ def requestMessage(ser: serial.Serial, reqmsg: bytearray, debug: int = 0) -> byt
def parse_03_response(response: bytearray, debug: int = 0) -> JBDBMS | bool:
"""Parse a JBD BMS 0x03 (basic info) response packet into a JBDBMS dataclass."""
# Response is 34 bytes:
# 00 begin: \xDD
# 01 r/w: \xA5
@@ -433,6 +440,7 @@ def parse_03_response(response: bytearray, debug: int = 0) -> JBDBMS | bool:
def parse_04_response(response: bytearray, debug: int = 0) -> BMSMultiField | bool:
"""Parse a JBD BMS 0x04 (cell voltages) response packet into a BMSMultiField."""
# Response is 7 + cells * 2 bytes:
# 00 begin: \xDD
# 01 r/w: \xA5
@@ -498,6 +506,7 @@ def parse_04_response(response: bytearray, debug: int = 0) -> BMSMultiField | bo
def collect_data(ser: serial.Serial, debug: int = 0) -> JBDBMS | bool:
"""Request both 0x03 and 0x04 packets from the BMS and return a populated JBDBMS."""
# Request is 7 bytes:
# \xDD for start
# \xA5 for read, \x5A for write
+3
View File
@@ -49,10 +49,12 @@ def signalHandler() -> NoReturn:
def socket_cleanup(socket_path: str, debug: int = 0) -> None:
"""Remove the Unix socket file from the filesystem."""
os.unlink(socket_path)
def read_request(connection: socket.socket, debug: int = 0) -> dict[str, Any]:
"""Read a length-prefixed JSON request from a connected socket."""
# get length of expected json string
request = bytes()
try:
@@ -84,6 +86,7 @@ def read_request(connection: socket.socket, debug: int = 0) -> dict[str, Any]:
def send_response(connection: socket.socket, response_data: Any, client: str, debug: int = 0) -> None:
"""Serialise response_data as length-prefixed JSON and send it over the socket."""
if debug > 2:
debugger("socket: sending {!r}".format(response_data))
try:
+2
View File
@@ -19,6 +19,7 @@ alert_sent = False
def handle_shutdown(action: str = "cancel", delay: int = 0, debug: int = 0) -> None:
"""Schedule or cancel a system shutdown; only schedules once per incident."""
global scheduled_shutdown
if action == "shutdown":
@@ -42,6 +43,7 @@ def handle_email(
mailpass: str | None = None,
debug: int = 0,
) -> None:
"""Send an alert email using SMTP, with optional STARTTLS and authentication."""
isSSL = False
hostname = socket.gethostname()
+1
View File
@@ -8,6 +8,7 @@ from typing import Any
def debugger(data: Any, pretty: bool = False) -> None:
"""Print a timestamped debug message; use PrettyPrinter when pretty=True."""
if pretty:
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(