Skip to content

Instantly share code, notes, and snippets.

@chianingwang
Created January 18, 2019 18:59
Show Gist options
  • Save chianingwang/0683c96915428aa3b9c9d8a67501f083 to your computer and use it in GitHub Desktop.
Save chianingwang/0683c96915428aa3b9c9d8a67501f083 to your computer and use it in GitHub Desktop.
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Preserved eventlet-aware ThreadPool from older Swift code."""
import os
import sys
import eventlet
from eventlet import sleep, Timeout, tpool, greenthread, greenio, event
import eventlet.queue
stdlib_queue = eventlet.patcher.original('Queue')
stdlib_threading = eventlet.patcher.original('threading')
class ThreadPoolDead(Exception):
pass
class ThreadPool(object):
"""
Perform blocking operations in background threads.
Call its methods from within greenlets to green-wait for results without
blocking the eventlet reactor (hopefully).
"""
BYTE = 'a'.encode('utf-8')
def __init__(self, nthreads=2):
self.nthreads = nthreads
self._run_queue = stdlib_queue.Queue()
self._result_queue = stdlib_queue.Queue()
self._threads = []
self._alive = True
if nthreads <= 0:
return
# We spawn a greenthread whose job it is to pull results from the
# worker threads via a real Queue and send them to eventlet Events so
# that the calling greenthreads can be awoken.
#
# Since each OS thread has its own collection of greenthreads, it
# doesn't work to have the worker thread send stuff to the event, as
# it then notifies its own thread-local eventlet hub to wake up, which
# doesn't do anything to help out the actual calling greenthread over
# in the main thread.
#
# Thus, each worker sticks its results into a result queue and then
# writes a byte to a pipe, signaling the result-consuming greenlet (in
# the main thread) to wake up and consume results.
#
# This is all stuff that eventlet.tpool does, but that code can't have
# multiple instances instantiated. Since the object server uses one
# pool per disk, we have to reimplement this stuff.
_raw_rpipe, self.wpipe = os.pipe()
self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb')
for _junk in range(nthreads):
thr = stdlib_threading.Thread(
target=self._worker,
args=(self._run_queue, self._result_queue))
thr.daemon = True
thr.start()
self._threads.append(thr)
# This is the result-consuming greenthread that runs in the main OS
# thread, as described above.
self._consumer_coro = greenthread.spawn_n(self._consume_results,
self._result_queue)
def _worker(self, work_queue, result_queue):
"""
Pulls an item from the queue and runs it, then puts the result into
the result queue. Repeats forever.
:param work_queue: queue from which to pull work
:param result_queue: queue into which to place results
"""
while True:
item = work_queue.get()
if item is None:
break
ev, func, args, kwargs = item
try:
result = func(*args, **kwargs)
result_queue.put((ev, True, result))
except BaseException:
result_queue.put((ev, False, sys.exc_info()))
finally:
work_queue.task_done()
os.write(self.wpipe, self.BYTE)
def _consume_results(self, queue):
"""
Runs as a greenthread in the same OS thread as callers of
run_in_thread().
Takes results from the worker OS threads and sends them to the waiting
greenthreads.
"""
while True:
try:
self.rpipe.read(1)
except ValueError:
# can happen at process shutdown when pipe is closed
break
while True:
try:
ev, success, result = queue.get(block=False)
except stdlib_queue.Empty:
break
try:
if success:
ev.send(result)
else:
ev.send_exception(*result)
finally:
queue.task_done()
def run_in_thread(self, func, *args, **kwargs):
"""
Runs ``func(*args, **kwargs)`` in a thread. Blocks the current greenlet
until results are available.
Exceptions thrown will be reraised in the calling thread.
If the threadpool was initialized with nthreads=0, it invokes
``func(*args, **kwargs)`` directly, followed by eventlet.sleep() to
ensure the eventlet hub has a chance to execute. It is more likely the
hub will be invoked when queuing operations to an external thread.
:returns: result of calling func
:raises: whatever func raises
"""
if not self._alive:
raise ThreadPoolDead()
if self.nthreads <= 0:
result = func(*args, **kwargs)
sleep()
return result
ev = event.Event()
self._run_queue.put((ev, func, args, kwargs), block=False)
# blocks this greenlet (and only *this* greenlet) until the real
# thread calls ev.send().
result = ev.wait()
return result
def _run_in_eventlet_tpool(self, func, *args, **kwargs):
"""
Really run something in an external thread, even if we haven't got any
threads of our own.
"""
def inner():
try:
return (True, func(*args, **kwargs))
except (Timeout, BaseException) as err:
return (False, err)
success, result = tpool.execute(inner)
if success:
return result
else:
raise result
def force_run_in_thread(self, func, *args, **kwargs):
"""
Runs ``func(*args, **kwargs)`` in a thread. Blocks the current greenlet
until results are available.
Exceptions thrown will be reraised in the calling thread.
If the threadpool was initialized with nthreads=0, uses eventlet.tpool
to run the function. This is in contrast to run_in_thread(), which
will (in that case) simply execute func in the calling thread.
:returns: result of calling func
:raises: whatever func raises
"""
if not self._alive:
raise ThreadPoolDead()
if self.nthreads <= 0:
return self._run_in_eventlet_tpool(func, *args, **kwargs)
else:
return self.run_in_thread(func, *args, **kwargs)
def terminate(self):
"""
Releases the threadpool's resources (OS threads, greenthreads, pipes,
etc.) and renders it unusable.
Don't call run_in_thread() or force_run_in_thread() after calling
terminate().
"""
self._alive = False
if self.nthreads <= 0:
return
for _junk in range(self.nthreads):
self._run_queue.put(None)
for thr in self._threads:
thr.join()
self._threads = []
self.nthreads = 0
greenthread.kill(self._consumer_coro)
self.rpipe.close()
os.close(self.wpipe)
class EventletCompatibleThreadPool(object):
"""
It is dangerous to use the 'threading' module simultaneously with Eventlet,
because Eventlet assumes an event-driven model with cooperatively yielding
green threads as opposed to the threading module's real threads. See
http://eventlet.net/doc/threading.html for reference.
This class piggybacks on the ThreadPool class that has been inlined inside
common.eventlet_threadpool since it was removed from swift.common.utils.
By default, an EventletCompatibleThreadPool has only one thread, in order
to make sure that its functionality doesn't use too much CPU in aggregate
(which, like threaded I/O, can cause Eventlet reactor-thread starvation).
If the functions being run in the thread pool are not CPU-intensive, a
subclass of EventletCompatibleThreadPool can override N_THREADS, or a new
value can be passed into the constructor.
The constructor kwarg, use_eventlet_aware_threadpool can be set to false to
just run everything inline in the current thread.
"""
N_THREADS = 1
def __init__(self, *args, **kwargs):
n_threads = kwargs.pop('n_threads', self.N_THREADS)
use_eventlet_aware_threadpool = kwargs.pop(
'use_eventlet_aware_threadpool', True)
if use_eventlet_aware_threadpool:
self.thread_pool = ThreadPool(n_threads)
else:
self.thread_pool = None
def _run_in_thread_pool(self, func, *args, **kwargs):
if self.thread_pool:
return self.thread_pool.run_in_thread(func, *args, **kwargs)
else:
return func(*args, **kwargs)
def terminate(self):
"""
Indicates the thread pool will no longer be used and consumed resources
may be freed.
"""
if self.thread_pool:
self.thread_pool.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment