Skip to content

Instantly share code, notes, and snippets.

@RafhaanShah
Last active January 29, 2022 17:05
Show Gist options
  • Save RafhaanShah/60a7348c215254e12cc92ffd05a6f287 to your computer and use it in GitHub Desktop.
Save RafhaanShah/60a7348c215254e12cc92ffd05a6f287 to your computer and use it in GitHub Desktop.
Google Location History -> OwnTracks Importer
#!/usr/bin/env python3
# forked from https://github.com/owntracks/tools/blob/master/Google/import-location-history.py
# docs: https://owntracks.org/booklet/tech/json/
# replace:
# hostname/ip
# topic user device
# filename
# TID tracker ID
import argparse
import json
import time
from paho.mqtt import client, publish
class ProtocolAction(argparse.Action):
def __call__(self, parser, namespace, value, option_string=None):
setattr(namespace, self.dest, getattr(client, value))
parser = argparse.ArgumentParser(description='Import Google Location History into OwnTracks')
# REPLACE with actual hostname for mqtt
parser.add_argument('-H', '--host', default='SERVER', help='MQTT host (localhost)')
parser.add_argument('-p', '--port', type=int, default=1883, help='MQTT port (1883)')
parser.add_argument('--protocol', action=ProtocolAction, default=client.MQTTv31, help='MQTT protocol (MQTTv31)')
parser.add_argument('--cacerts', help='Path to files containing trusted CA certificates')
parser.add_argument('--cert', help='Path to file containing TLS client certificate')
parser.add_argument('--key', help='Path to file containing TLS client private key')
parser.add_argument('--tls-version', help='TLS protocol version')
parser.add_argument('--ciphers', help='List of TLS ciphers')
parser.add_argument('-u', '--username', help='MQTT username')
parser.add_argument('-P', '--password', help='MQTT password')
parser.add_argument('-i', '--clientid', help='MQTT client-ID')
# REPLACE USER and DEVICE with correct user and device
parser.add_argument('-t', '--topic', default='owntracks/USER/DEVICE', help='MQTT topic')
#parser.add_argument('filename', help='Path to file containing JSON-formatted data from Google Location History exported by Google Takeout')
args = parser.parse_args()
print("Starting...")
print(time.ctime())
messages = []
count = 0
print("Reading data...")
# REPLACE with actual file name
with open("history.json") as lh:
last_time = 0
last_lat = 0.0
last_lon = 0.0
lh_data = json.load(lh)
for location in lh_data['locations']:
location_keys = location.keys()
# REPLACE tid with 2 letter tracker ID
payload = {
'_type': 'location',
'tid': 'ID'
}
# modified to filter out the following
# - if any of timestamp, latitude, longitude, accuracy are missing
# - if timestamp is the same as the last record
# - if the both latitude and logitude are the same as the last record
# - any points where the accuracy is worse than 100 meters
# - any points where both latitude and longitude are 0.0
# for iOS, it's 'timestamp' instead, like: 2017-01-01T13:19:48.250Z
# if 'timestamp' in location_keys:
# date_string = location['timestamp']
# if '.' not in date_string:
# date_string = date_string[:-1] + '.0Z' # sometimes the .0 is missing smh
# date_obj = datetime.datetime.strptime(date_string, '%Y-%m-%dT%H:%M:%S.%fZ')
# payload['tst'] = int(date_obj.timestamp())
# else:
# continue
if 'timestampMs' in location_keys:
payload['tst'] = int(location['timestampMs']) // 1000
else:
continue
if 'latitudeE7' in location_keys:
payload['lat'] = location['latitudeE7'] / 10000000
else:
continue
if 'longitudeE7' in location_keys:
payload['lon'] = location['longitudeE7'] / 10000000
else:
continue
if 'accuracy' in location_keys:
payload['acc'] = location['accuracy']
else:
continue
if payload['tst'] == last_time:
continue
if payload['acc'] > 100:
continue
if payload['lat'] == last_lat and payload['lon'] == last_lon:
continue
if payload['lat'] == 0.0 and payload['lon'] == 0.0:
continue
last_time = payload['tst']
last_lat = payload['lat']
last_lon = payload['lon']
count = count + 1
messages.append(
{
'topic': args.topic,
'payload': json.dumps(payload),
'qos': 2
}
)
del lh_data
if args.username != None:
auth={
'username': args.username,
'password': args.password
}
else:
auth = None
if args.cacerts != None:
tls = {
'ca_certs': args.cacerts,
'certfile': args.cert,
'keyfile': args.key,
'tls_version': args.tls_version,
'ciphers': args.ciphers
}
else:
tls = None
print(time.ctime())
print("Publishing...")
publish.multiple(
messages,
hostname=args.host,
port=args.port,
client_id=args.clientid,
auth=auth,
tls=tls
)
print(time.ctime())
print("Done, records:")
print(count)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment