Skip to content

Instantly share code, notes, and snippets.

@miike
Created December 12, 2016 22:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miike/6507b4bb88bb88877843d312c229063b to your computer and use it in GitHub Desktop.
Save miike/6507b4bb88bb88877843d312c229063b to your computer and use it in GitHub Desktop.
Decoding Snowplow real-time bad rows
# accompanying tutorial:
import base64
import thriftpy
from thriftpy.protocol import TCyBinaryProtocolFactory
from thriftpy.utils import deserialize, serialize
sample_payload = "CwFAAAAAAi9pCwBkAAAACTEyNy4wLjAuMQoAyAAAAVjbnjdoC3ppAAAAQWlnbHU6Y29tLnNub3dwbG93YW5hbHl0aWNzLnNub3dwbG93L0NvbGxlY3RvclBheWxvYWQvdGhyaWZ0LzEtMC0wCwFKAAABaHN0bT0xNDgxMTUzMzI5MDAwJmU9cHYmdXJsPWh0dHAlM0ElMkYlMkZzbm93Zmxha2UtYW5hbHl0aWNzLmNvbSZ0dj1qcy0yLjYuMCZ0bmE9anMtMy42LjAmYWlkPXNub3dmbGFrZSZwPXdlYiZ0ej1BdXN0cmFsaWElMkZTeWRuZXkmbGFuZz1lbi1BVSZjcz1VVEYtOCZyZXM9MzYweDY0MCZjZD0zMiZjb29raWU9MSZlaWQ9YzI1OWMyNWUtZjk0Yi00ZDJjLWExMWMtMGQyNzhjMmU2ZDFhJmR0bT0xNDc5OTI3ODU3MjAxJnZwPTB4LTU2JmRzPTIwMHgyNjI5NSZ2aWQ9NCZzaWQ9N2ZiOTdmQzYtNmUwZi00MDIyLWFkYmQtMDE3NDMxNTIwZGRiJmR1aWQ9NGQxMGQzZDAtYzJiNC00NzNlLWE0ODMtODEyNzk5ZTgyNGQxJmZwPTEyOTExMjMzMgsBLAAAAG1Nb3ppbGxhLzUuMCAoV2luZG93cyBOVCAxMC4wOyBXT1c2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzU0LjAuMjg0MC43MSBTYWZhcmkvNTM3LjM2CwGQAAAAIWNvbGxlY3Rvci5zbm93Zmxha2UtYW5hbHl0aWNzLmNvbQsA0gAAAAVVVEYtOAsBNgAAAB9odHRwczovL3Nub3dmbGFrZS1hbmFseXRpY3MuY29tCwGaAAAAJDRkMTBkM2QwLWMyYjQtNDczZS1hNDgzLTgxMjc5OWU4MjRkMQsA3AAAABFzc2MtMC43LjAta2luZXNpcw8BXgsAAAALAAAAJ0hvc3Q6IGNvbGxlY3Rvci5zbm93Zmxha2UtYW5hbHl0aWNzLmNvbQAAAB1BY2NlcHQ6IGltYWdlL3dlYnAsICovKjtxPTAuOAAAACRBY2NlcHQtRW5jb2Rpbmc6IGd6aXAsIGRlZmxhdGUsIHNkY2gAAAA3QWNjZXB0LUxhbmd1YWdlOiBlbi1BVSwgZW47cT0wLjgsIGVuLVVTO3E9MC42LCBlbjtxPTAuNAAAABRDb29raWU6IHNwPWFiY2QtMTIzNAAAACdSZWZlcmVyOiBodHRwOi8vc25vd2ZsYWtlLWFuYWx5dGljcy5jb20AAAB6VXNlci1BZ2VudDogIE1vemlsbGEvNS4wIChXaW5kb3dzIE5UIDEwLjA7IFdPVzY0KSBBcHBsZVdlYktpdC81MzcuMzYgKEtIVE1MLCBsaWtlIEdlY2tvKSBDaHJvbWUvNTQuMC4yODQwLjcxIFNhZmFyaS81MzcuMzYAAAAaWC1Gb3J3YXJkZWQtRm9yOiAxMjcuMC4wLjEAAAAVWC1Gb3J3YXJkZWQtUG9ydDogNDQzAAAAGFgtRm9yd2FyZGVkLVByb3RvOiBodHRwcwAAABZDb25uZWN0aW9uOiBrZWVwLWFsaXZlAA=="
decoded_payload = base64.b64decode(sample_payload)
print(decoded_payload)
collector = thriftpy.load("collector.thrift")
collector_payload = collector.CollectorPayload()
raw_payload = deserialize(collector_payload, decoded_payload, TCyBinaryProtocolFactory())
print(raw_payload)
querystring = raw_payload.querystring
network_userid = raw_payload.networkUserId
print(querystring)
from urllib import parse
params = parse.parse_qs(querystring)
print(params)
app_id = params.get('aid')[0]
print(app_id)
@tyomo4ka
Copy link

Thanks for the article. Made it working for myself with Python 2.

It probably worth mentioning that you have to deal with JSON object first when you pick data from kinesis stream. Collector's payload will be under line key in thrift format encoded with base64. Took me some time to figure this out. The full kineses record will also contains some other useful information like errors field.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment