diff --git a/bmspy/ups.py b/bmspy/ups.py index 7d59976..cea9ba0 100644 --- a/bmspy/ups.py +++ b/bmspy/ups.py @@ -9,6 +9,7 @@ import socket from collections import deque from bmspy import client +from bmspy.classes import UPS, BMSScalarField DAEMON_UPDATE_PERIOD = 30 @@ -18,6 +19,46 @@ warning_sent = False alert_sent = False +def _get_field_value(device: UPS, field_name: str) -> float | None: + """Return the raw_value of a named BMSScalarField from a UPS, or None if not found.""" + for name, field in device.items(): + if name == field_name and isinstance(field, BMSScalarField): + return field.raw_value + return None + + +def _resolve_ups_device(data: dict[str, UPS], requested: str | None) -> str | None: + """Return the device name to monitor, or None and print an error if the selection is invalid. + + Rules: + - No devices → error. + - requested is set and found → use it. + - requested is set but not found → error listing available devices. + - requested is None and exactly one device → use it. + - requested is None and multiple devices → error; user must specify one. + """ + if not data: + print("Error: no UPS devices found") + return None + if requested is not None: + if requested not in data: + print( + "Error: UPS '{}' not found. Available: {}".format( + requested, ", ".join(sorted(data)) + ) + ) + return None + return requested + if len(data) > 1: + print( + "Error: multiple UPS devices found, specify one with --device: {}".format( + ", ".join(sorted(data)) + ) + ) + return None + return next(iter(data)) + + def handle_shutdown(action: str = "cancel", delay: int = 0, debug: int = 0) -> None: """Schedule or cancel a system shutdown; only schedules once per incident.""" global scheduled_shutdown @@ -157,6 +198,14 @@ def main() -> None: default="/run/bmspy/bms", help="Socket to communicate with daemon", ) + parser.add_argument( + "--device", + dest="device", + action="store", + default=None, + metavar="NAME", + help="Name of the UPS device to monitor (required if multiple devices are available)", + ) parser.add_argument( "--verbose", "-v", @@ -173,10 +222,19 @@ def main() -> None: client.handle_registration(args.socket, "ups", debug) atexit.register(client.handle_registration, args.socket, "ups", debug) + initial_data = client.read_data(args.socket, "ups") + device_name = _resolve_ups_device(initial_data, args.device) + if device_name is None: + return + history = deque() while True: - data = client.read_data(args.socket, "ups") - history.append(data) + data = client.read_data(args.socket, "ups", device=device_name) + device_obj = data.get(device_name) + if device_obj is None: + time.sleep(DAEMON_UPDATE_PERIOD) + continue + history.append(device_obj) # Remove the oldest data from the history while len(history) > 10: @@ -192,19 +250,30 @@ def main() -> None: time.sleep(DAEMON_UPDATE_PERIOD) continue - current_amps = float(data["bms_current_amps"]["raw_value"]) - charge_ratio = float(data["bms_capacity_charge_ratio"]["raw_value"]) * 100 - comparison_1_current_amps = float(comparison_1["bms_current_amps"]["raw_value"]) + current_amps = float(_get_field_value(device_obj, "bms_current_amps") or 0.0) + charge_ratio = ( + float(_get_field_value(device_obj, "bms_capacity_charge_ratio") or 0.0) * 100 + ) + comparison_1_current_amps = float( + _get_field_value(comparison_1, "bms_current_amps") or 0.0 + ) comparison_1_charge_ratio = ( - float(comparison_1["bms_capacity_charge_ratio"]["raw_value"]) * 100 + float(_get_field_value(comparison_1, "bms_capacity_charge_ratio") or 0.0) + * 100 + ) + comparison_2_current_amps = float( + _get_field_value(comparison_2, "bms_current_amps") or 0.0 ) - comparison_2_current_amps = float(comparison_2["bms_current_amps"]["raw_value"]) comparison_2_charge_ratio = ( - float(comparison_2["bms_capacity_charge_ratio"]["raw_value"]) * 100 + float(_get_field_value(comparison_2, "bms_capacity_charge_ratio") or 0.0) + * 100 + ) + comparison_3_current_amps = float( + _get_field_value(comparison_3, "bms_current_amps") or 0.0 ) - comparison_3_current_amps = float(comparison_3["bms_current_amps"]["raw_value"]) comparison_3_charge_ratio = ( - float(comparison_3["bms_capacity_charge_ratio"]["raw_value"]) * 100 + float(_get_field_value(comparison_3, "bms_capacity_charge_ratio") or 0.0) + * 100 ) if debug > 1: diff --git a/tests/test_ups.py b/tests/test_ups.py index c7b0c8e..7cbfb25 100644 --- a/tests/test_ups.py +++ b/tests/test_ups.py @@ -4,7 +4,7 @@ from unittest.mock import patch, MagicMock import bmspy.ups as ups_mod from bmspy.classes import BMSScalarField, BMSMultiField, BMSInfoField, UPS -from bmspy.ups import _get_field_value, handle_shutdown, handle_email +from bmspy.ups import _get_field_value, _resolve_ups_device, handle_shutdown, handle_email # --------------------------------------------------------------------------- @@ -204,6 +204,44 @@ class TestHandleEmail: assert args[1] == "root@myhost" +# --------------------------------------------------------------------------- +# _resolve_ups_device +# --------------------------------------------------------------------------- + +def _simple_ups_data(*names: str) -> dict[str, UPS]: + return {name: UPS.from_dict({ + "bms_current_amps": {"help": "A", "raw_value": 0.0, "value": "0.00", "units": "A"}, + }) for name in names} + + +class TestResolveUpsDevice: + def test_single_device_no_request_returns_it(self): + data = _simple_ups_data("myups") + assert _resolve_ups_device(data, None) == "myups" + + def test_requested_device_found(self): + data = _simple_ups_data("ups1", "ups2") + assert _resolve_ups_device(data, "ups2") == "ups2" + + def test_requested_device_not_found_returns_none(self, capsys): + data = _simple_ups_data("ups1") + result = _resolve_ups_device(data, "missing") + assert result is None + assert "missing" in capsys.readouterr().out + + def test_multiple_devices_no_request_returns_none(self, capsys): + data = _simple_ups_data("ups1", "ups2") + result = _resolve_ups_device(data, None) + assert result is None + out = capsys.readouterr().out + assert "ups1" in out or "ups2" in out + + def test_empty_data_returns_none(self, capsys): + result = _resolve_ups_device({}, None) + assert result is None + assert "no UPS" in capsys.readouterr().out + + # --------------------------------------------------------------------------- # ups main() - comprehensive loop testing # --------------------------------------------------------------------------- @@ -229,27 +267,26 @@ def _make_ups(current_amps: float, charge_ratio: float) -> UPS: class TestUpsMain: """Test ups.main() loop behavior by running a limited number of iterations.""" - def _run_main_with_data(self, data_sequence, argv=None, extra_patches=None): - """Run main() with a sequence of UPS data, stopping when data runs out.""" - call_count = 0 + def _run_main_with_data(self, data_sequence, argv=None): + """Run main() with a sequence of UPS data, stopping when data runs out. + + The first call to read_data is the device-discovery call and returns + data_sequence[0]. Subsequent calls consume data_sequence in order. + """ + # discovery call returns data_sequence[0], then loop calls follow + discovery_done = [False] + loop_count = [0] def _read_data(*args, **kwargs): - nonlocal call_count - if call_count >= len(data_sequence): + if not discovery_done[0]: + discovery_done[0] = True + return data_sequence[0] + if loop_count[0] >= len(data_sequence): raise StopIteration("done") - result = data_sequence[call_count] - call_count += 1 + result = data_sequence[loop_count[0]] + loop_count[0] += 1 return result - patches = { - "bmspy.ups.client.read_data": _read_data, - "bmspy.ups.client.handle_registration": MagicMock(), - "bmspy.ups.time.sleep": MagicMock(), - "bmspy.ups.os.system": MagicMock(), - } - if extra_patches: - patches.update(extra_patches) - with patch("sys.argv", ["bmspy-ups"] + (argv or [])): with pytest.raises(StopIteration): with patch("bmspy.ups.client.read_data", side_effect=_read_data), \ @@ -257,7 +294,7 @@ class TestUpsMain: patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.os.system"): ups_mod.main() - return call_count + return loop_count[0] def _make_data(self, current_amps, charge_ratio): return {"testups": _make_ups(current_amps, charge_ratio)} @@ -269,7 +306,7 @@ class TestUpsMain: with patch("sys.argv", ["bmspy-ups"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ @@ -285,7 +322,7 @@ class TestUpsMain: data = [self._make_data(0.0, 0.95)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "-v"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data + [StopIteration]),\ + with patch("bmspy.ups.client.read_data", side_effect=[data[0]] + data + [StopIteration]),\ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.os.system"): @@ -305,7 +342,7 @@ class TestUpsMain: mock_email = MagicMock() with patch("sys.argv", ["bmspy-ups", "--critical", "30"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown", mock_shutdown), \ @@ -325,7 +362,7 @@ class TestUpsMain: mock_email = MagicMock() with patch("sys.argv", ["bmspy-ups", "--warning", "75", "--critical", "30"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ @@ -352,7 +389,7 @@ class TestUpsMain: mock_email = MagicMock() with patch("sys.argv", ["bmspy-ups"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ @@ -381,7 +418,7 @@ class TestUpsMain: mock_shutdown = MagicMock() with patch("sys.argv", ["bmspy-ups"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown", mock_shutdown), \ @@ -394,7 +431,7 @@ class TestUpsMain: data_seq = [self._make_data(0.0, 0.95)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "-v"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ @@ -409,7 +446,7 @@ class TestUpsMain: data_seq = [self._make_data(0.0, 0.60)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "--warning", "75", "--critical", "30"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ @@ -430,7 +467,7 @@ class TestUpsMain: ] with patch("sys.argv", ["bmspy-ups", "-v"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ @@ -451,7 +488,7 @@ class TestUpsMain: ] with patch("sys.argv", ["bmspy-ups", "-v"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ @@ -465,7 +502,7 @@ class TestUpsMain: data_seq = [self._make_data(1.0, 0.80)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "-v"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ @@ -480,7 +517,7 @@ class TestUpsMain: data_seq = [self._make_data(0.0, 0.20)] * 5 with patch("sys.argv", ["bmspy-ups", "-v", "--critical", "30"]): with pytest.raises(StopIteration): - with patch("bmspy.ups.client.read_data", side_effect=data_seq + [StopIteration("done")]), \ + with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \ patch("bmspy.ups.client.handle_registration"), \ patch("bmspy.ups.time.sleep"), \ patch("bmspy.ups.handle_shutdown"), \ @@ -488,3 +525,68 @@ class TestUpsMain: ups_mod.main() captured = capsys.readouterr() assert "critical" in captured.out.lower() or "threshold" in captured.out.lower() + + +# --------------------------------------------------------------------------- +# main() — device selection errors and --device flag +# --------------------------------------------------------------------------- + +class TestUpsMainDeviceSelection: + def _make_data(self, name, current_amps=0.0, charge_ratio=0.8): + return {name: _make_ups(current_amps, charge_ratio)} + + def test_main_no_devices_returns_early(self, capsys): + """main() exits cleanly when no devices are found.""" + with patch("sys.argv", ["bmspy-ups"]), \ + patch("bmspy.ups.client.read_data", return_value={}), \ + patch("bmspy.ups.client.handle_registration"): + ups_mod.main() + assert "no UPS" in capsys.readouterr().out + + def test_main_multiple_devices_no_flag_returns_early(self, capsys): + """main() exits with an error when multiple devices exist and --device is not set.""" + two_devices = { + "ups1": _make_ups(0.0, 0.9), + "ups2": _make_ups(0.0, 0.8), + } + with patch("sys.argv", ["bmspy-ups"]), \ + patch("bmspy.ups.client.read_data", return_value=two_devices), \ + patch("bmspy.ups.client.handle_registration"): + ups_mod.main() + out = capsys.readouterr().out + assert "ups1" in out or "ups2" in out + + def test_main_ups_flag_selects_device(self): + """--device selects the correct device from multiple available ones.""" + data_seq = [self._make_data("ups2")] * 6 + with patch("sys.argv", ["bmspy-ups", "--device", "ups2"]): + with pytest.raises(StopIteration): + with patch("bmspy.ups.client.read_data", + side_effect=[{"ups1": _make_ups(0.0, 0.9), "ups2": _make_ups(0.0, 0.8)}] + + data_seq + [StopIteration("done")]), \ + patch("bmspy.ups.client.handle_registration"), \ + patch("bmspy.ups.time.sleep"), \ + patch("bmspy.ups.handle_shutdown"), \ + patch("bmspy.ups.handle_email"): + ups_mod.main() + + def test_main_ups_flag_unknown_device_returns_early(self, capsys): + """--device with an unknown device name exits with an error.""" + with patch("sys.argv", ["bmspy-ups", "--device", "ghost"]), \ + patch("bmspy.ups.client.read_data", return_value={"ups1": _make_ups(0.0, 0.9)}), \ + patch("bmspy.ups.client.handle_registration"): + ups_mod.main() + assert "ghost" in capsys.readouterr().out + + def test_main_device_disappears_in_loop_sleeps(self): + """When the named device is missing from a loop response, main() sleeps and retries.""" + initial = {"testups": _make_ups(0.0, 0.8)} + # Second call returns empty dict (device gone), third raises StopIteration + with patch("sys.argv", ["bmspy-ups"]): + with pytest.raises(StopIteration): + with patch("bmspy.ups.client.read_data", + side_effect=[initial, {}, StopIteration("done")]), \ + patch("bmspy.ups.client.handle_registration"), \ + patch("bmspy.ups.time.sleep") as mock_sleep: + ups_mod.main() + mock_sleep.assert_called()