Skip to content

Instantly share code, notes, and snippets.

@epwalsh
Last active August 26, 2021 16:08
Show Gist options
  • Save epwalsh/e43b8af900d01534cbf921ab52bc0d67 to your computer and use it in GitHub Desktop.
Save epwalsh/e43b8af900d01534cbf921ab52bc0d67 to your computer and use it in GitHub Desktop.
MultiProcessDataLoader zombie workers
import signal
import logging
import time
from transformers import AutoTokenizer
from allennlp.data.instance import Instance
from allennlp.data.dataset_readers import DatasetReader
from allennlp.data.data_loaders import MultiProcessDataLoader
from allennlp.data.fields import TransformerTextField
from allennlp.data.vocabulary import Vocabulary
logging.basicConfig(level=logging.INFO)
class SigTermInterrupt(Exception):
pass
def handle_sigterm(sig, frame):
raise SigTermInterrupt
class MockDatasetReader(DatasetReader):
NUM_INSTANCES = 1000
MODEL_NAME = "epwalsh/bert-xsmall-dummy"
def __init__(self, **kwargs) -> None:
super().__init__(
manual_distributed_sharding=True, manual_multiprocess_sharding=True, **kwargs
)
self.tokenizer = AutoTokenizer.from_pretrained(self.MODEL_NAME)
def _read(self, file_path: str):
for i in self.shard_iterable(range(self.NUM_INSTANCES)):
source = f"Hi there, I'm the {i}th instance"
target = f"Hello, {i}th instance!"
yield self.text_to_instance(i, source, target)
def text_to_instance(self, index: int, source: str, target: str) -> Instance: # type: ignore
return Instance(
{
"source": TransformerTextField(**self.tokenizer(source)),
"target": TransformerTextField(**self.tokenizer(target)),
}
)
def main():
signal.signal(signal.SIGTERM, handle_sigterm) # try commenting this out to see the issue
reader = MockDatasetReader()
loader = MultiProcessDataLoader(
reader=reader,
data_path="this doens't matter",
num_workers=1,
batch_size=2,
max_instances_in_memory=8,
start_method="fork", # also try with "spawn"
)
vocab = Vocabulary.from_pretrained_transformer(reader.MODEL_NAME)
loader.index_with(vocab)
for i, batch in enumerate(loader):
time.sleep(0.1)
print(f"Processing batch {i+1}")
if __name__ == "__main__":
main()
"""
Run this file from the command line while watching the Python processes from another terminal:
```bash
watch -n0.5 "pgrep Python | xargs ps | grep -vE 'nvim|neovim'"
```
The 'nvim|neovim' is to filter out unrelated Python processes that I don't care about (in my case, nvim plugins).
"""
import subprocess
import os
import time
popen = subprocess.Popen(["python", "load.py"], preexec_fn=os.setpgrp)
# Pause to let the subprocess start.
time.sleep(10)
print("terminating the process")
popen.terminate()
time.sleep(2)
popen.kill()
popen.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment