Skip to content

Instantly share code, notes, and snippets.

@CyrusF
Last active December 4, 2019 19:07
Show Gist options
  • Select an option

  • Save CyrusF/35538c61ca7525e4566d9d8171f70e59 to your computer and use it in GitHub Desktop.

Select an option

Save CyrusF/35538c61ca7525e4566d9d8171f70e59 to your computer and use it in GitHub Desktop.
课设III
import requests
import json
import time
import base64
from threading import Thread
#request:
# {
# "ts": 0,
# "content": "",
# "delay": 0
# }
#
#submit response:
# {
# "status": "",
# "message": "" # quota status message
# }
#query response:
# {
# "content": ""
# }
class ClientRequest:
content = ""
delay = 0 # ms
target = "" # server's uri
_got_result = False
_result = ""
_ts = -1
def __init__(self, content, delay, target):
self.content = content
self.delay = delay
self.target = target
def get_id(self):
return self._ts
def send(self):
self._ts = time.time_ns()
body = json.dumps({
"ts": self._ts,
"content": self.content,
"delay": self.delay
})
response = json.loads(requests.post(self.target+"/submit", json=body).content)
return response
def get(self):
body = json.dumps({
"ts": self._ts,
})
response = json.loads(requests.post(self.target+"/query", json=body).content)
if response["status"] == "ok":
self._result = bytes(response["content"], encoding="utf8")
self._got_result = True
return response
def verify(self):
if self._got_result:
decoded_content = str(base64.b64decode(self._result), encoding="utf8")
return decoded_content == self.content
else:
return None
TIMER = int(time.time()*1000)
def new_request(content, delay, interval):
c = ClientRequest(content, delay, "http://127.0.0.1:2333")
response = c.send()
print("Task [%s] submitted, task status = [%s], quota message = [%s]" % (c.get_id(), response["status"], response["message"]))
while c.get()["status"] != "ok":
time.sleep(interval/1000.0)
print("[%07d] Task [%s] result: %s" % (int(time.time()*1000-TIMER), c.get_id(), c.verify()))
def background_request(content, delay, interval):
p = Thread(target=new_request, args=(content, delay, interval))
p.start()
def test_1():
background_request("fuck", 2000, 500)
background_request("fuck", 2000, 500)
background_request("fuck", 2000, 500)
background_request("fuck", 2000, 500)
background_request("fuck", 2000, 500)
background_request("fuck", 2000, 500)
background_request("fuck", 2000, 500)
# test "set expansion timer" (no expansion)
# expected 5+2
def test_2():
background_request("fuck", 3000, 500)
time.sleep(1)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
# test "delay expansion"
# expected 1+4+1+1
def test_3():
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
time.sleep(2.5)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
# test "do expansion"
# expected: 5+10+2
def test_4():
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
time.sleep(2.5)
background_request("fuck", 3000, 500)
time.sleep(1)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
time.sleep(3.5)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
# test "do expansion" and "delay reduction" and "cancel reduction"
# expected: 5+(1+9)+7
def test_5():
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
time.sleep(2.5)
background_request("fuck", 3000, 500)
time.sleep(1)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
time.sleep(3.5)
background_request("fuck", 3000, 500)
time.sleep(3.5)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
background_request("fuck", 3000, 500)
# test "do expansion" and "delay reduction" and "set expansion timer" (no expansion) again
# expected: 5+(1+9)+5+2
if __name__ == "__main__":
# test_1()
# test_2()
# test_3()
# test_4()
# test_5()
background_request("fuck", 2000, 500)
from flask import Flask, render_template, render_template_string, request, jsonify
import json
import time
import base64
from threading import Thread, Lock
import copy
app = Flask(__name__)
QUOTA_MAX = 80
QUOTA_MIN = 5
QUOTA_NOW = QUOTA_MIN
QUOTA_LIMIT = 80 # expansion_limit = reduction_limit = QUOTA_NOW * QUOTA_LIMIT/100
QUOTA_STEP = 100 # expansion_step = (1+QUOTA_STEP/100), reduction_step = 1/(1+QUOTA_STEP/100)
QUOTA_DELAY = 2000 # ms
THREAD_POOL = {}
TASK_WAITING = {}
TASK_ACTIVATED = {}
QUOTA_TIMER = 0
QUOTA_TIME_NONE = "none"
QUOTA_TIME_EXPANSION = "expansion"
QUOTA_TIME_REDUCTION = "reduction"
QUOTA_TIMER_STATUS = QUOTA_TIME_NONE
QUOTA_MUTEX = Lock()
QUOTA_ALERT = False
class Task:
content = ""
delay = 0
_done = False
_result = ""
def __init__(self, content, delay):
self.content = content
self.delay = delay
def run(self):
time.sleep(self.delay/1000.0)
self._result = str(base64.b64encode(bytes(self.content, encoding="utf8")), encoding="utf8")
self._done = True
def get_result(self):
return self._done, self._result
def alert():
global QUOTA_ALERT
QUOTA_ALERT = True
print("Thread quota exceeded alert")
def executor(task_id):
global QUOTA_NOW
global QUOTA_TIMER_STATUS
global QUOTA_TIMER
global QUOTA_ALERT
THREAD_POOL[task_id].run()
THREAD_POOL.pop(task_id)
while len(THREAD_POOL) < QUOTA_NOW and len(TASK_WAITING) > 0:
new_task_id = copy.deepcopy(min(TASK_WAITING.keys()))
new_task = copy.deepcopy(TASK_WAITING[new_task_id])
TASK_WAITING.pop(new_task_id)
controller(new_task, new_task_id)
if len(THREAD_POOL) < int(QUOTA_NOW * QUOTA_LIMIT/100.0):
QUOTA_ALERT = False
if QUOTA_TIMER_STATUS == QUOTA_TIME_REDUCTION:
if int(time.time()*1000) - QUOTA_TIMER > QUOTA_DELAY:
QUOTA_NOW = max(QUOTA_MIN, int(QUOTA_NOW / (1 + QUOTA_STEP/100.0))) # no less than QUOTA_MIN
QUOTA_TIMER_STATUS = QUOTA_TIME_NONE
else:
pass
else:
QUOTA_TIMER_STATUS = QUOTA_TIME_REDUCTION
QUOTA_TIMER = int(time.time()*1000)
elif len(THREAD_POOL) == int(QUOTA_NOW * QUOTA_LIMIT):
QUOTA_ALERT = False
QUOTA_TIMER_STATUS = QUOTA_TIME_NONE
else:
pass
def controller(task, task_id):
QUOTA_MUTEX.acquire()
global QUOTA_NOW
global QUOTA_TIMER_STATUS
global QUOTA_TIMER
if len(THREAD_POOL) + 1 > int(QUOTA_NOW * QUOTA_LIMIT/100.0):
if QUOTA_TIMER_STATUS == QUOTA_TIME_EXPANSION:
if int(time.time()*1000) - QUOTA_TIMER > QUOTA_DELAY:
if QUOTA_NOW == QUOTA_MAX:
TASK_WAITING[task_id] = task
alert()
QUOTA_MUTEX.release()
return "error", "exceeded"
else:
QUOTA_NOW = min(QUOTA_MAX, int(QUOTA_NOW * (1 + QUOTA_STEP/100.0))) # no more than QUOTA_MAX
if len(THREAD_POOL) < int(QUOTA_NOW):
TASK_ACTIVATED[task_id] = task
THREAD_POOL[task_id] = TASK_ACTIVATED[task_id]
p = Thread(target=executor, args=(task_id,))
p.start()
else:
TASK_WAITING[task_id] = task
QUOTA_TIMER_STATUS = QUOTA_TIME_NONE
QUOTA_MUTEX.release()
return "ok", "do expansion"
else:
if len(THREAD_POOL) < int(QUOTA_NOW):
TASK_ACTIVATED[task_id] = task
THREAD_POOL[task_id] = TASK_ACTIVATED[task_id]
p = Thread(target=executor, args=(task_id,))
p.start()
else:
TASK_WAITING[task_id] = task
QUOTA_MUTEX.release()
return "ok", "delay expansion"
else:
QUOTA_TIMER_STATUS = QUOTA_TIME_EXPANSION
QUOTA_TIMER = int(time.time()*1000)
if len(THREAD_POOL) < int(QUOTA_NOW):
TASK_ACTIVATED[task_id] = task
THREAD_POOL[task_id] = TASK_ACTIVATED[task_id]
p = Thread(target=executor, args=(task_id,))
p.start()
else:
TASK_WAITING[task_id] = task
QUOTA_MUTEX.release()
return "ok", "set expansion timer"
elif len(THREAD_POOL) + 1 == int(QUOTA_NOW * QUOTA_LIMIT):
TASK_ACTIVATED[task_id] = task
THREAD_POOL[task_id] = TASK_ACTIVATED[task_id]
p = Thread(target=executor, args=(task_id,))
p.start()
QUOTA_TIMER_STATUS = QUOTA_TIME_NONE
QUOTA_MUTEX.release()
return "ok", "cancel operation"
else:
TASK_ACTIVATED[task_id] = task
THREAD_POOL[task_id] = TASK_ACTIVATED[task_id]
p = Thread(target=executor, args=(task_id,))
p.start()
QUOTA_MUTEX.release()
return "ok", "normal"
@app.route("/", methods=["GET"])
def panel():
return render_template_string("""
<html><body>
<h1><center>Status Panel</h1></center>
<hr>
<pre>
Quota_now: %d
All_task: %d
Waiting_task: %d
<hr>
Quota_max: %d
Quota_min: %d
Quota_limit: %d%% (expansion_limit = reduction_limit = Quota_now * Quota_limit)
Quota_step: %d%% (expansion_result = Quota_now * (1+Quota_step), reduction_result = Quota_now / (1+Quota_step))
Quota_delay: %dms (After Quota_delay of higher/lower than limit, do operation of quota)
</pre>
<hr>
<center>Python HTTP Server</center>
</body></html>""" % (QUOTA_NOW, len(TASK_ACTIVATED), len(TASK_WAITING), QUOTA_MAX, QUOTA_MIN, QUOTA_LIMIT, QUOTA_STEP, QUOTA_DELAY))
@app.route("/submit", methods=["POST"])
def submit():
request_body = request.json
request_json = json.loads(request_body)
result = controller(Task(request_json["content"], request_json["delay"]), request_json["ts"])
return jsonify({
"status": result[0],
"message": result[1]
})
@app.route("/query", methods=["POST"])
def query():
print(len(THREAD_POOL), QUOTA_NOW)
request_body = request.json
request_json = json.loads(request_body)
if request_json["ts"] in TASK_ACTIVATED.keys():
result = TASK_ACTIVATED[request_json["ts"]].get_result()
if result[0]:
return jsonify({
"status": "ok",
"content": result[1]
})
else:
return jsonify({
"status": "working",
"content": ""
})
else:
return jsonify({
"status": "waiting",
"content": ""
})
if __name__ == "__main__":
app.run(debug=False, host="127.0.0.1", port=2333)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment