Skip to content

Instantly share code, notes, and snippets.

@kumagi
Created October 10, 2015 05:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kumagi/dc55bc4484f3e452b3c1 to your computer and use it in GitHub Desktop.
Save kumagi/dc55bc4484f3e452b3c1 to your computer and use it in GitHub Desktop.
39, 40行目のコメントアウトを切り替えると動かなくなる。その場合は67行目を68行目で置き換えれば動く。
#!/usr/bin/env python
from __future__ import print_function
from flask import Flask, render_template, Response
import socket
from contextlib import closing
from threading import Thread
from time import sleep
PORT = 4444
app = Flask(__name__)
shared = [None]
def udp_fire():
""" sender """
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with closing(sock):
while True:
sock.sendto("this is udp", ("127.0.0.1", PORT))
sleep(0.5)
def recv_mock():
""" receiver """
while True:
yield "this is mock"
sleep(0.5)
def recv_udp():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with closing(sock):
sock.bind(("0.0.0.0", PORT))
while True:
yield sock.recv(14000)
def prod():
# switch these two data
gen = (
#recv_mock()
recv_udp()
)
while True:
data = gen.next()
shared[0] = data
print("prod(): [{d}]".format(d=shared[0]))
def cons():
while True:
data = shared[0]
print("cons(): [{d}]".format(d=data))
yield data
consumer = cons()
@app.route('/')
def index():
return consumer.next()
if __name__ == '__main__':
udp_sender = Thread(target=udp_fire)
udp_sender.daemon = True
udp_sender.start()
producer = Thread(target=prod)
producer.daemon = True
producer.start()
app.run(host='0.0.0.0', debug=True)
# app.run(host='0.0.0.0') # <- it works well with UDP
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment