|
"""タスク処理サーバの雛形となるコード。 |
|
|
|
クライアントから時間のかかるタスク処理のリクエストを受け付け、結果を返してから接続を終了する |
|
サーバの実装について考える。要件として、 |
|
|
|
- クライアントが接続を切断したら、キャンセルと見なして、実行中のタスク処理は終了する。 |
|
- 実行中のタスクが異常終了したら、クライアントにそれを伝える。 |
|
|
|
を満たすものとする。2番めの要件は、たとえば C 言語で実装したライブラリを実行中に |
|
Segmentation Fault で異常終了した場合でも、それをクライアントへ伝えられるようにするということだ。 |
|
|
|
これらの要件を満たすサーバの雛形となるようなコードをこのモジュールで実装した。 |
|
タスク処理を子プロセスにまかせ、クライアントとの接続と子プロセスからの結果を受け取る接続との両方を |
|
`select` モジュールを利用して監視している。子プロセスの終了も監視しなくてはならないので、これはシグナルハンドラで |
|
`SIGCHLD` を処理することで実現した。 |
|
|
|
このモジュールは、1度に1つのクライアントのタスクしか処理できない。あるクライアントからのタスクを |
|
実行中に別のクライアントからの接続があっても、タスク開始はまたされるか、開始待ちのクライアントが多い場合は |
|
接続そのものが拒否される。 |
|
|
|
Example |
|
---------- |
|
接続を受け付けるアドレスとポート番号を指定して、コマンドラインから次のようにサーバを実行する。:: |
|
|
|
$ python task_server.py localhost 8000 |
|
|
|
クライアントはサーバへ接続後、``14,Hello,world!`` のようにスリープ秒数とエコーされるメッセージを |
|
カンマ区切りにして送信し、サーバからの応答を待つ。スリープすることで時間のかかるタスクをシミューレートしている。 |
|
|
|
タスク処理用の子プロセスを kill したり、処理中にクライアントから接続を切断したりして、要件どおりに |
|
動作するかを確認する。 |
|
|
|
クライアント側を Jupyter Notebook 上で実行する場合、サーバからの応答待ちでブロックしているときに |
|
Kernel を Interrupt しただけでは、接続が切断されないので、`KeyboardInterrupt` 例外をキャッチして |
|
コネクションを切断するようにしなければならない。Kernel の Restart や Shutdown だとプロセスが終了するので |
|
コネクションも切断される。 |
|
|
|
|
|
Notes |
|
-------- |
|
実装にあたって、考慮しないといけない点や落とし穴がいくつかあった。コード中のコメントも |
|
よく読むこと。 |
|
|
|
この雛形には問題点があることが後日発覚した。子プロセスが結果をパイプに書き込んで終了した場合、 |
|
`select.poll` が書き込みを検出してリターンした後にシグナルハンドラが実行されると想定していたが、 |
|
`select.poll` 中にシグナルハンドラが実行されてしまうことがある。元のコードでは、この状況を |
|
子プロセスが結果を書き込むことなく異常終了したと見なして、ハンドラが例外を raise していたため、 |
|
`poll` が再開して結果を取得することができず、タスクが失敗したことになってしまう。 |
|
|
|
""" |
|
from datetime import datetime |
|
import socket |
|
import multiprocessing as mp |
|
import sys |
|
import time |
|
import select |
|
import signal |
|
import traceback |
|
|
|
|
|
# クライアントから受信するタスク内容を表現するオブジェクトの最大サイズ |
|
_MAX_SIZE = 4096 |
|
|
|
# 親プロセスが子プロセスの終了を待ってブロックしているかどうかを示すフラグ。 |
|
_BLOCKING = False |
|
|
|
|
|
def sub_worker(conn, sleep_sec, echo_mes): |
|
"""子プロセスとして実行される Worker 関数。 |
|
|
|
`sleep_sec` 秒だけスリープ後、`echo_mes` を `conn` へ書き込む。 |
|
|
|
Notes |
|
------- |
|
意味のある仕事をするなら、`sleep_sec`, `echo_mes` ではなく、 |
|
その仕事に必要な引数を受け取り、実行し、結果を整数コードや文字列、 |
|
あるいは辞書等にして、`conn` を使って親プロセスへ送り、exitcode = 0 で |
|
正常終了する。シグナルによる異常終了等の場合、親プロセスは exitcode を見て |
|
対処する。 |
|
|
|
Parameters |
|
------------ |
|
conn : multiprocessing.connection.Connection |
|
親プロセスへ結果を返すための送信用の Connection。 |
|
|
|
sleep_sec : int |
|
スリープ時間(秒)。 |
|
|
|
echo_mes : str |
|
`conn` を通して親プロセスへ送信する文字列。 |
|
""" |
|
time.sleep(sleep_sec) |
|
conn.send(echo_mes) |
|
sys.exit(0) |
|
|
|
|
|
def main_worker(cli_sock): |
|
"""タスクを遂行する子プロセスと入出力の管理をする。 |
|
|
|
`cli_sock` からタスク内容を表すオブジェクトを読み出し、 |
|
子プロセスを起動してタスクの処理をまかせる。その後は、クライアント接続の切断、 |
|
タスクの終了、子プロセスの異常終了を監視する。タスクが成功、失敗に関わらず、 |
|
子プロセスが正常終了した場合は結果をクライアントへ送信する。 |
|
子プロセスが異常終了した場合、タスクが完了できなかったことをクライアントに通知する。 |
|
クライアント接続が切断されたときは、子プロセスを終了する。 |
|
|
|
Notes |
|
------ |
|
タスク処理をまかせる子プロセスから結果を受け取るために `multiprocessing.Pipe` を利用する。 |
|
この `Pipe` のコネクションと `cli_sock` を `select.poll()` で監視する。子プロセスの終了は、 |
|
`Pipe` のコネクションを監視しても検知できないので、`poll()` ではなく、`SIGCHLD` シグナルの |
|
ハンドラで検出する。 |
|
|
|
Parameters |
|
------------ |
|
cli_sock : socket.socket |
|
クライアントに接続済みのソケットオブジェクト。 |
|
""" |
|
global _BLOCKING |
|
|
|
# 実際に意味のある仕事をする場合、あらかじめ定義しておいたタスク内容を表現するオブジェクトを |
|
# 読み出す。 |
|
# ここでは、"10,Hello, world!" のような文字列を読み出し、"10" と "Hello, world!" に分ける。 |
|
# |
|
try: |
|
data = cli_sock.recv(_MAX_SIZE).decode('utf-8') |
|
sleep_sec, echo_mes = data.split(',', maxsplit=1) |
|
except: |
|
return |
|
|
|
conn1, conn2 = mp.Pipe(duplex=False) |
|
poll = select.poll() |
|
poll.register(cli_sock, select.POLLIN) |
|
poll.register(conn1, select.POLLIN) |
|
|
|
signal.signal(signal.SIGCHLD, sinchld_handler) |
|
|
|
proc = mp.Process(target=sub_worker, args=(conn2, int(sleep_sec), echo_mes), daemon=True) |
|
|
|
try: |
|
rdy = () |
|
# start と poll がアトミックに実行できるのが理想だが、それは無理のようだ。 |
|
# とはいえ、start と poll の間に子プロセスが終了することはないと想定するのは合理的だろう。 |
|
# poll でブロック中の間のみグローバルなフラグ _BLOCKING が True になるようにし、 |
|
# その間にシグナルハンドラが実行された場合は、子プロセスが結果を戻すことなく異常終了したと |
|
# 見なしてハンドラが InterruptedError を raise し、それによって poll を終了させるようにする。 |
|
# これにより、子プロセスが異常終了した場合でも poll が無限に待つことがないようにする。 |
|
# |
|
# 冒頭の Note に書いたように、子プロセスが結果を書き込んだ後で終了した場合でも |
|
# poll がシグナルハンドラで割り込まれてしまうことがあるので、この処理は正しくない。 |
|
# この場合でもシグナルハンドラが例外を raise しなければ poll が再開し、子プロセスの書き込みを |
|
# 検出できるのでうまくいくが(Python 3.5 以降)、子プロセスが本当に結果を書き込まずに異常終了すると |
|
# poll から戻ってこれなくなるのでやはりうまくいかない。 |
|
# |
|
proc.start() |
|
_BLOCKING = True |
|
rdy = poll.poll() |
|
_BLOCKING = False |
|
|
|
except InterruptedError: |
|
# 子プロセスが終了したことがブロック中に検知された。 |
|
print(traceback.format_exc()) |
|
|
|
if proc.exitcode < 0: |
|
try: |
|
cli_sock.sendall( |
|
'The Worker process was killed by Signal {}.'.format(-proc.exitcode).encode('utf-8')) |
|
except: |
|
pass |
|
elif proc.exitcode > 0: |
|
try: |
|
cli_sock.sendall( |
|
'The Worker process was terminated by unknown error'.encode('utf-8')) |
|
except: |
|
pass |
|
finally: |
|
_BLOCKING = False |
|
|
|
# 想定しているイベントは、クライアントからの切断と子プロセスからの送信だけである。 |
|
# どちらのイベントでも適切な処理をして直ちに抜ける。 |
|
# それ以外のイベント(たとえば、クライアントからの余計な入力)があった場合でも抜ける。 |
|
# |
|
for fd, ev in rdy: |
|
if (fd == cli_sock.fileno()) and (ev & select.POLLHUP): |
|
proc.terminate() |
|
break |
|
if (fd == conn1.fileno()) and (ev & select.POLLIN): |
|
try: |
|
cli_sock.sendall(conn1.recv().encode('utf-8')) |
|
except: |
|
pass |
|
break |
|
proc.terminate() |
|
break |
|
|
|
proc.join() |
|
proc.exitcode # これを入れないと、ゾンビプロセスになる。 |
|
conn1.close() |
|
conn2.close() |
|
|
|
|
|
def sinchld_handler(signo, _): |
|
print('Catch signal: ', str(signo)) |
|
if _BLOCKING: |
|
raise InterruptedError('Interrupted by signal {}.'.format(signo)) |
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
# クライアントからの接続要求を待ち受けるアドレスとポート番号を、この順で引数に与える |
|
args = sys.argv |
|
address = (args[1], int(args[2])) |
|
|
|
print('Starting the server at', datetime.now()) |
|
print('Waiting for a client to connect.') |
|
|
|
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
server.bind(address) |
|
server.listen(3) |
|
|
|
try: |
|
while True: |
|
client, addr = server.accept() |
|
print('At', datetime.now(), 'accepted connection from', addr) |
|
|
|
main_worker(client) |
|
client.close() |
|
finally: |
|
server.close() |