Skip to content

Instantly share code, notes, and snippets.

@fortune
Last active July 26, 2021 03:56
Show Gist options
  • Save fortune/2d6430cf7f1536401aeedfa8d920598d to your computer and use it in GitHub Desktop.
Save fortune/2d6430cf7f1536401aeedfa8d920598d to your computer and use it in GitHub Desktop.
A sample code of Python's select module

Python の select モジュールを使ったコードサンプル

クライアントからの時間のかかるタスク処理要求を受け付けるサーバを、select モジュールによる入出力の多重化を利用して実装してみた。子プロセスの終了監視のために signal モジュールも利用している。

最初の実装にはバグがあったので、それは failed_task_server.py というファイル名で残した。select モジュールの poll で書き込みを監視している子プロセスが終了した場合、パイプに書き込んで正常終了したときでも、たまに poll が書き込みを検出して戻る前にシグナルハンドラで割り込まれてしまうことがあるのが問題だった。修正したものをあらためて task_server.py とした。

シグナルハンドラは、コード中のどこでも呼び出される可能性があるので、例外を raise する場合には注意しなければならないことの確認のためにテスト用のスクリプト sig_handler_test.py を作った。

"""タスク処理サーバの雛形となるコード。
クライアントから時間のかかるタスク処理のリクエストを受け付け、結果を返してから接続を終了する
サーバの実装について考える。要件として、
- クライアントが接続を切断したら、キャンセルと見なして、実行中のタスク処理は終了する。
- 実行中のタスクが異常終了したら、クライアントにそれを伝える。
を満たすものとする。2番めの要件は、たとえば C 言語で実装したライブラリを実行中に
Segmentation Fault で異常終了した場合でも、それをクライアントへ伝えられるようにするということだ。
これらの要件を満たすサーバの雛形となるようなコードをこのモジュールで実装した。
タスク処理を子プロセスにまかせ、クライアントとの接続と子プロセスからの結果を受け取る接続との両方を
`select` モジュールを利用して監視している。子プロセスの終了も監視しなくてはならないので、これはシグナルハンドラで
`SIGCHLD` を処理することで実現した。
このモジュールは、1度に1つのクライアントのタスクしか処理できない。あるクライアントからのタスクを
実行中に別のクライアントからの接続があっても、タスク開始はまたされるか、開始待ちのクライアントが多い場合は
接続そのものが拒否される。
Example
----------
接続を受け付けるアドレスとポート番号を指定して、コマンドラインから次のようにサーバを実行する。::
$ python task_server.py localhost 8000
クライアントはサーバへ接続後、``14,Hello,world!`` のようにスリープ秒数とエコーされるメッセージを
カンマ区切りにして送信し、サーバからの応答を待つ。スリープすることで時間のかかるタスクをシミューレートしている。
タスク処理用の子プロセスを kill したり、処理中にクライアントから接続を切断したりして、要件どおりに
動作するかを確認する。
クライアント側を Jupyter Notebook 上で実行する場合、サーバからの応答待ちでブロックしているときに
Kernel を Interrupt しただけでは、接続が切断されないので、`KeyboardInterrupt` 例外をキャッチして
コネクションを切断するようにしなければならない。Kernel の Restart や Shutdown だとプロセスが終了するので
コネクションも切断される。
Notes
--------
実装にあたって、考慮しないといけない点や落とし穴がいくつかあった。コード中のコメントも
よく読むこと。
この雛形には問題点があることが後日発覚した。子プロセスが結果をパイプに書き込んで終了した場合、
`select.poll` が書き込みを検出してリターンした後にシグナルハンドラが実行されると想定していたが、
`select.poll` 中にシグナルハンドラが実行されてしまうことがある。元のコードでは、この状況を
子プロセスが結果を書き込むことなく異常終了したと見なして、ハンドラが例外を raise していたため、
`poll` が再開して結果を取得することができず、タスクが失敗したことになってしまう。
"""
from datetime import datetime
import socket
import multiprocessing as mp
import sys
import time
import select
import signal
import traceback
# クライアントから受信するタスク内容を表現するオブジェクトの最大サイズ
_MAX_SIZE = 4096
# 親プロセスが子プロセスの終了を待ってブロックしているかどうかを示すフラグ。
_BLOCKING = False
def sub_worker(conn, sleep_sec, echo_mes):
"""子プロセスとして実行される Worker 関数。
`sleep_sec` 秒だけスリープ後、`echo_mes` を `conn` へ書き込む。
Notes
-------
意味のある仕事をするなら、`sleep_sec`, `echo_mes` ではなく、
その仕事に必要な引数を受け取り、実行し、結果を整数コードや文字列、
あるいは辞書等にして、`conn` を使って親プロセスへ送り、exitcode = 0 で
正常終了する。シグナルによる異常終了等の場合、親プロセスは exitcode を見て
対処する。
Parameters
------------
conn : multiprocessing.connection.Connection
親プロセスへ結果を返すための送信用の Connection。
sleep_sec : int
スリープ時間(秒)。
echo_mes : str
`conn` を通して親プロセスへ送信する文字列。
"""
time.sleep(sleep_sec)
conn.send(echo_mes)
sys.exit(0)
def main_worker(cli_sock):
"""タスクを遂行する子プロセスと入出力の管理をする。
`cli_sock` からタスク内容を表すオブジェクトを読み出し、
子プロセスを起動してタスクの処理をまかせる。その後は、クライアント接続の切断、
タスクの終了、子プロセスの異常終了を監視する。タスクが成功、失敗に関わらず、
子プロセスが正常終了した場合は結果をクライアントへ送信する。
子プロセスが異常終了した場合、タスクが完了できなかったことをクライアントに通知する。
クライアント接続が切断されたときは、子プロセスを終了する。
Notes
------
タスク処理をまかせる子プロセスから結果を受け取るために `multiprocessing.Pipe` を利用する。
この `Pipe` のコネクションと `cli_sock` を `select.poll()` で監視する。子プロセスの終了は、
`Pipe` のコネクションを監視しても検知できないので、`poll()` ではなく、`SIGCHLD` シグナルの
ハンドラで検出する。
Parameters
------------
cli_sock : socket.socket
クライアントに接続済みのソケットオブジェクト。
"""
global _BLOCKING
# 実際に意味のある仕事をする場合、あらかじめ定義しておいたタスク内容を表現するオブジェクトを
# 読み出す。
# ここでは、"10,Hello, world!" のような文字列を読み出し、"10" と "Hello, world!" に分ける。
#
try:
data = cli_sock.recv(_MAX_SIZE).decode('utf-8')
sleep_sec, echo_mes = data.split(',', maxsplit=1)
except:
return
conn1, conn2 = mp.Pipe(duplex=False)
poll = select.poll()
poll.register(cli_sock, select.POLLIN)
poll.register(conn1, select.POLLIN)
signal.signal(signal.SIGCHLD, sinchld_handler)
proc = mp.Process(target=sub_worker, args=(conn2, int(sleep_sec), echo_mes), daemon=True)
try:
rdy = ()
# start と poll がアトミックに実行できるのが理想だが、それは無理のようだ。
# とはいえ、start と poll の間に子プロセスが終了することはないと想定するのは合理的だろう。
# poll でブロック中の間のみグローバルなフラグ _BLOCKING が True になるようにし、
# その間にシグナルハンドラが実行された場合は、子プロセスが結果を戻すことなく異常終了したと
# 見なしてハンドラが InterruptedError を raise し、それによって poll を終了させるようにする。
# これにより、子プロセスが異常終了した場合でも poll が無限に待つことがないようにする。
#
# 冒頭の Note に書いたように、子プロセスが結果を書き込んだ後で終了した場合でも
# poll がシグナルハンドラで割り込まれてしまうことがあるので、この処理は正しくない。
# この場合でもシグナルハンドラが例外を raise しなければ poll が再開し、子プロセスの書き込みを
# 検出できるのでうまくいくが(Python 3.5 以降)、子プロセスが本当に結果を書き込まずに異常終了すると
# poll から戻ってこれなくなるのでやはりうまくいかない。
#
proc.start()
_BLOCKING = True
rdy = poll.poll()
_BLOCKING = False
except InterruptedError:
# 子プロセスが終了したことがブロック中に検知された。
print(traceback.format_exc())
if proc.exitcode < 0:
try:
cli_sock.sendall(
'The Worker process was killed by Signal {}.'.format(-proc.exitcode).encode('utf-8'))
except:
pass
elif proc.exitcode > 0:
try:
cli_sock.sendall(
'The Worker process was terminated by unknown error'.encode('utf-8'))
except:
pass
finally:
_BLOCKING = False
# 想定しているイベントは、クライアントからの切断と子プロセスからの送信だけである。
# どちらのイベントでも適切な処理をして直ちに抜ける。
# それ以外のイベント(たとえば、クライアントからの余計な入力)があった場合でも抜ける。
#
for fd, ev in rdy:
if (fd == cli_sock.fileno()) and (ev & select.POLLHUP):
proc.terminate()
break
if (fd == conn1.fileno()) and (ev & select.POLLIN):
try:
cli_sock.sendall(conn1.recv().encode('utf-8'))
except:
pass
break
proc.terminate()
break
proc.join()
proc.exitcode # これを入れないと、ゾンビプロセスになる。
conn1.close()
conn2.close()
def sinchld_handler(signo, _):
print('Catch signal: ', str(signo))
if _BLOCKING:
raise InterruptedError('Interrupted by signal {}.'.format(signo))
if __name__ == '__main__':
# クライアントからの接続要求を待ち受けるアドレスとポート番号を、この順で引数に与える
args = sys.argv
address = (args[1], int(args[2]))
print('Starting the server at', datetime.now())
print('Waiting for a client to connect.')
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(address)
server.listen(3)
try:
while True:
client, addr = server.accept()
print('At', datetime.now(), 'accepted connection from', addr)
main_worker(client)
client.close()
finally:
server.close()
"""シグナルハンドラから例外を raise する実験
基本的にシグナルハンドラがどこで実行されるかはわからない。
なので、シグナルハンドラが例外を raise する場合は注意する必要がある。
シグナルハンドラから普通にリターンすると、ハンドラによっていわば割り込まれた箇所から
再開される。システムコール中にハンドラに割り込まれた場合も、そのシステムコール中で
再開される(Python 3.5 から。それ以前は、InterruptedError がシステムコールから raise された)。
ハンドラが例外を raise すると、割り込まれた箇所からその例外が raise されることになる。
システムコール実行中だった場合、そのシステムコールから raise される。
Example
-----------
子プロセスに Sleep させる秒数を指定して次のように実行する。
$ python sig_handler_test.py 5
Python プロセスは、子プロセスの終了を `SIGCHLD` に対するシグナルハンドラで監視するように
設定し、指定された秒数だけスリープするだけの子プロセスを開始し、無限ループに入る。
シグナルハンドラは、`InterruptedError` を raise する。シグナルハンドラで割り込まれたのが
try ブロックの中かそうでないかで結果が変わってくる。
"""
import multiprocessing as mp
import sys
import time
import signal
import traceback
def sub_worker(sleep_sec):
"""子プロセスが実行するタスクとなる関数
タスクといっても `sleep_sec` 秒だけスリープするだけ。
"""
time.sleep(sleep_sec)
sys.exit(0)
def main_worker(sleep_sec):
"""タスクを実行する子プロセスを実行する。
"""
signal.signal(signal.SIGCHLD, sinchld_handler)
proc = mp.Process(target=sub_worker, args=(sleep_sec,), daemon=True)
proc.start()
i = 0
while True:
# 無限ループ中に子プロセスが終了し、SIGCHLD によるシグナルハンドラで
# 割り込まれる。割り込まれたのが try 中ならばプロセスは終了しないが、
# try 中でなければ、ハンドラの raise した例外でこのプロセスが終了する。
try:
i += 1
except InterruptedError:
print('i = {}'.format(i))
print(traceback.format_exc())
def sinchld_handler(signo, _):
print('Catch signal: ', str(signo))
raise InterruptedError('Interrupted by signal {}.'.format(signo))
if __name__ == '__main__':
main_worker(int(sys.argv[1]))
"""タスク処理サーバの雛形となるコード。
クライアントから時間のかかるタスク処理のリクエストを受け付け、結果を返してから接続を終了する
サーバの実装について考える。要件として、
- クライアントが接続を切断したら、キャンセルと見なして、実行中のタスク処理は終了する。
- 実行中のタスクが異常終了したら、クライアントにそれを伝える。
を満たすものとする。2番めの要件は、たとえば C 言語で実装したライブラリを実行中に
Segmentation Fault で異常終了した場合でも、それをクライアントへ伝えられるようにするということだ。
これらの要件を満たすサーバの雛形となるようなコードをこのモジュールで実装した。
タスク処理を子プロセスにまかせ、クライアントとの接続と子プロセスからの結果を受け取る接続との両方を
`select` モジュールを利用して監視している。子プロセスの終了も監視しなくてはならないので、これはシグナルハンドラで
`SIGCHLD` を処理することで実現した。
このモジュールは、1度に1つのクライアントのタスクしか処理できない。あるクライアントからのタスクを
実行中に別のクライアントからの接続があっても、タスク開始はまたされるか、開始待ちのクライアントが多い場合は
接続そのものが拒否される。
Example
----------
接続を受け付けるアドレスとポート番号を指定して、コマンドラインから次のようにサーバを実行する。::
$ python task_server.py localhost 8000
クライアントはサーバへ接続後、``14,Hello,world!`` のようにスリープ秒数とエコーされるメッセージを
カンマ区切りにして送信し、サーバからの応答を待つ。スリープすることで時間のかかるタスクをシミューレートしている。
タスク処理用の子プロセスを kill したり、処理中にクライアントから接続を切断したりして、要件どおりに
動作するかを確認する。
クライアント側を Jupyter Notebook 上で実行する場合、サーバからの応答待ちでブロックしているときに
Kernel を Interrupt しただけでは、接続が切断されないので、`KeyboardInterrupt` 例外をキャッチして
コネクションを切断するようにしなければならない。Kernel の Restart や Shutdown だとプロセスが終了するので
コネクションも切断される。
Notes
--------
実装にあたって、考慮しないといけない点や落とし穴がいくつかあった。コード中のコメントも
よく読むこと。
"""
from datetime import datetime
import socket
import multiprocessing as mp
import sys
import time
import select
import signal
import traceback
# クライアントから受信するタスク内容を表現するオブジェクトの最大サイズ
_MAX_SIZE = 4096
_CUR_WORKER = None
def sub_worker(conn, sleep_sec, echo_mes):
"""子プロセスとして実行される Worker 関数。
`sleep_sec` 秒だけスリープ後、`echo_mes` を `conn` へ書き込む。
Notes
-------
意味のある仕事をするなら、`sleep_sec`, `echo_mes` ではなく、
その仕事に必要な引数を受け取り、実行し、結果を整数コードや文字列、
あるいは辞書等にして、`conn` を使って親プロセスへ送り、exitcode = 0 で
正常終了する。シグナルによる異常終了等の場合、親プロセスは exitcode を見て
対処する。
Parameters
------------
conn : multiprocessing.connection.Connection
親プロセスへ結果を返すための送信用の Connection。
sleep_sec : int
スリープ時間(秒)。
echo_mes : str
`conn` を通して親プロセスへ送信する文字列。
"""
time.sleep(sleep_sec)
conn.send(echo_mes)
sys.exit(0)
def main_worker(cli_sock):
"""タスクを遂行する子プロセスと入出力の管理をする。
`cli_sock` からタスク内容を表すオブジェクトを読み出し、
子プロセスを起動してタスクの処理をまかせる。その後は、クライアント接続の切断、
タスクの終了、子プロセスの異常終了を監視する。タスクが成功、失敗に関わらず、
子プロセスが正常終了した場合は結果をクライアントへ送信する。
子プロセスが異常終了した場合、タスクが完了できなかったことをクライアントに通知する。
クライアント接続が切断されたときは、子プロセスを終了する。
Notes
------
タスク処理をまかせる子プロセスから結果を受け取るために `multiprocessing.Pipe` を利用する。
この `Pipe` のコネクションと `cli_sock` を `select.poll()` で監視する。子プロセスの終了は、
`Pipe` のコネクションを監視しても検知できないので、`poll()` ではなく、`SIGCHLD` シグナルの
ハンドラで検出する。
Parameters
------------
cli_sock : socket.socket
クライアントに接続済みのソケットオブジェクト。
"""
global _CUR_WORKER
# 実際に意味のある仕事をする場合、あらかじめ定義しておいたタスク内容を表現するオブジェクトを
# 読み出す。
# ここでは、"10,Hello, world!" のような文字列を読み出し、"10" と "Hello, world!" に分ける。
#
try:
data = cli_sock.recv(_MAX_SIZE).decode('utf-8')
sleep_sec, echo_mes = data.split(',', maxsplit=1)
except:
return
conn1, conn2 = mp.Pipe(duplex=False)
poll = select.poll()
poll.register(cli_sock, select.POLLIN)
poll.register(conn1, select.POLLIN)
signal.signal(signal.SIGCHLD, sinchld_handler)
proc = mp.Process(target=sub_worker, args=(conn2, int(sleep_sec), echo_mes), daemon=True)
try:
rdy = ()
# シグナルハンドラは、poll でブロック中に子プロセスが結果を書き込まずに異常終了(exitcode != 0)した
# 場合には InterruptedError を raise して poll に割り込まねばならない。そのためにグローバルな
# _CUR_WORKER でクリティカルな領域を囲むようにする。
#
# シグナルハンドラは、監視中のプロセスが存在して(_CUR_WORKER is not None)、それが異常終了(exitcode != 0)した
# 場合のみ InterruptedError を raise する。これにより、確実に対象となる子プロセスがクリティカル領域で異常終了した場合には
# except ブロックに入ることができる。
#
# 仮に poll 実行中に子プロセスが結果を書き戻して正常終了したとき、poll が書き込みを検出する前に
# シグナルハンドラが実行されたとしても、ハンドラは例外を raise しないので、poll は再開されて
# 子プロセスの書き込みを検出して普通にリターンできる(Python 3.5 以降)なので問題はない。
#
_CUR_WORKER = proc
proc.start()
rdy = poll.poll()
_CUR_WORKER = None
except InterruptedError:
_CUR_WORKER = None
# 子プロセスが終了したことがブロック中に検知された。
print(traceback.format_exc())
if proc.exitcode < 0:
try:
cli_sock.sendall(
'The Worker process was killed by Signal {}.'.format(-proc.exitcode).encode('utf-8'))
except:
pass
elif proc.exitcode > 0:
try:
cli_sock.sendall(
'The Worker process was terminated by unknown error'.encode('utf-8'))
except:
pass
finally:
_CUR_WORKER = None
# 想定しているイベントは、クライアントからの切断と子プロセスからの送信だけである。
# どちらのイベントでも適切な処理をして直ちに抜ける。
# それ以外のイベント(たとえば、クライアントからの余計な入力)があった場合でも抜ける。
#
for fd, ev in rdy:
if (fd == cli_sock.fileno()) and (ev & select.POLLHUP):
proc.terminate()
break
if (fd == conn1.fileno()) and (ev & select.POLLIN):
try:
cli_sock.sendall(conn1.recv().encode('utf-8'))
except:
pass
break
proc.terminate()
break
proc.join()
proc.exitcode # これを入れないと、ゾンビプロセスになる。
conn1.close()
conn2.close()
def sinchld_handler(signo, _):
print('Catch signal: ', str(signo))
if _CUR_WORKER is None:
return
exitcode = _CUR_WORKER.exitcode
if exitcode is not None and exitcode != 0:
raise InterruptedError('Interrupted by signal {}.'.format(signo))
if __name__ == '__main__':
# クライアントからの接続要求を待ち受けるアドレスとポート番号を、この順で引数に与える
args = sys.argv
address = (args[1], int(args[2]))
print('Starting the server at', datetime.now())
print('Waiting for a client to connect.')
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(address)
server.listen(3)
try:
while True:
client, addr = server.accept()
print('At', datetime.now(), 'accepted connection from', addr)
main_worker(client)
client.close()
finally:
server.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment