Created
June 11, 2019 21:52
-
-
Save rachelannelise/f241865cd3b04d94bdcdde4d166cfe7b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 | |
import datetime as dt | |
import unittest | |
from io import BytesIO | |
from mock import MagicMock, patch | |
from parselyutils.apikey_utils import ApikeyConfigFetcher | |
from parselyutils.event import Event, VisitorInfo, TimestampInfo | |
from casterisk.lib.deliverymetrics import DatadogMetric, DeliveryMetric | |
from casterisk.lib.data_pipeline.enrichments import enrich_event | |
from casterisk.bolts.data_pipeline.kinesis_firehose_writer import KinesisFirehoseWriter | |
from casterisk.settings import get_settings | |
from tests.casterisk.lib import SegmentationBase | |
APIKEY_CONFIG = { | |
"features": ["data_pipeline_firehose"], | |
"exists": True, | |
"tags_enabled": True, | |
"registered": True, | |
"registration_date": dt.datetime(2011, 12, 5), | |
"internal_domains": ["parsely.com", "blog.parsely.com"], | |
"es_settings": {}, | |
"key": "demoaccount.parsely.com", | |
"is_installed": True, | |
"tier": "2_analytics_team", | |
"timezone": "America/New_York", | |
"ptrack_db": "demoaccount_parsely_com", | |
"topics_enabled": True, | |
"signup_date": dt.datetime(2011, 12, 5), | |
"ignore_domains": [], | |
"min_page_views": 4, | |
"allowed_tlds": ["parsely.com", "blog.parsely.com", "blog.parse.ly"], | |
"track_start_date": dt.datetime(2011, 12, 5), | |
"api_auth_mode": "secret", | |
"disable_tracking": False, | |
"publisher": "parsely.com", | |
"secret_key": "asdfqwer", | |
} | |
PUT_RESPONSE_SUCCESS = {"RequestResponses": [{"RecordId": "123"}]} | |
PUT_RESPONSE_ERROR_RETRY = { | |
"RequestResponses": [ | |
{"ErrorCode": "RateLimitError", "ErrorMessage": "RateLimitError"} | |
] | |
} | |
PUT_RESPONSE_ERROR_FAIL = { | |
"RequestResponses": [ | |
{"ErrorCode": "SomeOtherError", "ErrorMessage": "SomeOtherError"} | |
] | |
} | |
class TestKinesisFirehoseWriter(SegmentationBase): | |
maxDiff = None | |
def setUp(self): | |
self.bolt = self.createInstance() | |
def get_event_dict(self): | |
event = Event( | |
apikey="demoaccount.parsely.com", | |
url="https://blog.parse.ly/", | |
referrer="", | |
action="pageview", | |
engaged_time_inc=None, | |
visitor=VisitorInfo( | |
network_id="", | |
site_id="e71604df-a912-455d-aaf3-a9c72a6dd86c", | |
ip="198.200.78.40", | |
), | |
extra_data=None, | |
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36", | |
display=None, | |
timestamp_info=TimestampInfo( | |
nginx_ms=1_429_707_722_000, pixel_ms=None, override_ms=None | |
), | |
session=None, | |
slot=None, | |
metadata=None, | |
campaign=None, | |
flags=None, | |
total_time=None, | |
pageload_id=None | |
) | |
# mimic what comes into the writer bolts from event_enricher | |
enriched = enrich_event( | |
event, | |
geo_db_file=self.geo_db_file, | |
nielsen_data_file=self.nielsen_data_file, | |
) | |
return enriched | |
@patch( | |
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.emit", | |
autospec=True, | |
) | |
@patch( | |
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.put_records", | |
return_value=PUT_RESPONSE_SUCCESS, | |
) | |
def test_process_success(self, put_handler, emit_handler): | |
"""Ensure successful processing emits metrics.""" | |
tup = MagicMock(autospec=True) | |
event_dict = self.get_event_dict() | |
tup.values = [[event_dict]] | |
self.bolt.process_batch("abc", [tup]) | |
assert 1 == put_handler.call_count | |
assert 2 == emit_handler.call_count | |
# first call to emit delivery metrics | |
actual_metrics = emit_handler.call_args_list[0][0][1][0] | |
assert [ | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|events|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|pageviews|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|in_late|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|successes|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="uuid", | |
data=( | |
"dpl_firehose_writer|network|visitor_site_ids|parsely.com|2018-08-13", | |
["e71604df-a912-455d-aaf3-a9c72a6dd86c"], | |
), | |
), | |
DeliveryMetric( | |
kind="uuid", | |
data=( | |
"dpl_firehose_writer|network|event_ids|parsely.com|2018-08-13", | |
["0xb03fb493c88d8044ee4dd912c224ebb9"], | |
), | |
), | |
] == actual_metrics | |
# second call to emit datadog metrics | |
assert [ | |
DatadogMetric( | |
name="dpl_firehose_writer.event.success", | |
type="count", | |
value=1, | |
tags=["network:parsely_com", "deployment:beta"], | |
) | |
] == emit_handler.call_args_list[1][0][1][0] | |
# fields should not be modified by the bolt | |
assert "timestamp_info.nginx_ms" not in event_dict | |
assert "visitor.site_id" not in event_dict | |
@patch( | |
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.emit", | |
autospec=True, | |
) | |
@patch( | |
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.put_records", | |
return_value=PUT_RESPONSE_ERROR_FAIL, | |
) | |
def test_process_failure(self, put_handler, emit_handler): | |
"""Ensure fatal error emits metrics.""" | |
tup = MagicMock(autospec=True) | |
event_dict = self.get_event_dict() | |
tup.values = [[event_dict]] | |
self.bolt.process_batch("abc", [tup]) | |
assert 1 == put_handler.call_count | |
assert 4 == emit_handler.call_count | |
# first call to emit delivery metrics | |
actual_metrics = emit_handler.call_args_list[0][0][1][0] | |
assert [ | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|events|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|pageviews|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|in_late|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|failures:someothererror|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="uuid", | |
data=( | |
"dpl_firehose_writer|network|visitor_site_ids|parsely.com|2018-08-13", | |
["e71604df-a912-455d-aaf3-a9c72a6dd86c"], | |
), | |
), | |
DeliveryMetric( | |
kind="uuid", | |
data=( | |
"dpl_firehose_writer|network|event_ids|parsely.com|2018-08-13", | |
["0xb03fb493c88d8044ee4dd912c224ebb9"], | |
), | |
), | |
] == actual_metrics | |
# second call to emit datadog metrics | |
assert [ | |
DatadogMetric( | |
name="dpl_firehose_writer.event.failure", | |
type="count", | |
value=1, | |
tags=[ | |
"network:parsely_com", | |
"deployment:beta", | |
"errorcode:someothererror", | |
], | |
) | |
] == emit_handler.call_args_list[1][0][1][0] | |
# retries are emitted separately | |
# third: delivery retry metrics for one retry enqueue event | |
assert [ | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|retry_enqueues|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
) | |
] == emit_handler.call_args_list[2][0][1][0] | |
# fourth: datadog retry metrics for one retry enqueue event | |
assert [ | |
DatadogMetric( | |
name="dpl_firehose_writer.event.retry_enqueue", | |
type="count", | |
value=1, | |
tags=["network:parsely_com", "deployment:beta"], | |
) | |
] == emit_handler.call_args_list[3][0][1][0] | |
# fields should not be modified by the bolt | |
assert "timestamp_info.nginx_ms" not in event_dict | |
assert "visitor.site_id" not in event_dict | |
@patch( | |
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.emit", | |
autospec=True, | |
) | |
@patch( | |
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.KinesisFirehoseWriter.put_records", | |
return_value=PUT_RESPONSE_ERROR_RETRY, | |
) | |
def test_process_retry(self, put_handler, emit_handler): | |
"""Ensure non-fatal error emits metrics.""" | |
tup = MagicMock(autospec=True) | |
event_dict = self.get_event_dict() | |
tup.values = [[event_dict]] | |
self.bolt.process_batch("abc", [tup]) | |
assert 1 == put_handler.call_count | |
assert 4 == emit_handler.call_count | |
# first call to emit delivery metrics | |
actual_metrics = emit_handler.call_args_list[0][0][1][0] | |
assert [ | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|events|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|pageviews|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|in_late|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|failures:ratelimiterror|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
), | |
DeliveryMetric( | |
kind="uuid", | |
data=( | |
"dpl_firehose_writer|network|visitor_site_ids|parsely.com|2018-08-13", | |
["e71604df-a912-455d-aaf3-a9c72a6dd86c"], | |
), | |
), | |
DeliveryMetric( | |
kind="uuid", | |
data=( | |
"dpl_firehose_writer|network|event_ids|parsely.com|2018-08-13", | |
["0xb03fb493c88d8044ee4dd912c224ebb9"], | |
), | |
), | |
] == actual_metrics | |
# second call to emit datadog metrics | |
assert [ | |
DatadogMetric( | |
name="dpl_firehose_writer.event.failure", | |
type="count", | |
value=1, | |
tags=[ | |
"network:parsely_com", | |
"deployment:beta", | |
"errorcode:ratelimiterror", | |
], | |
) | |
] == emit_handler.call_args_list[1][0][1][0] | |
# retries are emitted separately | |
# third: delivery retry metrics for one retry enqueue event | |
assert [ | |
DeliveryMetric( | |
kind="count", | |
data=( | |
( | |
"dpl_firehose_writer|network|retry_enqueues|parsely.com|2018-08-13", | |
"00:00", | |
), | |
1, | |
), | |
) | |
] == emit_handler.call_args_list[2][0][1][0] | |
# fourth: datadog retry metrics for one retry enqueue event | |
assert [ | |
DatadogMetric( | |
name="dpl_firehose_writer.event.retry_enqueue", | |
type="count", | |
value=1, | |
tags=["network:parsey_com", "deployment:beta"], | |
) | |
] == emit_handler.call_args_list[3][0][1][0] | |
# fields should not be modified by the bolt | |
assert "timestamp_info.nginx_ms" not in event_dict | |
assert "visitor.site_id" not in event_dict | |
@patch( | |
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.get_settings_from_stormconf", | |
autospec=True, | |
) | |
@patch( | |
"casterisk.bolts.data_pipeline.kinesis_firehose_writer.get_statsd", | |
autospec=True, | |
) | |
def createInstance(self, statsd, settings): | |
settings.return_value = get_settings("data_pipeline", "beta") | |
instance = KinesisFirehoseWriter( | |
input_stream=BytesIO(), output_stream=BytesIO() | |
) | |
instance.initialize({}, MagicMock(autospec=True)) | |
instance.config_fetcher = MagicMock(spec=ApikeyConfigFetcher) | |
instance.config_fetcher.get_apikey_config.return_value = APIKEY_CONFIG | |
instance.STATSD_RATE = 1 | |
instance.RATE_LIMIT_ERROR = "RateLimitError" | |
instance.RECOVERABLE_ERRORS = ["RateLimitError"] | |
assert hasattr(instance, "timestamp_override") | |
assert hasattr(instance, "sqs_retry_queue") | |
instance.timestamp_override = dt.datetime(2018, 8, 13) | |
instance.logger = MagicMock() | |
return instance |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment