Skip to content

Instantly share code, notes, and snippets.

@deniszh
Last active February 16, 2023 03:39
Show Gist options
  • Save deniszh/7986974 to your computer and use it in GitHub Desktop.
Save deniszh/7986974 to your computer and use it in GitHub Desktop.
Quick and dirty script to migrate Graphite from whisper to cyanite
#!/usr/bin/env python
import os
import mmap
import struct
import signal
import optparse
import cql
try:
import whisper
except ImportError:
raise SystemExit('[ERROR] Please make sure whisper is installed properly')
# Ignore SIGPIPE
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
option_parser = optparse.OptionParser(usage='''%prog path''')
(options, args) = option_parser.parse_args()
if len(args) != 1:
option_parser.error("require one input file name")
else:
path = args[0]
def mmap_file(filename):
fd = os.open(filename, os.O_RDONLY)
map = mmap.mmap(fd, os.fstat(fd).st_size, prot=mmap.PROT_READ)
os.close(fd)
return map
def read_header(map):
try:
(aggregationType,maxRetention,xFilesFactor,archiveCount) = struct.unpack(whisper.metadataFormat,map[:whisper.metadataSize])
except:
raise CorruptWhisperFile("Unable to unpack header")
archives = []
archiveOffset = whisper.metadataSize
for i in xrange(archiveCount):
try:
(offset, secondsPerPoint, points) = struct.unpack(whisper.archiveInfoFormat, map[archiveOffset:archiveOffset+whisper.archiveInfoSize])
except:
raise CorruptWhisperFile("Unable to read archive %d metadata" % i)
archiveInfo = {
'offset' : offset,
'secondsPerPoint' : secondsPerPoint,
'points' : points,
'retention' : secondsPerPoint * points,
'size' : points * whisper.pointSize,
}
archives.append(archiveInfo)
archiveOffset += whisper.archiveInfoSize
header = {
'aggregationMethod' : whisper.aggregationTypeToMethod.get(aggregationType, 'average'),
'maxRetention' : maxRetention,
'xFilesFactor' : xFilesFactor,
'archives' : archives,
}
return header
def dump_header(header):
print 'Meta data:'
print ' aggregation method: %s' % header['aggregationMethod']
print ' max retention: %d' % header['maxRetention']
print ' xFilesFactor: %g' % header['xFilesFactor']
print
def dump_archive_headers(archives):
for i,archive in enumerate(archives):
print 'Archive %d info:' % i
print ' offset: %d' % archive['offset']
print ' seconds per point: %d' % archive['secondsPerPoint']
print ' points: %d' % archive['points']
print ' retention: %d' % archive['retention']
print ' size: %d' % archive['size']
print
def dump_archives(archives):
name = path.replace('/opt/graphite/storage/whisper/','',1)
name = name.replace('.wsp','',1)
name = name.replace('/', '.')
con = cql.connect('127.0.0.1', 9160, 'metric', cql_version='3.0.0')
print ("Connected to Cassandra!")
cursor = con.cursor()
for i,archive in enumerate(archives):
print 'Archive %d data:' %i
offset = archive['offset']
for point in xrange(archive['points']):
(timestamp, value) = struct.unpack(whisper.pointFormat, map[offset:offset+whisper.pointSize])
print '%d: %d, %10.35g' % (point, timestamp, value)
offset += whisper.pointSize
period = archive['retention']
rollup = archive['secondsPerPoint']
ttl = period
#CQLString = "UPDATE metric USING TTL ? SET data = data + ? WHERE tenant = '' AND rollup = ? AND period = ? AND path = ? AND time = ?;"
#cursor.execute(CQLString, [ttl, value, rollup, period, name, timestamp])
#print CQLString, [ttl, value, rollup, period, name, timestamp]
if timestamp > 0:
CQLString = "UPDATE metric USING TTL %d SET data = data + [ %d ] WHERE tenant = '' AND rollup = %d AND period = %d AND path = '%s' AND time = %d;" % (ttl, value, rollup, period, name, timestamp)
#print CQLString
cursor.execute(CQLString)
print
if not os.path.exists(path):
raise SystemExit('[ERROR] File "%s" does not exist!' % path)
map = mmap_file(path)
header = read_header(map)
dump_header(header)
dump_archive_headers(header['archives'])
dump_archives(header['archives'])
@punnie
Copy link

punnie commented Aug 24, 2014

Sorry for digging up this old thing, but I've got to ask @deniszh: how long did this script take to run for your TB of whisper data? I've been running it for our measly 300GB of data for about 12 hours now, using 8 parallel jobs and it seems like it will take literally forever.

I know I'm just being lazy now, but is there a more updated version of this? ;)

@deniszh
Copy link
Author

deniszh commented Sep 5, 2014

Hi @punnie,
never tried to migrate our main storage though because of space concerns. But, even for linear scalability, if migration of 1281 file took ~ 1281 sec (very loose approximation) in 16 threads, then migration of full storage of 160K metrics will took 160000 sec = 44 hours.
I do not think that you will be able to squeeze more speed from python version, and I'm loose at clojure - maybe @pir will be able to provide clojure version of this script with built in concurrency.

@deniszh
Copy link
Author

deniszh commented Sep 5, 2014

@punnie
But btw - speed is not required for migration. You can spit metrics stream in two with relay, one stream will go to your old installation - new will go to cyanite. After that you can copy wsp files and migrate them, even if it will took a week - you will get all data in cyanite and then remove old installation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment