-
-
Save stuartarchibald/7051191e0f0cb09ba9c2d42c7c21ced6 to your computer and use it in GitHub Desktop.
@overload with first filter being on hardware.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import ctypes | |
import operator | |
import threading | |
import numpy as np | |
from numba import njit, jit, types | |
from numba.extending import overload, intrinsic | |
from numba.core.extending_hardware import ( | |
JitDecorator, | |
hardware_registry, | |
GPU, | |
current_target, | |
hardware_target, | |
CPU, | |
) | |
from numba.core import registry, utils, errors | |
from numba.core.dispatcher import Dispatcher | |
from numba.core.descriptors import TargetDescriptor | |
from numba.core import cpu, typing, cgutils | |
from numba.core.base import BaseContext | |
from numba.core.compiler_lock import global_compiler_lock | |
from numba.core.utils import cached_property | |
from numba.core import callconv, decorators | |
from numba.core.codegen import BaseCPUCodegen, JITCodeLibrary | |
from numba.core.callwrapper import PyCallWrapper | |
from numba.core.imputils import RegistryLoader, Registry | |
from numba import _dynfunc | |
import llvmlite.binding as ll | |
from llvmlite import ir | |
from numba.core.runtime import rtsys | |
from numba import njit | |
from numba.core import ir | |
from numba.core import compiler | |
from numba.core.compiler import CompilerBase, DefaultPassBuilder | |
from numba.core.compiler_machinery import FunctionPass, register_pass | |
from numba.core.typed_passes import PreLowerStripPhis | |
from numbers import Number | |
# This script demonstrates some ideas about how to add support to Numba to make | |
# it easier to specialise functionality on specific hardware. In this script a | |
# fake target, the Dummy Processing Unit (DPU), is employed for demonstrative | |
# purposes. The DPU borrows a lot from the CPU for ease, but is sufficiently | |
# removed to make it possible to show what is required to add a new target and | |
# use it. This code is experimental and relies on a horror show of hacks | |
# internally, don't expect things to work :) | |
# | |
# The way this all works relies on a couple of concepts: | |
# | |
# 1. That, where appropriate, familiar decorators such as `@overload` take a new | |
# kwarg "hardware". This a string token associated with a piece of hardware | |
# in the hardware hierarchy and is present to denote that "this function is | |
# for the noted hardware". | |
# 2. A hardware hierarchy, this is a conceptual necessity and is used to | |
# describe how various pieces of physical hardware are related to each other. | |
# The root of the hierarchy is "generic" hardware, from this extends CPU and | |
# GPU, and from GPU extends CUDA and ROCm. Examples of functions that could | |
# belong to each category: | |
# - generic: the cpython `hash` function, this just calls `obj.__hash__`, | |
# it's entirely unspecialised to hardware. | |
# - cpu: numba's gdb support, requires fork(2), only available to call from | |
# a CPU and linux. | |
# - gpu: a theoretical but not yet written "sync" function that encapsulates | |
# synchronisation across threads in a work unit. | |
# - cuda: something CUDA specific, like cuda.atomic.add | |
# - rocm: something ROCm specific, like roc.shared.array | |
# | |
# The following code demonstrates what is currently necessary to create the most | |
# basic support for a new hardware target, in this case the DPU. For | |
# demonstration purposes the DPU is arbitrarily declared a member of the | |
# GPU family of hardware. | |
# Define a new target, this hardware extends GPU, this places the DPU in the | |
# hardware hierarchy. | |
class DPU(GPU): | |
... | |
# register the dpu hardware hierarchy token in the hardware registry | |
hardware_registry["dpu"] = DPU | |
# Create a JIT DPU codegen for the DPU target | |
class JITDPUCodegen(BaseCPUCodegen): | |
# This largely rips off the CPU for ease | |
_library_class = JITCodeLibrary | |
def _customize_tm_options(self, options): | |
# Customize the target machine options. | |
# As long as we don't want to ship the code to another machine, | |
# we can specialize for this CPU. | |
options["cpu"] = self._get_host_cpu_name() | |
# LLVM 7 change: # https://reviews.llvm.org/D47211#inline-425406 | |
# JIT needs static relocation on x86* | |
# native target is already initialized from base class __init__ | |
arch = ll.Target.from_default_triple().name | |
if arch.startswith("x86"): # one of x86 or x86_64 | |
reloc_model = "static" | |
elif arch.startswith("ppc"): | |
reloc_model = "pic" | |
else: | |
reloc_model = "default" | |
options["reloc"] = reloc_model | |
options["codemodel"] = "jitdefault" | |
# Set feature attributes (such as ISA extensions) | |
# This overrides default feature selection by CPU model above | |
options["features"] = self._tm_features | |
# Deal with optional argument to ll.Target.create_target_machine | |
sig = utils.pysignature(ll.Target.create_target_machine) | |
if "jit" in sig.parameters: | |
# Mark that this is making a JIT engine | |
options["jit"] = True | |
def _customize_tm_features(self): | |
# For JIT target, we will use LLVM to get the feature map | |
return self._get_host_cpu_features() | |
def _add_module(self, module): | |
self._engine.add_module(module) | |
def set_env(self, env_name, env): | |
"""Set the environment address. | |
Update the GlobalVariable named *env_name* to the address of *env*. | |
""" | |
gvaddr = self._engine.get_global_value_address(env_name) | |
envptr = (ctypes.c_void_p * 1).from_address(gvaddr) | |
envptr[0] = ctypes.c_void_p(id(env)) | |
# This is the function registry for the dpu, it just has one, this one! | |
dpu_function_registry = Registry() | |
# Implement a new context for the DPU target | |
class DPUContext(BaseContext): | |
allow_dynamic_globals = True | |
# Overrides | |
def create_module(self, name): | |
return self._internal_codegen._create_empty_module(name) | |
@global_compiler_lock | |
def init(self): | |
self._internal_codegen = JITDPUCodegen("numba.exec") | |
# Initialize NRT runtime | |
rtsys.initialize(self) | |
self.refresh() | |
def refresh(self): | |
registry = dpu_function_registry | |
try: | |
loader = self._registries[registry] | |
except KeyError: | |
loader = RegistryLoader(registry) | |
self._registries[registry] = loader | |
self.install_registry(registry) | |
# Also refresh typing context, since @overload declarations can | |
# affect it. | |
self.typing_context.refresh() | |
@property | |
def target_data(self): | |
return self._internal_codegen.target_data | |
def codegen(self): | |
return self._internal_codegen | |
# Borrow the CPU call conv | |
@cached_property | |
def call_conv(self): | |
return callconv.CPUCallConv(self) | |
def get_env_body(self, builder, envptr): | |
""" | |
From the given *envptr* (a pointer to a _dynfunc.Environment object), | |
get a EnvBody allowing structured access to environment fields. | |
""" | |
body_ptr = cgutils.pointer_add( | |
builder, envptr, _dynfunc._impl_info["offsetof_env_body"] | |
) | |
return cpu.EnvBody(self, builder, ref=body_ptr, cast_ref=True) | |
def get_env_manager(self, builder): | |
envgv = self.declare_env_global(builder.module, self.get_env_name(self.fndesc)) | |
envarg = builder.load(envgv) | |
pyapi = self.get_python_api(builder) | |
pyapi.emit_environment_sentry( | |
envarg, debug_msg=self.fndesc.env_name, | |
) | |
env_body = self.get_env_body(builder, envarg) | |
return pyapi.get_env_manager(self.environment, env_body, envarg) | |
def get_generator_state(self, builder, genptr, return_type): | |
""" | |
From the given *genptr* (a pointer to a _dynfunc.Generator object), | |
get a pointer to its state area. | |
""" | |
return cgutils.pointer_add( | |
builder, | |
genptr, | |
_dynfunc._impl_info["offsetof_generator_state"], | |
return_type=return_type, | |
) | |
def post_lowering(self, mod, library): | |
if self.fastmath: | |
fastmathpass.rewrite_module(mod, self.fastmath) | |
library.add_linking_library(rtsys.library) | |
def create_cpython_wrapper( | |
self, library, fndesc, env, call_helper, release_gil=False | |
): | |
wrapper_module = self.create_module("wrapper") | |
fnty = self.call_conv.get_function_type(fndesc.restype, fndesc.argtypes) | |
wrapper_callee = wrapper_module.add_function(fnty, fndesc.llvm_func_name) | |
builder = PyCallWrapper( | |
self, | |
wrapper_module, | |
wrapper_callee, | |
fndesc, | |
env, | |
call_helper=call_helper, | |
release_gil=release_gil, | |
) | |
builder.build() | |
library.add_ir_module(wrapper_module) | |
def create_cfunc_wrapper(self, library, fndesc, env, call_helper): | |
# There's no cfunc wrapper on the dpu | |
pass | |
def get_executable(self, library, fndesc, env): | |
""" | |
Returns | |
------- | |
(cfunc, fnptr) | |
- cfunc | |
callable function (Can be None) | |
- fnptr | |
callable function address | |
- env | |
an execution environment (from _dynfunc) | |
""" | |
# Code generation | |
baseptr = library.get_pointer_to_function(fndesc.llvm_func_name) | |
fnptr = library.get_pointer_to_function(fndesc.llvm_cpython_wrapper_name) | |
# Note: we avoid reusing the original docstring to avoid encoding | |
# issues on Python 2, see issue #1908 | |
doc = "compiled wrapper for %r" % (fndesc.qualname,) | |
cfunc = _dynfunc.make_function( | |
fndesc.lookup_module(), | |
fndesc.qualname.split(".")[-1], | |
doc, | |
fnptr, | |
env, | |
# objects to keepalive with the function | |
(library,), | |
) | |
library.codegen.set_env(self.get_env_name(fndesc), env) | |
return cfunc | |
# Nested contexts to help with isolatings bits of compilations | |
class _NestedContext(object): | |
_typing_context = None | |
_target_context = None | |
@contextlib.contextmanager | |
def nested(self, typing_context, target_context): | |
old_nested = self._typing_context, self._target_context | |
try: | |
self._typing_context = typing_context | |
self._target_context = target_context | |
yield | |
finally: | |
self._typing_context, self._target_context = old_nested | |
# Implement a DPU TargetDescriptor, this one borrows bits from the CPU | |
class DPUTarget(TargetDescriptor): | |
options = cpu.CPUTargetOptions | |
_nested = _NestedContext() | |
@utils.cached_property | |
def _toplevel_target_context(self): | |
# Lazily-initialized top-level target context, for all threads | |
return DPUContext(self.typing_context, self._target_name) | |
@utils.cached_property | |
def _toplevel_typing_context(self): | |
# Lazily-initialized top-level typing context, for all threads | |
return typing.Context() | |
@property | |
def target_context(self): | |
""" | |
The target context for DPU targets. | |
""" | |
nested = self._nested._target_context | |
if nested is not None: | |
return nested | |
else: | |
return self._toplevel_target_context | |
@property | |
def typing_context(self): | |
""" | |
The typing context for CPU targets. | |
""" | |
nested = self._nested._typing_context | |
if nested is not None: | |
return nested | |
else: | |
return self._toplevel_typing_context | |
def nested_context(self, typing_context, target_context): | |
""" | |
A context manager temporarily replacing the contexts with the | |
given ones, for the current thread of execution. | |
""" | |
return self._nested.nested(typing_context, target_context) | |
# Create a DPU target instance | |
dpu_target = DPUTarget("dpu") | |
# Declare a dispatcher for the DPU target | |
class DPUDispatcher(Dispatcher): | |
targetdescr = dpu_target | |
# Register a dispatcher for the DPU target, a lot of the code uses this | |
# internally to work out what to do RE compilation | |
registry.dispatcher_registry["dpu"] = DPUDispatcher | |
# Implement a dispatcher for the DPU target | |
class djit(JitDecorator): | |
def __init__(self, *args, **kwargs): | |
self._args = args | |
self._kwargs = kwargs | |
def __call__(self, *args): | |
assert len(args) < 2 | |
if args: | |
func = args[0] | |
else: | |
func = self._args[0] | |
self.py_func = func | |
# wrap in dispatcher | |
return self.dispatcher_wrapper() | |
def get_dispatcher(self): | |
""" | |
Returns the dispatcher | |
""" | |
return registry.dispatcher_registry["dpu"] | |
def dispatcher_wrapper(self): | |
disp = self.get_dispatcher() | |
# Parse self._kwargs here | |
topt = {} | |
if "nopython" in self._kwargs: | |
topt["nopython"] = True | |
# It would be easy to specialise the default compilation pipeline for | |
# this target here. | |
pipeline_class = compiler.Compiler | |
if "pipeline_class" in self._kwargs: | |
pipeline_class = self._kwargs["pipeline_class"] | |
return disp( | |
py_func=self.py_func, targetoptions=topt, pipeline_class=compiler.Compiler | |
) | |
# add it to the decorator registry, this is so e.g. @overload can look up a | |
# JIT function to do the compilation work. | |
decorators.jit_registry["dpu"] = djit | |
# -------------- Case 1, want to compile for a new target, the DPU --------- | |
print(' Case 1 - Use DPU target '.center(80, '=')) | |
# In this section you can try commenting one or more of the overloads for | |
# 'my_func' to explore the effect of having a hardware hierarchy. The hierarchy | |
# for the DPU target is: DPU -> GPU -> Generic; where -> is 'extends from'. | |
# As a result, a DPU compiled function will try and use a DPU overload if | |
# available, if it's not available but there's a GPU version, it will use that | |
# and finally, if there's no GPU version it will use a generic one if it is | |
# available. In this case, if the CPU compiled version is used, it will use the | |
# generic version as it's the only version available for the CPU hardware. | |
# Uncomment one or the other to choose to run on CPU or DPU | |
target = 'cpu' | |
#target = "dpu" | |
def my_func(x): | |
pass | |
# The DPU target "knows" nothing, add in some primitives for basic things... | |
# need to register how to lower dummy for @intrinsic | |
@dpu_function_registry.lower_constant(types.Dummy) | |
def constant_dummy(context, builder, ty, pyval): | |
return context.get_dummy_value() | |
# and how to deal with IntegerLiteral to Integer casts | |
@dpu_function_registry.lower_cast(types.IntegerLiteral, types.Integer) | |
def literal_int_to_number(context, builder, fromty, toty, val): | |
lit = context.get_constant_generic( | |
builder, fromty.literal_type, fromty.literal_value, | |
) | |
return context.cast(builder, lit, fromty.literal_type, toty) | |
# and how to lower an Int constant | |
@dpu_function_registry.lower_constant(types.Integer) | |
def const_float(context, builder, ty, pyval): | |
lty = context.get_value_type(ty) | |
return lty(pyval) | |
# In this example, the DPU actually subtracts when it's asked to 'add'! | |
@intrinsic(hardware="dpu") | |
def intrin_add(tyctx, x, y): | |
sig = x(x, y) | |
def codegen(cgctx, builder, tyargs, llargs): | |
return builder.sub(*llargs) | |
return sig, codegen | |
# Spell out how to overload 'add', call the dpu specific intrinsic | |
@overload(operator.add, hardware="dpu") | |
def ol_add(x, y): | |
if isinstance(x, types.Integer) and isinstance(y, types.Integer): | |
def impl(x, y): | |
return intrin_add(x, y) | |
return impl | |
# Can be used by both CPU and DPU | |
@overload(my_func, hardware="generic") | |
def ol_my_func1(x): | |
def impl(x): | |
return 1 + x | |
return impl | |
# Should be used by the DPU if there's no dpu specific one | |
@overload(my_func, hardware="gpu") | |
def ol_my_func2(x): | |
def impl(x): | |
return 10 + x | |
return impl | |
## Should be used by the DPU only | |
@overload(my_func, hardware="dpu") | |
def ol_my_func3(x): | |
def impl(x): | |
return 100 + x | |
return impl | |
jitter = djit if target == "dpu" else jit | |
# This is the demonstration function, it calls a version of the overloaded | |
# 'my_func' function. | |
@jitter(nopython=True) | |
def foo(x): | |
return my_func(7) | |
print("foo(5) with %s" % target, foo(5)) | |
# -------------- new usecase, have CPU func, want to swap to DPU --------- | |
print(' Case 2 - CPU to DPU offload '.center(80, '=')) | |
# In this use case the CPU compilation pipeline is extended with a new | |
# compilation pass that runs just prior to lowering. The pass looks for function | |
# calls and when it finds one it sees if there's a DPU function available that | |
# is a valid overload for the function call. If there is one then it swaps the | |
# CPU implementation out for a DPU implementation. This producing an "offload" | |
# effect. | |
@register_pass(mutates_CFG=False, analysis_only=False) | |
class DispatcherSwitcher(FunctionPass): | |
_name = "DispatcherSwitcher" | |
def __init__(self): | |
FunctionPass.__init__(self) | |
def run_pass(self, state): | |
func_ir = state.func_ir | |
mutated = False | |
for blk in func_ir.blocks.values(): | |
# find the assignment nodes in the block and walk them, if there's a | |
# DPU version then swap out for a call to that | |
for call in blk.find_exprs("call"): | |
function = state.typemap[call.func.name] | |
tname = "dpu" | |
# Note: `hardware_target` context driven compilation can be done | |
# here the DPU target is in use. | |
with hardware_target(tname): | |
try: | |
sig = function.get_call_type( | |
state.typingctx, state.calltypes[call].args, {} | |
) | |
except errors.UnsupportedError: | |
continue | |
disp = registry.dispatcher_registry[tname] | |
# force compile | |
hw_ctx = disp.targetdescr.target_context | |
# This is a necessary hack at present so as to generate | |
# code into the same library. I.e. the DPU target is going | |
# to do code gen into the CPUs lib. | |
hw_ctx._codelib_stack = state.targetctx._codelib_stack | |
# All is good, so switch IR node for one targeting this | |
# hardware. Should generate this, but for now just mutate: | |
# ir.Expr.call(call.func, call.args, call.kws, call.loc, hardware='dpu') | |
call.hardware = tname | |
mutated = True | |
return mutated # return True if the IR was mutated, False if not. | |
# DPU compiler pipeline, compiles with offload to the DPU target | |
class DPUCompiler(CompilerBase): | |
def define_pipelines(self): | |
pm = DefaultPassBuilder.define_nopython_pipeline(self.state) | |
pm.add_pass_after(DispatcherSwitcher, PreLowerStripPhis) | |
pm.finalize() | |
return [pm] | |
# This is the DPU function | |
@overload(np.sin, hardware="dpu") | |
def ol_np_sin_DPU(x): | |
def dpu_sin_impl(x): | |
return 314159.0 | |
return dpu_sin_impl | |
# Need to tell the DPU how to lower a float and a dummy var | |
@dpu_function_registry.lower_constant(types.Float) | |
def const_float(context, builder, ty, pyval): | |
lty = context.get_value_type(ty) | |
return lty(pyval) | |
# Demo it compiles fine on the DPU | |
@djit(nopython=True) | |
def foo(x): | |
return np.sin(x) | |
# Pure DPU call. | |
print("DPU:", foo(5)) | |
# Demo it compiles fine on the CPU | |
@jit(nopython=True) | |
def foo(x): | |
return np.sin(x) | |
# Pure CPU call | |
print("CPU:", foo(5)) | |
# Now compile for CPU, but with the DispatcherSwitcher pass in place that | |
# switches CPU calls for DPU calls | |
@jit(nopython=True, pipeline_class=DPUCompiler) | |
def foo(x): | |
return np.sin(x), np.cos(x) # np.sin is DPU, np.cos is CPU | |
print("Mixed: with CPU offload to DPU", foo(5)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment