Skip to content

Instantly share code, notes, and snippets.

@szczeles
Last active March 31, 2020 15:02
Show Gist options
  • Save szczeles/91dd08ce7f0096381c05f802af586a0e to your computer and use it in GitHub Desktop.
Save szczeles/91dd08ce7f0096381c05f802af586a0e to your computer and use it in GitHub Desktop.
import unittest
import shutil
import os
import json
from datetime import datetime
from pyspark.sql import SparkSession
class TestIntegration(unittest.TestCase):
INPUT_EVENTS = "/tmp/input_events"
INPUT_ADS = "/tmp/input_ads"
OUTPUT_JOINED = "/tmp/output_joined"
OUTPUT_INVALID = "/tmp/output_invalid"
def test_enrichment(self):
# given
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
### TODO
def add_event(self, ts, user_id, ad_id):
self.spark.createDataFrame(
[(ts, user_id, ad_id)],
['ts', 'user_id', 'ad_id']) \
.write.parquet(f'{self.INPUT_EVENTS}/dt={ts.date()}', mode='append')
def add_ad(self, id, name):
with open(f'{self.INPUT_ADS}/sample.json', 'a+') as f:
json.dump({'id': id, 'name': name}, f)
f.write('\n')
def setUp(self):
for path in [self.INPUT_EVENTS, self.INPUT_ADS,
self.OUTPUT_JOINED, self.OUTPUT_INVALID]:
shutil.rmtree(path, True)
os.makedirs(path)
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.enableHiveSupport().getOrCreate()
@classmethod
def tearDownClass(cls):
cls.spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment