Skip to content

Instantly share code, notes, and snippets.

@AndreiD
Created January 31, 2020 15:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AndreiD/233d988852a58c9736854ee0f9d9cb4a to your computer and use it in GitHub Desktop.
Save AndreiD/233d988852a58c9736854ee0f9d9cb4a to your computer and use it in GitHub Desktop.
google_pub_sub_python
import json
import logging
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from google.cloud import pubsub_v1
# imports the credential file
path_service_account = "read_stream_key.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
# set the subscription id
subscriptionID = 'projects/beamplay/subscriptions/highTechHospitalSubX0001'
subscriber = pubsub_v1.SubscriberClient()
class FormatStreamData(beam.DoFn):
def process(self, element):
data = json.loads(element)
print("{} {} has {} beats per minute".format(data["patient"]["last_name"], data["patient"]["first_name"],
data["bpm"]))
yield data
def run():
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
options.view_as(StandardOptions).runner = 'DirectRunner'
with beam.Pipeline(options=options) as p:
(
p
| 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription=subscriptionID)
| 'Log results' >> beam.ParDo(FormatStreamData())
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment