169 lines
6.3 KiB
Python
169 lines
6.3 KiB
Python
import atexit, datetime, os, sys, time
|
|
from influxdb_client import InfluxDBClient, Point
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
from bmspy import client
|
|
|
|
DAEMON_UPDATE_PERIOD = 30
|
|
|
|
''' Close remote connections '''
|
|
def influx_shutdown(influxclient):
|
|
if influxclient is not None:
|
|
influxclient.close()
|
|
|
|
def writeapi_shutdown(writeapi):
|
|
if writeapi is not None:
|
|
writeapi.close()
|
|
|
|
|
|
def influxdb_export(bucket, url=None, org=None, token=None, socket_path=None, daemonize=True, debug=0):
|
|
data = dict()
|
|
|
|
if url:
|
|
influxclient = InfluxDBClient(url=url, token=token, org=org)
|
|
else:
|
|
influxclient = InfluxDBClient.from_env_properties()
|
|
atexit.register(influx_shutdown, influxclient)
|
|
|
|
if daemonize:
|
|
while True:
|
|
# collect new data, delay, and start again
|
|
data = client.read_data(socket_path, 'influxdb')
|
|
influxdb_write_snapshot(influxclient, bucket, data, debug)
|
|
time.sleep(DAEMON_UPDATE_PERIOD)
|
|
else:
|
|
data = client.read_data(socket_path, 'influxdb')
|
|
influxdb_write_snapshot(influxclient, bucket, data, debug)
|
|
|
|
influxclient.close()
|
|
atexit.unregister(influx_shutdown)
|
|
|
|
return
|
|
|
|
def influxdb_write_snapshot(influxclient, bucket, data, debug=0):
|
|
writeapi = influxclient.write_api(write_options=SYNCHRONOUS)
|
|
atexit.register(writeapi_shutdown, writeapi)
|
|
|
|
# Populate the data structure this period
|
|
if debug > 1:
|
|
print("influxdb: creating snapshot")
|
|
points = influxdb_create_snapshot(data, debug)
|
|
if debug > 1:
|
|
print("influxdb: writing snapshot")
|
|
try:
|
|
writeapi.write(bucket=bucket, record=points)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
writeapi.close()
|
|
atexit.unregister(writeapi_shutdown)
|
|
|
|
return
|
|
|
|
def influxdb_create_snapshot(data, debug=0):
|
|
points = []
|
|
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
|
|
|
|
for kind, contains in data.items():
|
|
# discard bmspy metadata
|
|
if kind == 'client' or kind == 'timestamp':
|
|
break
|
|
|
|
helpmsg = ''
|
|
if contains.get('help'):
|
|
helpmsg = contains.get('help')
|
|
units = None
|
|
if contains.get('units'):
|
|
units = contains.get('units')
|
|
# Simple values
|
|
value = None
|
|
if contains.get('raw_value') is not None:
|
|
value = contains.get('raw_value')
|
|
if debug > 2:
|
|
print("value: {} : {}".format(kind, value));
|
|
point = Point(kind) \
|
|
.tag("units", units) \
|
|
.tag("help", helpmsg) \
|
|
.field("value", value) \
|
|
.time(now)
|
|
points.append(point)
|
|
# Doesn't have a value, but multiple values, each with a label:
|
|
label = None
|
|
if contains.get('raw_values') is not None and isinstance(contains.get('raw_values'), dict):
|
|
label = contains.get('label')
|
|
for idx, label_value in contains.get('raw_values').items():
|
|
if debug > 2:
|
|
print("labels: {} [{}] : {}".format(kind, idx, label_value));
|
|
point = Point(kind) \
|
|
.tag(label, idx) \
|
|
.tag("units", units) \
|
|
.tag("help", helpmsg) \
|
|
.field("value", label_value) \
|
|
.time(now)
|
|
points.append(point)
|
|
# Information (like a manufacturing date or a serial number)
|
|
if contains.get('info') is not None:
|
|
value = contains.get('info')
|
|
if debug > 2:
|
|
print("info: {} : {}".format(kind, value));
|
|
point = Point(kind) \
|
|
.tag("units", units) \
|
|
.tag("help", helpmsg) \
|
|
.field("value", value) \
|
|
.time(now)
|
|
points.append(point)
|
|
else:
|
|
pass
|
|
|
|
return points
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Query JBD BMS and report status',
|
|
add_help=True,
|
|
)
|
|
parser.add_argument('--bucket', '-b', dest='influx_bucket', type=str, action='store',
|
|
default="ups", help='Set the bucket name when sending data to influxdb (defaults to "ups")')
|
|
parser.add_argument('--url', '-u', dest='influx_url', type=str, action='store',
|
|
default=False, help='Set the URL when sending data to influxdb (overrides INFLUXDB environment variables)')
|
|
parser.add_argument('--org', '-o', dest='influx_org', type=str, action='store',
|
|
default=False, help='Set the influx organization when sending data to influxdb (overrides INFLUXDB environment variables)')
|
|
parser.add_argument('--token', '-t', dest='influx_token', type=str, action='store',
|
|
default=False, help='Set the influx token when sending data to influxdb (overrides INFLUXDB environment variables)')
|
|
parser.add_argument('--socket', '-s', dest='socket', action='store',
|
|
default='/run/bmspy/bms', help='Socket to communicate with daemon')
|
|
parser.add_argument('--verbose', '-v', action='count',
|
|
default=0, help='Print more verbose information (can be specified multiple times)')
|
|
args = parser.parse_args()
|
|
|
|
debug=args.verbose
|
|
|
|
try:
|
|
if not os.getenv('INFLUXDB_V2_URL') and not args.influx_url:
|
|
raise argparse.ArgumentTypeError('Missing value for --url')
|
|
if not os.getenv('INFLUXDB_V2_ORG') and not args.influx_org:
|
|
raise argparse.ArgumentTypeError('Missing value for --url')
|
|
if not os.getenv('INFLUXDB_V2_TOKEN') and not args.influx_token:
|
|
raise argparse.ArgumentTypeError('Missing value for --token')
|
|
except Exception as e:
|
|
print("bmspy-influxdb: {}".format(e))
|
|
sys.exit(1)
|
|
|
|
print("Running BMS influxdb daemon on socket {}".format(args.socket))
|
|
|
|
client.handle_registration(args.socket, 'influxdb', debug)
|
|
atexit.register(client.handle_registration, args.socket, 'influxdb', debug)
|
|
|
|
influxdb_export(bucket=args.influx_bucket, \
|
|
url=args.influx_url, \
|
|
org=args.influx_org, \
|
|
token=args.influx_token, \
|
|
socket_path=args.socket, \
|
|
daemonize=True, \
|
|
debug=debug)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|