Skip to content

Instantly share code, notes, and snippets.

@fayak
Created January 19, 2020 16:23
Show Gist options
  • Save fayak/cf25e3705e23a16b48d0ef302b3e209a to your computer and use it in GitHub Desktop.
Save fayak/cf25e3705e23a16b48d0ef302b3e209a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
#config.py
import json
from elasticsearch import Elasticsearch
es = Elasticsearch(
["127.0.0.1"],
http_auth=None, # http_auth=('user', 'secret')
scheme="http",
port=9200
)
INDEX_PREFIX = "robot"
MAPPING = json.loads("""
{
"settings" : {
"index" : {
"number_of_shards" : 1,
"number_of_replicas" : 0
}
},
"mappings" : {
"properties" : {
"date" : {
"type" : "date",
"format": "epoch_millis"
},
"x_p" : {
"type" : "double"
},
"z_p" : {
"type" : "double"
},
"y_p" : {
"type" : "double"
},
"x" : {
"type" : "double"
},
"y" : {
"type" : "double"
},
"z" : {
"type" : "double"
}
}
}
}
""")
#autrechose.py
#!/usr/bin/env python3
from datetime import datetime
import json
import hashlib
import sys
from config import es
from config import INDEX_PREFIX
from config import MAPPING
MESSAGES = {}
def process_json_file(path):
i = 1578682930342
with open(path, "r") as f:
lines = f.readlines()
for line in lines:
msg = json.loads(line.replace("'", '"'))
print(msg)
msg["date"] = i + int(msg["timestamp"])
MESSAGES[str(msg["date"])] = msg
def create_index():
try:
es.indices.delete(index=INDEX_PREFIX + "-conv")
pass
except:
pass
try:
r = es.indices.create(index=INDEX_PREFIX + '-conv', body=MAPPING, )
except:
pass
def bulk_send_messages():
global MESSAGES
def send(body):
if body == "":
return
r = es.bulk(body=body)
#print(r)
i = 0
body = ""
for key, val in MESSAGES.items():
body += '{"index":{"_index":"' + INDEX_PREFIX + '-conv","_id":"' + key + '"}}\n'
body += json.dumps(val) + "\n"
i += 1
if i == 1000:
i = 0
send(body)
body = ""
send(body)
create_index()
for path in sys.argv[1:]:
process_json_file(path)
bulk_send_messages()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment