Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
google_pub_sub_python
import json
import logging
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from google.cloud import pubsub_v1
# imports the credential file
path_service_account = "read_stream_key.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
# set the subscription id
subscriptionID = 'projects/beamplay/subscriptions/highTechHospitalSubX0001'
subscriber = pubsub_v1.SubscriberClient()
class FormatStreamData(beam.DoFn):
def process(self, element):
data = json.loads(element)
print("{} {} has {} beats per minute".format(data["patient"]["last_name"], data["patient"]["first_name"],
data["bpm"]))
yield data
def run():
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
options.view_as(StandardOptions).runner = 'DirectRunner'
with beam.Pipeline(options=options) as p:
(
p
| 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription=subscriptionID)
| 'Log results' >> beam.ParDo(FormatStreamData())
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment