Skip to content

Instantly share code, notes, and snippets.

@clusterfudge
Created February 15, 2018 17:28
Show Gist options
  • Save clusterfudge/3fe40f7f55e8a2a80c65dbd8e8d6764e to your computer and use it in GitHub Desktop.
Save clusterfudge/3fe40f7f55e8a2a80c65dbd8e8d6764e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# Author: Sean Fitzgerald
# Date: 2018/2/5
import socket
import google.cloud.happybase as hb
from google.cloud import bigtable as bt
import sys
import os.path
import logging
logging.basicConfig(level=logging.DEBUG)
CHECKPOINT_FILE = "/tmp/bigtable_backfill.checkpoint"
def create_socket(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
return s
def relay_batch(metrics, sock):
for metric in metrics:
sock.send(metric)
def relay_messages(source_project, source_db, source_table_name, dest_carbon_ip):
client = bt.Client(project=source_project, admin=True)
instance = client.instance(source_db)
connection = hb.Connection(instance=instance)
table = connection.table(source_table_name)
last_key = None
if os.path.exists(CHECKPOINT_FILE):
f = open(CHECKPOINT_FILE, 'r')
last_key = f.read()
f.close()
while True:
cursor = table.scan(row_start=last_key) if last_key else table.scan()
try:
key_count = 0
for key, row in cursor:
s = create_socket(dest_carbon_ip, 2003)
batch = []
metric_name = key.rsplit(":", 1)[0]
for col in row.keys():
ts = col.split(":")[1]
val = row[col]
carbon_msg = "%s %s %s\n" % (metric_name, str(val), str(ts))
batch.append(carbon_msg)
relay_batch(batch, s)
last_key = key
key_count += 1
if key_count % 100 == 0 and key_count > 0:
print("Refreshing checkpoint...")
f = open(CHECKPOINT_FILE, 'w')
f.write(last_key)
f.close()
raise Exception("Checkpoint...")
break
except Exception as e:
logging.exception("Failed to write batch")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment