Skip to content

Instantly share code, notes, and snippets.

@andreycizov
Last active May 27, 2019 08:40
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andreycizov/ee59806a3ac6955c127e511c5e84d2b6 to your computer and use it in GitHub Desktop.
Save andreycizov/ee59806a3ac6955c127e511c5e84d2b6 to your computer and use it in GitHub Desktop.
Enabling Python 3.5 Coverage support for multiprocessing Pool
import logging
import os
from multiprocessing.pool import Pool as OrigPool
from coverage import Coverage
from multiprocessing.context import Process, DefaultContext, _concrete_contexts
logger = logging.getLogger(__name__)
class CoverageProcess(Process):
def run(self):
should_cov = os.environ.get('COVERAGE_PROCESS_START', False)
if should_cov:
logger.debug('Starting coverage')
cov = Coverage(data_suffix=True, config_file='.coveragerc')
cov._warn_no_data = True
cov._warn_unimported_source = True
cov.start()
try:
super().run()
finally:
logger.debug('Stopping coverage')
cov.stop()
cov.save()
else:
super().run()
class CoverageContext(DefaultContext):
Process = CoverageProcess
def Pool(*args, **kwargs):
ctx = CoverageContext(_concrete_contexts['fork'])
return OrigPool(*args, **kwargs, context=ctx)
@daviscabral
Copy link

@LucaCappelletti94
Copy link

LucaCappelletti94 commented May 27, 2019

I have managed to track coverage within a Pool simply by adding calls to close and join, as follows:

with Pool(min(cpu_count(), len(tasks))) as p:
       list(p.imap(worker, tasks))
       p.close()
       p.join()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment