Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Last active August 14, 2019 16:09
Show Gist options
  • Save gxercavins/825e4bd43526d778704a2cb9f92acdb2 to your computer and use it in GitHub Desktop.
Save gxercavins/825e4bd43526d778704a2cb9f92acdb2 to your computer and use it in GitHub Desktop.
SO question 57496400
import apache_beam as beam
import logging
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
PROJECT = 'PROJECT_ID'
class Sentiment(beam.DoFn):
def process(self, element):
element = element.split(",")
client = language.LanguageServiceClient()
document = types.Document(content=element[2],
type=enums.Document.Type.PLAIN_TEXT)
sentiment = client.analyze_sentiment(document).document_sentiment
return [{
'name': element[0],
'title': element[1],
'magnitude': sentiment.magnitude,
'score': sentiment.score
}]
class LogFn(beam.DoFn):
def process(self, element):
logging.info(element)
yield element
def main():
BUCKET = 'BUCKET_NAME'
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/tmp/'.format(BUCKET),
'--runner=DataflowRunner',
'--job_name=examplejob2',
'--save_main_session',
'--requirements_file=requirements.txt'
]
p = beam.Pipeline(argv=argv)
(p
| 'ReadData' >> beam.Create(['Guillem, SO question 57496400, I dont like this answer :('])
| 'ParseCSV' >> beam.ParDo(Sentiment())
| 'LogResults' >> beam.ParDo(LogFn()))
p.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment