Skip to content

Instantly share code, notes, and snippets.

@danielrichman
Last active July 5, 2020 00:18
Show Gist options
  • Save danielrichman/107cec7190c9111b0e8b5c7e68bcb84c to your computer and use it in GitHub Desktop.
Save danielrichman/107cec7190c9111b0e8b5c7e68bcb84c to your computer and use it in GitHub Desktop.
habitat database rebuild notes

Instructions

from __future__ import print_function
import sys
import json
def load(name):
with open(name) as f:
blob = json.load(f)
for row in blob["rows"]:
doc = row["doc"]
del doc["_rev"]
yield doc
flights = list(load("flights"))
payload_configurations = list(load("payload-configuration"))
print("Flights", len(flights), file=sys.stderr)
print("Payload configuration", len(payload_configurations), file=sys.stderr)
flights = [x for x in flights if x["approved"] and x["start"] >= "2018"]
print("Recent approved flights", len(flights), file=sys.stderr)
config_ids = set()
config_ids.add("8c36ab528b16cb3adf00b7e07a228854") # radiosonde override doc
for fl in flights:
for pc in fl["payloads"]:
config_ids.add(pc)
print("Referenced", len(config_ids), file=sys.stderr)
for pc in payload_configurations:
if pc["time_created"] >= "2020-06":
config_ids.add(pc["_id"])
print("+recent", len(config_ids), file=sys.stderr)
payload_configurations = [x for x in payload_configurations if x["_id"] in config_ids]
assert len(config_ids) == len(payload_configurations)
bulk_upload = {"docs": (payload_configurations + flights)}
print("Printing bulk-upload json blob to stdout", file=sys.stderr)
json.dump(bulk_upload, sys.stdout)
{
"_id": "_design/ept",
"lists": {
"json": "habitat_export_payload_telemetry.lists.json_list|4",
"csv": "habitat_export_payload_telemetry.lists.csv_list|6",
"kml": "habitat_export_payload_telemetry.lists.kml_list|6"
},
"language": "cnp_habitat"
}
{
"_id": "_design/flight",
"validate_doc_update": "habitat.views.flight.validate|1",
"language": "cnp_habitat",
"views": {
"end_start_including_payloads": {
"map": "habitat.views.flight.end_start_including_payloads_map|2"
},
"unapproved_name_including_payloads": {
"map": "habitat.views.flight.unapproved_name_including_payloads_map|1"
},
"launch_time_including_payloads": {
"map": "habitat.views.flight.launch_time_including_payloads_map|2"
},
"all_name_time_created": {
"map": "habitat.views.flight.all_name_map|1"
}
}
}
{
"_id": "_design/habitat",
"validate_doc_update": "habitat.views.habitat.validate|1",
"language": "cnp_habitat"
}
{
"_id": "_design/listener_information",
"validate_doc_update": "habitat.views.listener_information.validate|1",
"language": "cnp_habitat",
"views": {
"callsign_time_created": {
"map": "habitat.views.listener_information.callsign_time_created_map|1"
},
"time_created_callsign": {
"map": "habitat.views.listener_information.time_created_callsign_map|1"
}
}
}
{
"_id": "_design/listener_telemetry",
"validate_doc_update": "habitat.views.listener_telemetry.validate|1",
"language": "cnp_habitat",
"views": {
"callsign_time_created": {
"map": "habitat.views.listener_telemetry.callsign_time_created_map|1"
},
"time_created_callsign": {
"map": "habitat.views.listener_telemetry.time_created_callsign_map|1"
}
}
}
{
"_id": "_design/parser",
"filters": {
"unparsed": "habitat.views.parser.unparsed_filter|1"
},
"language": "cnp_habitat"
}
{
"_id": "_design/payload_configuration",
"validate_doc_update": "habitat.views.payload_configuration.validate|1",
"language": "cnp_habitat",
"views": {
"callsign_time_created_index": {
"map": "habitat.views.payload_configuration.callsign_time_created_index_map|1"
},
"name_time_created": {
"map": "habitat.views.payload_configuration.name_time_created_map|1"
}
}
}
{
"_id": "_design/payload_telemetry",
"language": "cnp_habitat",
"views": {
"payload_time": {
"map": "habitat.views.payload_telemetry.payload_time_map|1"
},
"flight_payload_time": {
"map": "habitat.views.payload_telemetry.flight_payload_time_map|1"
},
"time": {
"map": "habitat.views.payload_telemetry.time_map|1"
}
},
"updates": {
"add_listener": "habitat.views.payload_telemetry.add_listener_update|1",
"http_post": "habitat.views.payload_telemetry.http_post_update|3"
},
"validate_doc_update": "habitat.views.payload_telemetry.validate|2"
}
{
"_id": "_design/payload_telemetry_stats",
"language": "cnp_habitat",
"views": {
"time_uploaded_day": {
"map": "habitat_payload_telemetry_stats.time_uploaded_day_map|2",
"reduce": "_sum"
},
"launch_time": {
"map": "habitat_payload_telemetry_stats.launch_time_map|1"
},
"flight_receiver": {
"map": "habitat_payload_telemetry_stats.flight_receiver_map|1",
"reduce": "_sum"
},
"receiver": {
"map": "habitat_payload_telemetry_stats.receiver_map|1",
"reduce": "_sum"
}
}
}
{
"_id": "_design/spacenearus",
"filters": {
"spacenear": "habitat.views.spacenear.spacenear_filter|1"
},
"language": "cnp_habitat"
}
from __future__ import print_function
import sys
import time
import gzip
import json
try:
from urllib.requests import urlopen
from urllib.parse import urlencode
except:
from urllib import urlopen, urlencode
def one_request(first_doc, limit, database, expect_total_rows=None, expect_response_num_rows=None, retry_delay=1):
args = {"limit": str(limit), "include_docs": "true"}
if first_doc is not None:
args["startkey"] = json.dumps(first_doc["_id"])
args = urlencode(args)
url = "http://localhost:5984/" + database + "/_all_docs?" + args
while True:
try:
url_file = urlopen(url)
try:
response = json.load(url_file)
finally:
url_file.close()
total_rows = response["total_rows"]
if expect_total_rows is not None and total_rows != expect_total_rows:
raise Exception("Total rows mismatch", url, expect_total_rows, total_rows)
response_rows = response["rows"]
response_num_rows = len(response_rows)
if expect_response_num_rows is not None and response_num_rows != expect_response_num_rows:
raise Exception("Unexpected response size", url, expect_response_num_rows, response_num_rows)
for row in response_rows:
if "doc" not in row or "_id" not in row["doc"] or row["id"] != row["doc"]["_id"]:
raise Exception("row sanity check failed", row)
if first_doc is not None and (response_rows == [] or response_rows[0]["doc"] != first_doc):
raise Exception("first_doc did not match", url, first_doc, response_rows[0])
return {"total_rows": total_rows, "docs": [x["doc"] for x in response_rows]}
except Exception as exn:
print("Request failed, retrying in 1s", exn, file=sys.stderr)
time.sleep(retry_delay)
def get_all(batch_size, database, batch_delay):
first_response = one_request(first_doc=None, limit=1, database=database)
database_rows = first_response["total_rows"]
last_doc = first_response["docs"][0]
yield last_doc
retrieved_rows = 1
while retrieved_rows < database_rows:
remaining_rows = database_rows - retrieved_rows
expect_rows = min(batch_size, remaining_rows + 1)
batch_start = time.time()
response = one_request(
first_doc=last_doc,
limit=batch_size,
database=database,
expect_total_rows=database_rows,
expect_response_num_rows=expect_rows,
retry_delay=batch_delay)
batch_stop = time.time()
new_docs = response["docs"][1:]
for doc in new_docs:
yield doc
assert len(new_docs) == expect_rows - 1
last_doc = new_docs[-1]
retrieved_rows += len(new_docs)
progress = float(retrieved_rows) / database_rows * 100
batch_time = batch_stop - batch_start
print("{:02.2f}%".format(progress), retrieved_rows, database_rows, "{:.1f}s".format(batch_time))
time.sleep(batch_delay)
with gzip.open("all-docs.json.gz", "w") as output:
for doc in get_all(batch_size=1001, database="habitat_old", batch_delay=1):
output.write(json.dumps(doc) + "\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment