Skip to content

@erichiggins /appstats_serializer.py
Last active

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Protobuf to Python dictionary serializer for Google App Engine AppStats module.I wanted to be able to send my AppStats profile data off to a logging service (like Loggly) so that I could monitor it over time. This module serializes the data so that it can be converted to JSON and sent off to a log service by a cron job.
#!/usr/bin/env python
"""
AppStats serializer module which converts App Engine profile data from Protocol Buffers into JSON.
Usage:
import appstats_serializer
# List all AppStats summaries:
appstats_dicts = appstats_serializer.appstats_to_dict(summaries_only=True)
# Get specific AppStats data by UNIX Epoch timestamp:
timestamp = request.get('timestamp')
full_proto = appstats_serializer.load_full_proto_from_timestamp(timestamp)
appstats_dict = appstats_serializer.request_stats_to_dict(full_proto)
"""
__author__ = 'Eric Higgins'
__copyright__ = 'Copyright 2013, Eric Higgins'
__version__ = '0.0.5'
__email__ = 'erichiggins@gmail.com'
__status__ = 'Development'
from google.appengine.ext.appstats import loader
from google.appengine.ext.appstats import recording
__all__ = [
'appstats_to_dict',
'load_summary_protos',
'load_full_proto',
'load_full_proto_from_timestamp',
'request_stats_to_dict',
]
def FormatFixed32(value):
"""Local copy of proto.ProtocolMessage.FormatFixed32 function."""
if (value < 0):
value += (1L << 32)
return '0x%x' % value
def FormatFixed64(value):
"""Local copy of proto.ProtocolMessage.FormatFixed64 function."""
if (value < 0):
value += (1L << 64)
return '0x%x' % value
def iter_proto(pb, prop_name, dict_fn):
"""Iterate over a repeated proto property and convert all to dictionaries."""
return [dict_fn(getattr(pb, prop_name)(i)) for i in xrange(getattr(pb, prop_name + '_size')())]
def billed_ops_to_dict(pb):
"""appstats.datastore_pb.BilledOpsProto converter."""
return {
'op': pb.op(),
'num_ops': pb.num_ops(),
}
def stack_frame_to_dict(pb):
"""appstats.datastore_pb.StackFrameProto converter."""
return {
'class_or_file_name': pb.class_or_file_name(),
'line_number': pb.line_number(),
'function_name': pb.function_name(),
'variables': iter_proto(pb, 'variables', key_val_to_dict),
}
def individual_rpc_stats_to_dict(pb):
"""appstats.datastore_pb.IndividualRpcStatsProto converter."""
return {
'service_call_name': pb.service_call_name(),
'request_data_summary': pb.request_data_summary(),
'response_data_summary': pb.response_data_summary(),
'api_mcycles': pb.api_mcycles(),
'api_milliseconds': pb.api_milliseconds(),
'start_offset_milliseconds': pb.start_offset_milliseconds(),
'duration_milliseconds': pb.duration_milliseconds(),
'namespace': pb.namespace(),
'was_successful': bool(pb.was_successful()),
'call_stack': iter_proto(pb, 'call_stack', stack_frame_to_dict),
'datastore_details': datastore_call_details_to_dict(pb.datastore_details()),
'call_cost_microdollars': pb.call_cost_microdollars(),
'billed_ops': iter_proto(pb, 'billed_ops', billed_ops_to_dict),
}
def aggregate_rpc_stats_to_dict(pb):
"""appstats.datastore_pb.AggregateRpcStatsProto converter."""
return {
'service_call_name': pb.service_call_name(),
'total_amount_of_calls': pb.total_amount_of_calls(),
'total_cost_of_calls_microdollars': pb.total_cost_of_calls_microdollars(),
'total_billed_ops': iter_proto(pb, 'total_billed_ops', billed_ops_to_dict),
}
def key_val_to_dict(pb):
"""appstats.datastore_pb.KeyValProto converter."""
return {pb.key(): pb.value()}
def datastore_call_details_to_dict(pb):
"""appstats.datastore_pb.DatastoreCallDetailsProto converter."""
return {
'query_kind': pb.query_kind(),
'query_ancestor': reference_to_dict(pb.query_ancestor()),
'query_thiscursor': FormatFixed64(pb.query_thiscursor()),
'query_nextcursor': FormatFixed64(pb.query_nextcursor()),
}
def path_element_to_dict(pb):
"""datastore.entity_pb.Path_Element converter."""
return {
'type': pb.type(),
'id': pb.id(),
'name': pb.name(),
}
def reference_to_dict(pb):
"""datastore.entity_pb.Reference converter."""
return {
'app': pb.app(),
'name_space': pb.name_space(),
'path': iter_proto(pb.path(), 'element', path_element_to_dict),
}
def request_stats_to_dict(pb):
"""appstats.datastore_pb.RequestStatsProto converter."""
return {
'start_timestamp_milliseconds': pb.start_timestamp_milliseconds(),
'http_method': pb.http_method(),
'http_path': pb.http_path(),
'http_query': pb.http_query(),
'http_status': pb.http_status(),
'duration_milliseconds': pb.duration_milliseconds(),
'api_mcycles': pb.api_mcycles(),
'processor_mcycles': pb.processor_mcycles(),
'rpc_stats': iter_proto(pb, 'rpc_stats', aggregate_rpc_stats_to_dict),
'cgi_env': iter_proto(pb, 'cgi_env', key_val_to_dict),
'overhead_walltime_milliseconds': pb.overhead_walltime_milliseconds(),
'user_email': pb.user_email(),
'is_admin': bool(pb.is_admin()),
'individual_stats': iter_proto(pb, 'individual_stats', individual_rpc_stats_to_dict),
}
def appstats_to_dict(filter_timestamp=0, summaries_only=False):
"""Pull all AppStats records from memcache and convert to dictionaries."""
loader_fn = loader.FromMemcache
if summaries_only:
loader_fn = load_summary_protos
return [request_stats_to_dict(x._proto) for x in loader_fn(filter_timestamp)]
def load_summary_protos(filter_timestamp=0):
"""Fetch only AppStats summary protos and filter by optional timestamp."""
return [x for x in recording.load_summary_protos() if x.start_timestamp_milliseconds() > filter_timestamp]
def load_full_proto(summary):
"""Fetch a full detail AppStats from a summary proto."""
return load_full_proto_from_timestamp(summary.start_timestamp_milliseconds())
def load_full_proto_from_timestamp(timestamp):
"""Fetch a full detail AppStats from a summary proto timestamp."""
timestamp = int(timestamp) * 0.001
return recording.load_full_proto(timestamp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.