Skip to content

Instantly share code, notes, and snippets.

@rachelannelise
Created June 11, 2019 21:52
Show Gist options
  • Save rachelannelise/f241865cd3b04d94bdcdde4d166cfe7b to your computer and use it in GitHub Desktop.
Save rachelannelise/f241865cd3b04d94bdcdde4d166cfe7b to your computer and use it in GitHub Desktop.
# -*- coding: utf-8
import datetime as dt
import unittest
from io import BytesIO
from mock import MagicMock, patch
from parselyutils.apikey_utils import ApikeyConfigFetcher
from parselyutils.event import Event, VisitorInfo, TimestampInfo
from casterisk.lib.deliverymetrics import DatadogMetric, DeliveryMetric
from casterisk.lib.data_pipeline.enrichments import enrich_event
from casterisk.bolts.data_pipeline.kinesis_firehose_writer import KinesisFirehoseWriter
from casterisk.settings import get_settings
from tests.casterisk.lib import SegmentationBase
APIKEY_CONFIG = {
"features": ["data_pipeline_firehose"],
"exists": True,
"tags_enabled": True,
"registered": True,
"registration_date": dt.datetime(2011, 12, 5),
"internal_domains": ["parsely.com", "blog.parsely.com"],
"es_settings": {},
"key": "demoaccount.parsely.com",
"is_installed": True,
"tier": "2_analytics_team",
"timezone": "America/New_York",
"ptrack_db": "demoaccount_parsely_com",
"topics_enabled": True,
"signup_date": dt.datetime(2011, 12, 5),
"ignore_domains": [],
"min_page_views": 4,
"allowed_tlds": ["parsely.com", "blog.parsely.com", "blog.parse.ly"],
"track_start_date": dt.datetime(2011, 12, 5),
"api_auth_mode": "secret",
"disable_tracking": False,
"publisher": "parsely.com",
"secret_key": "asdfqwer",
}
PUT_RESPONSE_SUCCESS = {"RequestResponses": [{"RecordId": "123"}]}
PUT_RESPONSE_ERROR_RETRY = {
"RequestResponses": [
{"ErrorCode": "RateLimitError", "ErrorMessage": "RateLimitError"}
]
}
PUT_RESPONSE_ERROR_FAIL = {
"RequestResponses": [
{"ErrorCode": "SomeOtherError", "ErrorMessage": "SomeOtherError"}
]
}
class TestKinesisFirehoseWriter(SegmentationBase):
maxDiff = None
def setUp(self):
self.bolt = self.createInstance()
def get_event_dict(self):
event = Event(
apikey="demoaccount.parsely.com",
url="https://blog.parse.ly/",
referrer="",
action="pageview",
engaged_time_inc=None,
visitor=VisitorInfo(
network_id="",
site_id="e71604df-a912-455d-aaf3-a9c72a6dd86c",
ip="198.200.78.40",
),
extra_data=None,
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
display=None,
timestamp_info=TimestampInfo(
nginx_ms=1_429_707_722_000, pixel_ms=None, override_ms=None
),
session=None,
slot=None,
metadata=None,
campaign=None,
flags=None,
total_time=None,
pageload_id=None
)
# mimic what comes into the writer bolts from event_enricher
enriched = enrich_event(
event,
geo_db_file=self.geo_db_file,
nielsen_data_file=self.nielsen_data_file,
)
return enriched
@patch(
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.emit",
autospec=True,
)
@patch(
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.put_records",
return_value=PUT_RESPONSE_SUCCESS,
)
def test_process_success(self, put_handler, emit_handler):
"""Ensure successful processing emits metrics."""
tup = MagicMock(autospec=True)
event_dict = self.get_event_dict()
tup.values = [[event_dict]]
self.bolt.process_batch("abc", [tup])
assert 1 == put_handler.call_count
assert 2 == emit_handler.call_count
# first call to emit delivery metrics
actual_metrics = emit_handler.call_args_list[0][0][1][0]
assert [
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|events|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|pageviews|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|in_late|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|successes|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="uuid",
data=(
"dpl_firehose_writer|network|visitor_site_ids|parsely.com|2018-08-13",
["e71604df-a912-455d-aaf3-a9c72a6dd86c"],
),
),
DeliveryMetric(
kind="uuid",
data=(
"dpl_firehose_writer|network|event_ids|parsely.com|2018-08-13",
["0xb03fb493c88d8044ee4dd912c224ebb9"],
),
),
] == actual_metrics
# second call to emit datadog metrics
assert [
DatadogMetric(
name="dpl_firehose_writer.event.success",
type="count",
value=1,
tags=["network:parsely_com", "deployment:beta"],
)
] == emit_handler.call_args_list[1][0][1][0]
# fields should not be modified by the bolt
assert "timestamp_info.nginx_ms" not in event_dict
assert "visitor.site_id" not in event_dict
@patch(
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.emit",
autospec=True,
)
@patch(
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.put_records",
return_value=PUT_RESPONSE_ERROR_FAIL,
)
def test_process_failure(self, put_handler, emit_handler):
"""Ensure fatal error emits metrics."""
tup = MagicMock(autospec=True)
event_dict = self.get_event_dict()
tup.values = [[event_dict]]
self.bolt.process_batch("abc", [tup])
assert 1 == put_handler.call_count
assert 4 == emit_handler.call_count
# first call to emit delivery metrics
actual_metrics = emit_handler.call_args_list[0][0][1][0]
assert [
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|events|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|pageviews|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|in_late|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|failures:someothererror|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="uuid",
data=(
"dpl_firehose_writer|network|visitor_site_ids|parsely.com|2018-08-13",
["e71604df-a912-455d-aaf3-a9c72a6dd86c"],
),
),
DeliveryMetric(
kind="uuid",
data=(
"dpl_firehose_writer|network|event_ids|parsely.com|2018-08-13",
["0xb03fb493c88d8044ee4dd912c224ebb9"],
),
),
] == actual_metrics
# second call to emit datadog metrics
assert [
DatadogMetric(
name="dpl_firehose_writer.event.failure",
type="count",
value=1,
tags=[
"network:parsely_com",
"deployment:beta",
"errorcode:someothererror",
],
)
] == emit_handler.call_args_list[1][0][1][0]
# retries are emitted separately
# third: delivery retry metrics for one retry enqueue event
assert [
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|retry_enqueues|parsely.com|2018-08-13",
"00:00",
),
1,
),
)
] == emit_handler.call_args_list[2][0][1][0]
# fourth: datadog retry metrics for one retry enqueue event
assert [
DatadogMetric(
name="dpl_firehose_writer.event.retry_enqueue",
type="count",
value=1,
tags=["network:parsely_com", "deployment:beta"],
)
] == emit_handler.call_args_list[3][0][1][0]
# fields should not be modified by the bolt
assert "timestamp_info.nginx_ms" not in event_dict
assert "visitor.site_id" not in event_dict
@patch(
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.emit",
autospec=True,
)
@patch(
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.put_records",
return_value=PUT_RESPONSE_ERROR_RETRY,
)
def test_process_retry(self, put_handler, emit_handler):
"""Ensure non-fatal error emits metrics."""
tup = MagicMock(autospec=True)
event_dict = self.get_event_dict()
tup.values = [[event_dict]]
self.bolt.process_batch("abc", [tup])
assert 1 == put_handler.call_count
assert 4 == emit_handler.call_count
# first call to emit delivery metrics
actual_metrics = emit_handler.call_args_list[0][0][1][0]
assert [
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|events|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|pageviews|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|in_late|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|failures:ratelimiterror|parsely.com|2018-08-13",
"00:00",
),
1,
),
),
DeliveryMetric(
kind="uuid",
data=(
"dpl_firehose_writer|network|visitor_site_ids|parsely.com|2018-08-13",
["e71604df-a912-455d-aaf3-a9c72a6dd86c"],
),
),
DeliveryMetric(
kind="uuid",
data=(
"dpl_firehose_writer|network|event_ids|parsely.com|2018-08-13",
["0xb03fb493c88d8044ee4dd912c224ebb9"],
),
),
] == actual_metrics
# second call to emit datadog metrics
assert [
DatadogMetric(
name="dpl_firehose_writer.event.failure",
type="count",
value=1,
tags=[
"network:parsely_com",
"deployment:beta",
"errorcode:ratelimiterror",
],
)
] == emit_handler.call_args_list[1][0][1][0]
# retries are emitted separately
# third: delivery retry metrics for one retry enqueue event
assert [
DeliveryMetric(
kind="count",
data=(
(
"dpl_firehose_writer|network|retry_enqueues|parsely.com|2018-08-13",
"00:00",
),
1,
),
)
] == emit_handler.call_args_list[2][0][1][0]
# fourth: datadog retry metrics for one retry enqueue event
assert [
DatadogMetric(
name="dpl_firehose_writer.event.retry_enqueue",
type="count",
value=1,
tags=["network:parsey_com", "deployment:beta"],
)
] == emit_handler.call_args_list[3][0][1][0]
# fields should not be modified by the bolt
assert "timestamp_info.nginx_ms" not in event_dict
assert "visitor.site_id" not in event_dict
@patch(
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.get_settings_from_stormconf",
autospec=True,
)
@patch(
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.get_statsd",
autospec=True,
)
def createInstance(self, statsd, settings):
settings.return_value = get_settings("data_pipeline", "beta")
instance = KinesisFirehoseWriter(
input_stream=BytesIO(), output_stream=BytesIO()
)
instance.initialize({}, MagicMock(autospec=True))
instance.config_fetcher = MagicMock(spec=ApikeyConfigFetcher)
instance.config_fetcher.get_apikey_config.return_value = APIKEY_CONFIG
instance.STATSD_RATE = 1
instance.RATE_LIMIT_ERROR = "RateLimitError"
instance.RECOVERABLE_ERRORS = ["RateLimitError"]
assert hasattr(instance, "timestamp_override")
assert hasattr(instance, "sqs_retry_queue")
instance.timestamp_override = dt.datetime(2018, 8, 13)
instance.logger = MagicMock()
return instance
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment