Skip to content

Instantly share code, notes, and snippets.

@moradology
Last active February 28, 2024 19:09
Show Gist options
  • Save moradology/89970afb1ce03580a3334c717b21ec09 to your computer and use it in GitHub Desktop.
Save moradology/89970afb1ce03580a3334c717b21ec09 to your computer and use it in GitHub Desktop.
estimate pi in beam
import apache_beam as beam
import random
def inside(p):
x, y = random.random(), random.random()
return x*x + y*y < 1
def estimate_pi(total_points):
inside_points = sum(1 for _ in range(total_points) if inside(None))
return 4 * inside_points / total_points
with beam.Pipeline() as pipeline:
(
pipeline
| 'Create points' >> beam.Create([1] * 100000) # make this guy as big as you like
| 'Filter inside points' >> beam.Filter(inside)
| 'Count inside points' >> beam.combiners.Count.Globally()
| 'Estimate Pi' >> beam.Map(estimate_pi)
| 'Print result' >> beam.Map(print)
)
import apache_beam as beam
import random
class GenerateRandomPoint(beam.DoFn):
def process(self, element):
for _ in range(element):
x, y = random.random(), random.random()
yield x*x + y*y < 1
def estimate_pi(total_points, inside_points):
return 4 * inside_points / total_points
with beam.Pipeline() as pipeline:
(
pipeline
| 'Create seed for workers' >> beam.Create([10000] * 10) # Create 10 elements, each an instruction to generate 10,000 points
| 'Generate points on workers' >> beam.ParDo(GenerateRandomPoint())
| 'Filter inside points' >> beam.Filter(lambda x: x) # Remove points outside the circle
| 'Count inside points' >> beam.combiners.Count.Globally()
| 'Estimate Pi' >> beam.Map(lambda inside_points: estimate_pi(100000, inside_points))
| 'Print result' >> beam.Map(print)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment