Skip to content

Instantly share code, notes, and snippets.

@MMcM
Last active December 27, 2015 06:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MMcM/7281756 to your computer and use it in GitHub Desktop.
Save MMcM/7281756 to your computer and use it in GitHub Desktop.
Using FDB SQL STORAGE_FORMAT to allow read access from Python
from google.protobuf import descriptor
from google.protobuf import descriptor_pb2
from google.protobuf import descriptor_pool
from google.protobuf import message
from google.protobuf import reflection
import fdb
from directory import directory
# https://github.com/FoundationDB/sql-layer/blob/master/src/main/protobuf/akiban_information_schema.proto
from akiban_information_schema_pb2 import AkibanInformationSchema
# https://github.com/FoundationDB/sql-layer/blob/master/src/main/protobuf/common_storage_formats.proto
from common_storage_formats_pb2 import protobuf_row
# https://github.com/FoundationDB/sql-layer/blob/master/src/main/protobuf/sql_custom_options.proto
import sql_custom_options_pb2
import struct
class RowParserFactory(object):
"""When protobuf is used as the row format, the FDB SQL Layer
saves the generated descriptors in a special directory, keyed by
the group name.
This loads those descriptors, rendezvouses them with their
dependencies, and returns functions that can be used to parse the
value byte string into the row message.
Unless no_group was chosen as a storage option, the row message
will be the group message, which has one optional field for each
table type in the group. Only one is populated at any time.
"""
def __init__(self):
self._pool = descriptor_pool.DescriptorPool()
file_proto = descriptor_pb2.FileDescriptorProto()
descriptor_pb2.DESCRIPTOR.CopyToProto(file_proto)
self._pool.Add(file_proto)
file_proto = descriptor_pb2.FileDescriptorProto()
sql_custom_options_pb2.DESCRIPTOR.CopyToProto(file_proto)
self._pool.Add(file_proto)
self._pbdir = None
self._aises = {}
def class_for_message_type(self, descriptor):
"""reflect.ParseMessage makes a new class every time and does
not arrange for field message types to get associated with
their classes.
So we duplicate some of what it does inside.
"""
class _MessageClass(message.Message):
__metaclass__ = reflection.GeneratedProtocolMessageType
DESCRIPTOR = descriptor
return _MessageClass
def class_for_row(self, file_desc):
"""Build classes for all the message types in the file and
return the one marked as the group message or else the only
one for the single table case.
"""
group_class = None
row_class = None
for descriptor in file_desc.message_types_by_name.values():
row_class = self.class_for_message_type(descriptor)
table_options = descriptor.GetOptions().Extensions[sql_custom_options_pb2.TableOptions.fdbsql]
if not table_options is None and table_options.is_group:
group_class = row_class
return group_class or row_class
@fdb.transactional
def file_desc_from_ais(self, tr, schema_name, table_name):
ais = self._aises.get(schema_name)
if ais is None:
if self._pbdir is None:
self._pbdir = directory.open(tr, (u'sql', u'schemaManager', u'protobuf'))
pbais = tr[self._pbdir.pack((schema_name,))]
if pbais is None:
raise Exception("Schema not found")
pblen = struct.unpack('>i', pbais[0:4])[0]
assert len(pbais) == 4 + pblen
ais = AkibanInformationSchema.FromString(pbais[4:])
self._aises[schema_name] = ais
for g in ais.schemas[0].groups:
if g.rootTableName == table_name:
if not g.storage.HasExtension(protobuf_row):
raise Exception("Group is not using Protobuf Buffers")
return g.storage.Extensions[protobuf_row].file_descriptor
raise Exception("Table not found")
@fdb.transactional
def file_desc_for_group(self, tr, schema_name, table_name):
file_proto = self.file_desc_from_ais(tr, schema_name, table_name)
self._pool.Add(file_proto)
return self._pool.FindFileByName(file_proto.name)
@fdb.transactional
def parser_for_group(self, tr, schema_name, table_name):
"""Return a function that takes the byte string for a protobuf
row and returns the row instance.
"""
file_desc = self.file_desc_for_group(tr, schema_name, table_name)
row_class = self.class_for_row(file_desc)
def _parser(str):
row = row_class()
row.ParseFromString(str)
return row
return _parser
import fdb
from directory import directory
db = fdb.open()
tdir = directory.open(db, (u'sql',u'data',u'table',u'test',u't1'))
idir = tdir.open(db, u't1_s')
# Key is tuple of hkey (1, id). Value is tuple of row (id, s).
for kv in db[tdir.range()]:
print tdir.unpack(kv.key), fdb.tuple.unpack(kv.value)
for kv in db[slice(idir.pack((u'A',)),idir.pack((u'E',)))]:
# Key is (s, id). Turn into hkey.
print fdb.tuple.unpack(db[tdir.pack((1, idir.unpack(kv.key)[1]))])
DROP TABLE IF EXISTS t1;
CREATE TABLE t1(id INT PRIMARY KEY NOT NULL, s VARCHAR(16)) STORAGE_FORMAT tuple;
INSERT INTO t1 VALUES(1,'Fred'),(2,'Wilma'),(3,'Barney'),(4,'Betty');
CREATE INDEX t1_s ON t1(s) STORAGE_FORMAT tuple;
SELECT * FROM t1 WHERE s BETWEEN 'A' AND 'E';
import fdb
from directory import directory
from fdb_sql_protobuf import RowParserFactory
db = fdb.open()
parser = RowParserFactory().parser_for_group(db, u'test', u't2')
tdir = directory.open(db, (u'sql',u'data',u'table',u'test',u't2'))
idir = tdir.open(db, u't2_s')
@fdb.transactional
def index_lookup(tr,lo,hi):
# Initiate futures for main table from index all at once.
vals = [tr[tdir.pack((1, idir.unpack(kv.key)[1]))]
for kv in tr[idir.pack((lo,)):idir.pack((hi,))]]
return [parser(val).t2.s for val in vals]
print index_lookup(db, u'M',u'Z')
DROP TABLE IF EXISTS t2;
CREATE TABLE t2(id INT PRIMARY KEY NOT NULL, s VARCHAR(16)) STORAGE_FORMAT protobuf;
INSERT INTO t2 VALUES(1,'Fred'),(2,'Wilma'),(3,'Barney'),(4,'Betty');
CREATE INDEX t2_s ON t2(s) STORAGE_FORMAT tuple;
SELECT * FROM t2 WHERE s BETWEEN 'A' and 'E';
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment