Upload CSV files generated by glucometerutils to NightScout

This commit is contained in:
Timothy Allen 2018-05-10 22:50:14 +02:00
parent 5ec4b75d18
commit 93a4c412c5

382
freestyle_to_nightscout.py Normal file
View File

@ -0,0 +1,382 @@
#!/usr/bin/python3
import argparse
import csv
import datetime as dt
import dateutil.parser as dp
import dateutil.tz as tz
import hashlib
import json
import os
import re
import sys
import urllib.request
import uuid
import pprint
''' Constants for units '''
UNIT_MGDL = 'mg/dL'
UNIT_MMOLL = 'mmol/L'
VALID_UNITS = [UNIT_MGDL, UNIT_MMOLL]
def main():
if sys.version_info < (3, 2):
raise Exception(
'Unsupported Python version, please use at least Python 3.2')
pp = pprint.PrettyPrinter(depth=6)
args = parse_arguments()
secret_bytes = str.encode(args.secret)
secret_hash = hashlib.sha1(secret_bytes).hexdigest()
headers = {
'API-SECRET': secret_hash,
'accept': 'application/json',
'Content-Type': 'application/json',
}
print(secret_hash)
for page in ( ):
#for page in ( '/status.json', '/entries.json', '/treatments.json' ):
#for page in ( '/entries.json?count=500', '/treatments.json?count=100' ):
url = args.server + page
#pp.pprint(url)
#pp.pprint(headers)
try:
req = urllib.request.Request(url, headers = headers)
with urllib.request.urlopen(req) as response:
status = response.read().decode('utf-8')
status_json = json.loads(status)
if re.match(page, '^/entries') is not None:
status_json.sort(key=lambda e: e['date'])
pp.pprint(status_json)
except urllib.error.HTTPError as err:
print('{}: URL {}'.format(err, url))
#sys.exit(1)
with open(args.input_file, 'r', newline='') as f:
rows = from_csv(f)
entries = list()
treatments = list()
for row in rows:
''' FreeStyle Libre comments:
* measure_method: "CGM" (comment: Scan or Sensor)
"blood sample" (comment: Blood)
* comment: "Blood" = finger prick (with optional comments)
"Scan" = manual scan (with optional comments)
"Sensor" = automatic (never any comments)
* Note that we ignore ketone strips
* Nightscout Entry type:
- sgv (Serum Glucose Values): "Sensor", no comment; "Scan", no comment
- mbg (Meter Blood Glucose): "Blood", no comment
- cal (Calibration): Ignore, never done on Freestyle
* Treatment glucoseType:
- Finger: "blood sample" with comment
- Sensor: "CGM" with comment (in effect, "Scan")
'''
meal = None # This value always seems to be empty
(timestamp, datestring, created_at) = parse_timestamp(row, args.tz)
value = parse_value(row, args.units)
measure_method = parse_method(row)
comment = parse_comments(row)
#''' Meal seems to be empty '''
#meal = row.get('meal')
#if meal:
# print(meal)
''' Create Treatment or Entry '''
if comment.get('type') == 'entry':
entry = {
'device': 'FreeStyle Libre',
'dateString': datestring,
'date': timestamp,
'noise': 1,
'rssi': 100,
'sysTime': datestring,
}
if measure_method != 'CGM':
entry['type'] = 'mbg'
entry['mbg'] = value
else:
entry['type'] = 'sgv'
entry['sgv'] = value
# We don't get raw data from the reader, so don't try and guess raw data; just ignore it
#entry['filtered'] = int(float(value) * 1000 / 8.5) # 'value' is already smoothed by the Libre Reader
#entry['unfiltered'] = int(float(value) * 1000 / 8.5) # 'value' is already smoothed by the Libre Reader
if comment.get('direction') is not None:
entry['direction'] = comment.get('direction')
entries.append(entry)
elif comment.get('type') == 'treatment':
for t in ( 'long-acting', 'rapid-acting', 'food', 'sport' ):
if comment.get(t) is not None:
treatment = {
'enteredBy': 'FreeStyle Libre',
'eventType': '<none>',
'timestamp': timestamp,
'created_at': created_at,
'sysTime': datestring,
}
treatment['glucose'] = value
treatment['units'] = UNIT_MGDL
if measure_method != 'CGM':
treatment['glucoseType'] = 'Finger'
else:
treatment['glucoseType'] = 'Sensor'
if t == 'food' and comment.get(t) is not None:
treatment['eventType'] = 'Carb Correction'
treatment['carbs'] = comment.get('food')
if t == 'sports' and comment.get(t) is not None:
treatment['eventType'] = 'Exercise'
if comment.get('sport') > 1:
treatment['duration'] = comment.get('sport')
else:
''' Assume we're exercising for half an hour, since this value wasn't recorded '''
treatment['duration'] = 30
if t == 'long-acting' and comment.get(t) is not None:
treatment['eventType'] = 'Temp Basal'
treatment['absolute'] = comment.get('long-acting')
''' Lantus/Levemir last 18-26 hours '''
treatment['duration'] = 20
if t == 'rapid-acting' and comment.get(t) is not None:
''' xDrip leaves this blank; options are Meal Bolus, Snack Bolus, Correction Bolus, Combo Bolus '''
treatment['eventType'] = '<none>'
treatment['insulin'] = comment.get('rapid-acting')
treatments.append(treatment)
''' Ensure any subsequent entry is the next entry by date '''
entries.sort(key=lambda d: d.get('date'), reverse=False)
for i, d in enumerate(entries):
if i < len(entries)-1:
(entries[i], entries[i+1]) = calculate_entry_delta(entries[i], entries[i+1])
pp.pprint(entries)
sys.exit(1)
''' Upload data (either treatment or entry) to Nightscout '''
actions = {
#'/treatments': treatments,
#'/entries': entries,
}
for page, form in actions.items():
''' Upload data in groups of 100 entries, to avoid timeouts from large datasets '''
for chunk in chunks(form, 100):
print(json.dumps(chunk))
try:
url = args.server + page
print(url)
req = urllib.request.Request(url, headers = headers, data = bytes(json.dumps(chunk), encoding="utf-8"))
with urllib.request.urlopen(req) as response:
status = response.read().decode('utf-8')
status_json = json.loads(status)
pp.pprint(status_json)
print('Completed')
except urllib.error.HTTPError as err:
print('{}: URL {}'.format(err, url))
return
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def calculate_entry_delta(first, last):
''' Calculate delta and direction using relative values '''
''' Only judge directions and deltas for values that are within 30 minutes '''
interval = 30 * 60 * 1000
if first.get('sgv') is not None and last.get('sgv') is not None:
if last.get('date') - first.get('date') < interval and last.get('date') - first.get('date') > 1:
slope = ( last.get('sgv') - first.get('sgv') ) / ( last.get('date') - first.get('date') ) * 60000
delta = last.get('sgv') - first.get('sgv')
#last['slope'] = '{:05.3f}'.format(slope)
last['delta'] = '{:05.3f}'.format(delta)
''' direction: DoubleUp, SingleUp, FortyFiveUp, Flat,
FortyFiveDown, SingleDown, DoubleDown,
NOT_COMPUTABLE, RATE_OUT_OF_RANGE '''
if last.get('direction') is None:
if slope <= -3.5:
last['direction'] = 'DoubleDown'
elif slope <= -2:
last['direction'] = 'SingleDown'
elif slope <= -1:
last['direction'] = 'FortyFiveDown'
elif slope <= 1:
last['direction'] = 'Flat'
elif slope <= 2:
last['direction'] = 'FortyFiveUp'
elif slope <= 3.5:
last['direction'] = 'SingleUp'
elif slope <= 40:
last['direction'] = 'DoubleUp'
return (first, last)
def parse_timestamp(data, tz):
''' Clean up timestamp '''
timestamp = data.get('timestamp')
try:
date = dp.parse(timestamp).astimezone()
if date.tzinfo is None:
if tz is None:
tz = 'UTC'
date = dp.parse('{} {}'.format(timestamp, tz)).astimezone()
except ValueError:
raise ValueError('Invalid date or timezone: %s (should be of format %s)' % (data.get('timestamp'), '%Y-%m-%d %H:%M:%S'))
''' Pad microseconds to 6 digits, then truncate to 3 significant digits'''
millisecond = '{:.03}'.format( '{:06d}'.format(date.microsecond))
''' the date element seems to need milliseconds (not microseconds) '''
timestamp = int(str(date.strftime('%s')) + millisecond)
datestring = date.isoformat()
''' the created_at element seems to need to end in Z (and thus be UTC) '''
utcdatestring = date.astimezone(dt.timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
return (timestamp, datestring, utcdatestring)
def parse_value(data, units):
''' Clean up value, convert to mg/dL, and return as float '''
try:
value = float(data.get('value'))
''' Convert the value from mmol/L to mg/dL '''
value = convert_glucose_unit(value, units, UNIT_MGDL)
value = float(value)
except ValueError as err:
raise ValueError('Invalid glucose value: '.format(err))
return value
def parse_method(data):
''' measure_method: ( 'blood sample', 'CGM' ) '''
measure_method = data.get('measure_method')
return measure_method
def parse_comments(data):
''' comment: '(Sensor)|(Blood)|(Scan); Rapid-acting insulin (X.X); Long-acting insulin (X.X); Food (X g); Sport ' '''
comment = { 'type': 'entry' }
rscantype = re.compile('\((Sensor|Scan|Blood|Ketone)\)', flags=re.IGNORECASE)
rrelevant = re.compile('(Sport|Food|Rapid-acting insulin|Long-acting insulin|Direction)(?: \((.*?)(?: g)?\))', flags=re.IGNORECASE)
for part in data.get('comment').split('; '):
scantype = rscantype.search(part)
if scantype is not None:
if scantype.group(1) == 'Blood':
comment['reader'] = 'Blood'
elif scantype.group(1) == 'Sensor':
comment['reader'] = 'Sensor'
else:
comment['reader'] = 'Scan'
relevant = rrelevant.search(part)
if relevant is not None:
comment_type = relevant.group(1)
comment_value = relevant.group(2)
if comment_type == 'Long-acting insulin':
comment['type'] = 'treatment'
comment['long-acting'] = comment_value
if comment_type == 'Rapid-acting insulin':
comment['type'] = 'treatment'
comment['rapid-acting'] = comment_value
if comment_type == 'Food':
comment['type'] = 'treatment'
comment['food'] = comment_value
if comment_type == 'Sport':
comment['type'] = 'treatment'
comment['sport'] = 1
# Note that Direction may require hacking the CSV generator
''' direction: DoubleUp, SingleUp, FortyFiveUp, Flat,
FortyFiveDown, SingleDown, DoubleDown,
NOT_COMPUTABLE, RATE_OUT_OF_RANGE '''
if comment_type == 'Direction':
if comment_value == 'up-fast':
comment['direction'] = 'SingleUp'
elif comment_value == 'up':
comment['direction'] = 'FortyFiveUp'
elif comment_value == 'steady':
comment['direction'] = 'Flat'
elif comment_value == 'down':
comment['direction'] = 'FortyFiveDown'
elif comment_value == 'down-fast':
comment['direction'] = 'SingleDown'
return comment
def verify_units(units = None):
''' Standardise units for output and for the A1c calculations '''
if re.search('mg', units, flags=re.IGNORECASE) is not None:
units = UNIT_MGDL
elif re.search('mm', units, flags=re.IGNORECASE) is not None:
units = UNIT_MMOLL
else:
units = UNIT_MMOLL
return units
def convert_glucose_unit(value, from_unit, to_unit=None):
"""Convert the given value of glucose level between units.
Args:
value: The value of glucose in the current unit
from_unit: The unit value is currently expressed in
to_unit: The unit to conver the value to: the other if empty.
Returns:
The converted representation of the blood glucose level.
Raises:
ValueError if either unit is unvalid
"""
if from_unit not in VALID_UNITS:
raise ValueError("Invalid unit: {}".format(from_unit))
if to_unit not in VALID_UNITS:
raise ValueError("Invalid unit: {}".format(from_unit))
if from_unit == to_unit:
return value
if to_unit is UNIT_MGDL:
return round(value * 18.0182, 0)
else:
return round(value / 18.0182, 2)
def from_csv(csv_file, newline=''):
'''Returns the reading as a formatted comma-separated value string.'''
data = csv.reader(csv_file, delimiter=',', quotechar='"')
fields = [ 'timestamp', 'value', 'meal', 'measure_method', 'comment' ]
rows = []
for row in data:
item = dict(zip(fields, row))
rows.append(item)
return rows
def parse_arguments():
parser = argparse.ArgumentParser(description='Upload data from a Freestyle Libre CSV file into a Nightscout instance')
parser.add_argument(
'--input', '-i', action='store', required=True, type=str, dest='input_file',
help='Select the CSV file exported by glucometerutils.')
parser.add_argument(
'--units', action='store', required=False, type=str,
default='mmol/L', choices=(UNIT_MGDL, UNIT_MMOLL),
help=('The measurement units used in the Freestyle Libre reader (mmol/L or mg/dL).'))
parser.add_argument(
'--timezone', '-tz', action='store', required=False, type=str, dest='tz',
default='UTC',
help=('The time zone for which the sensor is set to record dates.'))
parser.add_argument(
'--server', '-s', action='store', required=True, type=str, dest='server',
help='Nightscout server, of the form https://<server address>/api/v1/')
parser.add_argument(
'--secret', '-a', action='store', required=True, type=str, dest='secret',
help='API_SECRET for Nightscout server')
args = parser.parse_args()
args.units = verify_units(args.units)
return args
if __name__ == "__main__":
main()
# vim: set expandtab shiftwidth=2 softtabstop=2 tw=0 :