bmspy/bmspy/influxdb.py

169 lines
6.3 KiB
Python

import atexit, datetime, os, sys, time
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from bmspy import client
DAEMON_UPDATE_PERIOD = 30
''' Close remote connections '''
def influx_shutdown(influxclient):
if influxclient is not None:
influxclient.close()
def writeapi_shutdown(writeapi):
if writeapi is not None:
writeapi.close()
def influxdb_export(bucket, url=None, org=None, token=None, socket_path=None, daemonize=True, debug=0):
data = dict()
if url:
influxclient = InfluxDBClient(url=url, token=token, org=org)
else:
influxclient = InfluxDBClient.from_env_properties()
atexit.register(influx_shutdown, influxclient)
if daemonize:
while True:
# collect new data, delay, and start again
data = client.read_data(socket_path, 'influxdb')
influxdb_write_snapshot(influxclient, bucket, data, debug)
time.sleep(DAEMON_UPDATE_PERIOD)
else:
data = client.read_data(socket_path, 'influxdb')
influxdb_write_snapshot(influxclient, bucket, data, debug)
influxclient.close()
atexit.unregister(influx_shutdown)
return
def influxdb_write_snapshot(influxclient, bucket, data, debug=0):
writeapi = influxclient.write_api(write_options=SYNCHRONOUS)
atexit.register(writeapi_shutdown, writeapi)
# Populate the data structure this period
if debug > 1:
print("influxdb: creating snapshot")
points = influxdb_create_snapshot(data, debug)
if debug > 1:
print("influxdb: writing snapshot")
try:
writeapi.write(bucket=bucket, record=points)
except Exception as e:
print(e)
writeapi.close()
atexit.unregister(writeapi_shutdown)
return
def influxdb_create_snapshot(data, debug=0):
points = []
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
for kind, contains in data.items():
# discard bmspy metadata
if kind == 'client' or kind == 'timestamp':
break
helpmsg = ''
if contains.get('help'):
helpmsg = contains.get('help')
units = None
if contains.get('units'):
units = contains.get('units')
# Simple values
value = None
if contains.get('raw_value') is not None:
value = contains.get('raw_value')
if debug > 2:
print("value: {} : {}".format(kind, value));
point = Point(kind) \
.tag("units", units) \
.tag("help", helpmsg) \
.field("value", value) \
.time(now)
points.append(point)
# Doesn't have a value, but multiple values, each with a label:
label = None
if contains.get('raw_values') is not None and isinstance(contains.get('raw_values'), dict):
label = contains.get('label')
for idx, label_value in contains.get('raw_values').items():
if debug > 2:
print("labels: {} [{}] : {}".format(kind, idx, label_value));
point = Point(kind) \
.tag(label, idx) \
.tag("units", units) \
.tag("help", helpmsg) \
.field("value", label_value) \
.time(now)
points.append(point)
# Information (like a manufacturing date or a serial number)
if contains.get('info') is not None:
value = contains.get('info')
if debug > 2:
print("info: {} : {}".format(kind, value));
point = Point(kind) \
.tag("units", units) \
.tag("help", helpmsg) \
.field("value", value) \
.time(now)
points.append(point)
else:
pass
return points
def main():
import argparse
parser = argparse.ArgumentParser(
description='Query JBD BMS and report status',
add_help=True,
)
parser.add_argument('--bucket', '-b', dest='influx_bucket', type=str, action='store',
default="ups", help='Set the bucket name when sending data to influxdb (defaults to "ups")')
parser.add_argument('--url', '-u', dest='influx_url', type=str, action='store',
default=False, help='Set the URL when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--org', '-o', dest='influx_org', type=str, action='store',
default=False, help='Set the influx organization when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--token', '-t', dest='influx_token', type=str, action='store',
default=False, help='Set the influx token when sending data to influxdb (overrides INFLUXDB environment variables)')
parser.add_argument('--socket', '-s', dest='socket', action='store',
default='/run/bmspy/bms', help='Socket to communicate with daemon')
parser.add_argument('--verbose', '-v', action='count',
default=0, help='Print more verbose information (can be specified multiple times)')
args = parser.parse_args()
debug=args.verbose
try:
if not os.getenv('INFLUXDB_V2_URL') and not args.influx_url:
raise argparse.ArgumentTypeError('Missing value for --url')
if not os.getenv('INFLUXDB_V2_ORG') and not args.influx_org:
raise argparse.ArgumentTypeError('Missing value for --url')
if not os.getenv('INFLUXDB_V2_TOKEN') and not args.influx_token:
raise argparse.ArgumentTypeError('Missing value for --token')
except Exception as e:
print("bmspy-influxdb: {}".format(e))
sys.exit(1)
print("Running BMS influxdb daemon on socket {}".format(args.socket))
client.handle_registration(args.socket, 'influxdb', debug)
atexit.register(client.handle_registration, args.socket, 'influxdb', debug)
influxdb_export(bucket=args.influx_bucket, \
url=args.influx_url, \
org=args.influx_org, \
token=args.influx_token, \
socket_path=args.socket, \
daemonize=True, \
debug=debug)
if __name__ == "__main__":
main()