Skip to content

Instantly share code, notes, and snippets.

@shibacow
Last active November 26, 2017 18:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shibacow/c625de61349dd37c641318b824b6d880 to your computer and use it in GitHub Desktop.
Save shibacow/c625de61349dd37c641318b824b6d880 to your computer and use it in GitHub Desktop.
Cloud pubsub検証用コード
TOPIC='[TOPIC_NAME]'
GOOGLE_CLOUD_PROJECT='[PROJECT_ID]'
SUBSCRIBE='[SUBSCRIBER_NAME]'
MONGO_DB='MONGO_DB_NAME'
JSON_KEY='PATH_TO_JSON_KEY'
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import os
import logging
logging.basicConfig(level=logging.INFO)
from datetime import datetime
from pymongo import MongoClient
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-p", "--part",
type="int", dest="part", default=0)
(options, args) = parser.parse_args()
def main():
mp=MongoClient()
db=mp.pubsub
col=db.data
part=options.part
al=list(col.find({"part":part}).sort([['count',1]]))
size=len(al)
logging.info("part={} size={}".format(part,size))
for i,a in enumerate(al):
if i>1:
aa=al[i-1]
if a['count']-aa['count']>1:
logging.info(a['count'])
if __name__=='__main__':main()
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import os
from google.cloud import pubsub
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
from google.oauth2 import service_account
PROJECT=os.getenv('GOOGLE_CLOUD_PROJECT')
TOPIC=os.getenv('TOPIC')
SUBSCRIBE=os.getenv('SUBSCRIBE')
JSON_KEY=os.getenv('JSON_KEY')
def init():
credentials = service_account.Credentials.from_service_account_file(JSON_KEY)
scoped_credentials = credentials.with_scopes(
['https://www.googleapis.com/auth/cloud-platform'])
publisher = pubsub.PublisherClient(credentials=scoped_credentials)
topic=publisher.topic_path(PROJECT,TOPIC)
subscriber = pubsub.SubscriberClient(credentials=scoped_credentials)
subscription_path = subscriber.subscription_path(PROJECT,SUBSCRIBE)
return publisher,topic,subscriber,subscription_path
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import os
import json
import time
import logging
logging.basicConfig(level=logging.INFO)
from datetime import datetime
from common import init
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-p", "--part",
type="int", dest="part", default=0)
parser.add_option("-s", "--size",
type="int", dest="size", default=0)
(options, args) = parser.parse_args()
def send(publisher,topic,i,part):
now=datetime.now()
dkt={'count':i,
'time':now.strftime("%Y-%m-%d %H:%M:%S"),
'part':part}
data=json.dumps(dkt).encode('utf-8')
response = publisher.publish(topic, data)
def main():
publisher,topic,subscriber,subscription_path=init()
part=options.part
size=options.size
for i in range(size):
send(publisher,topic,i,part)
if i%1000==0:
logging.info("part={} i={}".format(part,i))
time.sleep(5)
if __name__=='__main__':main()
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import json
import time
import logging
logging.basicConfig(level=logging.INFO)
from datetime import datetime
from pymongo import MongoClient
from common import init
mp=MongoClient()
db=mp.pubsub
col=db.data
counter=0
def receive_messages(subscriber,subscription_path):
"""Receives messages from a pull subscription."""
def callback(message):
global counter
#logging.info('Received message: {}'.format(message))
dt=message.data
dt=json.loads(dt.decode('utf-8'))
if dt['count']%1000==0:
logging.info(dt)
counter+=1
tm=datetime.strptime(dt['time'],"%Y-%m-%d %H:%M:%S")
dt['time']=tm
dt['received_time']=datetime.now()
dt['received_counter']=counter
col.insert_one(dt)
message.ack()
subscriber.subscribe(subscription_path, callback=callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
now=datetime.now()
logging.info("tm={} counter={}".format(now,counter))
time.sleep(5)
def main():
publisher,topic,subscriber,subscription_path=init()
receive_messages(subscriber,subscription_path)
if __name__=='__main__':main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment