From 93a4c412c53a5647a158019f02b0de8e01588034 Mon Sep 17 00:00:00 2001 From: tim Date: Thu, 10 May 2018 22:50:14 +0200 Subject: [PATCH] Upload CSV files generated by glucometerutils to NightScout --- freestyle_to_nightscout.py | 382 +++++++++++++++++++++++++++++++++++++ 1 file changed, 382 insertions(+) create mode 100644 freestyle_to_nightscout.py diff --git a/freestyle_to_nightscout.py b/freestyle_to_nightscout.py new file mode 100644 index 0000000..f64560d --- /dev/null +++ b/freestyle_to_nightscout.py @@ -0,0 +1,382 @@ +#!/usr/bin/python3 + +import argparse +import csv +import datetime as dt +import dateutil.parser as dp +import dateutil.tz as tz +import hashlib +import json +import os +import re +import sys +import urllib.request +import uuid +import pprint + +''' Constants for units ''' +UNIT_MGDL = 'mg/dL' +UNIT_MMOLL = 'mmol/L' +VALID_UNITS = [UNIT_MGDL, UNIT_MMOLL] + +def main(): + if sys.version_info < (3, 2): + raise Exception( + 'Unsupported Python version, please use at least Python 3.2') + + pp = pprint.PrettyPrinter(depth=6) + + args = parse_arguments() + + secret_bytes = str.encode(args.secret) + secret_hash = hashlib.sha1(secret_bytes).hexdigest() + headers = { + 'API-SECRET': secret_hash, + 'accept': 'application/json', + 'Content-Type': 'application/json', + } + print(secret_hash) + + + for page in ( ): + #for page in ( '/status.json', '/entries.json', '/treatments.json' ): + #for page in ( '/entries.json?count=500', '/treatments.json?count=100' ): + url = args.server + page + #pp.pprint(url) + #pp.pprint(headers) + try: + req = urllib.request.Request(url, headers = headers) + with urllib.request.urlopen(req) as response: + status = response.read().decode('utf-8') + status_json = json.loads(status) + if re.match(page, '^/entries') is not None: + status_json.sort(key=lambda e: e['date']) + pp.pprint(status_json) + except urllib.error.HTTPError as err: + print('{}: URL {}'.format(err, url)) + + #sys.exit(1) + + with open(args.input_file, 'r', newline='') as f: + rows = from_csv(f) + + entries = list() + treatments = list() + for row in rows: + ''' FreeStyle Libre comments: + * measure_method: "CGM" (comment: Scan or Sensor) + "blood sample" (comment: Blood) + * comment: "Blood" = finger prick (with optional comments) + "Scan" = manual scan (with optional comments) + "Sensor" = automatic (never any comments) + * Note that we ignore ketone strips + + * Nightscout Entry type: + - sgv (Serum Glucose Values): "Sensor", no comment; "Scan", no comment + - mbg (Meter Blood Glucose): "Blood", no comment + - cal (Calibration): Ignore, never done on Freestyle + * Treatment glucoseType: + - Finger: "blood sample" with comment + - Sensor: "CGM" with comment (in effect, "Scan") + ''' + meal = None # This value always seems to be empty + (timestamp, datestring, created_at) = parse_timestamp(row, args.tz) + value = parse_value(row, args.units) + measure_method = parse_method(row) + comment = parse_comments(row) + + #''' Meal seems to be empty ''' + #meal = row.get('meal') + #if meal: + # print(meal) + + ''' Create Treatment or Entry ''' + if comment.get('type') == 'entry': + entry = { + 'device': 'FreeStyle Libre', + 'dateString': datestring, + 'date': timestamp, + 'noise': 1, + 'rssi': 100, + 'sysTime': datestring, + } + if measure_method != 'CGM': + entry['type'] = 'mbg' + entry['mbg'] = value + else: + entry['type'] = 'sgv' + entry['sgv'] = value + # We don't get raw data from the reader, so don't try and guess raw data; just ignore it + #entry['filtered'] = int(float(value) * 1000 / 8.5) # 'value' is already smoothed by the Libre Reader + #entry['unfiltered'] = int(float(value) * 1000 / 8.5) # 'value' is already smoothed by the Libre Reader + if comment.get('direction') is not None: + entry['direction'] = comment.get('direction') + entries.append(entry) + + elif comment.get('type') == 'treatment': + for t in ( 'long-acting', 'rapid-acting', 'food', 'sport' ): + if comment.get(t) is not None: + treatment = { + 'enteredBy': 'FreeStyle Libre', + 'eventType': '', + 'timestamp': timestamp, + 'created_at': created_at, + 'sysTime': datestring, + } + treatment['glucose'] = value + treatment['units'] = UNIT_MGDL + + if measure_method != 'CGM': + treatment['glucoseType'] = 'Finger' + else: + treatment['glucoseType'] = 'Sensor' + if t == 'food' and comment.get(t) is not None: + treatment['eventType'] = 'Carb Correction' + treatment['carbs'] = comment.get('food') + if t == 'sports' and comment.get(t) is not None: + treatment['eventType'] = 'Exercise' + if comment.get('sport') > 1: + treatment['duration'] = comment.get('sport') + else: + ''' Assume we're exercising for half an hour, since this value wasn't recorded ''' + treatment['duration'] = 30 + if t == 'long-acting' and comment.get(t) is not None: + treatment['eventType'] = 'Temp Basal' + treatment['absolute'] = comment.get('long-acting') + ''' Lantus/Levemir last 18-26 hours ''' + treatment['duration'] = 20 + if t == 'rapid-acting' and comment.get(t) is not None: + ''' xDrip leaves this blank; options are Meal Bolus, Snack Bolus, Correction Bolus, Combo Bolus ''' + treatment['eventType'] = '' + treatment['insulin'] = comment.get('rapid-acting') + treatments.append(treatment) + + ''' Ensure any subsequent entry is the next entry by date ''' + entries.sort(key=lambda d: d.get('date'), reverse=False) + for i, d in enumerate(entries): + if i < len(entries)-1: + (entries[i], entries[i+1]) = calculate_entry_delta(entries[i], entries[i+1]) + + pp.pprint(entries) + sys.exit(1) + + ''' Upload data (either treatment or entry) to Nightscout ''' + actions = { + #'/treatments': treatments, + #'/entries': entries, + } + for page, form in actions.items(): + ''' Upload data in groups of 100 entries, to avoid timeouts from large datasets ''' + for chunk in chunks(form, 100): + print(json.dumps(chunk)) + try: + url = args.server + page + print(url) + req = urllib.request.Request(url, headers = headers, data = bytes(json.dumps(chunk), encoding="utf-8")) + with urllib.request.urlopen(req) as response: + status = response.read().decode('utf-8') + status_json = json.loads(status) + pp.pprint(status_json) + print('Completed') + except urllib.error.HTTPError as err: + print('{}: URL {}'.format(err, url)) + return + + +def chunks(l, n): + """Yield successive n-sized chunks from l.""" + for i in range(0, len(l), n): + yield l[i:i + n] + +def calculate_entry_delta(first, last): + ''' Calculate delta and direction using relative values ''' + ''' Only judge directions and deltas for values that are within 30 minutes ''' + interval = 30 * 60 * 1000 + if first.get('sgv') is not None and last.get('sgv') is not None: + if last.get('date') - first.get('date') < interval and last.get('date') - first.get('date') > 1: + slope = ( last.get('sgv') - first.get('sgv') ) / ( last.get('date') - first.get('date') ) * 60000 + delta = last.get('sgv') - first.get('sgv') + #last['slope'] = '{:05.3f}'.format(slope) + last['delta'] = '{:05.3f}'.format(delta) + ''' direction: DoubleUp, SingleUp, FortyFiveUp, Flat, + FortyFiveDown, SingleDown, DoubleDown, + NOT_COMPUTABLE, RATE_OUT_OF_RANGE ''' + if last.get('direction') is None: + if slope <= -3.5: + last['direction'] = 'DoubleDown' + elif slope <= -2: + last['direction'] = 'SingleDown' + elif slope <= -1: + last['direction'] = 'FortyFiveDown' + elif slope <= 1: + last['direction'] = 'Flat' + elif slope <= 2: + last['direction'] = 'FortyFiveUp' + elif slope <= 3.5: + last['direction'] = 'SingleUp' + elif slope <= 40: + last['direction'] = 'DoubleUp' + return (first, last) + +def parse_timestamp(data, tz): + ''' Clean up timestamp ''' + timestamp = data.get('timestamp') + try: + date = dp.parse(timestamp).astimezone() + if date.tzinfo is None: + if tz is None: + tz = 'UTC' + date = dp.parse('{} {}'.format(timestamp, tz)).astimezone() + except ValueError: + raise ValueError('Invalid date or timezone: %s (should be of format %s)' % (data.get('timestamp'), '%Y-%m-%d %H:%M:%S')) + ''' Pad microseconds to 6 digits, then truncate to 3 significant digits''' + millisecond = '{:.03}'.format( '{:06d}'.format(date.microsecond)) + ''' the date element seems to need milliseconds (not microseconds) ''' + timestamp = int(str(date.strftime('%s')) + millisecond) + datestring = date.isoformat() + ''' the created_at element seems to need to end in Z (and thus be UTC) ''' + utcdatestring = date.astimezone(dt.timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') + return (timestamp, datestring, utcdatestring) + +def parse_value(data, units): + ''' Clean up value, convert to mg/dL, and return as float ''' + try: + value = float(data.get('value')) + ''' Convert the value from mmol/L to mg/dL ''' + value = convert_glucose_unit(value, units, UNIT_MGDL) + value = float(value) + except ValueError as err: + raise ValueError('Invalid glucose value: '.format(err)) + return value + +def parse_method(data): + ''' measure_method: ( 'blood sample', 'CGM' ) ''' + measure_method = data.get('measure_method') + return measure_method + +def parse_comments(data): + ''' comment: '(Sensor)|(Blood)|(Scan); Rapid-acting insulin (X.X); Long-acting insulin (X.X); Food (X g); Sport ' ''' + comment = { 'type': 'entry' } + rscantype = re.compile('\((Sensor|Scan|Blood|Ketone)\)', flags=re.IGNORECASE) + rrelevant = re.compile('(Sport|Food|Rapid-acting insulin|Long-acting insulin|Direction)(?: \((.*?)(?: g)?\))', flags=re.IGNORECASE) + for part in data.get('comment').split('; '): + + scantype = rscantype.search(part) + if scantype is not None: + if scantype.group(1) == 'Blood': + comment['reader'] = 'Blood' + elif scantype.group(1) == 'Sensor': + comment['reader'] = 'Sensor' + else: + comment['reader'] = 'Scan' + + relevant = rrelevant.search(part) + if relevant is not None: + comment_type = relevant.group(1) + comment_value = relevant.group(2) + if comment_type == 'Long-acting insulin': + comment['type'] = 'treatment' + comment['long-acting'] = comment_value + if comment_type == 'Rapid-acting insulin': + comment['type'] = 'treatment' + comment['rapid-acting'] = comment_value + if comment_type == 'Food': + comment['type'] = 'treatment' + comment['food'] = comment_value + if comment_type == 'Sport': + comment['type'] = 'treatment' + comment['sport'] = 1 + # Note that Direction may require hacking the CSV generator + ''' direction: DoubleUp, SingleUp, FortyFiveUp, Flat, + FortyFiveDown, SingleDown, DoubleDown, + NOT_COMPUTABLE, RATE_OUT_OF_RANGE ''' + if comment_type == 'Direction': + if comment_value == 'up-fast': + comment['direction'] = 'SingleUp' + elif comment_value == 'up': + comment['direction'] = 'FortyFiveUp' + elif comment_value == 'steady': + comment['direction'] = 'Flat' + elif comment_value == 'down': + comment['direction'] = 'FortyFiveDown' + elif comment_value == 'down-fast': + comment['direction'] = 'SingleDown' + return comment + +def verify_units(units = None): + ''' Standardise units for output and for the A1c calculations ''' + if re.search('mg', units, flags=re.IGNORECASE) is not None: + units = UNIT_MGDL + elif re.search('mm', units, flags=re.IGNORECASE) is not None: + units = UNIT_MMOLL + else: + units = UNIT_MMOLL + return units + +def convert_glucose_unit(value, from_unit, to_unit=None): + """Convert the given value of glucose level between units. + + Args: + value: The value of glucose in the current unit + from_unit: The unit value is currently expressed in + to_unit: The unit to conver the value to: the other if empty. + + Returns: + The converted representation of the blood glucose level. + + Raises: + ValueError if either unit is unvalid + """ + if from_unit not in VALID_UNITS: + raise ValueError("Invalid unit: {}".format(from_unit)) + + if to_unit not in VALID_UNITS: + raise ValueError("Invalid unit: {}".format(from_unit)) + + if from_unit == to_unit: + return value + + if to_unit is UNIT_MGDL: + return round(value * 18.0182, 0) + else: + return round(value / 18.0182, 2) + +def from_csv(csv_file, newline=''): + '''Returns the reading as a formatted comma-separated value string.''' + data = csv.reader(csv_file, delimiter=',', quotechar='"') + fields = [ 'timestamp', 'value', 'meal', 'measure_method', 'comment' ] + rows = [] + for row in data: + item = dict(zip(fields, row)) + rows.append(item) + return rows + +def parse_arguments(): + parser = argparse.ArgumentParser(description='Upload data from a Freestyle Libre CSV file into a Nightscout instance') + + parser.add_argument( + '--input', '-i', action='store', required=True, type=str, dest='input_file', + help='Select the CSV file exported by glucometerutils.') + parser.add_argument( + '--units', action='store', required=False, type=str, + default='mmol/L', choices=(UNIT_MGDL, UNIT_MMOLL), + help=('The measurement units used in the Freestyle Libre reader (mmol/L or mg/dL).')) + parser.add_argument( + '--timezone', '-tz', action='store', required=False, type=str, dest='tz', + default='UTC', + help=('The time zone for which the sensor is set to record dates.')) + parser.add_argument( + '--server', '-s', action='store', required=True, type=str, dest='server', + help='Nightscout server, of the form https:///api/v1/') + parser.add_argument( + '--secret', '-a', action='store', required=True, type=str, dest='secret', + help='API_SECRET for Nightscout server') + + args = parser.parse_args() + args.units = verify_units(args.units) + return args + +if __name__ == "__main__": + main() + +# vim: set expandtab shiftwidth=2 softtabstop=2 tw=0 :