Skip to content

Instantly share code, notes, and snippets.

@knzm
Forked from kumagi/flask_debug.py
Last active October 10, 2015 11:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save knzm/3ecb2d5189fd376ec2fc to your computer and use it in GitHub Desktop.
Save knzm/3ecb2d5189fd376ec2fc to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from __future__ import print_function
from flask import Flask, render_template, Response
import socket
from contextlib import closing
from threading import Thread
from time import sleep
import logging
logger = None
PORT = 4444
app = Flask(__name__)
shared = [None]
def udp_fire():
""" sender """
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with closing(sock):
while True:
sock.sendto("this is udp", ("127.0.0.1", PORT))
sleep(0.5)
def recv_mock():
""" receiver """
while True:
yield "this is mock"
sleep(0.5)
def recv_udp():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with closing(sock):
sock.bind(("0.0.0.0", PORT))
while True:
yield sock.recv(14000)
# while True:
# try:
# sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# with closing(sock):
# sock.bind(("0.0.0.0", PORT))
# while True:
# yield sock.recv(14000)
# except socket.error, e:
# if e.errno != 48:
# raise
# # Address already in use
# logger.error(e)
# sleep(1)
def prod():
# switch these two data
gen = (
#recv_mock()
recv_udp()
)
while True:
data = gen.next()
shared[0] = data
logger.info("prod(): [{d}]".format(d=shared[0]))
def cons():
while True:
data = shared[0]
logger.info("cons(): [{d}]".format(d=data))
yield data
consumer = cons()
@app.route('/')
def index():
return consumer.next()
def start_threads():
udp_sender = Thread(target=udp_fire)
udp_sender.daemon = True
udp_sender.start()
producer = Thread(target=prod)
producer.daemon = True
producer.start()
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s (pid=%(process)d,%(threadName)s) %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
use_reloader = True
import os
if use_reloader and os.environ.get('WERKZEUG_RUN_MAIN') != 'true':
# This is a reloader process itself.
pass
else:
start_threads()
app.run(host='0.0.0.0', debug=True, use_reloader=use_reloader)
# app.run(host='0.0.0.0') # <- it works well with UDP
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment