Skip to content

Instantly share code, notes, and snippets.

View markcutajar's full-sized avatar
💭
One cannot step twice in the same river - Heraclitus

Mark Cutajar markcutajar

💭
One cannot step twice in the same river - Heraclitus
View GitHub Profile
@markcutajar
markcutajar / beam-spark-notes.md
Last active October 13, 2020 18:51
Running an Apache Beam pipeline in a Windows Local Spark Cluster

Running an Apache Beam pipeline in a Windows Local Spark Cluster

This file will include details on attempting to run an apache beam pipeline on a windows spark setup. Unlike many other techies out there, i am a firm Microsoft fan. I do believe they build quality products and over the last few years since Satya Nadella was handed the position of CEO, we're seeing massive strides on Microsoft products.

To that point, all my machines at home are windows based including my processing machine which i mosty use to run models or occasionally play a few games. A couple of weeks ago i decided to build a simple Apache Beam pipeline to process forex tick data into windowed candle stick form. Now this can be easily done in pandas if the data was smaller, having a 7GB tick data file, in order to do it using pandas we'd have to split the file into day or maybe week segments and process them one by one. In order to obviously avoid the simple way out, i decided to attempt to build a beam pipeline as it is promised to pr

@markcutajar
markcutajar / fxbeam-v1-read-data.py
Last active March 15, 2021 20:46
Fx-Beam reading data from text
rows = self.pipeline | 'Read data file' >> ReadFromText(
self.input_file_pattern,
compression_type=_compression
)
@markcutajar
markcutajar / fxbeam-v1-input-types.py
Last active March 15, 2021 20:45
FxBeam process different input types
if self.input_file_type == 'csv':
rows = rows | 'Convert CSV to tick elements' >> beam.ParDo(
ParseDataRows(),
headers=self.input_columns
)
else:
rows = rows | 'Convert JSON to tick elements' >> beam.Map(json.loads)
@markcutajar
markcutajar / fxbeam-v1-process-timestamps.py
Last active March 15, 2021 20:45
FxBeam processing timestamps
rows = rows | 'Convert to timestamp field' >> beam.ParDo(
ToTimestamp(),
string_format=self.TICK_DATA_TIMESTAMP_FORMAT,
datetime_key=self.input_columns[0],
timestamp_key='timestamp'
)
rows = rows | 'Convert to datetime object' >> beam.ParDo(
AddTimestamp(),
timestamp_key='timestamp'
@markcutajar
markcutajar / fxbeam-v1-add-timestamp.py
Last active March 15, 2021 20:44
FxBeam AddTimestamp function
class AddTimestamp(beam.DoFn):
"""ParDo take a timestamp column and assign it
to row to produce a timestamped value
"""
def process(self, element, timestamp_key, delete_key=False, **kwargs):
"""
:param element: Element being processed
:param timestamp_key: Field containing the timestamp
:param delete_key: Whether to delete the original timestamp or not
:return: Elements with a datetime filed
@markcutajar
markcutajar / fxbeam-v1-creating-timestamp-group.py
Created March 15, 2021 20:47
FxBeam create timestamp group
...
yield {
**element,
time_group_key: element['timestamp'] // window_size
}
@markcutajar
markcutajar / fxbeam-v1-mapping-before-combine.py
Last active March 15, 2021 20:48
FxBeam mapping before combine
def map_elements(self, data):
"""Map function to create key:value pairs to run CombinePerKey function.
:param data: PCollection being processed with time_group_key column and
instrument_column if set to use.
:return: PCollection with mapped data
"""
action_name = 'Resampler - Map data'
if self.instrument_column:
return data | action_name >> beam.Map(
lambda x: ((x['time_group_key'], x[self.instrument_column]), x)
@markcutajar
markcutajar / beam-custom-combiner.py
Created March 15, 2021 20:49
Apace Beam custom combiner exampler
class CustomCombineFn(beam.CombineFn):
def create_accumulator(self):
return # create accumulator here
def add_input(self, current_accumulation, input):
return # perform accumulation here
def merge_accumulators(self, accumulators):
return # merge accumulators running un different nodes
@markcutajar
markcutajar / fxbeam-v1-get-value-ohlcv.py
Created March 15, 2021 20:51
FxBeam get value to calculate OHLCV
@staticmethod
def get_value(item):
"""Function to calculate the value of the element.
This is done so we can easily change how the value is calculated,
by changing it in this one place.
:param item: Tick item
:return: Value of tick item
"""
return item['ask']
@markcutajar
markcutajar / beam-custom-unit-test.py
Created March 15, 2021 20:52
Apache Beam custom unit test
class CustomTestCase(unittest.TestCase):
def setUp(self):
pipeline_options = PipelineOptions(['--runner=DirectRunner'])
pipeline_options.view_as(SetupOptions).save_main_session = True
self.pipeline = beam.Pipeline(options=pipeline_options)
class TestFunctionX(CustomTestCase):
def test_run(self):
with self.pipeline: