Created
May 27, 2018 20:05
-
-
Save kaxil/f8af9c6215edd1bcc4c790e4e9949e05 to your computer and use it in GitHub Desktop.
Google Cloud Pub/Sub to Bokeh Dashboard - Streaming Dashboard
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# User module to receive tweets | |
from recevie_tweets_pubsub import receive_tweets | |
import pandas | |
from bokeh.io import curdoc | |
from bokeh.models import ColumnDataSource | |
from bokeh.models import DatetimeTickFormatter | |
from bokeh.plotting import figure, output_file | |
import sys | |
import os | |
sys.path.append(os.getcwd()) | |
import settings | |
TWEET_FILTER = settings.TWEET_DASHBOARD_FILTER | |
GCP_PROJECT_NAME = settings.GCP_PROJECT_NAME | |
PUBSUB_SUBSCRIPTION_NAME = settings.PUBSUB_SUBSCRIPTION_DASHBOARD_NAME | |
source = ColumnDataSource(dict(x=[], y=[])) | |
# create a new plot with a a datetime axis type | |
p = figure(width=1080, height=600, x_axis_type="datetime") | |
p.circle(source=source, x='x', y='y', legend=TWEET_FILTER, color='red', size=5) | |
p.line(source=source, x='x', y='y', legend=TWEET_FILTER, color='red', line_width=2) | |
p.title.text = "Tweet Count" | |
p.grid.grid_line_alpha = 0 | |
p.xaxis.axis_label = 'Time' | |
p.yaxis.axis_label = 'Tweets' | |
p.ygrid.band_fill_color = "olive" | |
p.ygrid.band_fill_alpha = 0.1 | |
p.xaxis.formatter = DatetimeTickFormatter(seconds=["%H:%M:%S"], | |
minutes=["%H:%M:%S"], | |
minsec=["%H:%M:%S"], | |
hours=["%H:%M:%S"]) | |
def update_data(): | |
message = receive_tweets(project=GCP_PROJECT_NAME, | |
subscription_name=PUBSUB_SUBSCRIPTION_NAME, | |
tweet_filter=TWEET_FILTER)[0] | |
tweet_time = pandas.to_datetime(message['timestamp'], unit='s') | |
print('Tweet Time: ', tweet_time) | |
tweets_count = int(str(message['data']).strip().split(':')[1]) | |
new_data = dict(x=[tweet_time], y=[tweets_count]) | |
# After 10 seconds rollover the data | |
source.stream(new_data=new_data, rollover=10) | |
curdoc().add_root(p) | |
# Update the data every 10seconds (1000*10 ms) | |
curdoc().add_periodic_callback(callback=update_data, period_milliseconds=1000 * 10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment