Last active

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

Protobuf to Python dictionary serializer for Google App Engine AppStats module.I wanted to be able to send my AppStats profile data off to a logging service (like Loggly) so that I could monitor it over time. This module serializes the data so that it can be converted to JSON and sent off to a log service by a cron job.

View appstats_serializer.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
#!/usr/bin/env python
"""
AppStats serializer module which converts App Engine profile data from Protocol Buffers into JSON.
Usage:
import appstats_serializer
# List all AppStats summaries:
appstats_dicts = appstats_serializer.appstats_to_dict(summaries_only=True)
# Get specific AppStats data by UNIX Epoch timestamp:
timestamp = request.get('timestamp')
full_proto = appstats_serializer.load_full_proto_from_timestamp(timestamp)
appstats_dict = appstats_serializer.request_stats_to_dict(full_proto)
"""
 
__author__ = 'Eric Higgins'
__copyright__ = 'Copyright 2013, Eric Higgins'
__version__ = '0.0.5'
__email__ = 'erichiggins@gmail.com'
__status__ = 'Development'
 
 
from google.appengine.ext.appstats import loader
from google.appengine.ext.appstats import recording
 
 
__all__ = [
'appstats_to_dict',
'load_summary_protos',
'load_full_proto',
'load_full_proto_from_timestamp',
'request_stats_to_dict',
]
 
 
def FormatFixed32(value):
"""Local copy of proto.ProtocolMessage.FormatFixed32 function."""
if (value < 0):
value += (1L << 32)
return '0x%x' % value
 
 
def FormatFixed64(value):
"""Local copy of proto.ProtocolMessage.FormatFixed64 function."""
if (value < 0):
value += (1L << 64)
return '0x%x' % value
 
 
def iter_proto(pb, prop_name, dict_fn):
"""Iterate over a repeated proto property and convert all to dictionaries."""
return [dict_fn(getattr(pb, prop_name)(i)) for i in xrange(getattr(pb, prop_name + '_size')())]
 
 
def billed_ops_to_dict(pb):
"""appstats.datastore_pb.BilledOpsProto converter."""
return {
'op': pb.op(),
'num_ops': pb.num_ops(),
}
 
 
def stack_frame_to_dict(pb):
"""appstats.datastore_pb.StackFrameProto converter."""
return {
'class_or_file_name': pb.class_or_file_name(),
'line_number': pb.line_number(),
'function_name': pb.function_name(),
'variables': iter_proto(pb, 'variables', key_val_to_dict),
}
 
 
def individual_rpc_stats_to_dict(pb):
"""appstats.datastore_pb.IndividualRpcStatsProto converter."""
return {
'service_call_name': pb.service_call_name(),
'request_data_summary': pb.request_data_summary(),
'response_data_summary': pb.response_data_summary(),
'api_mcycles': pb.api_mcycles(),
'api_milliseconds': pb.api_milliseconds(),
'start_offset_milliseconds': pb.start_offset_milliseconds(),
'duration_milliseconds': pb.duration_milliseconds(),
'namespace': pb.namespace(),
'was_successful': bool(pb.was_successful()),
'call_stack': iter_proto(pb, 'call_stack', stack_frame_to_dict),
'datastore_details': datastore_call_details_to_dict(pb.datastore_details()),
'call_cost_microdollars': pb.call_cost_microdollars(),
'billed_ops': iter_proto(pb, 'billed_ops', billed_ops_to_dict),
}
 
 
def aggregate_rpc_stats_to_dict(pb):
"""appstats.datastore_pb.AggregateRpcStatsProto converter."""
return {
'service_call_name': pb.service_call_name(),
'total_amount_of_calls': pb.total_amount_of_calls(),
'total_cost_of_calls_microdollars': pb.total_cost_of_calls_microdollars(),
'total_billed_ops': iter_proto(pb, 'total_billed_ops', billed_ops_to_dict),
}
 
 
def key_val_to_dict(pb):
"""appstats.datastore_pb.KeyValProto converter."""
return {pb.key(): pb.value()}
 
 
def datastore_call_details_to_dict(pb):
"""appstats.datastore_pb.DatastoreCallDetailsProto converter."""
return {
'query_kind': pb.query_kind(),
'query_ancestor': reference_to_dict(pb.query_ancestor()),
'query_thiscursor': FormatFixed64(pb.query_thiscursor()),
'query_nextcursor': FormatFixed64(pb.query_nextcursor()),
}
 
 
def path_element_to_dict(pb):
"""datastore.entity_pb.Path_Element converter."""
return {
'type': pb.type(),
'id': pb.id(),
'name': pb.name(),
}
 
 
def reference_to_dict(pb):
"""datastore.entity_pb.Reference converter."""
return {
'app': pb.app(),
'name_space': pb.name_space(),
'path': iter_proto(pb.path(), 'element', path_element_to_dict),
}
 
 
def request_stats_to_dict(pb):
"""appstats.datastore_pb.RequestStatsProto converter."""
return {
'start_timestamp_milliseconds': pb.start_timestamp_milliseconds(),
'http_method': pb.http_method(),
'http_path': pb.http_path(),
'http_query': pb.http_query(),
'http_status': pb.http_status(),
'duration_milliseconds': pb.duration_milliseconds(),
'api_mcycles': pb.api_mcycles(),
'processor_mcycles': pb.processor_mcycles(),
'rpc_stats': iter_proto(pb, 'rpc_stats', aggregate_rpc_stats_to_dict),
'cgi_env': iter_proto(pb, 'cgi_env', key_val_to_dict),
'overhead_walltime_milliseconds': pb.overhead_walltime_milliseconds(),
'user_email': pb.user_email(),
'is_admin': bool(pb.is_admin()),
'individual_stats': iter_proto(pb, 'individual_stats', individual_rpc_stats_to_dict),
}
 
 
def appstats_to_dict(filter_timestamp=0, summaries_only=False):
"""Pull all AppStats records from memcache and convert to dictionaries."""
loader_fn = loader.FromMemcache
if summaries_only:
loader_fn = load_summary_protos
return [request_stats_to_dict(x._proto) for x in loader_fn(filter_timestamp)]
 
 
def load_summary_protos(filter_timestamp=0):
"""Fetch only AppStats summary protos and filter by optional timestamp."""
return [x for x in recording.load_summary_protos() if x.start_timestamp_milliseconds() > filter_timestamp]
 
 
def load_full_proto(summary):
"""Fetch a full detail AppStats from a summary proto."""
return load_full_proto_from_timestamp(summary.start_timestamp_milliseconds())
 
 
def load_full_proto_from_timestamp(timestamp):
"""Fetch a full detail AppStats from a summary proto timestamp."""
timestamp = int(timestamp) * 0.001
return recording.load_full_proto(timestamp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.