Add unit tests

This commit is contained in:
2026-05-02 18:40:27 +02:00
parent 4e2a967dcf
commit 897ae5dfb7
16 changed files with 3871 additions and 184 deletions
View File
+78
View File
@@ -0,0 +1,78 @@
import pytest
from bmspy.classes import BMSScalarField, BMSMultiField, BMSInfoField
from bmspy.jbd_bms import JBDBMS
# ---------------------------------------------------------------------------
# Raw JBD BMS wire-format fixtures
# ---------------------------------------------------------------------------
# 03 response: 31 bytes, data_len=25, 4 cells, 1 NTC sensor
# Encodes: 52.00V, 0.00A, 100.00Ah remaining, 100.00Ah nominal,
# 10 cycles, 2023-01-15, no protection faults, rsoc=95,
# charging+discharging MOSFET on, 4 cells, 25.0°C
VALID_03_RESPONSE = bytearray([
0xDD, 0xA5, 0x00, 0x19, # start, r/w, status OK, data_len=25
0x14, 0x50, # total voltage: 5200 * 0.01 = 52.00 V
0x00, 0x00, # current: 0
0x27, 0x10, # remaining cap: 10000 * 0.01 = 100.00 Ah
0x27, 0x10, # nominal cap: 10000 * 0.01 = 100.00 Ah
0x00, 0x0A, # charge cycles: 10
0x2E, 0x2F, # manufacture date: 0x2E2F → 2023-01-15
0x00, 0x00, # balance state high (no balancing)
0x00, 0x00, # balance state low (no balancing)
0x00, 0x00, # protection state (no faults)
0x00, # software version
0x5F, # rsoc: 95 * 0.01 = 0.95
0x03, # control status: charging(bit0) + discharging(bit1)
0x04, # cell count: 4
0x01, # NTC sensor count: 1
0x0B, 0xA5, # NTC 1: (0x0BA5=2981 - 2731) * 0.1 = 25.0 °C
0xFD, 0x97, # checksum
])
# 04 response: 14 bytes, 4 cells
# Cell voltages: 3.600 V, 3.601 V, 3.599 V, 3.598 V
VALID_04_RESPONSE = bytearray([
0xDD, 0xA5, 0x00, 0x08, # start, r/w, status OK, data_len=8
0x0E, 0x10, # cell 1: 3600 * 0.001 = 3.600 V
0x0E, 0x11, # cell 2: 3601 * 0.001 = 3.601 V
0x0E, 0x0F, # cell 3: 3599 * 0.001 = 3.599 V
0x0E, 0x0E, # cell 4: 3598 * 0.001 = 3.598 V
0xFF, 0x82, # checksum
])
@pytest.fixture
def valid_03_response() -> bytearray:
return bytearray(VALID_03_RESPONSE)
@pytest.fixture
def valid_04_response() -> bytearray:
return bytearray(VALID_04_RESPONSE)
@pytest.fixture
def populated_jbdbms() -> JBDBMS:
bms = JBDBMS()
bms.bms_voltage_total_volts = BMSScalarField(
help="Total Voltage", raw_value=52.0, value="52.00", units="V"
)
bms.bms_current_amps = BMSScalarField(
help="Current", raw_value=0.0, value="0.00", units="A"
)
bms.bms_capacity_charge_ratio = BMSScalarField(
help="Percent Charge", raw_value=0.95, value="0.95", units=""
)
bms.bms_manufacture_date = BMSInfoField(
help="Date of Manufacture", info="2023-01-15"
)
bms.bms_temperature_celcius = BMSMultiField(
help="Temperature",
label="sensor",
raw_values={1: 25.0},
values={1: "25.00"},
units="°C",
)
return bms
+162
View File
@@ -0,0 +1,162 @@
import pytest
from bmspy.classes import (
BMSScalarField,
BMSMultiField,
BMSInfoField,
UPS,
_field_from_dict,
_UPSSnapshot,
)
# ---------------------------------------------------------------------------
# BMSScalarField
# ---------------------------------------------------------------------------
class TestBMSScalarField:
def test_get_existing_attribute(self):
f = BMSScalarField(help="Voltage", raw_value=52.0, value="52.00", units="V")
assert f.get("help") == "Voltage"
assert f.get("raw_value") == 52.0
assert f.get("units") == "V"
def test_get_missing_attribute_returns_default(self):
f = BMSScalarField(help="Voltage", raw_value=52.0, value="52.00")
assert f.get("nonexistent") is None
assert f.get("nonexistent", 42) == 42
def test_units_defaults_to_none(self):
f = BMSScalarField(help="Cycles", raw_value=10, value="10")
assert f.units is None
assert f.get("units") is None
def test_bool_raw_value(self):
f = BMSScalarField(help="Flag", raw_value=True, value="1")
assert f.raw_value is True
# ---------------------------------------------------------------------------
# BMSMultiField
# ---------------------------------------------------------------------------
class TestBMSMultiField:
def test_get_existing_attribute(self):
f = BMSMultiField(
help="Cell Voltages",
label="cell",
raw_values={1: 3.6, 2: 3.61},
values={1: "3.600", 2: "3.610"},
units="V",
)
assert f.get("label") == "cell"
assert f.get("raw_values") == {1: 3.6, 2: 3.61}
assert f.get("units") == "V"
def test_get_missing_returns_default(self):
f = BMSMultiField(help="h", label="l", raw_values={}, values={})
assert f.get("missing") is None
assert f.get("missing", "fallback") == "fallback"
# ---------------------------------------------------------------------------
# BMSInfoField
# ---------------------------------------------------------------------------
class TestBMSInfoField:
def test_get_existing_attribute(self):
f = BMSInfoField(help="Date of Manufacture", info="2023-01-15")
assert f.get("help") == "Date of Manufacture"
assert f.get("info") == "2023-01-15"
def test_get_missing_returns_none(self):
f = BMSInfoField(help="h", info="i")
assert f.get("units") is None
# ---------------------------------------------------------------------------
# _field_from_dict
# ---------------------------------------------------------------------------
class TestFieldFromDict:
def test_scalar_field(self):
d = {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"}
f = _field_from_dict(d)
assert isinstance(f, BMSScalarField)
assert f.raw_value == 52.0
assert f.units == "V"
def test_multi_field(self):
d = {
"help": "Cell Voltages",
"label": "cell",
"raw_values": {1: 3.6},
"values": {1: "3.600"},
"units": "V",
}
f = _field_from_dict(d)
assert isinstance(f, BMSMultiField)
assert f.label == "cell"
assert f.raw_values == {1: 3.6}
def test_info_field(self):
d = {"help": "Date of Manufacture", "info": "2023-01-15"}
f = _field_from_dict(d)
assert isinstance(f, BMSInfoField)
assert f.info == "2023-01-15"
# ---------------------------------------------------------------------------
# UPS / _UPSSnapshot
# ---------------------------------------------------------------------------
class TestUPS:
def _snapshot(self):
return UPS.from_dict({
"bms_voltage": {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"},
"bms_date": {"help": "Date", "info": "2023-01-15"},
"bms_temps": {
"help": "Temperature",
"label": "sensor",
"raw_values": {1: 25.0},
"values": {1: "25.00"},
"units": "°C",
},
})
def test_from_dict_returns_ups_instance(self):
assert isinstance(self._snapshot(), UPS)
def test_from_dict_scalar_field(self):
ups = self._snapshot()
items = dict(ups.items())
assert isinstance(items["bms_voltage"], BMSScalarField)
assert items["bms_voltage"].raw_value == 52.0
def test_from_dict_info_field(self):
ups = self._snapshot()
items = dict(ups.items())
assert isinstance(items["bms_date"], BMSInfoField)
assert items["bms_date"].info == "2023-01-15"
def test_from_dict_multi_field(self):
ups = self._snapshot()
items = dict(ups.items())
assert isinstance(items["bms_temps"], BMSMultiField)
assert items["bms_temps"].raw_values == {1: 25.0}
def test_bool_true_when_populated(self):
assert bool(self._snapshot()) is True
def test_bool_false_when_empty(self):
empty = UPS.from_dict({})
assert bool(empty) is False
def test_items_yields_all_fields(self):
ups = self._snapshot()
keys = [k for k, _ in ups.items()]
assert set(keys) == {"bms_voltage", "bms_date", "bms_temps"}
def test_items_empty_snapshot(self):
ups = UPS.from_dict({})
assert list(ups.items()) == []
+368
View File
@@ -0,0 +1,368 @@
from unittest.mock import patch, MagicMock
import pytest
from bmspy.classes import BMSInfoField, BMSMultiField, BMSScalarField, UPS
from bmspy.client import read_data
MOCK_RESPONSE = {
"myups": {
"bms_voltage_total_volts": {
"help": "Total Voltage",
"raw_value": 52.0,
"value": "52.00",
"units": "V",
},
"bms_manufacture_date": {
"help": "Date of Manufacture",
"info": "2023-01-15",
},
"bms_temperature_celcius": {
"help": "Temperature",
"label": "sensor",
"raw_values": {1: 25.0},
"values": {1: "25.00"},
"units": "°C",
},
},
"officeups": {
"bms_voltage_total_volts": {
"help": "Total Voltage",
"raw_value": 48.5,
"value": "48.50",
"units": "V",
},
},
}
class TestReadData:
def _call(self, response=None):
with patch("bmspy.client.socket_comms", return_value=response or MOCK_RESPONSE):
return read_data("/fake/socket", "test")
def test_returns_dict_of_ups(self):
result = self._call()
assert isinstance(result, dict)
for v in result.values():
assert isinstance(v, UPS)
def test_all_devices_present(self):
result = self._call()
assert set(result.keys()) == {"myups", "officeups"}
def test_scalar_field_deserialized(self):
result = self._call()
items = dict(result["myups"].items())
f = items["bms_voltage_total_volts"]
assert isinstance(f, BMSScalarField)
assert f.raw_value == 52.0
assert f.units == "V"
def test_info_field_deserialized(self):
result = self._call()
items = dict(result["myups"].items())
f = items["bms_manufacture_date"]
assert isinstance(f, BMSInfoField)
assert f.info == "2023-01-15"
def test_multi_field_deserialized(self):
result = self._call()
items = dict(result["myups"].items())
f = items["bms_temperature_celcius"]
assert isinstance(f, BMSMultiField)
assert f.raw_values == {1: 25.0}
assert f.label == "sensor"
def test_ups_is_truthy_when_populated(self):
result = self._call()
assert bool(result["myups"]) is True
def test_empty_device_response(self):
result = self._call({"emptyups": {}})
assert isinstance(result["emptyups"], UPS)
assert bool(result["emptyups"]) is False
def test_ups_filter_forwarded(self):
with patch("bmspy.client.socket_comms") as mock_comms:
mock_comms.return_value = {"myups": MOCK_RESPONSE["myups"]}
read_data("/fake/socket", "test", ups="myups")
call_args = mock_comms.call_args[0][1]
assert call_args.get("ups") == "myups"
def test_no_ups_filter_not_in_request(self):
with patch("bmspy.client.socket_comms") as mock_comms:
mock_comms.return_value = {}
read_data("/fake/socket", "test")
call_args = mock_comms.call_args[0][1]
assert "ups" not in call_args
# ---------------------------------------------------------------------------
# handle_registration
# ---------------------------------------------------------------------------
import bmspy.client as _client_mod
@pytest.fixture(autouse=True)
def _reset_is_registered():
_client_mod.is_registered = False
yield
_client_mod.is_registered = False
class TestHandleRegistration:
def test_register_sets_flag(self):
with patch("bmspy.client.socket_comms", return_value={"status": "REGISTERED", "client": "test"}):
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test")
assert _client_mod.is_registered is True
def test_register_returns_response(self):
response = {"status": "REGISTERED", "client": "test"}
with patch("bmspy.client.socket_comms", return_value=response):
from bmspy.client import handle_registration
result = handle_registration("/fake/socket", "test")
assert result == response
def test_register_sends_register_command(self):
with patch("bmspy.client.socket_comms") as mock_comms:
mock_comms.return_value = {"status": "REGISTERED", "client": "test"}
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test")
sent = mock_comms.call_args[0][1]
assert sent["command"] == "REGISTER"
assert sent["client"] == "test"
def test_deregister_clears_flag(self):
_client_mod.is_registered = True
with patch("bmspy.client.socket_comms", return_value={"status": "DEREGISTERED", "client": "test"}):
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test")
assert _client_mod.is_registered is False
def test_deregister_sends_deregister_command(self):
_client_mod.is_registered = True
with patch("bmspy.client.socket_comms") as mock_comms:
mock_comms.return_value = {"status": "DEREGISTERED", "client": "test"}
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test")
sent = mock_comms.call_args[0][1]
assert sent["command"] == "DEREGISTER"
def test_socket_error_does_not_raise(self):
with patch("bmspy.client.socket_comms", side_effect=Exception("connection refused")):
from bmspy.client import handle_registration
result = handle_registration("/fake/socket", "test")
assert result == {}
def test_invalid_status_does_not_raise(self):
with patch("bmspy.client.socket_comms", return_value={"status": "UNKNOWN"}):
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test")
def test_debug_3_on_deregister_failure(self, capsys):
"""debug=3 path in exception handler when is_registered=True."""
_client_mod.is_registered = True
with patch("bmspy.client.socket_comms", side_effect=Exception("fail")):
from bmspy.client import handle_registration
handle_registration("/fake/socket", "test", debug=3)
captured = capsys.readouterr()
assert "deregister" in captured.out.lower() or "fail" in captured.out
# ---------------------------------------------------------------------------
# socket_comms — real Unix socket
# ---------------------------------------------------------------------------
import json
import socket
import struct
import threading
class TestSocketComms:
"""Test socket_comms with a real Unix socket server running in a thread."""
def _run_server(self, sock_path: str, response: dict, ready_event: threading.Event):
"""Minimal server: accept one connection, read framed request, send framed response."""
srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
srv.bind(sock_path)
srv.listen(1)
ready_event.set()
conn, _ = srv.accept()
try:
# read length
raw_len = conn.recv(4)
if len(raw_len) < 4:
return # Client disconnected early
length = struct.unpack("!I", raw_len)[0]
data = conn.recv(length)
# send response
payload = json.dumps(response).encode()
conn.sendall(struct.pack("!I", len(payload)) + payload)
finally:
conn.close()
srv.close()
import os
try:
os.unlink(sock_path)
except FileNotFoundError:
pass
def test_returns_response_dict(self, tmp_path):
from bmspy.client import socket_comms
sock_path = str(tmp_path / "test.sock")
response = {"status": "REGISTERED", "client": "test"}
ready = threading.Event()
t = threading.Thread(
target=self._run_server, args=(sock_path, response, ready), daemon=True
)
t.start()
ready.wait(timeout=2)
result = socket_comms(sock_path, {"command": "REGISTER", "client": "test"})
t.join(timeout=2)
assert result == response
def test_debug_3_does_not_raise(self, tmp_path, capsys):
from bmspy.client import socket_comms
sock_path = str(tmp_path / "test_dbg.sock")
response = {"status": "OK"}
ready = threading.Event()
t = threading.Thread(
target=self._run_server, args=(sock_path, response, ready), daemon=True
)
t.start()
ready.wait(timeout=2)
result = socket_comms(sock_path, {"command": "GET", "client": "test"}, debug=3)
t.join(timeout=2)
assert result == response
def test_debug_4_does_not_raise(self, tmp_path, capsys):
from bmspy.client import socket_comms
sock_path = str(tmp_path / "test_dbg4.sock")
response = {"status": "OK"}
ready = threading.Event()
t = threading.Thread(
target=self._run_server, args=(sock_path, response, ready), daemon=True
)
t.start()
ready.wait(timeout=2)
result = socket_comms(sock_path, {"command": "GET", "client": "test"}, debug=4)
t.join(timeout=2)
assert result == response
def test_debug_5_does_not_raise(self, tmp_path, capsys):
from bmspy.client import socket_comms
sock_path = str(tmp_path / "test_dbg5.sock")
response = {"status": "OK"}
ready = threading.Event()
t = threading.Thread(
target=self._run_server, args=(sock_path, response, ready), daemon=True
)
t.start()
ready.wait(timeout=2)
result = socket_comms(sock_path, {"command": "GET", "client": "test"}, debug=5)
t.join(timeout=2)
assert result == response
def test_connection_refused_does_not_raise(self, tmp_path, capsys):
"""socket_comms gracefully handles ENOENT (no server)."""
from bmspy.client import socket_comms
import sys
sock_path = str(tmp_path / "nonexistent.sock")
# socket_comms calls sys.exit(1) on encode errors, but connection failures
# just print and continue (then fail on recv). We expect SystemExit.
with pytest.raises((SystemExit, OSError, Exception)):
socket_comms(sock_path, {"command": "GET", "client": "test"})
def _run_bad_response_server(self, sock_path: str, bad_payload: bytes, ready_event: threading.Event):
"""Server that sends a bad (non-JSON) response."""
srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
srv.bind(sock_path)
srv.listen(1)
ready_event.set()
conn, _ = srv.accept()
try:
# read request
raw_len = conn.recv(4)
length = struct.unpack("!I", raw_len)[0]
conn.recv(length)
# send bad response
conn.sendall(struct.pack("!I", len(bad_payload)) + bad_payload)
finally:
conn.close()
srv.close()
import os
try:
os.unlink(sock_path)
except FileNotFoundError:
pass
def test_invalid_json_response_exits(self, tmp_path, capsys):
"""socket_comms calls sys.exit(1) on invalid JSON response."""
from bmspy.client import socket_comms
sock_path = str(tmp_path / "bad_json.sock")
bad_payload = b"not json!!!"
ready = threading.Event()
t = threading.Thread(
target=self._run_bad_response_server,
args=(sock_path, bad_payload, ready),
daemon=True,
)
t.start()
ready.wait(timeout=2)
with pytest.raises(SystemExit):
socket_comms(sock_path, {"command": "GET", "client": "test"})
t.join(timeout=2)
def test_json_encode_failure_exits(self, tmp_path, capsys):
"""socket_comms calls sys.exit(1) when json.dumps raises."""
from bmspy.client import socket_comms
sock_path = str(tmp_path / "encode_err.sock")
# Create a minimal server so the socket connect succeeds
ready = threading.Event()
response = {"status": "OK"}
t = threading.Thread(
target=self._run_server, args=(sock_path, response, ready), daemon=True
)
t.start()
ready.wait(timeout=2)
with patch("bmspy.client.json.dumps", side_effect=TypeError("not serializable")):
with pytest.raises(SystemExit):
socket_comms(sock_path, {"command": "GET", "client": "test"})
t.join(timeout=2)
def test_non_enoent_socket_error_logs_message(self, tmp_path, capsys):
"""socket_comms logs a different message for non-ENOENT socket errors."""
from bmspy.client import socket_comms
import socket as _socket
import errno
sock_path = str(tmp_path / "test_err.sock")
# Make connect raise a socket.error with errno != 2
err = _socket.error("connection refused")
err.errno = errno.ECONNREFUSED # not 2 (ENOENT)
with patch("bmspy.client.socket.socket") as mock_sock_cls:
mock_sock = MagicMock()
mock_sock_cls.return_value = mock_sock
mock_sock.connect.side_effect = err
# after connect fails, sendall will also fail, triggering sys.exit
with pytest.raises((SystemExit, Exception)):
socket_comms(sock_path, {"command": "GET", "client": "test"})
captured = capsys.readouterr()
assert "socket client" in captured.out or "connection refused" in captured.out.lower()
# ---------------------------------------------------------------------------
# read_data — socket_comms returns None path
# ---------------------------------------------------------------------------
class TestReadDataNone:
def test_raises_runtime_error_when_none(self):
with patch("bmspy.client.socket_comms", return_value=None):
with pytest.raises(RuntimeError, match="No data received"):
read_data("/fake/socket", "test")
+373
View File
@@ -0,0 +1,373 @@
import pytest
pytest.importorskip("influxdb_client_3", reason="influxdb3-python not installed")
import pytest
from unittest.mock import patch, MagicMock
from bmspy.classes import UPS
from bmspy.influxdb import influxdb_create_snapshot
def _ups(*field_dicts: dict) -> dict[str, UPS]:
"""Helper: build a {ups_name: UPS} map from a list of field dicts."""
fields = {}
for d in field_dicts:
fields[d.pop("_name")] = d
return {"testups": UPS.from_dict(fields)}
class TestInfluxdbCreateSnapshot:
def test_scalar_field_produces_one_point(self):
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
points = influxdb_create_snapshot(ups_data)
assert len(points) == 1
def test_multi_field_produces_one_point_per_index(self):
ups_data = {"myups": UPS.from_dict({
"bms_cells": {
"help": "Cell Voltages",
"label": "cell",
"raw_values": {1: 3.6, 2: 3.61, 3: 3.59},
"values": {1: "3.600", 2: "3.610", 3: "3.590"},
"units": "V",
},
})}
points = influxdb_create_snapshot(ups_data)
assert len(points) == 3
def test_info_field_produces_one_point(self):
ups_data = {"myups": UPS.from_dict({
"bms_date": {"help": "Manufacture Date", "info": "2023-01-15"},
})}
points = influxdb_create_snapshot(ups_data)
assert len(points) == 1
def test_mixed_fields_sum_correctly(self):
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"},
"bms_date": {"help": "Date", "info": "2023-01-15"},
"bms_cells": {
"help": "Cells",
"label": "cell",
"raw_values": {1: 3.6, 2: 3.61},
"values": {1: "3.600", 2: "3.610"},
"units": "V",
},
})}
# 1 scalar + 1 info + 2 multi = 4 points
points = influxdb_create_snapshot(ups_data)
assert len(points) == 4
def test_multiple_ups_devices(self):
field = {"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"}
ups_data = {
"ups1": UPS.from_dict({"bms_voltage": dict(field)}),
"ups2": UPS.from_dict({"bms_voltage": dict(field)}),
}
points = influxdb_create_snapshot(ups_data)
assert len(points) == 2
def test_empty_ups_produces_no_points(self):
ups_data = {"myups": UPS.from_dict({})}
points = influxdb_create_snapshot(ups_data)
assert points == []
def test_point_measurement_name(self):
ups_data = {"myups": UPS.from_dict({
"bms_voltage_total_volts": {
"help": "Voltage", "raw_value": 52.0, "value": "52.00", "units": "V"
},
})}
points = influxdb_create_snapshot(ups_data)
assert len(points) == 1
# Point measurement name should match the field key
assert points[0]._name == "bms_voltage_total_volts"
def test_debug_mode_does_not_raise(self):
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
# debug=3 triggers the debugger() calls — should not raise
influxdb_create_snapshot(ups_data, debug=3)
def test_scalar_point_has_ups_tag(self):
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
points = influxdb_create_snapshot(ups_data)
assert points[0]._tags.get("ups") == "myups"
def test_multi_field_points_have_label_tag(self):
ups_data = {"myups": UPS.from_dict({
"bms_cells": {
"help": "Cells",
"label": "cell",
"raw_values": {1: 3.6, 2: 3.61},
"values": {1: "3.600", 2: "3.610"},
"units": "V",
},
})}
points = influxdb_create_snapshot(ups_data)
assert all("cell" in p._tags for p in points)
def test_info_field_point_value(self):
ups_data = {"myups": UPS.from_dict({
"bms_date": {"help": "Manufacture Date", "info": "2023-01-15"},
})}
points = influxdb_create_snapshot(ups_data)
assert points[0]._fields.get("value") == "2023-01-15"
# ---------------------------------------------------------------------------
# influxdb_write_snapshot
# ---------------------------------------------------------------------------
class TestInfluxdbWriteSnapshot:
def test_write_called_once(self):
from unittest.mock import MagicMock
from bmspy.influxdb import influxdb_write_snapshot
mock_client = MagicMock()
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
influxdb_write_snapshot(mock_client, "test_bucket", ups_data)
mock_client.write.assert_called_once()
def test_write_uses_correct_database(self):
from unittest.mock import MagicMock
from bmspy.influxdb import influxdb_write_snapshot
mock_client = MagicMock()
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
influxdb_write_snapshot(mock_client, "mybucket", ups_data)
call_kwargs = mock_client.write.call_args[1]
assert call_kwargs.get("database") == "mybucket"
def test_write_exception_does_not_raise(self):
from unittest.mock import MagicMock
from bmspy.influxdb import influxdb_write_snapshot
mock_client = MagicMock()
mock_client.write.side_effect = Exception("connection failed")
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
influxdb_write_snapshot(mock_client, "mybucket", ups_data)
def test_empty_ups_writes_no_points(self):
from unittest.mock import MagicMock
from bmspy.influxdb import influxdb_write_snapshot
mock_client = MagicMock()
influxdb_write_snapshot(mock_client, "mybucket", {"myups": UPS.from_dict({})})
call_kwargs = mock_client.write.call_args[1]
assert call_kwargs.get("record") == []
# ---------------------------------------------------------------------------
# influxdb_export (non-daemonized)
# ---------------------------------------------------------------------------
class TestInfluxdbExport:
def test_single_write_when_not_daemonized(self):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import influxdb_export
mock_instance = MagicMock()
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance), \
patch("bmspy.influxdb.client.read_data", return_value=ups_data):
influxdb_export(
bucket="test",
url="http://localhost",
org="org",
token="token",
daemonize=False,
)
mock_instance.write.assert_called_once()
def test_client_closed_after_non_daemonized_run(self):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import influxdb_export
mock_instance = MagicMock()
ups_data = {"myups": UPS.from_dict({})}
with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance), \
patch("bmspy.influxdb.client.read_data", return_value=ups_data):
influxdb_export(
bucket="test",
url="http://localhost",
org="org",
token="token",
daemonize=False,
)
mock_instance.close.assert_called()
def test_env_vars_used_when_no_url(self, monkeypatch):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import influxdb_export
monkeypatch.setenv("INFLUXDB_V2_URL", "http://envhost")
monkeypatch.setenv("INFLUXDB_V2_ORG", "envorg")
monkeypatch.setenv("INFLUXDB_V2_TOKEN", "envtoken")
mock_instance = MagicMock()
ups_data = {"myups": UPS.from_dict({})}
with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance) as mock_cls, \
patch("bmspy.influxdb.client.read_data", return_value=ups_data):
influxdb_export(bucket="test", daemonize=False)
# Should have been called with env URL
call_kwargs = mock_cls.call_args[1]
assert call_kwargs.get("host") == "http://envhost"
def test_daemonize_true_loops_until_exception(self):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import influxdb_export
mock_instance = MagicMock()
ups_data = {"myups": UPS.from_dict({})}
call_count = 0
def _read_data(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count >= 2:
raise StopIteration("stop")
return ups_data
with patch("bmspy.influxdb.InfluxDBClient3", return_value=mock_instance), \
patch("bmspy.influxdb.client.read_data", side_effect=_read_data), \
patch("bmspy.influxdb.time.sleep"):
with pytest.raises(StopIteration):
influxdb_export(
bucket="test",
url="http://localhost",
org="org",
token="token",
daemonize=True,
)
assert call_count >= 2
# ---------------------------------------------------------------------------
# influx_shutdown
# ---------------------------------------------------------------------------
class TestInfluxShutdown:
def test_none_is_no_op(self):
from bmspy.influxdb import influx_shutdown
# Should not raise
influx_shutdown(None)
def test_calls_close_on_client(self):
from unittest.mock import MagicMock
from bmspy.influxdb import influx_shutdown
mock_client = MagicMock()
influx_shutdown(mock_client)
mock_client.close.assert_called_once()
# ---------------------------------------------------------------------------
# influxdb_write_snapshot debug coverage
# ---------------------------------------------------------------------------
class TestInfluxdbWriteSnapshotDebug:
def test_debug_2_logs_messages(self, capsys):
from unittest.mock import MagicMock
from bmspy.influxdb import influxdb_write_snapshot
mock_client = MagicMock()
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
influxdb_write_snapshot(mock_client, "bucket", ups_data, debug=2)
captured = capsys.readouterr()
assert "snapshot" in captured.out.lower()
# ---------------------------------------------------------------------------
# influxdb_create_snapshot additional debug paths
# ---------------------------------------------------------------------------
class TestInfluxdbCreateSnapshotDebug:
def test_debug_3_scalar_logs(self, capsys):
ups_data = {"myups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
})}
influxdb_create_snapshot(ups_data, debug=3)
captured = capsys.readouterr()
assert "value" in captured.out.lower()
def test_debug_3_multi_logs(self, capsys):
ups_data = {"myups": UPS.from_dict({
"bms_cells": {
"help": "Cells",
"label": "cell",
"raw_values": {1: 3.6},
"values": {1: "3.600"},
"units": "V",
},
})}
influxdb_create_snapshot(ups_data, debug=3)
captured = capsys.readouterr()
assert "labels" in captured.out.lower()
def test_debug_3_info_logs(self, capsys):
ups_data = {"myups": UPS.from_dict({
"bms_date": {"help": "Date", "info": "2023-01-15"},
})}
influxdb_create_snapshot(ups_data, debug=3)
captured = capsys.readouterr()
assert "info" in captured.out.lower()
# ---------------------------------------------------------------------------
# influxdb main()
# ---------------------------------------------------------------------------
class TestInfluxdbMain:
def test_main_missing_url_exits(self, monkeypatch):
from bmspy.influxdb import main
monkeypatch.delenv("INFLUXDB_V2_URL", raising=False)
monkeypatch.delenv("INFLUXDB_V2_ORG", raising=False)
monkeypatch.delenv("INFLUXDB_V2_TOKEN", raising=False)
with patch("sys.argv", ["bmspy-influxdb"]):
with pytest.raises(SystemExit):
main()
def test_main_missing_org_exits(self, monkeypatch):
from bmspy.influxdb import main
monkeypatch.setenv("INFLUXDB_V2_URL", "http://host")
monkeypatch.delenv("INFLUXDB_V2_ORG", raising=False)
monkeypatch.delenv("INFLUXDB_V2_TOKEN", raising=False)
with patch("sys.argv", ["bmspy-influxdb"]):
with pytest.raises(SystemExit):
main()
def test_main_missing_token_exits(self, monkeypatch):
from bmspy.influxdb import main
monkeypatch.setenv("INFLUXDB_V2_URL", "http://host")
monkeypatch.setenv("INFLUXDB_V2_ORG", "myorg")
monkeypatch.delenv("INFLUXDB_V2_TOKEN", raising=False)
with patch("sys.argv", ["bmspy-influxdb"]):
with pytest.raises(SystemExit):
main()
def test_main_calls_influxdb_export(self, monkeypatch):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import main
mock_export = MagicMock()
with patch("sys.argv", ["bmspy-influxdb", "--url", "http://host", "--org", "org", "--token", "tok"]), \
patch("bmspy.influxdb.client.handle_registration"), \
patch("bmspy.influxdb.influxdb_export", mock_export):
main()
mock_export.assert_called_once()
def test_main_with_env_vars_calls_export(self, monkeypatch):
from unittest.mock import MagicMock, patch
from bmspy.influxdb import main
monkeypatch.setenv("INFLUXDB_V2_URL", "http://envhost")
monkeypatch.setenv("INFLUXDB_V2_ORG", "envorg")
monkeypatch.setenv("INFLUXDB_V2_TOKEN", "envtoken")
mock_export = MagicMock()
with patch("sys.argv", ["bmspy-influxdb"]), \
patch("bmspy.influxdb.client.handle_registration"), \
patch("bmspy.influxdb.influxdb_export", mock_export):
main()
mock_export.assert_called_once()
+177
View File
@@ -0,0 +1,177 @@
import argparse
import sys
from unittest.mock import patch, MagicMock
import pytest
from bmspy import parse_args, main
from bmspy.classes import UPS
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_ups_data():
return {"testups": UPS.from_dict({
"bms_voltage": {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"},
"bms_date": {"help": "Date", "info": "2023-01-15"},
})}
class TestParseArgs:
def _parse(self, args: list[str]):
with patch("sys.argv", ["bmspy"] + args):
return parse_args()
def test_socket_default(self):
assert self._parse([]).socket == "/run/bmspy/bms"
def test_socket_long(self):
assert self._parse(["--socket", "/tmp/test.sock"]).socket == "/tmp/test.sock"
def test_socket_short(self):
assert self._parse(["-s", "/tmp/test.sock"]).socket == "/tmp/test.sock"
def test_ups_default_is_none(self):
assert self._parse([]).ups is None
def test_ups_filter(self):
assert self._parse(["--ups", "myups"]).ups == "myups"
def test_json_default_false(self):
assert self._parse([]).report_json is False
def test_json_long(self):
assert self._parse(["--json"]).report_json is True
def test_json_short(self):
assert self._parse(["-j"]).report_json is True
def test_print_default_true(self):
assert self._parse([]).report_print is True
def test_prometheus_default_false(self):
assert self._parse([]).report_prometheus is False
def test_prometheus_flag(self):
assert self._parse(["--prometheus"]).report_prometheus is True
def test_influxdb_default_false(self):
assert self._parse([]).report_influxdb is False
def test_influxdb_long(self):
assert self._parse(["--influxdb"]).report_influxdb is True
def test_influxdb_short(self):
assert self._parse(["-i"]).report_influxdb is True
def test_bucket_default(self):
assert self._parse([]).influx_bucket == "ups"
def test_bucket_long(self):
assert self._parse(["--bucket", "mybucket"]).influx_bucket == "mybucket"
def test_bucket_short(self):
assert self._parse(["-b", "mybucket"]).influx_bucket == "mybucket"
def test_url_default_false(self):
assert self._parse([]).influx_url is False
def test_url_long(self):
assert self._parse(["--url", "http://influx.example.com"]).influx_url == "http://influx.example.com"
def test_org_long(self):
assert self._parse(["--org", "myorg"]).influx_org == "myorg"
def test_token_long(self):
assert self._parse(["--token", "mytoken"]).influx_token == "mytoken"
def test_verbose_default_zero(self):
assert self._parse([]).verbose == 0
def test_verbose_once(self):
assert self._parse(["-v"]).verbose == 1
def test_verbose_multiple(self):
assert self._parse(["-v", "-v", "-v"]).verbose == 3
def test_verbose_long(self):
assert self._parse(["--verbose"]).verbose == 1
# ---------------------------------------------------------------------------
# main()
# ---------------------------------------------------------------------------
class TestMain:
def _run_main(self, args: list[str]):
with patch("sys.argv", ["bmspy"] + args):
main()
def test_print_mode_default(self, capsys):
ups_data = _make_ups_data()
with patch("sys.argv", ["bmspy"]), \
patch("bmspy.client.handle_registration"), \
patch("bmspy.client.read_data", return_value=ups_data):
main()
captured = capsys.readouterr()
assert "testups" in captured.out
def test_json_mode(self, capsys):
ups_data = _make_ups_data()
with patch("sys.argv", ["bmspy", "--json"]), \
patch("bmspy.client.handle_registration"), \
patch("bmspy.client.read_data", return_value=ups_data):
main()
captured = capsys.readouterr()
# JSON output should contain field names
assert "bms_voltage" in captured.out
def test_keyboard_interrupt_is_caught(self, capsys):
with patch("sys.argv", ["bmspy"]), \
patch("bmspy.client.handle_registration"), \
patch("bmspy.client.read_data", side_effect=KeyboardInterrupt("stopped")):
main() # should not raise
def test_prometheus_flag_calls_export(self):
import bmspy.prometheus as prom_mod
mock_export = MagicMock()
with patch("sys.argv", ["bmspy", "--prometheus"]), \
patch.object(prom_mod, "prometheus_export", mock_export):
main()
mock_export.assert_called_once()
def test_textfile_flag_calls_prometheus_export(self, tmp_path):
import bmspy.prometheus as prom_mod
filename = str(tmp_path / "metrics.prom")
mock_export = MagicMock()
with patch("sys.argv", ["bmspy", "--file", filename]), \
patch.object(prom_mod, "prometheus_export", mock_export):
main()
mock_export.assert_called_once()
call_kwargs = mock_export.call_args[1]
assert call_kwargs.get("daemonize") is False
assert call_kwargs.get("filename") == filename
def test_influxdb_flag_calls_export(self):
pytest.importorskip("influxdb_client_3", reason="influxdb3-python not installed")
import bmspy.influxdb as influx_mod
mock_export = MagicMock()
with patch("sys.argv", ["bmspy", "--influxdb", "--url", "http://influx", "--org", "org", "--token", "tok"]), \
patch.object(influx_mod, "influxdb_export", mock_export):
main()
mock_export.assert_called_once()
def test_influxdb_partial_args_exits(self, capsys):
"""Providing only one of --url/--org/--token raises ArgumentTypeError (caught by KeyboardInterrupt handler)."""
with patch("sys.argv", ["bmspy", "--influxdb", "--url", "http://influx"]):
with pytest.raises(argparse.ArgumentTypeError):
main()
def test_influxdb_partial_raises_argument_error(self):
"""Two of three influx args causes ArgumentTypeError."""
import argparse as _ap
with patch("sys.argv", ["bmspy", "--influxdb", "--url", "http://influx", "--org", "org"]):
with pytest.raises(_ap.ArgumentTypeError):
main()
+704
View File
@@ -0,0 +1,704 @@
import pytest
from unittest.mock import MagicMock, patch
from bmspy.jbd_bms import (
JBDBMS,
bytes_to_digits,
bytes_to_date,
convert_to_signed,
verify_checksum,
parse_03_response,
parse_04_response,
requestMessage,
serial_cleanup,
collect_data,
)
from bmspy.classes import BMSScalarField, BMSMultiField, BMSInfoField, UPS
# ---------------------------------------------------------------------------
# bytes_to_digits
# ---------------------------------------------------------------------------
class TestBytesToDigits:
def test_zero(self):
assert bytes_to_digits(0x00, 0x00) == 0
def test_low_byte_only(self):
assert bytes_to_digits(0x00, 0x0A) == 10
def test_high_byte_only(self):
assert bytes_to_digits(0x01, 0x00) == 256
def test_combined(self):
assert bytes_to_digits(0x14, 0x50) == 5200
def test_max(self):
assert bytes_to_digits(0xFF, 0xFF) == 65535
# ---------------------------------------------------------------------------
# bytes_to_date
# ---------------------------------------------------------------------------
class TestBytesToDate:
def test_known_date(self):
# 0x2E2F = 11823; day=15, mon=1, year=2023
assert bytes_to_date(0x2E, 0x2F) == "2023-01-15"
def test_zero_encodes_epoch(self):
# day=0, mon=0, year=2000
assert bytes_to_date(0x00, 0x00) == "2000-00-00"
def test_day_field(self):
# Only day bits set: 0x001F → day=31, mon=0, year=2000
assert bytes_to_date(0x00, 0x1F) == "2000-00-31"
def test_month_field(self):
# month=12: bits [8:5] = 0b1100 = 12 → raw = 12 << 5 = 384 = 0x0180
assert bytes_to_date(0x01, 0x80) == "2000-12-00"
def test_year_field(self):
# year offset = 24 → value = 24 << 9 = 12288 = 0x3000
assert bytes_to_date(0x30, 0x00) == "2024-00-00"
# ---------------------------------------------------------------------------
# convert_to_signed
# ---------------------------------------------------------------------------
class TestConvertToSigned:
def test_zero(self):
assert convert_to_signed(0) == 0
def test_small_positive(self):
assert convert_to_signed(100) == 100
def test_below_threshold(self):
assert convert_to_signed(1023) == 1023
def test_at_threshold_maps_to_zero(self):
# 1024 → (1024-512) % 1024 - 512 = 0
assert convert_to_signed(1024) == 0
def test_just_above_threshold_maps_to_positive(self):
assert convert_to_signed(1025) == 1
def test_maps_to_negative(self):
# 2047 → (2047-512)%1024 - 512 = 1535%1024 - 512 = 511-512 = -1
assert convert_to_signed(2047) == -1
def test_maps_to_most_negative(self):
# 1536 → (1536-512)%1024 - 512 = 1024%1024 - 512 = -512
assert convert_to_signed(1536) == -512
# ---------------------------------------------------------------------------
# verify_checksum
# ---------------------------------------------------------------------------
class TestVerifyChecksum:
def _make_checksum(self, data: bytes) -> bytes:
s = sum(data)
s = (s ^ 0xFFFF) + 1
return bytes([s >> 8, s & 0xFF])
def test_correct_checksum(self):
data = bytes([0x1B, 0x14, 0x50, 0x00])
chk = self._make_checksum(data)
assert verify_checksum(data, chk) is True
def test_single_byte(self):
data = bytes([0x42])
chk = self._make_checksum(data)
assert verify_checksum(data, chk) is True
def test_wrong_checksum(self):
data = bytes([0x10, 0x20])
assert verify_checksum(data, bytes([0x00, 0x00])) is False
def test_off_by_one(self):
data = bytes([0x10, 0x20])
chk = self._make_checksum(data)
bad = bytes([chk[0], chk[1] ^ 0x01])
assert verify_checksum(data, bad) is False
def test_empty_data(self):
# sum=0 → s = (0^0xFFFF)+1 = 65536, which can never equal a 2-byte chk
assert verify_checksum(bytes(), bytes([0xFF, 0xFF])) is False
# ---------------------------------------------------------------------------
# JBDBMS
# ---------------------------------------------------------------------------
class TestJBDBMS:
def test_empty_is_falsy(self):
assert not JBDBMS()
def test_populated_is_truthy(self, populated_jbdbms):
assert bool(populated_jbdbms)
def test_items_skips_none_fields(self):
bms = JBDBMS()
bms.bms_voltage_total_volts = BMSScalarField(
help="Total Voltage", raw_value=52.0, value="52.00", units="V"
)
keys = [k for k, _ in bms.items()]
assert keys == ["bms_voltage_total_volts"]
def test_items_yields_all_populated_fields(self, populated_jbdbms):
keys = {k for k, _ in populated_jbdbms.items()}
assert "bms_voltage_total_volts" in keys
assert "bms_current_amps" in keys
assert "bms_manufacture_date" in keys
assert "bms_temperature_celcius" in keys
def test_items_yields_correct_types(self, populated_jbdbms):
d = dict(populated_jbdbms.items())
assert isinstance(d["bms_voltage_total_volts"], BMSScalarField)
assert isinstance(d["bms_manufacture_date"], BMSInfoField)
assert isinstance(d["bms_temperature_celcius"], BMSMultiField)
def test_is_ups_subclass(self):
assert isinstance(JBDBMS(), UPS)
# ---------------------------------------------------------------------------
# parse_03_response
# ---------------------------------------------------------------------------
class TestParse03Response:
def test_valid_response_returns_jbdbms(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert isinstance(result, JBDBMS)
def test_voltage(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_voltage_total_volts.raw_value == pytest.approx(52.00)
assert result.bms_voltage_total_volts.units == "V"
def test_current(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_current_amps.raw_value == pytest.approx(0.0)
def test_remaining_capacity(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_capacity_remaining_ah.raw_value == pytest.approx(100.00)
def test_nominal_capacity(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_capacity_nominal_ah.raw_value == pytest.approx(100.00)
def test_charge_cycles(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_charge_cycles.raw_value == 10
def test_manufacture_date(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_manufacture_date.info == "2023-01-15"
def test_rsoc(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_capacity_charge_ratio.raw_value == pytest.approx(0.95)
def test_cell_count(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_cell_number.raw_value == 4
def test_temperature(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_temperature_celcius.raw_values[1] == pytest.approx(25.0)
assert result.bms_temperature_celcius.units == "°C"
def test_mosfet_charging(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_charge_is_charging.raw_value is True
def test_mosfet_discharging(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_charge_is_discharging.raw_value is True
def test_no_protection_faults(self, valid_03_response):
result = parse_03_response(valid_03_response)
assert result.bms_protection_sop_bool.raw_value is False
assert result.bms_protection_cocp_bool.raw_value is False
def test_wrong_start_byte_returns_false(self, valid_03_response):
valid_03_response[0] = 0xAA
assert parse_03_response(valid_03_response) is False
def test_error_status_byte_returns_false(self, valid_03_response):
valid_03_response[2] = 0x80
assert parse_03_response(valid_03_response) is False
def test_bad_checksum_returns_false(self, valid_03_response):
valid_03_response[-1] ^= 0xFF # corrupt last checksum byte
assert parse_03_response(valid_03_response) is False
def test_truncated_response_returns_false(self, valid_03_response):
assert parse_03_response(valid_03_response[:10]) is False
def test_zero_data_len_returns_false(self, valid_03_response):
valid_03_response[3] = 0x00
assert parse_03_response(valid_03_response) is False
# ---------------------------------------------------------------------------
# parse_04_response
# ---------------------------------------------------------------------------
class TestParse04Response:
def test_valid_response_returns_multi_field(self, valid_04_response):
result = parse_04_response(valid_04_response)
assert isinstance(result, BMSMultiField)
def test_cell_count(self, valid_04_response):
result = parse_04_response(valid_04_response)
assert len(result.raw_values) == 4
def test_cell_voltages(self, valid_04_response):
result = parse_04_response(valid_04_response)
assert result.raw_values[1] == pytest.approx(3.600)
assert result.raw_values[2] == pytest.approx(3.601)
assert result.raw_values[3] == pytest.approx(3.599)
assert result.raw_values[4] == pytest.approx(3.598)
def test_cell_voltage_units(self, valid_04_response):
result = parse_04_response(valid_04_response)
assert result.units == "V"
assert result.label == "cell"
def test_wrong_start_byte_returns_false(self, valid_04_response):
valid_04_response[0] = 0xAA
assert parse_04_response(valid_04_response) is False
def test_error_status_byte_returns_false(self, valid_04_response):
valid_04_response[2] = 0x80
assert parse_04_response(valid_04_response) is False
def test_bad_checksum_returns_false(self, valid_04_response):
valid_04_response[-1] ^= 0xFF
assert parse_04_response(valid_04_response) is False
def test_truncated_response_returns_false(self, valid_04_response):
assert parse_04_response(valid_04_response[:5]) is False
def test_zero_data_len_returns_false(self, valid_04_response):
valid_04_response[3] = 0x00
assert parse_04_response(valid_04_response) is False
# ---------------------------------------------------------------------------
# parse_03_response — protection bits and other field variations
# ---------------------------------------------------------------------------
def _recompute_checksum(response: bytearray) -> None:
"""Recompute and update the JBD frame checksum in-place."""
data_len = response[3]
first = data_len + 4
s = sum(response[3:first])
s = (s ^ 0xFFFF) + 1
response[first] = (s >> 8) & 0xFF
response[first + 1] = s & 0xFF
class TestParse03ProtectionBits:
def test_sop_bit_set(self, valid_03_response):
valid_03_response[20] = 0x00
valid_03_response[21] = 0x01 # bit 0 = SOP
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_protection_sop_bool.raw_value is True
assert result.bms_protection_sup_bool.raw_value is False
def test_cocp_bit_set(self, valid_03_response):
valid_03_response[20] = 0x01 # bit 8 = COCP (high byte bit 0)
valid_03_response[21] = 0x00
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_protection_cocp_bool.raw_value is True
def test_all_protections_clear(self, valid_03_response):
result = parse_03_response(valid_03_response)
for attr in [
"bms_protection_sop_bool", "bms_protection_sup_bool",
"bms_protection_wgop_bool", "bms_protection_wgup_bool",
"bms_protection_cotp_bool", "bms_protection_cutp_bool",
"bms_protection_dotp_bool", "bms_protection_dutp_bool",
"bms_protection_cocp_bool", "bms_protection_docp_bool",
"bms_protection_scp_bool", "bms_protection_fdic_bool",
"bms_protection_slmos_bool",
]:
assert getattr(result, attr).raw_value is False, f"{attr} should be False"
def test_negative_current(self, valid_03_response):
# 1536 = 0x0600; convert_to_signed(1536) = -512; -512 * 0.01 = -5.12 A
valid_03_response[6] = 0x06
valid_03_response[7] = 0x00
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_current_amps.raw_value == pytest.approx(-5.12)
def test_cell_1_balancing(self, valid_03_response):
# balance_state_low = bytes_to_digits(response[18], response[19])
# bit 0 of balance_state_low → cell 1 balancing
valid_03_response[18] = 0x00
valid_03_response[19] = 0x01
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_cells_balancing.raw_values[1] is True
assert result.bms_cells_balancing.raw_values[2] is False
def test_mosfet_only_charging(self, valid_03_response):
# control_status = 0x01 → charging only
valid_03_response[24] = 0x01
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_charge_is_charging.raw_value is True
assert result.bms_charge_is_discharging.raw_value is False
def test_mosfet_only_discharging(self, valid_03_response):
# control_status = 0x02 → discharging only
valid_03_response[24] = 0x02
_recompute_checksum(valid_03_response)
result = parse_03_response(valid_03_response)
assert result.bms_charge_is_charging.raw_value is False
assert result.bms_charge_is_discharging.raw_value is True
def test_two_temperature_sensors(self):
from tests.conftest import VALID_03_RESPONSE
# Build a modified 03 response with 2 NTC sensors
# data_len changes from 25 to 27 (add 2 bytes for NTC 2)
response = bytearray(VALID_03_RESPONSE[:29]) # bytes 0-28
response[3] = 0x1B # data_len = 27
response[26] = 0x02 # NTC count = 2
response += bytearray([0x0B, 0x6B]) # NTC 2: (2923-2731)*0.1 = 19.2°C
response += bytearray([0x00, 0x00]) # placeholder checksum
_recompute_checksum(response)
result = parse_03_response(response)
assert isinstance(result, JBDBMS)
assert len(result.bms_temperature_celcius.raw_values) == 2
assert result.bms_temperature_celcius.raw_values[1] == pytest.approx(25.0)
assert result.bms_temperature_celcius.raw_values[2] == pytest.approx(19.2)
# ---------------------------------------------------------------------------
# serial_cleanup
# ---------------------------------------------------------------------------
class TestSerialCleanup:
def test_closes_open_port(self):
ser = MagicMock()
ser.is_open = True
serial_cleanup(ser)
ser.reset_input_buffer.assert_called()
ser.reset_output_buffer.assert_called()
ser.close.assert_called_once()
def test_does_not_close_if_not_open(self):
ser = MagicMock()
ser.is_open = False
serial_cleanup(ser)
ser.close.assert_not_called()
def test_debug_3_logs_message(self, capsys):
ser = MagicMock()
ser.is_open = True
serial_cleanup(ser, debug=3)
captured = capsys.readouterr()
assert "cleaning up" in captured.out
# ---------------------------------------------------------------------------
# requestMessage
# ---------------------------------------------------------------------------
class TestRequestMessage:
def _make_serial(self, response_bytes=b"\x77"):
ser = MagicMock()
ser.is_open = True
ser.in_waiting = 1
ser.write.return_value = 7
ser.read_until.return_value = response_bytes
return ser
def test_returns_response_bytes(self):
payload = b"\xDD\xA5\x00\x04\x01\x02\x03\x04\x77"
ser = self._make_serial(payload)
reqmsg = bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77])
result = requestMessage(ser, reqmsg)
assert result == payload
def test_open_failure_returns_false(self):
ser = MagicMock()
ser.is_open = True
ser.open.side_effect = Exception("port not found")
result = requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]))
assert result is False
def test_short_write_returns_false(self):
ser = self._make_serial()
ser.write.return_value = 3 # fewer bytes than message length
result = requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]))
assert result is False
def test_empty_response_returns_empty_string(self):
ser = self._make_serial(b"")
result = requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]))
assert result == ""
def test_exception_during_read_logs_error(self, capsys):
"""When read_until raises, requestMessage logs the exception."""
ser = MagicMock()
ser.is_open = True
ser.in_waiting = 1
ser.write.return_value = 7
ser.read_until.side_effect = Exception("serial port error")
result = requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]))
assert result is None
captured = capsys.readouterr()
assert "error communicating" in captured.out.lower()
def test_debug_3_logs_startup(self, capsys):
payload = b"\xDD\xA5\x00\x04\x01\x02\x03\x04\x77"
ser = self._make_serial(payload)
requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]), debug=3)
captured = capsys.readouterr()
assert "starting up monitor" in captured.out
def test_wait_timeout_returns_empty_string(self):
"""When in_waiting stays 0 long enough, returns empty string."""
ser = MagicMock()
ser.is_open = True
ser.in_waiting = 0
ser.write.return_value = 7
call_count = 0
def _in_waiting_prop():
nonlocal call_count
call_count += 1
return 0
# Simulate in_waiting always 0 → timeout after wait_time > 2
ser_mock = MagicMock()
ser_mock.is_open = True
ser_mock.write.return_value = 7
# Make in_waiting always return 0 (property mock)
type(ser_mock).in_waiting = property(lambda self: 0)
with patch("bmspy.jbd_bms.time.sleep"):
result = requestMessage(
ser_mock,
bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]),
debug=3,
)
assert result == ""
def test_cannot_open_port_returns_none(self):
"""When ser.is_open is False after open() call, returns None."""
ser = MagicMock()
ser.is_open = False
# open() doesn't raise but port remains closed
ser.open.return_value = None
result = requestMessage(ser, bytearray([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]))
assert result is None
# ---------------------------------------------------------------------------
# collect_data
# ---------------------------------------------------------------------------
class TestCollectData:
def test_successful_collect_returns_jbdbms(self, valid_03_response, valid_04_response):
responses = [bytes(valid_03_response), bytes(valid_04_response)]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
result = collect_data(MagicMock())
assert isinstance(result, JBDBMS)
assert result.bms_voltage_cells_volts is not None
def test_empty_03_response_returns_false(self):
with patch("bmspy.jbd_bms.requestMessage", return_value=b""):
result = collect_data(MagicMock())
assert result is False
def test_empty_04_response_returns_false(self, valid_03_response):
responses = [bytes(valid_03_response), b""]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
result = collect_data(MagicMock())
assert result is False
def test_bad_03_checksum_returns_false(self, valid_03_response, valid_04_response):
valid_03_response[-1] ^= 0xFF # corrupt checksum
responses = [bytes(valid_03_response), bytes(valid_04_response)]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
result = collect_data(MagicMock())
assert result is False
def test_bad_04_checksum_returns_false(self, valid_03_response, valid_04_response):
valid_04_response[-1] ^= 0xFF # corrupt checksum
responses = [bytes(valid_03_response), bytes(valid_04_response)]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
result = collect_data(MagicMock())
assert result is False
def test_collect_data_debug_1(self, valid_03_response, valid_04_response, capsys):
responses = [bytes(valid_03_response), bytes(valid_04_response)]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
result = collect_data(MagicMock(), debug=1)
assert isinstance(result, JBDBMS)
# ---------------------------------------------------------------------------
# parse_03 and parse_04 debug coverage
# ---------------------------------------------------------------------------
class TestParse03Debug:
def test_debug_2_logs_voltage(self, valid_03_response, capsys):
parse_03_response(valid_03_response, debug=2)
captured = capsys.readouterr()
assert "voltage" in captured.out.lower() or "52" in captured.out
def test_debug_3_logs_data_length(self, valid_03_response, capsys):
parse_03_response(valid_03_response, debug=3)
captured = capsys.readouterr()
assert "data length" in captured.out.lower() or "25" in captured.out
def test_debug_3_logs_protection_state(self, valid_03_response, capsys):
parse_03_response(valid_03_response, debug=3)
captured = capsys.readouterr()
assert "protection state" in captured.out.lower() or "sop" in captured.out.lower()
class TestParse04Debug:
def test_debug_2_logs_cell_voltage(self, valid_04_response, capsys):
parse_04_response(valid_04_response, debug=2)
captured = capsys.readouterr()
assert "cell" in captured.out.lower() or "3.6" in captured.out
def test_debug_3_logs_data_length(self, valid_04_response, capsys):
parse_04_response(valid_04_response, debug=3)
captured = capsys.readouterr()
assert "data length" in captured.out.lower() or "8" in captured.out
# ---------------------------------------------------------------------------
# parse_03_response — cells >= 16 (branch coverage)
# ---------------------------------------------------------------------------
class TestCalculateChecksum:
def test_returns_empty_string(self):
from bmspy.jbd_bms import calculate_checksum
result = calculate_checksum(b"\x01\x02\x03")
assert result == ""
class TestParse03DataLenZero:
def test_data_len_zero_with_valid_checksum_returns_false(self):
"""Build a response where data_len=0 and checksum is valid."""
response = bytearray([
0xDD, 0xA5, 0x00, 0x00, # data_len = 0
0x00, 0x00, # checksum positions (first=4, second=5)
0x77, # end
])
_recompute_checksum(response)
result = parse_03_response(response)
assert result is False
class TestParse04DataLenZero:
def test_data_len_zero_with_valid_checksum_returns_false(self):
"""Build a parse_04 response where data_len=0 and checksum is valid."""
response = bytearray([
0xDD, 0xA5, 0x00, 0x00, # data_len = 0
0x00, 0x00, # checksum positions
0x77, # end
])
_recompute_checksum(response)
result = parse_04_response(response)
assert result is False
class TestParse03HighCellCount:
def test_17_cells_uses_high_balance_state(self):
"""Build a 17-cell response to cover the cell >= 16 branch."""
from tests.conftest import VALID_03_RESPONSE
# We need data_len = 25 + (17-4)*2 = 25 for 4 NTCs... actually we just
# need 17 cells. data_len stays 25 but we set cell count to 17.
response = bytearray(VALID_03_RESPONSE)
# Set cell count to 17
response[25] = 17
# Recompute checksum
_recompute_checksum(response)
result = parse_03_response(response)
assert isinstance(result, JBDBMS)
assert result.bms_cell_number.raw_value == 17
# Cell 17 should be in bms_cells_balancing
assert 17 in result.bms_cells_balancing.raw_values
# ---------------------------------------------------------------------------
# collect_data debug paths
# ---------------------------------------------------------------------------
class TestCollectDataDebug:
def test_debug_1_empty_03_logs(self, capsys):
with patch("bmspy.jbd_bms.requestMessage", return_value=b""):
collect_data(MagicMock(), debug=1)
captured = capsys.readouterr()
assert "error" in captured.out.lower()
def test_debug_1_empty_04_logs(self, valid_03_response, capsys):
responses = [bytes(valid_03_response), b""]
idx = 0
def _req(ser, msg, debug=0):
nonlocal idx
r = responses[idx]; idx += 1; return r
with patch("bmspy.jbd_bms.requestMessage", side_effect=_req):
collect_data(MagicMock(), debug=1)
captured = capsys.readouterr()
assert "error" in captured.out.lower()
# ---------------------------------------------------------------------------
# initialise_serial — covered with mocking
# ---------------------------------------------------------------------------
class TestInitialiseSerial:
def test_returns_serial_object(self):
from bmspy.jbd_bms import initialise_serial
with patch("bmspy.jbd_bms.serial.Serial") as mock_serial_cls:
mock_ser = MagicMock()
mock_serial_cls.return_value = mock_ser
result = initialise_serial("/dev/ttyUSB0", debug=0)
assert result is mock_ser
def test_sets_serial_params(self):
from bmspy.jbd_bms import initialise_serial
import serial as _serial
with patch("bmspy.jbd_bms.serial.Serial") as mock_serial_cls:
mock_ser = MagicMock()
mock_serial_cls.return_value = mock_ser
initialise_serial("/dev/ttyUSB0")
# Verify parity was set
assert mock_ser.parity == _serial.PARITY_NONE
+277
View File
@@ -0,0 +1,277 @@
import pytest
pytest.importorskip("prometheus_client", reason="prometheus-client not installed")
import os
import tempfile
from unittest.mock import patch, MagicMock
from prometheus_client import CollectorRegistry
from bmspy.classes import UPS, BMSScalarField, BMSMultiField, BMSInfoField
from bmspy.prometheus import (
prometheus_create_metric,
prometheus_populate_metric,
prometheus_export,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _scalar_ups(**fields):
"""Build a UPS with one scalar field per kwarg (name -> raw_value)."""
field_dicts = {
name: {"help": name, "raw_value": val, "value": str(val), "units": "V"}
for name, val in fields.items()
}
return UPS.from_dict(field_dicts)
def _make_ups_data():
"""Build a dict[str, UPS] with scalar, multi, and info fields."""
return {
"testups": UPS.from_dict({
"bms_voltage": {
"help": "Total Voltage",
"raw_value": 52.0,
"value": "52.00",
"units": "V",
},
"bms_cells": {
"help": "Cell Voltages",
"label": "cell",
"raw_values": {1: 3.6, 2: 3.61},
"values": {1: "3.600", 2: "3.610"},
"units": "V",
},
"bms_date": {
"help": "Manufacture Date",
"info": "2023-01-15",
},
})
}
# ---------------------------------------------------------------------------
# prometheus_create_metric
# ---------------------------------------------------------------------------
class TestPrometheusCreateMetric:
def test_scalar_creates_gauge(self):
registry = CollectorRegistry(auto_describe=True)
ups_data = _scalar_ups(bms_voltage=52.0)
metric = prometheus_create_metric(registry, {"myups": ups_data})
from prometheus_client import Gauge
assert isinstance(metric["bms_voltage"], Gauge)
def test_multi_creates_gauge_with_label(self):
registry = CollectorRegistry(auto_describe=True)
ups_data = {"myups": UPS.from_dict({
"bms_cells": {
"help": "Cells",
"label": "cell",
"raw_values": {1: 3.6},
"values": {1: "3.600"},
"units": "V",
},
})}
metric = prometheus_create_metric(registry, ups_data)
from prometheus_client import Gauge
assert isinstance(metric["bms_cells"], Gauge)
def test_info_creates_info_metric(self):
registry = CollectorRegistry(auto_describe=True)
ups_data = {"myups": UPS.from_dict({
"bms_date": {"help": "Date", "info": "2023-01-15"},
})}
metric = prometheus_create_metric(registry, ups_data)
from prometheus_client import Info
assert isinstance(metric["bms_date"], Info)
def test_skips_duplicate_across_ups_devices(self):
registry = CollectorRegistry(auto_describe=True)
field = {"help": "V", "raw_value": 52.0, "value": "52.00", "units": "V"}
ups_data = {
"ups1": UPS.from_dict({"bms_voltage": dict(field)}),
"ups2": UPS.from_dict({"bms_voltage": dict(field)}),
}
metric = prometheus_create_metric(registry, ups_data)
# Should only have one entry, not raise on duplicate registration
assert "bms_voltage" in metric
def test_all_field_types_in_one_call(self):
registry = CollectorRegistry(auto_describe=True)
metric = prometheus_create_metric(registry, _make_ups_data())
assert "bms_voltage" in metric
assert "bms_cells" in metric
assert "bms_date" in metric
def test_empty_ups_data_returns_empty_dict(self):
registry = CollectorRegistry(auto_describe=True)
metric = prometheus_create_metric(registry, {"myups": UPS.from_dict({})})
assert metric == {}
# ---------------------------------------------------------------------------
# prometheus_populate_metric
# ---------------------------------------------------------------------------
class TestPrometheusPopulateMetric:
def test_scalar_value_set(self):
registry = CollectorRegistry(auto_describe=True)
ups_data = {"testups": _scalar_ups(bms_voltage=52.0)}
metric = prometheus_create_metric(registry, ups_data)
prometheus_populate_metric(metric, ups_data)
# Verify via registry output
from prometheus_client import generate_latest
output = generate_latest(registry).decode()
assert "52.0" in output
def test_multi_values_set(self):
registry = CollectorRegistry(auto_describe=True)
ups_data = {"testups": UPS.from_dict({
"bms_cells": {
"help": "Cells",
"label": "cell",
"raw_values": {1: 3.6, 2: 3.61},
"values": {1: "3.600", 2: "3.610"},
"units": "V",
},
})}
metric = prometheus_create_metric(registry, ups_data)
prometheus_populate_metric(metric, ups_data)
from prometheus_client import generate_latest
output = generate_latest(registry).decode()
assert "3.6" in output
def test_info_value_set(self):
registry = CollectorRegistry(auto_describe=True)
ups_data = {"testups": UPS.from_dict({
"bms_date": {"help": "Date", "info": "2023-01-15"},
})}
metric = prometheus_create_metric(registry, ups_data)
prometheus_populate_metric(metric, ups_data)
from prometheus_client import generate_latest
output = generate_latest(registry).decode()
assert "2023-01-15" in output
def test_missing_metric_key_is_skipped(self):
"""populate should not crash if a field name is not in metric dict."""
registry = CollectorRegistry(auto_describe=True)
ups_data = {"testups": _scalar_ups(bms_voltage=52.0)}
# Create metrics for a different field name
other_metric = {}
prometheus_populate_metric(other_metric, ups_data) # should not raise
# ---------------------------------------------------------------------------
# prometheus_export daemonize=True (start_http_server path)
# ---------------------------------------------------------------------------
class TestPrometheusExportDaemonize:
def test_daemonize_calls_start_http_server(self):
"""When daemonize=True, start_http_server is called and loop runs once then exits."""
ups_data = {"testups": _scalar_ups(bms_voltage=52.0)}
call_count = 0
def _read_data(*args, **kwargs):
nonlocal call_count
call_count += 1
return ups_data
# Make time.sleep raise StopIteration after first call to exit the daemon loop
with patch("bmspy.prometheus.client.read_data", side_effect=_read_data), \
patch("bmspy.prometheus.prometheus_client.start_http_server") as mock_start, \
patch("bmspy.prometheus.time.sleep", side_effect=StopIteration("stop")):
with pytest.raises(StopIteration):
prometheus_export(daemonize=True)
mock_start.assert_called_once()
# ---------------------------------------------------------------------------
# prometheus_export
# ---------------------------------------------------------------------------
class TestPrometheusExport:
def test_export_to_textfile(self, tmp_path):
filename = str(tmp_path / "metrics.prom")
ups_data = {"testups": _scalar_ups(bms_voltage=52.0)}
with patch("bmspy.prometheus.client.read_data", return_value=ups_data):
result = prometheus_export(daemonize=False, filename=filename)
assert result is True
assert os.path.exists(filename)
def test_export_no_filename_returns_false(self):
ups_data = {"testups": _scalar_ups(bms_voltage=52.0)}
with patch("bmspy.prometheus.client.read_data", return_value=ups_data):
result = prometheus_export(daemonize=False, filename=None)
assert result is False
def test_export_waits_for_data(self, tmp_path):
"""When read_data returns empty first, then real data, export should work."""
filename = str(tmp_path / "metrics.prom")
ups_data = {"testups": _scalar_ups(bms_voltage=52.0)}
call_count = 0
def _read_data(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count < 2:
return {}
return ups_data
with patch("bmspy.prometheus.client.read_data", side_effect=_read_data), \
patch("bmspy.prometheus.time.sleep"):
result = prometheus_export(daemonize=False, filename=filename)
assert result is True
assert call_count == 2
def test_export_with_all_field_types(self, tmp_path):
filename = str(tmp_path / "metrics.prom")
ups_data = _make_ups_data()
with patch("bmspy.prometheus.client.read_data", return_value=ups_data):
result = prometheus_export(daemonize=False, filename=filename)
assert result is True
content = open(filename).read()
assert "bms_voltage" in content
assert "bms_cells" in content
assert "bms_date" in content
# ---------------------------------------------------------------------------
# prometheus main()
# ---------------------------------------------------------------------------
class TestPrometheusMain:
def _ups_dict(self):
return {"testups": _scalar_ups(bms_voltage=52.0)}
def test_main_with_file_arg(self, tmp_path):
from bmspy.prometheus import main
filename = str(tmp_path / "metrics.prom")
with patch("sys.argv", ["bmspy-prometheus", "--file", filename]), \
patch("bmspy.prometheus.client.read_data", return_value=self._ups_dict()):
main()
assert os.path.exists(filename)
def test_main_with_socket_arg(self, tmp_path):
from bmspy.prometheus import main
filename = str(tmp_path / "metrics.prom")
with patch("sys.argv", ["bmspy-prometheus", "--file", filename, "--socket", "/tmp/test.sock"]), \
patch("bmspy.prometheus.client.read_data", return_value=self._ups_dict()):
main()
def test_main_with_ups_arg(self, tmp_path):
from bmspy.prometheus import main
filename = str(tmp_path / "metrics.prom")
with patch("sys.argv", ["bmspy-prometheus", "--file", filename, "--ups", "myups"]), \
patch("bmspy.prometheus.client.read_data", return_value=self._ups_dict()):
main()
def test_main_with_verbose(self, tmp_path):
from bmspy.prometheus import main
filename = str(tmp_path / "metrics.prom")
with patch("sys.argv", ["bmspy-prometheus", "--file", filename, "-v"]), \
patch("bmspy.prometheus.client.read_data", return_value=self._ups_dict()):
main()
+957
View File
@@ -0,0 +1,957 @@
import json
import socket as socket_module
import struct
import threading
import pytest
import serial
from bmspy.classes import BMSScalarField, BMSInfoField, UPS
from bmspy.jbd_bms import JBDBMS
from bmspy.server import DeviceState, parse_device, read_request, send_response
# ---------------------------------------------------------------------------
# parse_device
# ---------------------------------------------------------------------------
class TestParseDevice:
def test_plain_path(self):
assert parse_device("/dev/ttyUSB0") == ("ttyUSB0", "/dev/ttyUSB0")
def test_named_path(self):
assert parse_device("myups:/dev/ttyUSB1") == ("myups", "/dev/ttyUSB1")
def test_nested_path(self):
assert parse_device("/dev/serial/by-id/usb-FTDI") == (
"usb-FTDI",
"/dev/serial/by-id/usb-FTDI",
)
def test_name_without_slash(self):
# No "/" prefix, no ":" → treated as a plain path; last segment is name
assert parse_device("ttyUSB0") == ("ttyUSB0", "ttyUSB0")
def test_name_colon_path_no_leading_slash(self):
assert parse_device("office:/dev/ttyUSB2") == ("office", "/dev/ttyUSB2")
# ---------------------------------------------------------------------------
# DeviceState
# ---------------------------------------------------------------------------
class TestDeviceState:
def test_defaults(self):
ser = serial.Serial()
ds = DeviceState(ser=ser)
assert ds.data is None
assert ds.timestamp == 0.0
assert ds.ser is ser
def test_fields_are_mutable(self):
ser = serial.Serial()
ds = DeviceState(ser=ser)
ds.timestamp = 123.4
ds.data = UPS.from_dict({})
assert ds.timestamp == 123.4
assert isinstance(ds.data, UPS)
# ---------------------------------------------------------------------------
# read_request / send_response round-trip
# ---------------------------------------------------------------------------
def _send_framed(sock: socket_module.socket, data: dict) -> None:
payload = json.dumps(data).encode()
sock.sendall(struct.pack("!I", len(payload)) + payload)
def _recv_framed(sock: socket_module.socket) -> dict:
length = struct.unpack("!I", sock.recv(4))[0]
return json.loads(sock.recv(length))
class TestReadRequest:
def test_round_trip(self):
srv, cli = socket_module.socketpair()
try:
_send_framed(cli, {"command": "GET", "client": "test"})
result = read_request(srv)
assert result == {"command": "GET", "client": "test"}
finally:
srv.close()
cli.close()
def test_with_ups_filter(self):
srv, cli = socket_module.socketpair()
try:
_send_framed(cli, {"command": "GET", "client": "test", "ups": "myups"})
result = read_request(srv)
assert result["ups"] == "myups"
finally:
srv.close()
cli.close()
class TestSendResponse:
def test_plain_dict(self):
srv, cli = socket_module.socketpair()
try:
send_response(srv, {"status": "REGISTERED", "client": "test"}, "test")
result = _recv_framed(cli)
assert result == {"status": "REGISTERED", "client": "test"}
finally:
srv.close()
cli.close()
def test_ups_object_serialized_via_items(self):
"""UPS objects must be serialized using items(), not dataclass_asdict."""
bms = JBDBMS()
bms.bms_voltage_total_volts = BMSScalarField(
help="Total Voltage", raw_value=52.0, value="52.00", units="V"
)
bms.bms_manufacture_date = BMSInfoField(
help="Date of Manufacture", info="2023-01-15"
)
srv, cli = socket_module.socketpair()
try:
send_response(srv, {"myups": bms}, "test")
result = _recv_framed(cli)
finally:
srv.close()
cli.close()
assert "myups" in result
assert "bms_voltage_total_volts" in result["myups"]
assert result["myups"]["bms_voltage_total_volts"]["raw_value"] == 52.0
# None fields must not appear
assert "bms_current_amps" not in result["myups"]
# client field must not appear
assert "client" not in result["myups"]
def test_empty_ups_serializes_to_empty_dict(self):
srv, cli = socket_module.socketpair()
try:
send_response(srv, {"myups": JBDBMS()}, "test")
result = _recv_framed(cli)
finally:
srv.close()
cli.close()
assert result["myups"] == {}
def test_plain_dict_passthrough(self):
srv, cli = socket_module.socketpair()
try:
send_response(srv, {"key": "value", "number": 42}, "test")
result = _recv_framed(cli)
finally:
srv.close()
cli.close()
assert result == {"key": "value", "number": 42}
def test_closed_socket_raises_os_error(self):
srv, cli = socket_module.socketpair()
srv.close()
cli.close()
with pytest.raises(OSError):
send_response(srv, {"status": "OK"}, "test")
# ---------------------------------------------------------------------------
# signalHandler
# ---------------------------------------------------------------------------
class TestSignalHandler:
def test_raises_system_exit(self):
from bmspy.server import signalHandler
with pytest.raises(SystemExit):
signalHandler()
def test_exit_message_contains_terminating(self):
from bmspy.server import signalHandler
with pytest.raises(SystemExit, match="terminating"):
signalHandler()
# ---------------------------------------------------------------------------
# socket_cleanup
# ---------------------------------------------------------------------------
class TestSocketCleanup:
def test_removes_socket_file(self, tmp_path):
from bmspy.server import socket_cleanup
sock_file = tmp_path / "test.sock"
sock_file.touch()
assert sock_file.exists()
socket_cleanup(str(sock_file))
assert not sock_file.exists()
def test_raises_when_file_missing(self, tmp_path):
from bmspy.server import socket_cleanup
with pytest.raises(FileNotFoundError):
socket_cleanup(str(tmp_path / "nonexistent.sock"))
# ---------------------------------------------------------------------------
# read_request — error paths
# ---------------------------------------------------------------------------
class TestReadRequestErrors:
def test_invalid_json_raises_exception(self):
srv, cli = socket_module.socketpair()
try:
invalid_payload = b"not valid json !!!"
cli.sendall(struct.pack("!I", len(invalid_payload)) + invalid_payload)
with pytest.raises(Exception, match="unable to read incoming request"):
read_request(srv)
finally:
srv.close()
cli.close()
def test_truncated_length_bytes_raises(self):
srv, cli = socket_module.socketpair()
try:
cli.sendall(b"\x00\x00") # only 2 of 4 length bytes, then close
cli.close()
with pytest.raises(Exception):
read_request(srv)
finally:
srv.close()
def test_recv_raises_os_error(self):
"""When recv raises on first read, read_request should raise OSError."""
from unittest.mock import MagicMock
mock_conn = MagicMock()
mock_conn.recv.side_effect = OSError("connection reset")
with pytest.raises(OSError, match="unable to read request length"):
read_request(mock_conn)
def test_recv_body_raises_os_error(self):
"""When recv raises on second read (body), should raise OSError."""
from unittest.mock import MagicMock
import struct
mock_conn = MagicMock()
length_bytes = struct.pack("!I", 10)
# First recv returns valid length bytes, second recv raises
mock_conn.recv.side_effect = [length_bytes, OSError("body read error")]
with pytest.raises(OSError, match="unable to read socket"):
read_request(mock_conn)
def test_debug_5_logs_length(self, capsys):
srv, cli = socket_module.socketpair()
try:
_send_framed(cli, {"command": "GET", "client": "test"})
read_request(srv, debug=5)
finally:
srv.close()
cli.close()
# debug > 4 logs incoming length
captured = capsys.readouterr()
assert "incoming length" in captured.out
def test_debug_4_logs_request_bytes(self, capsys):
srv, cli = socket_module.socketpair()
try:
_send_framed(cli, {"command": "GET", "client": "test"})
read_request(srv, debug=4)
finally:
srv.close()
cli.close()
captured = capsys.readouterr()
assert "incoming request" in captured.out
def test_debug_3_logs_received(self, capsys):
srv, cli = socket_module.socketpair()
try:
_send_framed(cli, {"command": "GET", "client": "test"})
read_request(srv, debug=3)
finally:
srv.close()
cli.close()
captured = capsys.readouterr()
assert "received" in captured.out
class TestServerMain:
"""Test the server main() function by running it in a thread and sending real socket commands."""
def _make_server_thread(self, sock_path: str, ready_event: threading.Event,
stop_event: threading.Event, **kwargs):
"""Run server main() in a thread with mocked serial and collect_data."""
import socket as _socket
from unittest.mock import MagicMock, patch
from bmspy.server import main as server_main
from bmspy.classes import BMSScalarField
from bmspy.jbd_bms import JBDBMS
# Build a fake JBDBMS result
fake_bms = JBDBMS()
fake_bms.bms_voltage_total_volts = BMSScalarField(
help="Voltage", raw_value=52.0, value="52.00", units="V"
)
def _do_main():
import sys
import time as _t
argv = ["bmspy-server", "--socket", sock_path, "--device", "/dev/ttyUSB0"]
if "debug" in kwargs:
argv += ["-v"] * kwargs["debug"]
original_listen = socket_module.socket.listen
def _patched_listen(self, backlog=1):
result = original_listen(self, backlog)
ready_event.set()
return result
with patch("sys.argv", argv), \
patch("bmspy.server.signal.signal"), \
patch("bmspy.server.initialise_serial", return_value=MagicMock()), \
patch("bmspy.server.collect_data", return_value=fake_bms), \
patch("bmspy.server.time.sleep"), \
patch("bmspy.server.os.path.isdir", return_value=True), \
patch("bmspy.server.os.path.exists", return_value=False), \
patch.object(socket_module.socket, "listen", _patched_listen):
try:
server_main()
except (SystemExit, KeyboardInterrupt, OSError):
pass
t = threading.Thread(target=_do_main, daemon=True)
return t
def _send_command(self, sock_path: str, cmd: dict) -> dict:
"""Connect to server socket and send a command."""
import socket as _socket
import struct
import json
sock = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
# Wait for server to be ready
for _ in range(20):
try:
sock.connect(sock_path)
break
except (OSError, ConnectionRefusedError):
import time
time.sleep(0.1)
payload = json.dumps(cmd).encode()
sock.sendall(struct.pack("!I", len(payload)) + payload)
# Read response
raw_len = sock.recv(4)
if not raw_len:
return {}
length = struct.unpack("!I", raw_len)[0]
resp_data = sock.recv(length)
sock.close()
return json.loads(resp_data)
def test_register_command(self, tmp_path):
"""Test REGISTER command via server main()."""
sock_path = str(tmp_path / "server.sock")
ready = threading.Event()
stop = threading.Event()
t = self._make_server_thread(sock_path, ready, stop)
t.start()
ready.wait(timeout=2)
import time as _time
_time.sleep(0.2) # Let server get to sock.accept()
try:
response = self._send_command(sock_path, {"command": "REGISTER", "client": "test"})
assert response.get("status") == "REGISTERED"
finally:
import os
# Trigger server shutdown by connecting and sending KeyboardInterrupt-triggering command
try:
import socket as _s, struct, json
s = _s.socket(_s.AF_UNIX, _s.SOCK_STREAM)
s.connect(sock_path)
# Send DEREGISTER
payload = json.dumps({"command": "DEREGISTER", "client": "test"}).encode()
s.sendall(struct.pack("!I", len(payload)) + payload)
s.recv(4)
s.close()
except Exception:
pass
t.join(timeout=0.5)
def test_get_command(self, tmp_path):
"""Test GET command via server main()."""
sock_path = str(tmp_path / "server_get.sock")
ready = threading.Event()
stop = threading.Event()
t = self._make_server_thread(sock_path, ready, stop)
t.start()
ready.wait(timeout=2)
import time as _time
_time.sleep(0.2)
try:
response = self._send_command(sock_path, {"command": "GET", "client": "test"})
assert "ttyUSB0" in response or len(response) >= 0
finally:
t.join(timeout=0.5)
def test_deregister_command(self, tmp_path):
"""Test DEREGISTER command via server main()."""
sock_path = str(tmp_path / "server_dereg.sock")
ready = threading.Event()
stop = threading.Event()
t = self._make_server_thread(sock_path, ready, stop)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
try:
# First register
r1 = self._send_command(sock_path, {"command": "REGISTER", "client": "test"})
assert r1.get("status") == "REGISTERED"
# Then deregister
r2 = self._send_command(sock_path, {"command": "DEREGISTER", "client": "test"})
assert r2.get("status") == "DEREGISTERED"
finally:
t.join(timeout=0.5)
def test_get_with_ups_filter(self, tmp_path):
"""Test GET command with ups filter."""
sock_path = str(tmp_path / "server_getups.sock")
ready = threading.Event()
stop = threading.Event()
t = self._make_server_thread(sock_path, ready, stop)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
try:
response = self._send_command(sock_path, {"command": "GET", "client": "test", "ups": "ttyUSB0"})
# Should get ttyUSB0 data or empty (ups filter)
assert isinstance(response, dict)
finally:
t.join(timeout=0.5)
def test_debug_mode_verbose(self, tmp_path, capsys):
"""Test server main() with debug=1 logs messages."""
sock_path = str(tmp_path / "server_dbg.sock")
ready = threading.Event()
stop = threading.Event()
t = self._make_server_thread(sock_path, ready, stop, debug=1)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
try:
self._send_command(sock_path, {"command": "GET", "client": "test"})
except Exception:
pass
finally:
t.join(timeout=0.5)
def test_duplicate_device_name_skipped(self, tmp_path, capsys):
"""Test that duplicate UPS names are skipped."""
sock_path = str(tmp_path / "server_dup.sock")
ready = threading.Event()
from unittest.mock import MagicMock, patch
from bmspy.server import main as server_main
from bmspy.classes import BMSScalarField
from bmspy.jbd_bms import JBDBMS
fake_bms = JBDBMS()
fake_bms.bms_voltage_total_volts = BMSScalarField(
help="Voltage", raw_value=52.0, value="52.00", units="V"
)
def _do_main_dup():
argv = ["bmspy-server", "--socket", sock_path,
"--device", "myups:/dev/ttyUSB0",
"--device", "myups:/dev/ttyUSB1"] # duplicate name
original_listen = socket_module.socket.listen
def _patched_listen(self, backlog=1):
result = original_listen(self, backlog)
ready.set()
return result
with patch("sys.argv", argv), \
patch("bmspy.server.signal.signal"), \
patch("bmspy.server.initialise_serial", return_value=MagicMock()), \
patch("bmspy.server.collect_data", return_value=fake_bms), \
patch("bmspy.server.time.sleep"), \
patch("bmspy.server.os.path.isdir", return_value=True), \
patch("bmspy.server.os.path.exists", return_value=False), \
patch.object(socket_module.socket, "listen", _patched_listen):
try:
server_main()
except (SystemExit, KeyboardInterrupt, OSError):
pass
t = threading.Thread(target=_do_main_dup, daemon=True)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
# Server should have started with one device
response = self._send_command(sock_path, {"command": "GET", "client": "test"})
assert "myups" in response
t.join(timeout=0.5)
def test_socket_dir_created_if_missing(self, tmp_path):
"""Test that socket dir is created when it doesn't exist."""
sock_path = str(tmp_path / "server_mkdir.sock")
ready = threading.Event()
from unittest.mock import MagicMock, patch, call
from bmspy.server import main as server_main
from bmspy.jbd_bms import JBDBMS
fake_bms = JBDBMS()
makedirs_called = []
def _do_main_mkdir():
argv = ["bmspy-server", "--socket", sock_path, "--device", "/dev/ttyUSB0"]
original_listen = socket_module.socket.listen
def _patched_listen(self, backlog=1):
result = original_listen(self, backlog)
ready.set()
return result
def _patched_makedirs(path, exist_ok=False):
makedirs_called.append(path)
# Don't call actual makedirs to avoid recursion - socket dir is already tmp_path
with patch("sys.argv", argv), \
patch("bmspy.server.signal.signal"), \
patch("bmspy.server.initialise_serial", return_value=MagicMock()), \
patch("bmspy.server.collect_data", return_value=fake_bms), \
patch("bmspy.server.time.sleep"), \
patch("bmspy.server.os.path.exists", return_value=False), \
patch("bmspy.server.os.path.isdir", return_value=False), \
patch("bmspy.server.os.makedirs", side_effect=_patched_makedirs), \
patch.object(socket_module.socket, "listen", _patched_listen):
try:
server_main()
except (SystemExit, KeyboardInterrupt, OSError):
pass
t = threading.Thread(target=_do_main_mkdir, daemon=True)
t.start()
ready.wait(timeout=5)
import time as _t
_t.sleep(0.2)
try:
self._send_command(sock_path, {"command": "REGISTER", "client": "test"})
except Exception:
pass
t.join(timeout=0.5)
assert len(makedirs_called) > 0
def test_debug_3_logs_startup(self, tmp_path, capsys):
"""Test debug=3 logs 'starting up' message."""
sock_path = str(tmp_path / "server_dbg3.sock")
ready = threading.Event()
stop = threading.Event()
t = self._make_server_thread(sock_path, ready, stop, debug=3)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
try:
self._send_command(sock_path, {"command": "REGISTER", "client": "test"})
except Exception:
pass
finally:
t.join(timeout=0.5)
captured = capsys.readouterr()
# debug>2 triggers "starting up" and "waiting for connection"
assert "starting up" in captured.out.lower() or "waiting" in captured.out.lower()
def test_socket_already_exists_raises(self, tmp_path):
"""Test that server raises OSError if socket already exists."""
sock_path = str(tmp_path / "existing.sock")
from unittest.mock import MagicMock, patch
from bmspy.server import main as server_main
with patch("sys.argv", ["bmspy-server", "--socket", sock_path, "--device", "/dev/ttyUSB0"]), \
patch("bmspy.server.signal.signal"), \
patch("bmspy.server.initialise_serial", return_value=MagicMock()), \
patch("bmspy.server.os.path.isdir", return_value=True), \
patch("bmspy.server.os.path.exists", return_value=True):
with pytest.raises(OSError, match="already exists"):
server_main()
def test_deregister_nonexistent_client_no_error(self, tmp_path):
"""Test DEREGISTER for a client that was never registered (KeyError suppressed)."""
sock_path = str(tmp_path / "server_noerr.sock")
ready = threading.Event()
stop = threading.Event()
t = self._make_server_thread(sock_path, ready, stop)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
try:
# Deregister without first registering
response = self._send_command(sock_path, {"command": "DEREGISTER", "client": "ghost"})
assert response.get("status") == "DEREGISTERED"
finally:
t.join(timeout=0.5)
def test_keyboard_interrupt_closes_connection(self, tmp_path):
"""Test KeyboardInterrupt handler closes connection when connection is active."""
sock_path = str(tmp_path / "server_kbi.sock")
ready = threading.Event()
from unittest.mock import MagicMock, patch
from bmspy.server import main as server_main, read_request as original_read_request
from bmspy.jbd_bms import JBDBMS
fake_bms = JBDBMS()
call_count = [0]
def _do_main_kbi():
argv = ["bmspy-server", "--socket", sock_path, "--device", "/dev/ttyUSB0"]
original_listen = socket_module.socket.listen
def _patched_listen(self, backlog=1):
result = original_listen(self, backlog)
ready.set()
return result
def _patched_read_request(conn, debug=0):
call_count[0] += 1
if call_count[0] >= 2:
raise KeyboardInterrupt("test interrupt")
return original_read_request(conn, debug)
with patch("sys.argv", argv), \
patch("bmspy.server.signal.signal"), \
patch("bmspy.server.initialise_serial", return_value=MagicMock()), \
patch("bmspy.server.collect_data", return_value=fake_bms), \
patch("bmspy.server.time.sleep"), \
patch("bmspy.server.os.path.isdir", return_value=True), \
patch("bmspy.server.os.path.exists", return_value=False), \
patch("bmspy.server.read_request", side_effect=_patched_read_request), \
patch.object(socket_module.socket, "listen", _patched_listen):
try:
server_main()
except SystemExit:
pass
t = threading.Thread(target=_do_main_kbi, daemon=True)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
# Send first request to register call_count[0] = 1
try:
self._send_command(sock_path, {"command": "REGISTER", "client": "test"})
except Exception:
pass
_t.sleep(0.1)
# Send second request to trigger KeyboardInterrupt with active connection
try:
self._send_command(sock_path, {"command": "REGISTER", "client": "test2"})
except Exception:
pass
t.join(timeout=2)
def test_socket_read_error_logs_and_continues(self, tmp_path, capsys):
"""Test that read_request errors are caught and logged."""
sock_path = str(tmp_path / "server_err.sock")
ready = threading.Event()
stop = threading.Event()
t = self._make_server_thread(sock_path, ready, stop)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
try:
# Connect but send garbage that will cause read_request to fail
import socket as _s
sock = _s.socket(_s.AF_UNIX, _s.SOCK_STREAM)
for _ in range(20):
try:
sock.connect(sock_path)
break
except (OSError, ConnectionRefusedError):
_t.sleep(0.05)
# Send only 2 bytes (incomplete length header)
sock.sendall(b"\x00\x00")
sock.close()
_t.sleep(0.1) # Give server time to process
finally:
t.join(timeout=0.5)
def test_root_user_socket_dir_created(self, tmp_path, capsys):
"""Test root user path when socket dir doesn't exist (chown/chmod triggered)."""
sock_path = str(tmp_path / "server_root_mkdir.sock")
ready = threading.Event()
from unittest.mock import MagicMock, patch
from bmspy.server import main as server_main
from bmspy.jbd_bms import JBDBMS
fake_bms = JBDBMS()
def _do_main():
argv = ["bmspy-server", "--socket", sock_path, "--device", "/dev/ttyUSB0", "-v", "-v"]
original_listen = socket_module.socket.listen
def _patched_listen(self, backlog=1):
result = original_listen(self, backlog)
ready.set()
return result
mock_pwd_module = MagicMock()
mock_pwd_module.getpwnam.return_value = [None, None, 65534]
mock_pwd_module.getpwuid.return_value = ["nobody"]
mock_grp_module = MagicMock()
mock_grp_module.getgrnam.return_value = [None, None, 65534]
mock_grp_module.getgrgid.return_value = ["dialout"]
import sys as _sys
with patch("sys.argv", argv), \
patch("bmspy.server.signal.signal"), \
patch("bmspy.server.initialise_serial", return_value=MagicMock()), \
patch("bmspy.server.collect_data", return_value=fake_bms), \
patch("bmspy.server.time.sleep"), \
patch("bmspy.server.os.path.isdir", return_value=False), \
patch("bmspy.server.os.makedirs"), \
patch("bmspy.server.os.chown"), \
patch("bmspy.server.os.chmod"), \
patch("bmspy.server.os.path.exists", return_value=False), \
patch("bmspy.server.os.getuid", return_value=0), \
patch("bmspy.server.os.getgid", return_value=0), \
patch("bmspy.server.os.setuid"), \
patch("bmspy.server.os.setgid"), \
patch("bmspy.server.os.umask", return_value=0o022), \
patch.dict(_sys.modules, {"pwd": mock_pwd_module, "grp": mock_grp_module}), \
patch.object(socket_module.socket, "listen", _patched_listen):
try:
server_main()
except (SystemExit, KeyboardInterrupt, OSError):
pass
t = threading.Thread(target=_do_main, daemon=True)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
try:
self._send_command(sock_path, {"command": "REGISTER", "client": "test"})
except Exception:
pass
t.join(timeout=0.5)
def test_root_user_setgid_error(self, tmp_path, capsys):
"""Test root user path when setgid raises OSError."""
sock_path = str(tmp_path / "server_setgid_err.sock")
ready = threading.Event()
from unittest.mock import MagicMock, patch
from bmspy.server import main as server_main
from bmspy.jbd_bms import JBDBMS
fake_bms = JGDBMS() if False else JBDBMS()
def _do_main():
argv = ["bmspy-server", "--socket", sock_path, "--device", "/dev/ttyUSB0"]
original_listen = socket_module.socket.listen
def _patched_listen(self, backlog=1):
result = original_listen(self, backlog)
ready.set()
return result
mock_pwd_module = MagicMock()
mock_pwd_module.getpwnam.return_value = [None, None, 65534]
mock_pwd_module.getpwuid.return_value = ["nobody"]
mock_grp_module = MagicMock()
mock_grp_module.getgrnam.return_value = [None, None, 65534]
mock_grp_module.getgrgid.return_value = ["dialout"]
import sys as _sys
with patch("sys.argv", argv), \
patch("bmspy.server.signal.signal"), \
patch("bmspy.server.initialise_serial", return_value=MagicMock()), \
patch("bmspy.server.collect_data", return_value=JBDBMS()), \
patch("bmspy.server.time.sleep"), \
patch("bmspy.server.os.path.isdir", return_value=True), \
patch("bmspy.server.os.path.exists", return_value=False), \
patch("bmspy.server.os.getuid", return_value=0), \
patch("bmspy.server.os.getgid", return_value=0), \
patch("bmspy.server.os.setgid", side_effect=OSError("cannot set gid")), \
patch("bmspy.server.os.setuid", side_effect=OSError("cannot set uid")), \
patch("bmspy.server.os.umask", return_value=0o022), \
patch.dict(_sys.modules, {"pwd": mock_pwd_module, "grp": mock_grp_module}), \
patch.object(socket_module.socket, "listen", _patched_listen):
try:
server_main()
except (SystemExit, KeyboardInterrupt, OSError):
pass
t = threading.Thread(target=_do_main, daemon=True)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
try:
self._send_command(sock_path, {"command": "REGISTER", "client": "test"})
except Exception:
pass
t.join(timeout=0.5)
captured = capsys.readouterr()
# Should log errors about setgid/setuid
assert "gid" in captured.out.lower() or "uid" in captured.out.lower()
def test_root_user_uid_gid_handling(self, tmp_path, capsys):
"""Test the uid==0 path for privilege dropping."""
sock_path = str(tmp_path / "server_root.sock")
ready = threading.Event()
from unittest.mock import MagicMock, patch
from bmspy.server import main as server_main
from bmspy.jbd_bms import JBDBMS
fake_bms = JBDBMS()
def _do_main_root():
argv = ["bmspy-server", "--socket", sock_path, "--device", "/dev/ttyUSB0"]
original_listen = socket_module.socket.listen
def _patched_listen(self, backlog=1):
result = original_listen(self, backlog)
ready.set()
return result
mock_pwd_module = MagicMock()
mock_pwd_module.getpwnam.return_value = [None, None, 65534] # nobody uid
mock_pwd_module.getpwuid.return_value = ["nobody"]
mock_grp_module = MagicMock()
mock_grp_module.getgrnam.return_value = [None, None, 65534] # dialout gid
mock_grp_module.getgrgid.return_value = ["dialout"]
import sys as _sys
with patch("sys.argv", argv), \
patch("bmspy.server.signal.signal"), \
patch("bmspy.server.initialise_serial", return_value=MagicMock()), \
patch("bmspy.server.collect_data", return_value=fake_bms), \
patch("bmspy.server.time.sleep"), \
patch("bmspy.server.os.path.isdir", return_value=True), \
patch("bmspy.server.os.path.exists", return_value=False), \
patch("bmspy.server.os.getuid", return_value=0), \
patch("bmspy.server.os.getgid", return_value=0), \
patch("bmspy.server.os.setuid"), \
patch("bmspy.server.os.setgid"), \
patch("bmspy.server.os.umask", return_value=0o022), \
patch.dict(_sys.modules, {"pwd": mock_pwd_module, "grp": mock_grp_module}), \
patch.object(socket_module.socket, "listen", _patched_listen):
try:
server_main()
except (SystemExit, KeyboardInterrupt, OSError):
pass
t = threading.Thread(target=_do_main_root, daemon=True)
t.start()
ready.wait(timeout=2)
import time as _t
_t.sleep(0.2)
try:
self._send_command(sock_path, {"command": "REGISTER", "client": "test"})
except Exception:
pass
t.join(timeout=0.5)
def test_invalid_command_breaks_loop(self, tmp_path, capsys):
"""Test that an invalid command logs an error."""
sock_path = str(tmp_path / "server_invalid.sock")
ready = threading.Event()
stop = threading.Event()
t = self._make_server_thread(sock_path, ready, stop)
t.start()
ready.wait(timeout=2)
import time as _time
_time.sleep(0.2)
try:
# Send invalid command - note: server breaks on invalid, so this may fail
import socket as _s, struct, json
sock = _s.socket(_s.AF_UNIX, _s.SOCK_STREAM)
for _ in range(20):
try:
sock.connect(sock_path)
break
except (OSError, ConnectionRefusedError):
_time.sleep(0.05)
payload = json.dumps({"command": "INVALID", "client": "test"}).encode()
sock.sendall(struct.pack("!I", len(payload)) + payload)
sock.close()
except Exception:
pass
finally:
t.join(timeout=0.5)
class TestSendResponseDebug:
def test_debug_3_logs_sending(self, capsys):
srv, cli = socket_module.socketpair()
try:
send_response(srv, {"status": "OK"}, "test", debug=3)
_recv_framed(cli)
finally:
srv.close()
cli.close()
captured = capsys.readouterr()
assert "sending" in captured.out
def test_debug_5_logs_length(self, capsys):
srv, cli = socket_module.socketpair()
try:
send_response(srv, {"status": "OK"}, "test", debug=5)
_recv_framed(cli)
finally:
srv.close()
cli.close()
captured = capsys.readouterr()
assert "length" in captured.out
def test_debug_4_logs_response(self, capsys):
srv, cli = socket_module.socketpair()
try:
send_response(srv, {"status": "OK"}, "test", debug=4)
_recv_framed(cli)
finally:
srv.close()
cli.close()
captured = capsys.readouterr()
assert "outgoing response" in captured.out
+490
View File
@@ -0,0 +1,490 @@
import pytest
import smtplib
from unittest.mock import patch, MagicMock
import bmspy.ups as ups_mod
from bmspy.classes import BMSScalarField, BMSMultiField, BMSInfoField, UPS
from bmspy.ups import _get_field_value, handle_shutdown, handle_email
# ---------------------------------------------------------------------------
# Fixtures: reset module-level globals between tests
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _reset_globals():
ups_mod.scheduled_shutdown = False
ups_mod.critical_sent = False
ups_mod.warning_sent = False
ups_mod.alert_sent = False
yield
ups_mod.scheduled_shutdown = False
ups_mod.critical_sent = False
ups_mod.warning_sent = False
ups_mod.alert_sent = False
# ---------------------------------------------------------------------------
# _get_field_value
# ---------------------------------------------------------------------------
class TestGetFieldValue:
def _ups(self):
return UPS.from_dict({
"bms_current_amps": {
"help": "Current",
"raw_value": -2.5,
"value": "-2.50",
"units": "A",
},
"bms_capacity_charge_ratio": {
"help": "Percent Charge",
"raw_value": 0.75,
"value": "0.75",
"units": "%",
},
"bms_date": {
"help": "Date",
"info": "2023-01-15",
},
"bms_cells": {
"help": "Cells",
"label": "cell",
"raw_values": {1: 3.6},
"values": {1: "3.600"},
"units": "V",
},
})
def test_found_scalar_field(self):
ups = self._ups()
assert _get_field_value(ups, "bms_current_amps") == pytest.approx(-2.5)
def test_found_charge_ratio(self):
ups = self._ups()
assert _get_field_value(ups, "bms_capacity_charge_ratio") == pytest.approx(0.75)
def test_field_not_found_returns_none(self):
ups = self._ups()
assert _get_field_value(ups, "nonexistent_field") is None
def test_info_field_returns_none(self):
# BMSInfoField is not a scalar, should return None
ups = self._ups()
assert _get_field_value(ups, "bms_date") is None
def test_multi_field_returns_none(self):
# BMSMultiField is not a scalar, should return None
ups = self._ups()
assert _get_field_value(ups, "bms_cells") is None
# ---------------------------------------------------------------------------
# handle_shutdown
# ---------------------------------------------------------------------------
class TestHandleShutdown:
def test_shutdown_calls_os_system(self):
with patch("bmspy.ups.os.system") as mock_sys:
handle_shutdown(action="shutdown", delay=5)
mock_sys.assert_called_once_with("/sbin/shutdown 5")
def test_shutdown_sets_scheduled_flag(self):
with patch("bmspy.ups.os.system"):
handle_shutdown(action="shutdown", delay=5)
assert ups_mod.scheduled_shutdown is not False
def test_shutdown_does_not_reschedule_if_already_scheduled(self):
ups_mod.scheduled_shutdown = 9999999999.0
with patch("bmspy.ups.os.system") as mock_sys:
handle_shutdown(action="shutdown", delay=5)
mock_sys.assert_not_called()
def test_cancel_calls_os_system(self):
with patch("bmspy.ups.os.system") as mock_sys:
handle_shutdown(action="cancel")
mock_sys.assert_called_once_with("/sbin/shutdown -c")
def test_cancel_does_not_set_flag(self):
with patch("bmspy.ups.os.system"):
handle_shutdown(action="cancel")
assert ups_mod.scheduled_shutdown is False
# ---------------------------------------------------------------------------
# handle_email
# ---------------------------------------------------------------------------
class TestHandleEmail:
def _mock_smtp(self):
mock_server = MagicMock()
mock_smtp_class = MagicMock()
mock_smtp_class.return_value.__enter__ = MagicMock(return_value=mock_server)
mock_smtp_class.return_value.__exit__ = MagicMock(return_value=False)
return mock_smtp_class, mock_server
def test_basic_send(self):
mock_smtp_class, mock_server = self._mock_smtp()
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class):
handle_email(
text="test message",
level="Alert",
recipient="user@example.com",
)
mock_server.sendmail.assert_called_once()
def test_ssl_port_triggers_starttls(self):
mock_smtp_class, mock_server = self._mock_smtp()
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \
patch("bmspy.ups.ssl.create_default_context") as mock_ssl:
handle_email(
text="test",
level="Alert",
recipient="user@example.com",
port=465,
)
mock_server.starttls.assert_called_once()
def test_port_587_also_triggers_starttls(self):
mock_smtp_class, mock_server = self._mock_smtp()
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \
patch("bmspy.ups.ssl.create_default_context"):
handle_email(
text="test",
level="Alert",
recipient="user@example.com",
port=587,
)
mock_server.starttls.assert_called_once()
def test_no_ssl_no_starttls(self):
mock_smtp_class, mock_server = self._mock_smtp()
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class):
handle_email(
text="test",
level="Alert",
recipient="user@example.com",
port=25,
)
mock_server.starttls.assert_not_called()
def test_with_credentials_calls_login(self):
mock_smtp_class, mock_server = self._mock_smtp()
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class):
handle_email(
text="test",
level="Alert",
recipient="user@example.com",
mailuser="user",
mailpass="pass",
)
mock_server.login.assert_called_once_with("user", "pass")
def test_without_credentials_no_login(self):
mock_smtp_class, mock_server = self._mock_smtp()
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class):
handle_email(
text="test",
level="Alert",
recipient="user@example.com",
)
mock_server.login.assert_not_called()
def test_recipient_without_at_gets_hostname_appended(self):
mock_smtp_class, mock_server = self._mock_smtp()
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \
patch("bmspy.ups.socket.gethostname", return_value="myhost"):
handle_email(
text="test",
level="Alert",
recipient="root",
)
# sendmail recipient should be root@myhost
args = mock_server.sendmail.call_args[0]
assert args[1] == "root@myhost"
# ---------------------------------------------------------------------------
# ups main() - comprehensive loop testing
# ---------------------------------------------------------------------------
def _make_ups(current_amps: float, charge_ratio: float) -> UPS:
"""Build a UPS with bms_current_amps and bms_capacity_charge_ratio."""
return UPS.from_dict({
"bms_current_amps": {
"help": "Current",
"raw_value": current_amps,
"value": str(current_amps),
"units": "A",
},
"bms_capacity_charge_ratio": {
"help": "Charge Ratio",
"raw_value": charge_ratio,
"value": str(charge_ratio),
"units": "%",
},
})
class TestUpsMain:
"""Test ups.main() loop behavior by running a limited number of iterations."""
def _run_main_with_data(self, data_sequence, argv=None, extra_patches=None):
"""Run main() with a sequence of UPS data, stopping when data runs out."""
call_count = 0
def _read_data(*args, **kwargs):
nonlocal call_count
if call_count >= len(data_sequence):
raise StopIteration("done")
result = data_sequence[call_count]
call_count += 1
return result
patches = {
"bmspy.ups.client.read_data": _read_data,
"bmspy.ups.client.handle_registration": MagicMock(),
"bmspy.ups.time.sleep": MagicMock(),
"bmspy.ups.os.system": MagicMock(),
}
if extra_patches:
patches.update(extra_patches)
with patch("sys.argv", ["bmspy-ups"] + (argv or [])):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=_read_data), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.os.system"):
ups_mod.main()
return call_count
def _make_data(self, current_amps, charge_ratio):
return {"testups": _make_ups(current_amps, charge_ratio)}
def test_main_history_pruned_at_10(self):
"""History is pruned to max 10 items."""
# Send 15 readings to ensure pruning happens
data_seq = [self._make_data(0.0, 0.80)] * 15
with patch("sys.argv", ["bmspy-ups"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown"), \
patch("bmspy.ups.handle_email"):
ups_mod.main()
def test_main_runs_with_not_enough_history(self):
"""With < 4 readings, should print 'not enough readings' and continue."""
data = [self._make_data(0.0, 0.95)] * 5
self._run_main_with_data(data)
def test_main_debug_2_prints_not_enough_readings(self, capsys):
data = [self._make_data(0.0, 0.95)] * 5
with patch("sys.argv", ["bmspy-ups", "-v", "-v"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data + [StopIteration]),\
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.os.system"):
ups_mod.main()
def test_main_below_critical_threshold_triggers_shutdown(self):
"""When charge drops below critical threshold, shutdown should be triggered."""
# Need 5+ readings (>3 in history) to trigger comparisons
ups_mod.scheduled_shutdown = False
ups_mod.critical_sent = False
data_seq = []
# Populate history with 5 readings all at 20% (below 30% critical)
for _ in range(5):
data_seq.append(self._make_data(0.0, 0.20))
mock_shutdown = MagicMock()
mock_email = MagicMock()
with patch("sys.argv", ["bmspy-ups", "--critical", "30"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown", mock_shutdown), \
patch("bmspy.ups.handle_email", mock_email):
ups_mod.main()
mock_shutdown.assert_called()
def test_main_below_warning_threshold_sends_email(self):
"""When charge drops below warning threshold, email alert should be sent."""
ups_mod.scheduled_shutdown = False
ups_mod.warning_sent = False
data_seq = []
# 5 readings at 60% (below 75% warning, above 30% critical)
for _ in range(5):
data_seq.append(self._make_data(0.0, 0.60))
mock_email = MagicMock()
with patch("sys.argv", ["bmspy-ups", "--warning", "75", "--critical", "30"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown"), \
patch("bmspy.ups.handle_email", mock_email):
ups_mod.main()
mock_email.assert_called()
def test_main_discharge_alert(self):
"""When current goes negative, discharge alert should be sent.
Discharge alert requires: current < 0 AND h1 < 0 AND h2 >= 0.
After 5 reads with seq [+1, -1, +1, +1, -1]:
h=[+1,-1,+1,+1,-1], current=-1, h1=-1, h2=+1 → triggers alert.
"""
ups_mod.alert_sent = False
data_seq = [
self._make_data(1.0, 0.90), # r1: +1
self._make_data(-1.0, 0.89), # r2: -1 (will be h1)
self._make_data(1.0, 0.90), # r3: +1 (will be h2)
self._make_data(1.0, 0.90), # r4: +1
self._make_data(-1.0, 0.88), # r5: -1 (current) → alert!
]
mock_email = MagicMock()
with patch("sys.argv", ["bmspy-ups"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown"), \
patch("bmspy.ups.handle_email", mock_email):
ups_mod.main()
mock_email.assert_called()
def test_main_power_regained_alert(self):
"""When current goes from negative to positive, recovery alert sent.
Recovery requires: current >= 0 AND h1 >= 0 AND h2 < 0.
After 5 reads with seq [-1, +1, -1, +1, +1]:
h=[-1,+1,-1,+1,+1], current=+1, h1=+1, h2=-1 → recovery!
"""
ups_mod.alert_sent = True
ups_mod.scheduled_shutdown = False
data_seq = [
self._make_data(-1.0, 0.80), # r1: -1
self._make_data(1.0, 0.81), # r2: +1 (will be h1)
self._make_data(-1.0, 0.79), # r3: -1 (will be h2)
self._make_data(1.0, 0.81), # r4: +1
self._make_data(1.0, 0.82), # r5: +1 (current) → recovery!
]
mock_email = MagicMock()
mock_shutdown = MagicMock()
with patch("sys.argv", ["bmspy-ups"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown", mock_shutdown), \
patch("bmspy.ups.handle_email", mock_email):
ups_mod.main()
mock_email.assert_called()
def test_main_not_enough_history_debug_2(self, capsys):
"""With debug=2 and < 4 readings, prints 'not enough readings'."""
data_seq = [self._make_data(0.0, 0.95)] * 5
with patch("sys.argv", ["bmspy-ups", "-v", "-v"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown"), \
patch("bmspy.ups.handle_email"):
ups_mod.main()
captured = capsys.readouterr()
assert "not enough readings" in captured.out.lower()
def test_main_below_warning_debug_1(self, capsys):
"""With debug=1, warning message is printed."""
ups_mod.warning_sent = False
data_seq = [self._make_data(0.0, 0.60)] * 5
with patch("sys.argv", ["bmspy-ups", "-v", "--warning", "75", "--critical", "30"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown"), \
patch("bmspy.ups.handle_email"):
ups_mod.main()
captured = capsys.readouterr()
assert "warning" in captured.out.lower()
def test_main_discharge_alert_debug_1(self, capsys):
"""With debug=1, discharge alert message is printed."""
ups_mod.alert_sent = False
data_seq = [
self._make_data(1.0, 0.90),
self._make_data(-1.0, 0.89),
self._make_data(1.0, 0.90),
self._make_data(1.0, 0.90),
self._make_data(-1.0, 0.88),
]
with patch("sys.argv", ["bmspy-ups", "-v"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown"), \
patch("bmspy.ups.handle_email"):
ups_mod.main()
captured = capsys.readouterr()
assert "discharging" in captured.out.lower()
def test_main_power_regained_debug_1(self, capsys):
"""With debug=1, power regained message is printed."""
ups_mod.alert_sent = True
data_seq = [
self._make_data(-1.0, 0.80),
self._make_data(1.0, 0.81),
self._make_data(-1.0, 0.79),
self._make_data(1.0, 0.81),
self._make_data(1.0, 0.82),
]
with patch("sys.argv", ["bmspy-ups", "-v"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown"), \
patch("bmspy.ups.handle_email"):
ups_mod.main()
captured = capsys.readouterr()
assert "power regained" in captured.out.lower()
def test_main_debug_2_prints_current_and_capacity(self, capsys):
"""With debug=2, current and capacity are printed."""
data_seq = [self._make_data(1.0, 0.80)] * 5
with patch("sys.argv", ["bmspy-ups", "-v", "-v"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown"), \
patch("bmspy.ups.handle_email"):
ups_mod.main()
captured = capsys.readouterr()
assert "current" in captured.out.lower() or "capacity" in captured.out.lower()
def test_main_debug_1_prints_thresholds(self, capsys):
"""With debug=1 and below threshold, threshold messages are printed."""
ups_mod.critical_sent = False
data_seq = [self._make_data(0.0, 0.20)] * 5
with patch("sys.argv", ["bmspy-ups", "-v", "--critical", "30"]):
with pytest.raises(StopIteration):
with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \
patch("bmspy.ups.client.handle_registration"), \
patch("bmspy.ups.time.sleep"), \
patch("bmspy.ups.handle_shutdown"), \
patch("bmspy.ups.handle_email"):
ups_mod.main()
captured = capsys.readouterr()
assert "critical" in captured.out.lower() or "threshold" in captured.out.lower()
+26
View File
@@ -0,0 +1,26 @@
from bmspy.utilities import debugger
class TestDebugger:
def test_prints_message(self, capsys):
debugger("hello world")
captured = capsys.readouterr()
assert "hello world" in captured.out
def test_includes_timestamp(self, capsys):
debugger("test")
captured = capsys.readouterr()
# timestamp is prepended
assert "test" in captured.out
assert len(captured.out.strip()) > len("test")
def test_pretty_true(self, capsys):
debugger({"key": "value"}, pretty=True)
captured = capsys.readouterr()
# pretty-printed output includes the data key
assert "key" in captured.out
def test_pretty_false_no_pprint(self, capsys):
debugger("simple message", pretty=False)
captured = capsys.readouterr()
assert "simple message" in captured.out