public
Last active

Protobuf to Python dictionary serializer for Google App Engine AppStats module. I wanted to be able to send my AppStats profile data off to a logging service (like Loggly) so that I could monitor it over time. This module serializes the data so that it can be converted to JSON and sent off to a log service by a cron job.

  • Download Gist
appstats_serializer.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
#!/usr/bin/env python
"""
AppStats serializer module.
"""
 
__author__ = 'Eric Higgins'
__copyright__ = 'Copyright 2013, Eric Higgins'
__version__ = '0.0.5'
__email__ = 'erichiggins@gmail.com'
__status__ = 'Development'
 
 
from google.appengine.ext.appstats import loader
from google.appengine.ext.appstats import recording
 
 
__all__ = [
'appstats_to_dict',
'load_summary_protos',
'load_full_proto',
'load_full_proto_from_timestamp',
'request_stats_to_dict',
]
 
 
def FormatFixed32(value):
"""Local copy of proto.ProtocolMessage.FormatFixed32 function."""
if (value < 0):
value += (1L << 32)
return '0x%x' % value
 
 
def FormatFixed64(value):
"""Local copy of proto.ProtocolMessage.FormatFixed64 function."""
if (value < 0):
value += (1L << 64)
return '0x%x' % value
 
 
def iter_proto(pb, prop_name, dict_fn):
"""Iterate over a repeated proto property and convert all to dictionaries."""
return [dict_fn(getattr(pb, prop_name)(i)) for i in xrange(getattr(pb, prop_name + '_size')())]
 
 
def billed_ops_to_dict(pb):
"""appstats.datastore_pb.BilledOpsProto converter."""
return {
'op': pb.op(),
'num_ops': pb.num_ops(),
}
 
 
def stack_frame_to_dict(pb):
"""appstats.datastore_pb.StackFrameProto converter."""
return {
'class_or_file_name': pb.class_or_file_name(),
'line_number': pb.line_number(),
'function_name': pb.function_name(),
'variables': iter_proto(pb, 'variables', key_val_to_dict),
}
 
 
def individual_rpc_stats_to_dict(pb):
"""appstats.datastore_pb.IndividualRpcStatsProto converter."""
return {
'service_call_name': pb.service_call_name(),
'request_data_summary': pb.request_data_summary(),
'response_data_summary': pb.response_data_summary(),
'api_mcycles': pb.api_mcycles(),
'api_milliseconds': pb.api_milliseconds(),
'start_offset_milliseconds': pb.start_offset_milliseconds(),
'duration_milliseconds': pb.duration_milliseconds(),
'namespace': pb.namespace(),
'was_successful': bool(pb.was_successful()),
'call_stack': iter_proto(pb, 'call_stack', stack_frame_to_dict),
'datastore_details': datastore_call_details_to_dict(pb.datastore_details()),
'call_cost_microdollars': pb.call_cost_microdollars(),
'billed_ops': iter_proto(pb, 'billed_ops', billed_ops_to_dict),
}
 
 
def aggregate_rpc_stats_to_dict(pb):
"""appstats.datastore_pb.AggregateRpcStatsProto converter."""
return {
'service_call_name': pb.service_call_name(),
'total_amount_of_calls': pb.total_amount_of_calls(),
'total_cost_of_calls_microdollars': pb.total_cost_of_calls_microdollars(),
'total_billed_ops': iter_proto(pb, 'total_billed_ops', billed_ops_to_dict),
}
 
 
def key_val_to_dict(pb):
"""appstats.datastore_pb.KeyValProto converter."""
return {pb.key(): pb.value()}
 
 
def datastore_call_details_to_dict(pb):
"""appstats.datastore_pb.DatastoreCallDetailsProto converter."""
return {
'query_kind': pb.query_kind(),
'query_ancestor': reference_to_dict(pb.query_ancestor()),
'query_thiscursor': FormatFixed64(pb.query_thiscursor()),
'query_nextcursor': FormatFixed64(pb.query_nextcursor()),
}
 
 
def path_element_to_dict(pb):
"""datastore.entity_pb.Path_Element converter."""
return {
'type': pb.type(),
'id': pb.id(),
'name': pb.name(),
}
 
 
def reference_to_dict(pb):
"""datastore.entity_pb.Reference converter."""
return {
'app': pb.app(),
'name_space': pb.name_space(),
'path': iter_proto(pb.path(), 'element', path_element_to_dict),
}
 
 
def request_stats_to_dict(pb):
"""appstats.datastore_pb.RequestStatsProto converter."""
return {
'start_timestamp_milliseconds': pb.start_timestamp_milliseconds(),
'http_method': pb.http_method(),
'http_path': pb.http_path(),
'http_query': pb.http_query(),
'http_status': pb.http_status(),
'duration_milliseconds': pb.duration_milliseconds(),
'api_mcycles': pb.api_mcycles(),
'processor_mcycles': pb.processor_mcycles(),
'rpc_stats': iter_proto(pb, 'rpc_stats', aggregate_rpc_stats_to_dict),
'cgi_env': iter_proto(pb, 'cgi_env', key_val_to_dict),
'overhead_walltime_milliseconds': pb.overhead_walltime_milliseconds(),
'user_email': pb.user_email(),
'is_admin': bool(pb.is_admin()),
'individual_stats': iter_proto(pb, 'individual_stats', individual_rpc_stats_to_dict),
}
 
 
def appstats_to_dict(filter_timestamp=0, summaries_only=False):
"""Pull all AppStats records from memcache and convert to dictionaries."""
loader_fn = loader.FromMemcache
if summaries_only:
loader_fn = load_summary_protos
return [request_stats_to_dict(x._proto) for x in loader_fn(filter_timestamp)]
 
 
def load_summary_protos(filter_timestamp=0):
"""Fetch only AppStats summary protos and filter by optional timestamp."""
return [x for x in recording.load_summary_protos() if x.start_timestamp_milliseconds() > filter_timestamp]
 
 
def load_full_proto(summary):
"""Fetch a full detail AppStats from a summary proto."""
return load_full_proto_from_timestamp(summary.start_timestamp_milliseconds())
 
 
def load_full_proto_from_timestamp(timestamp):
"""Fetch a full detail AppStats from a summary proto timestamp."""
timestamp = int(timestamp) * 0.001
return recording.load_full_proto(timestamp)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.