593 lines
25 KiB
Python
593 lines
25 KiB
Python
import pytest
|
|
import smtplib
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import bmspy.ups as ups_mod
|
|
from bmspy.classes import BMSScalarField, BMSMultiField, BMSInfoField, UPS
|
|
from bmspy.ups import _get_field_value, _resolve_ups_device, handle_shutdown, handle_email
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures: reset module-level globals between tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_globals():
|
|
ups_mod.scheduled_shutdown = False
|
|
ups_mod.critical_sent = False
|
|
ups_mod.warning_sent = False
|
|
ups_mod.alert_sent = False
|
|
yield
|
|
ups_mod.scheduled_shutdown = False
|
|
ups_mod.critical_sent = False
|
|
ups_mod.warning_sent = False
|
|
ups_mod.alert_sent = False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _get_field_value
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetFieldValue:
|
|
def _ups(self):
|
|
return UPS.from_dict({
|
|
"bms_current_amps": {
|
|
"help": "Current",
|
|
"raw_value": -2.5,
|
|
"value": "-2.50",
|
|
"units": "A",
|
|
},
|
|
"bms_capacity_charge_ratio": {
|
|
"help": "Percent Charge",
|
|
"raw_value": 0.75,
|
|
"value": "0.75",
|
|
"units": "%",
|
|
},
|
|
"bms_date": {
|
|
"help": "Date",
|
|
"info": "2023-01-15",
|
|
},
|
|
"bms_cells": {
|
|
"help": "Cells",
|
|
"label": "cell",
|
|
"raw_values": {1: 3.6},
|
|
"values": {1: "3.600"},
|
|
"units": "V",
|
|
},
|
|
})
|
|
|
|
def test_found_scalar_field(self):
|
|
ups = self._ups()
|
|
assert _get_field_value(ups, "bms_current_amps") == pytest.approx(-2.5)
|
|
|
|
def test_found_charge_ratio(self):
|
|
ups = self._ups()
|
|
assert _get_field_value(ups, "bms_capacity_charge_ratio") == pytest.approx(0.75)
|
|
|
|
def test_field_not_found_returns_none(self):
|
|
ups = self._ups()
|
|
assert _get_field_value(ups, "nonexistent_field") is None
|
|
|
|
def test_info_field_returns_none(self):
|
|
# BMSInfoField is not a scalar, should return None
|
|
ups = self._ups()
|
|
assert _get_field_value(ups, "bms_date") is None
|
|
|
|
def test_multi_field_returns_none(self):
|
|
# BMSMultiField is not a scalar, should return None
|
|
ups = self._ups()
|
|
assert _get_field_value(ups, "bms_cells") is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# handle_shutdown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHandleShutdown:
|
|
def test_shutdown_calls_os_system(self):
|
|
with patch("bmspy.ups.os.system") as mock_sys:
|
|
handle_shutdown(action="shutdown", delay=5)
|
|
mock_sys.assert_called_once_with("/sbin/shutdown 5")
|
|
|
|
def test_shutdown_sets_scheduled_flag(self):
|
|
with patch("bmspy.ups.os.system"):
|
|
handle_shutdown(action="shutdown", delay=5)
|
|
assert ups_mod.scheduled_shutdown is not False
|
|
|
|
def test_shutdown_does_not_reschedule_if_already_scheduled(self):
|
|
ups_mod.scheduled_shutdown = 9999999999.0
|
|
with patch("bmspy.ups.os.system") as mock_sys:
|
|
handle_shutdown(action="shutdown", delay=5)
|
|
mock_sys.assert_not_called()
|
|
|
|
def test_cancel_calls_os_system(self):
|
|
with patch("bmspy.ups.os.system") as mock_sys:
|
|
handle_shutdown(action="cancel")
|
|
mock_sys.assert_called_once_with("/sbin/shutdown -c")
|
|
|
|
def test_cancel_does_not_set_flag(self):
|
|
with patch("bmspy.ups.os.system"):
|
|
handle_shutdown(action="cancel")
|
|
assert ups_mod.scheduled_shutdown is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# handle_email
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHandleEmail:
|
|
def _mock_smtp(self):
|
|
mock_server = MagicMock()
|
|
mock_smtp_class = MagicMock()
|
|
mock_smtp_class.return_value.__enter__ = MagicMock(return_value=mock_server)
|
|
mock_smtp_class.return_value.__exit__ = MagicMock(return_value=False)
|
|
return mock_smtp_class, mock_server
|
|
|
|
def test_basic_send(self):
|
|
mock_smtp_class, mock_server = self._mock_smtp()
|
|
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class):
|
|
handle_email(
|
|
text="test message",
|
|
level="Alert",
|
|
recipient="user@example.com",
|
|
)
|
|
mock_server.sendmail.assert_called_once()
|
|
|
|
def test_ssl_port_triggers_starttls(self):
|
|
mock_smtp_class, mock_server = self._mock_smtp()
|
|
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \
|
|
patch("bmspy.ups.ssl.create_default_context") as mock_ssl:
|
|
handle_email(
|
|
text="test",
|
|
level="Alert",
|
|
recipient="user@example.com",
|
|
port=465,
|
|
)
|
|
mock_server.starttls.assert_called_once()
|
|
|
|
def test_port_587_also_triggers_starttls(self):
|
|
mock_smtp_class, mock_server = self._mock_smtp()
|
|
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \
|
|
patch("bmspy.ups.ssl.create_default_context"):
|
|
handle_email(
|
|
text="test",
|
|
level="Alert",
|
|
recipient="user@example.com",
|
|
port=587,
|
|
)
|
|
mock_server.starttls.assert_called_once()
|
|
|
|
def test_no_ssl_no_starttls(self):
|
|
mock_smtp_class, mock_server = self._mock_smtp()
|
|
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class):
|
|
handle_email(
|
|
text="test",
|
|
level="Alert",
|
|
recipient="user@example.com",
|
|
port=25,
|
|
)
|
|
mock_server.starttls.assert_not_called()
|
|
|
|
def test_with_credentials_calls_login(self):
|
|
mock_smtp_class, mock_server = self._mock_smtp()
|
|
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class):
|
|
handle_email(
|
|
text="test",
|
|
level="Alert",
|
|
recipient="user@example.com",
|
|
mailuser="user",
|
|
mailpass="pass",
|
|
)
|
|
mock_server.login.assert_called_once_with("user", "pass")
|
|
|
|
def test_without_credentials_no_login(self):
|
|
mock_smtp_class, mock_server = self._mock_smtp()
|
|
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class):
|
|
handle_email(
|
|
text="test",
|
|
level="Alert",
|
|
recipient="user@example.com",
|
|
)
|
|
mock_server.login.assert_not_called()
|
|
|
|
def test_recipient_without_at_gets_hostname_appended(self):
|
|
mock_smtp_class, mock_server = self._mock_smtp()
|
|
with patch("bmspy.ups.smtplib.SMTP", mock_smtp_class), \
|
|
patch("bmspy.ups.socket.gethostname", return_value="myhost"):
|
|
handle_email(
|
|
text="test",
|
|
level="Alert",
|
|
recipient="root",
|
|
)
|
|
# sendmail recipient should be root@myhost
|
|
args = mock_server.sendmail.call_args[0]
|
|
assert args[1] == "root@myhost"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _resolve_ups_device
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _simple_ups_data(*names: str) -> dict[str, UPS]:
|
|
return {name: UPS.from_dict({
|
|
"bms_current_amps": {"help": "A", "raw_value": 0.0, "value": "0.00", "units": "A"},
|
|
}) for name in names}
|
|
|
|
|
|
class TestResolveUpsDevice:
|
|
def test_single_device_no_request_returns_it(self):
|
|
data = _simple_ups_data("myups")
|
|
assert _resolve_ups_device(data, None) == "myups"
|
|
|
|
def test_requested_device_found(self):
|
|
data = _simple_ups_data("ups1", "ups2")
|
|
assert _resolve_ups_device(data, "ups2") == "ups2"
|
|
|
|
def test_requested_device_not_found_returns_none(self, capsys):
|
|
data = _simple_ups_data("ups1")
|
|
result = _resolve_ups_device(data, "missing")
|
|
assert result is None
|
|
assert "missing" in capsys.readouterr().out
|
|
|
|
def test_multiple_devices_no_request_returns_none(self, capsys):
|
|
data = _simple_ups_data("ups1", "ups2")
|
|
result = _resolve_ups_device(data, None)
|
|
assert result is None
|
|
out = capsys.readouterr().out
|
|
assert "ups1" in out or "ups2" in out
|
|
|
|
def test_empty_data_returns_none(self, capsys):
|
|
result = _resolve_ups_device({}, None)
|
|
assert result is None
|
|
assert "no UPS" in capsys.readouterr().out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ups main() - comprehensive loop testing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_ups(current_amps: float, charge_ratio: float) -> UPS:
|
|
"""Build a UPS with bms_current_amps and bms_capacity_charge_ratio."""
|
|
return UPS.from_dict({
|
|
"bms_current_amps": {
|
|
"help": "Current",
|
|
"raw_value": current_amps,
|
|
"value": str(current_amps),
|
|
"units": "A",
|
|
},
|
|
"bms_capacity_charge_ratio": {
|
|
"help": "Charge Ratio",
|
|
"raw_value": charge_ratio,
|
|
"value": str(charge_ratio),
|
|
"units": "%",
|
|
},
|
|
})
|
|
|
|
|
|
class TestUpsMain:
|
|
"""Test ups.main() loop behavior by running a limited number of iterations."""
|
|
|
|
def _run_main_with_data(self, data_sequence, argv=None):
|
|
"""Run main() with a sequence of UPS data, stopping when data runs out.
|
|
|
|
The first call to read_data is the device-discovery call and returns
|
|
data_sequence[0]. Subsequent calls consume data_sequence in order.
|
|
"""
|
|
# discovery call returns data_sequence[0], then loop calls follow
|
|
discovery_done = [False]
|
|
loop_count = [0]
|
|
|
|
def _read_data(*args, **kwargs):
|
|
if not discovery_done[0]:
|
|
discovery_done[0] = True
|
|
return data_sequence[0]
|
|
if loop_count[0] >= len(data_sequence):
|
|
raise StopIteration("done")
|
|
result = data_sequence[loop_count[0]]
|
|
loop_count[0] += 1
|
|
return result
|
|
|
|
with patch("sys.argv", ["bmspy-ups"] + (argv or [])):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=_read_data), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.os.system"):
|
|
ups_mod.main()
|
|
return loop_count[0]
|
|
|
|
def _make_data(self, current_amps, charge_ratio):
|
|
return {"testups": _make_ups(current_amps, charge_ratio)}
|
|
|
|
def test_main_history_pruned_at_10(self):
|
|
"""History is pruned to max 10 items."""
|
|
# Send 15 readings to ensure pruning happens
|
|
data_seq = [self._make_data(0.0, 0.80)] * 15
|
|
|
|
with patch("sys.argv", ["bmspy-ups"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown"), \
|
|
patch("bmspy.ups.handle_email"):
|
|
ups_mod.main()
|
|
|
|
def test_main_runs_with_not_enough_history(self):
|
|
"""With < 4 readings, should print 'not enough readings' and continue."""
|
|
data = [self._make_data(0.0, 0.95)] * 5
|
|
self._run_main_with_data(data)
|
|
|
|
def test_main_debug_2_prints_not_enough_readings(self, capsys):
|
|
data = [self._make_data(0.0, 0.95)] * 5
|
|
with patch("sys.argv", ["bmspy-ups", "-v", "-v"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data[0]] + data + [StopIteration]),\
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.os.system"):
|
|
ups_mod.main()
|
|
|
|
def test_main_below_critical_threshold_triggers_shutdown(self):
|
|
"""When charge drops below critical threshold, shutdown should be triggered."""
|
|
# Need 5+ readings (>3 in history) to trigger comparisons
|
|
ups_mod.scheduled_shutdown = False
|
|
ups_mod.critical_sent = False
|
|
data_seq = []
|
|
# Populate history with 5 readings all at 20% (below 30% critical)
|
|
for _ in range(5):
|
|
data_seq.append(self._make_data(0.0, 0.20))
|
|
|
|
mock_shutdown = MagicMock()
|
|
mock_email = MagicMock()
|
|
with patch("sys.argv", ["bmspy-ups", "--critical", "30"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown", mock_shutdown), \
|
|
patch("bmspy.ups.handle_email", mock_email):
|
|
ups_mod.main()
|
|
mock_shutdown.assert_called()
|
|
|
|
def test_main_below_warning_threshold_sends_email(self):
|
|
"""When charge drops below warning threshold, email alert should be sent."""
|
|
ups_mod.scheduled_shutdown = False
|
|
ups_mod.warning_sent = False
|
|
data_seq = []
|
|
# 5 readings at 60% (below 75% warning, above 30% critical)
|
|
for _ in range(5):
|
|
data_seq.append(self._make_data(0.0, 0.60))
|
|
|
|
mock_email = MagicMock()
|
|
with patch("sys.argv", ["bmspy-ups", "--warning", "75", "--critical", "30"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown"), \
|
|
patch("bmspy.ups.handle_email", mock_email):
|
|
ups_mod.main()
|
|
mock_email.assert_called()
|
|
|
|
def test_main_discharge_alert(self):
|
|
"""When current goes negative, discharge alert should be sent.
|
|
|
|
Discharge alert requires: current < 0 AND h1 < 0 AND h2 >= 0.
|
|
After 5 reads with seq [+1, -1, +1, +1, -1]:
|
|
h=[+1,-1,+1,+1,-1], current=-1, h1=-1, h2=+1 → triggers alert.
|
|
"""
|
|
ups_mod.alert_sent = False
|
|
data_seq = [
|
|
self._make_data(1.0, 0.90), # r1: +1
|
|
self._make_data(-1.0, 0.89), # r2: -1 (will be h1)
|
|
self._make_data(1.0, 0.90), # r3: +1 (will be h2)
|
|
self._make_data(1.0, 0.90), # r4: +1
|
|
self._make_data(-1.0, 0.88), # r5: -1 (current) → alert!
|
|
]
|
|
|
|
mock_email = MagicMock()
|
|
with patch("sys.argv", ["bmspy-ups"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown"), \
|
|
patch("bmspy.ups.handle_email", mock_email):
|
|
ups_mod.main()
|
|
mock_email.assert_called()
|
|
|
|
def test_main_power_regained_alert(self):
|
|
"""When current goes from negative to positive, recovery alert sent.
|
|
|
|
Recovery requires: current >= 0 AND h1 >= 0 AND h2 < 0.
|
|
After 5 reads with seq [-1, +1, -1, +1, +1]:
|
|
h=[-1,+1,-1,+1,+1], current=+1, h1=+1, h2=-1 → recovery!
|
|
"""
|
|
ups_mod.alert_sent = True
|
|
ups_mod.scheduled_shutdown = False
|
|
data_seq = [
|
|
self._make_data(-1.0, 0.80), # r1: -1
|
|
self._make_data(1.0, 0.81), # r2: +1 (will be h1)
|
|
self._make_data(-1.0, 0.79), # r3: -1 (will be h2)
|
|
self._make_data(1.0, 0.81), # r4: +1
|
|
self._make_data(1.0, 0.82), # r5: +1 (current) → recovery!
|
|
]
|
|
|
|
mock_email = MagicMock()
|
|
mock_shutdown = MagicMock()
|
|
with patch("sys.argv", ["bmspy-ups"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown", mock_shutdown), \
|
|
patch("bmspy.ups.handle_email", mock_email):
|
|
ups_mod.main()
|
|
mock_email.assert_called()
|
|
|
|
def test_main_not_enough_history_debug_2(self, capsys):
|
|
"""With debug=2 and < 4 readings, prints 'not enough readings'."""
|
|
data_seq = [self._make_data(0.0, 0.95)] * 5
|
|
with patch("sys.argv", ["bmspy-ups", "-v", "-v"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown"), \
|
|
patch("bmspy.ups.handle_email"):
|
|
ups_mod.main()
|
|
captured = capsys.readouterr()
|
|
assert "not enough readings" in captured.out.lower()
|
|
|
|
def test_main_below_warning_debug_1(self, capsys):
|
|
"""With debug=1, warning message is printed."""
|
|
ups_mod.warning_sent = False
|
|
data_seq = [self._make_data(0.0, 0.60)] * 5
|
|
with patch("sys.argv", ["bmspy-ups", "-v", "--warning", "75", "--critical", "30"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown"), \
|
|
patch("bmspy.ups.handle_email"):
|
|
ups_mod.main()
|
|
captured = capsys.readouterr()
|
|
assert "warning" in captured.out.lower()
|
|
|
|
def test_main_discharge_alert_debug_1(self, capsys):
|
|
"""With debug=1, discharge alert message is printed."""
|
|
ups_mod.alert_sent = False
|
|
data_seq = [
|
|
self._make_data(1.0, 0.90),
|
|
self._make_data(-1.0, 0.89),
|
|
self._make_data(1.0, 0.90),
|
|
self._make_data(1.0, 0.90),
|
|
self._make_data(-1.0, 0.88),
|
|
]
|
|
with patch("sys.argv", ["bmspy-ups", "-v"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown"), \
|
|
patch("bmspy.ups.handle_email"):
|
|
ups_mod.main()
|
|
captured = capsys.readouterr()
|
|
assert "discharging" in captured.out.lower()
|
|
|
|
def test_main_power_regained_debug_1(self, capsys):
|
|
"""With debug=1, power regained message is printed."""
|
|
ups_mod.alert_sent = True
|
|
data_seq = [
|
|
self._make_data(-1.0, 0.80),
|
|
self._make_data(1.0, 0.81),
|
|
self._make_data(-1.0, 0.79),
|
|
self._make_data(1.0, 0.81),
|
|
self._make_data(1.0, 0.82),
|
|
]
|
|
with patch("sys.argv", ["bmspy-ups", "-v"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown"), \
|
|
patch("bmspy.ups.handle_email"):
|
|
ups_mod.main()
|
|
captured = capsys.readouterr()
|
|
assert "power regained" in captured.out.lower()
|
|
|
|
def test_main_debug_2_prints_current_and_capacity(self, capsys):
|
|
"""With debug=2, current and capacity are printed."""
|
|
data_seq = [self._make_data(1.0, 0.80)] * 5
|
|
with patch("sys.argv", ["bmspy-ups", "-v", "-v"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown"), \
|
|
patch("bmspy.ups.handle_email"):
|
|
ups_mod.main()
|
|
captured = capsys.readouterr()
|
|
assert "current" in captured.out.lower() or "capacity" in captured.out.lower()
|
|
|
|
def test_main_debug_1_prints_thresholds(self, capsys):
|
|
"""With debug=1 and below threshold, threshold messages are printed."""
|
|
ups_mod.critical_sent = False
|
|
data_seq = [self._make_data(0.0, 0.20)] * 5
|
|
with patch("sys.argv", ["bmspy-ups", "-v", "--critical", "30"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data", side_effect=[data_seq[0]] + data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown"), \
|
|
patch("bmspy.ups.handle_email"):
|
|
ups_mod.main()
|
|
captured = capsys.readouterr()
|
|
assert "critical" in captured.out.lower() or "threshold" in captured.out.lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# main() — device selection errors and --device flag
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUpsMainDeviceSelection:
|
|
def _make_data(self, name, current_amps=0.0, charge_ratio=0.8):
|
|
return {name: _make_ups(current_amps, charge_ratio)}
|
|
|
|
def test_main_no_devices_returns_early(self, capsys):
|
|
"""main() exits cleanly when no devices are found."""
|
|
with patch("sys.argv", ["bmspy-ups"]), \
|
|
patch("bmspy.ups.client.read_data", return_value={}), \
|
|
patch("bmspy.ups.client.handle_registration"):
|
|
ups_mod.main()
|
|
assert "no UPS" in capsys.readouterr().out
|
|
|
|
def test_main_multiple_devices_no_flag_returns_early(self, capsys):
|
|
"""main() exits with an error when multiple devices exist and --device is not set."""
|
|
two_devices = {
|
|
"ups1": _make_ups(0.0, 0.9),
|
|
"ups2": _make_ups(0.0, 0.8),
|
|
}
|
|
with patch("sys.argv", ["bmspy-ups"]), \
|
|
patch("bmspy.ups.client.read_data", return_value=two_devices), \
|
|
patch("bmspy.ups.client.handle_registration"):
|
|
ups_mod.main()
|
|
out = capsys.readouterr().out
|
|
assert "ups1" in out or "ups2" in out
|
|
|
|
def test_main_ups_flag_selects_device(self):
|
|
"""--device selects the correct device from multiple available ones."""
|
|
data_seq = [self._make_data("ups2")] * 6
|
|
with patch("sys.argv", ["bmspy-ups", "--device", "ups2"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data",
|
|
side_effect=[{"ups1": _make_ups(0.0, 0.9), "ups2": _make_ups(0.0, 0.8)}]
|
|
+ data_seq + [StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep"), \
|
|
patch("bmspy.ups.handle_shutdown"), \
|
|
patch("bmspy.ups.handle_email"):
|
|
ups_mod.main()
|
|
|
|
def test_main_ups_flag_unknown_device_returns_early(self, capsys):
|
|
"""--device with an unknown device name exits with an error."""
|
|
with patch("sys.argv", ["bmspy-ups", "--device", "ghost"]), \
|
|
patch("bmspy.ups.client.read_data", return_value={"ups1": _make_ups(0.0, 0.9)}), \
|
|
patch("bmspy.ups.client.handle_registration"):
|
|
ups_mod.main()
|
|
assert "ghost" in capsys.readouterr().out
|
|
|
|
def test_main_device_disappears_in_loop_sleeps(self):
|
|
"""When the named device is missing from a loop response, main() sleeps and retries."""
|
|
initial = {"testups": _make_ups(0.0, 0.8)}
|
|
# Second call returns empty dict (device gone), third raises StopIteration
|
|
with patch("sys.argv", ["bmspy-ups"]):
|
|
with pytest.raises(StopIteration):
|
|
with patch("bmspy.ups.client.read_data",
|
|
side_effect=[initial, {}, StopIteration("done")]), \
|
|
patch("bmspy.ups.client.handle_registration"), \
|
|
patch("bmspy.ups.time.sleep") as mock_sleep:
|
|
ups_mod.main()
|
|
mock_sleep.assert_called()
|