Skip to content

Instantly share code, notes, and snippets.

@pabloem
Last active November 2, 2022 21:25
Show Gist options
  • Save pabloem/62596c602943e7783228cd1a875956e8 to your computer and use it in GitHub Desktop.
Save pabloem/62596c602943e7783228cd1a875956e8 to your computer and use it in GitHub Desktop.
Calculating Pi with a Monte Carlo algorithm and using Beam Batched DoFns

Calculating Pi using Beam and Batched DoFns

This is a set of two sample Beam Pipelines that we use to calculate Pi with a monte carlo algorithm that relies on generating random points in an n-dimentional cube, and validating whether they are located within a circle in that space.

These examples are meant to showcase the use of Beam's Batched DoFns developed by @TheNeuralBit.

From testing on my laptop, batched DoFns have slightly better performance; however, I expect the performance gains to be larger with larger data sizes.

To try out the example:

# Create your own virtual environment for Python

# Install dependencies
pip install -r requirements.txt

# Run simulations
python simple_numeric.py
apache-beam[gcp]>=2.42.0
numpy
import apache_beam as beam
import random
import numpy as np
import typing
class GenerateSamples(beam.DoFn):
def __init__(self, maximum_radius, dimensions):
self.radius = maximum_radius
self.dimensions = dimensions
def process(self, elm):
random.seed(elm)
yield (random.random() * self.radius for _ in range(self.dimensions))
def individual_circle_montecarlo(samples, radius=1, dimensions=2):
with beam.Pipeline() as p:
result = (
p
| beam.Create(list(range(samples)))
# Generate random points in an n-dimensional space.
| beam.ParDo(GenerateSamples(radius, dimensions))
# Verify if the points are within the circle.
| beam.Map(
lambda x: 1 if sum(dim * dim for dim in x) <= radius * radius else 0
)
| beam.CombineGlobally(sum)
)
result | "final_ratio" >> beam.Map(
lambda tot: print(
"pablito", tot, samples, radius * radius * 4 * tot / samples
)
)
class BatchedGenerateSamples(beam.DoFn):
"""Generate random points in an n-dimensional space."""
def __init__(self, dimensions, radius):
self.dimensions = dimensions
self.radius = radius
def process_batch(self, seeds: np.ndarray) -> typing.Iterator[np.ndarray]:
radiuses = np.random.rand(*seeds.shape, self.dimensions) * self.radius
yield radiuses * radiuses
def infer_output_type(self, input_element_type):
return np.int64
class BatchedSumAndCheck(beam.DoFn):
"""Verify if the points are within the circle.
This is a DoFn that consumes batches, but yields individual elements.
"""
def __init__(self, radius):
self.radius = radius
@beam.DoFn.yields_elements
def process_batch(self, radiuses: np.ndarray) -> typing.Iterator[int]:
sums = radiuses.sum(axis=1)
in_or_out = sum(sums < self.radius * self.radius)
yield in_or_out
def batched_circle_montecarlo(samples: int, radius=1, dimensions=2):
with beam.Pipeline() as p:
result = (
p
| beam.Create(list(range(samples))).with_output_types(np.int64)
| beam.ParDo(BatchedGenerateSamples(2, 1))
| beam.ParDo(BatchedSumAndCheck(1))
| beam.CombineGlobally(sum)
)
result | "final_ratio" >> beam.Map(
lambda tot: print(
"pablito", tot, samples, radius * radius * 4 * tot / samples
)
)
if __name__ == "__main__":
import time
samples = 800000
s1 = time.time()
individual_circle_montecarlo(samples)
print("took ", time.time() - s1, "seconds individually")
s2 = time.time()
batched_circle_montecarlo(samples)
print("took ", time.time() - s2, "seconds batchedly")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment