Skip to content

Instantly share code, notes, and snippets.

@stuartarchibald
Created November 23, 2020 23:01
Show Gist options
  • Save stuartarchibald/7051191e0f0cb09ba9c2d42c7c21ced6 to your computer and use it in GitHub Desktop.
Save stuartarchibald/7051191e0f0cb09ba9c2d42c7c21ced6 to your computer and use it in GitHub Desktop.
@overload with first filter being on hardware.
import contextlib
import ctypes
import operator
import threading
import numpy as np
from numba import njit, jit, types
from numba.extending import overload, intrinsic
from numba.core.extending_hardware import (
JitDecorator,
hardware_registry,
GPU,
current_target,
hardware_target,
CPU,
)
from numba.core import registry, utils, errors
from numba.core.dispatcher import Dispatcher
from numba.core.descriptors import TargetDescriptor
from numba.core import cpu, typing, cgutils
from numba.core.base import BaseContext
from numba.core.compiler_lock import global_compiler_lock
from numba.core.utils import cached_property
from numba.core import callconv, decorators
from numba.core.codegen import BaseCPUCodegen, JITCodeLibrary
from numba.core.callwrapper import PyCallWrapper
from numba.core.imputils import RegistryLoader, Registry
from numba import _dynfunc
import llvmlite.binding as ll
from llvmlite import ir
from numba.core.runtime import rtsys
from numba import njit
from numba.core import ir
from numba.core import compiler
from numba.core.compiler import CompilerBase, DefaultPassBuilder
from numba.core.compiler_machinery import FunctionPass, register_pass
from numba.core.typed_passes import PreLowerStripPhis
from numbers import Number
# This script demonstrates some ideas about how to add support to Numba to make
# it easier to specialise functionality on specific hardware. In this script a
# fake target, the Dummy Processing Unit (DPU), is employed for demonstrative
# purposes. The DPU borrows a lot from the CPU for ease, but is sufficiently
# removed to make it possible to show what is required to add a new target and
# use it. This code is experimental and relies on a horror show of hacks
# internally, don't expect things to work :)
#
# The way this all works relies on a couple of concepts:
#
# 1. That, where appropriate, familiar decorators such as `@overload` take a new
# kwarg "hardware". This a string token associated with a piece of hardware
# in the hardware hierarchy and is present to denote that "this function is
# for the noted hardware".
# 2. A hardware hierarchy, this is a conceptual necessity and is used to
# describe how various pieces of physical hardware are related to each other.
# The root of the hierarchy is "generic" hardware, from this extends CPU and
# GPU, and from GPU extends CUDA and ROCm. Examples of functions that could
# belong to each category:
# - generic: the cpython `hash` function, this just calls `obj.__hash__`,
# it's entirely unspecialised to hardware.
# - cpu: numba's gdb support, requires fork(2), only available to call from
# a CPU and linux.
# - gpu: a theoretical but not yet written "sync" function that encapsulates
# synchronisation across threads in a work unit.
# - cuda: something CUDA specific, like cuda.atomic.add
# - rocm: something ROCm specific, like roc.shared.array
#
# The following code demonstrates what is currently necessary to create the most
# basic support for a new hardware target, in this case the DPU. For
# demonstration purposes the DPU is arbitrarily declared a member of the
# GPU family of hardware.
# Define a new target, this hardware extends GPU, this places the DPU in the
# hardware hierarchy.
class DPU(GPU):
...
# register the dpu hardware hierarchy token in the hardware registry
hardware_registry["dpu"] = DPU
# Create a JIT DPU codegen for the DPU target
class JITDPUCodegen(BaseCPUCodegen):
# This largely rips off the CPU for ease
_library_class = JITCodeLibrary
def _customize_tm_options(self, options):
# Customize the target machine options.
# As long as we don't want to ship the code to another machine,
# we can specialize for this CPU.
options["cpu"] = self._get_host_cpu_name()
# LLVM 7 change: # https://reviews.llvm.org/D47211#inline-425406
# JIT needs static relocation on x86*
# native target is already initialized from base class __init__
arch = ll.Target.from_default_triple().name
if arch.startswith("x86"): # one of x86 or x86_64
reloc_model = "static"
elif arch.startswith("ppc"):
reloc_model = "pic"
else:
reloc_model = "default"
options["reloc"] = reloc_model
options["codemodel"] = "jitdefault"
# Set feature attributes (such as ISA extensions)
# This overrides default feature selection by CPU model above
options["features"] = self._tm_features
# Deal with optional argument to ll.Target.create_target_machine
sig = utils.pysignature(ll.Target.create_target_machine)
if "jit" in sig.parameters:
# Mark that this is making a JIT engine
options["jit"] = True
def _customize_tm_features(self):
# For JIT target, we will use LLVM to get the feature map
return self._get_host_cpu_features()
def _add_module(self, module):
self._engine.add_module(module)
def set_env(self, env_name, env):
"""Set the environment address.
Update the GlobalVariable named *env_name* to the address of *env*.
"""
gvaddr = self._engine.get_global_value_address(env_name)
envptr = (ctypes.c_void_p * 1).from_address(gvaddr)
envptr[0] = ctypes.c_void_p(id(env))
# This is the function registry for the dpu, it just has one, this one!
dpu_function_registry = Registry()
# Implement a new context for the DPU target
class DPUContext(BaseContext):
allow_dynamic_globals = True
# Overrides
def create_module(self, name):
return self._internal_codegen._create_empty_module(name)
@global_compiler_lock
def init(self):
self._internal_codegen = JITDPUCodegen("numba.exec")
# Initialize NRT runtime
rtsys.initialize(self)
self.refresh()
def refresh(self):
registry = dpu_function_registry
try:
loader = self._registries[registry]
except KeyError:
loader = RegistryLoader(registry)
self._registries[registry] = loader
self.install_registry(registry)
# Also refresh typing context, since @overload declarations can
# affect it.
self.typing_context.refresh()
@property
def target_data(self):
return self._internal_codegen.target_data
def codegen(self):
return self._internal_codegen
# Borrow the CPU call conv
@cached_property
def call_conv(self):
return callconv.CPUCallConv(self)
def get_env_body(self, builder, envptr):
"""
From the given *envptr* (a pointer to a _dynfunc.Environment object),
get a EnvBody allowing structured access to environment fields.
"""
body_ptr = cgutils.pointer_add(
builder, envptr, _dynfunc._impl_info["offsetof_env_body"]
)
return cpu.EnvBody(self, builder, ref=body_ptr, cast_ref=True)
def get_env_manager(self, builder):
envgv = self.declare_env_global(builder.module, self.get_env_name(self.fndesc))
envarg = builder.load(envgv)
pyapi = self.get_python_api(builder)
pyapi.emit_environment_sentry(
envarg, debug_msg=self.fndesc.env_name,
)
env_body = self.get_env_body(builder, envarg)
return pyapi.get_env_manager(self.environment, env_body, envarg)
def get_generator_state(self, builder, genptr, return_type):
"""
From the given *genptr* (a pointer to a _dynfunc.Generator object),
get a pointer to its state area.
"""
return cgutils.pointer_add(
builder,
genptr,
_dynfunc._impl_info["offsetof_generator_state"],
return_type=return_type,
)
def post_lowering(self, mod, library):
if self.fastmath:
fastmathpass.rewrite_module(mod, self.fastmath)
library.add_linking_library(rtsys.library)
def create_cpython_wrapper(
self, library, fndesc, env, call_helper, release_gil=False
):
wrapper_module = self.create_module("wrapper")
fnty = self.call_conv.get_function_type(fndesc.restype, fndesc.argtypes)
wrapper_callee = wrapper_module.add_function(fnty, fndesc.llvm_func_name)
builder = PyCallWrapper(
self,
wrapper_module,
wrapper_callee,
fndesc,
env,
call_helper=call_helper,
release_gil=release_gil,
)
builder.build()
library.add_ir_module(wrapper_module)
def create_cfunc_wrapper(self, library, fndesc, env, call_helper):
# There's no cfunc wrapper on the dpu
pass
def get_executable(self, library, fndesc, env):
"""
Returns
-------
(cfunc, fnptr)
- cfunc
callable function (Can be None)
- fnptr
callable function address
- env
an execution environment (from _dynfunc)
"""
# Code generation
baseptr = library.get_pointer_to_function(fndesc.llvm_func_name)
fnptr = library.get_pointer_to_function(fndesc.llvm_cpython_wrapper_name)
# Note: we avoid reusing the original docstring to avoid encoding
# issues on Python 2, see issue #1908
doc = "compiled wrapper for %r" % (fndesc.qualname,)
cfunc = _dynfunc.make_function(
fndesc.lookup_module(),
fndesc.qualname.split(".")[-1],
doc,
fnptr,
env,
# objects to keepalive with the function
(library,),
)
library.codegen.set_env(self.get_env_name(fndesc), env)
return cfunc
# Nested contexts to help with isolatings bits of compilations
class _NestedContext(object):
_typing_context = None
_target_context = None
@contextlib.contextmanager
def nested(self, typing_context, target_context):
old_nested = self._typing_context, self._target_context
try:
self._typing_context = typing_context
self._target_context = target_context
yield
finally:
self._typing_context, self._target_context = old_nested
# Implement a DPU TargetDescriptor, this one borrows bits from the CPU
class DPUTarget(TargetDescriptor):
options = cpu.CPUTargetOptions
_nested = _NestedContext()
@utils.cached_property
def _toplevel_target_context(self):
# Lazily-initialized top-level target context, for all threads
return DPUContext(self.typing_context, self._target_name)
@utils.cached_property
def _toplevel_typing_context(self):
# Lazily-initialized top-level typing context, for all threads
return typing.Context()
@property
def target_context(self):
"""
The target context for DPU targets.
"""
nested = self._nested._target_context
if nested is not None:
return nested
else:
return self._toplevel_target_context
@property
def typing_context(self):
"""
The typing context for CPU targets.
"""
nested = self._nested._typing_context
if nested is not None:
return nested
else:
return self._toplevel_typing_context
def nested_context(self, typing_context, target_context):
"""
A context manager temporarily replacing the contexts with the
given ones, for the current thread of execution.
"""
return self._nested.nested(typing_context, target_context)
# Create a DPU target instance
dpu_target = DPUTarget("dpu")
# Declare a dispatcher for the DPU target
class DPUDispatcher(Dispatcher):
targetdescr = dpu_target
# Register a dispatcher for the DPU target, a lot of the code uses this
# internally to work out what to do RE compilation
registry.dispatcher_registry["dpu"] = DPUDispatcher
# Implement a dispatcher for the DPU target
class djit(JitDecorator):
def __init__(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs
def __call__(self, *args):
assert len(args) < 2
if args:
func = args[0]
else:
func = self._args[0]
self.py_func = func
# wrap in dispatcher
return self.dispatcher_wrapper()
def get_dispatcher(self):
"""
Returns the dispatcher
"""
return registry.dispatcher_registry["dpu"]
def dispatcher_wrapper(self):
disp = self.get_dispatcher()
# Parse self._kwargs here
topt = {}
if "nopython" in self._kwargs:
topt["nopython"] = True
# It would be easy to specialise the default compilation pipeline for
# this target here.
pipeline_class = compiler.Compiler
if "pipeline_class" in self._kwargs:
pipeline_class = self._kwargs["pipeline_class"]
return disp(
py_func=self.py_func, targetoptions=topt, pipeline_class=compiler.Compiler
)
# add it to the decorator registry, this is so e.g. @overload can look up a
# JIT function to do the compilation work.
decorators.jit_registry["dpu"] = djit
# -------------- Case 1, want to compile for a new target, the DPU ---------
print(' Case 1 - Use DPU target '.center(80, '='))
# In this section you can try commenting one or more of the overloads for
# 'my_func' to explore the effect of having a hardware hierarchy. The hierarchy
# for the DPU target is: DPU -> GPU -> Generic; where -> is 'extends from'.
# As a result, a DPU compiled function will try and use a DPU overload if
# available, if it's not available but there's a GPU version, it will use that
# and finally, if there's no GPU version it will use a generic one if it is
# available. In this case, if the CPU compiled version is used, it will use the
# generic version as it's the only version available for the CPU hardware.
# Uncomment one or the other to choose to run on CPU or DPU
target = 'cpu'
#target = "dpu"
def my_func(x):
pass
# The DPU target "knows" nothing, add in some primitives for basic things...
# need to register how to lower dummy for @intrinsic
@dpu_function_registry.lower_constant(types.Dummy)
def constant_dummy(context, builder, ty, pyval):
return context.get_dummy_value()
# and how to deal with IntegerLiteral to Integer casts
@dpu_function_registry.lower_cast(types.IntegerLiteral, types.Integer)
def literal_int_to_number(context, builder, fromty, toty, val):
lit = context.get_constant_generic(
builder, fromty.literal_type, fromty.literal_value,
)
return context.cast(builder, lit, fromty.literal_type, toty)
# and how to lower an Int constant
@dpu_function_registry.lower_constant(types.Integer)
def const_float(context, builder, ty, pyval):
lty = context.get_value_type(ty)
return lty(pyval)
# In this example, the DPU actually subtracts when it's asked to 'add'!
@intrinsic(hardware="dpu")
def intrin_add(tyctx, x, y):
sig = x(x, y)
def codegen(cgctx, builder, tyargs, llargs):
return builder.sub(*llargs)
return sig, codegen
# Spell out how to overload 'add', call the dpu specific intrinsic
@overload(operator.add, hardware="dpu")
def ol_add(x, y):
if isinstance(x, types.Integer) and isinstance(y, types.Integer):
def impl(x, y):
return intrin_add(x, y)
return impl
# Can be used by both CPU and DPU
@overload(my_func, hardware="generic")
def ol_my_func1(x):
def impl(x):
return 1 + x
return impl
# Should be used by the DPU if there's no dpu specific one
@overload(my_func, hardware="gpu")
def ol_my_func2(x):
def impl(x):
return 10 + x
return impl
## Should be used by the DPU only
@overload(my_func, hardware="dpu")
def ol_my_func3(x):
def impl(x):
return 100 + x
return impl
jitter = djit if target == "dpu" else jit
# This is the demonstration function, it calls a version of the overloaded
# 'my_func' function.
@jitter(nopython=True)
def foo(x):
return my_func(7)
print("foo(5) with %s" % target, foo(5))
# -------------- new usecase, have CPU func, want to swap to DPU ---------
print(' Case 2 - CPU to DPU offload '.center(80, '='))
# In this use case the CPU compilation pipeline is extended with a new
# compilation pass that runs just prior to lowering. The pass looks for function
# calls and when it finds one it sees if there's a DPU function available that
# is a valid overload for the function call. If there is one then it swaps the
# CPU implementation out for a DPU implementation. This producing an "offload"
# effect.
@register_pass(mutates_CFG=False, analysis_only=False)
class DispatcherSwitcher(FunctionPass):
_name = "DispatcherSwitcher"
def __init__(self):
FunctionPass.__init__(self)
def run_pass(self, state):
func_ir = state.func_ir
mutated = False
for blk in func_ir.blocks.values():
# find the assignment nodes in the block and walk them, if there's a
# DPU version then swap out for a call to that
for call in blk.find_exprs("call"):
function = state.typemap[call.func.name]
tname = "dpu"
# Note: `hardware_target` context driven compilation can be done
# here the DPU target is in use.
with hardware_target(tname):
try:
sig = function.get_call_type(
state.typingctx, state.calltypes[call].args, {}
)
except errors.UnsupportedError:
continue
disp = registry.dispatcher_registry[tname]
# force compile
hw_ctx = disp.targetdescr.target_context
# This is a necessary hack at present so as to generate
# code into the same library. I.e. the DPU target is going
# to do code gen into the CPUs lib.
hw_ctx._codelib_stack = state.targetctx._codelib_stack
# All is good, so switch IR node for one targeting this
# hardware. Should generate this, but for now just mutate:
# ir.Expr.call(call.func, call.args, call.kws, call.loc, hardware='dpu')
call.hardware = tname
mutated = True
return mutated # return True if the IR was mutated, False if not.
# DPU compiler pipeline, compiles with offload to the DPU target
class DPUCompiler(CompilerBase):
def define_pipelines(self):
pm = DefaultPassBuilder.define_nopython_pipeline(self.state)
pm.add_pass_after(DispatcherSwitcher, PreLowerStripPhis)
pm.finalize()
return [pm]
# This is the DPU function
@overload(np.sin, hardware="dpu")
def ol_np_sin_DPU(x):
def dpu_sin_impl(x):
return 314159.0
return dpu_sin_impl
# Need to tell the DPU how to lower a float and a dummy var
@dpu_function_registry.lower_constant(types.Float)
def const_float(context, builder, ty, pyval):
lty = context.get_value_type(ty)
return lty(pyval)
# Demo it compiles fine on the DPU
@djit(nopython=True)
def foo(x):
return np.sin(x)
# Pure DPU call.
print("DPU:", foo(5))
# Demo it compiles fine on the CPU
@jit(nopython=True)
def foo(x):
return np.sin(x)
# Pure CPU call
print("CPU:", foo(5))
# Now compile for CPU, but with the DispatcherSwitcher pass in place that
# switches CPU calls for DPU calls
@jit(nopython=True, pipeline_class=DPUCompiler)
def foo(x):
return np.sin(x), np.cos(x) # np.sin is DPU, np.cos is CPU
print("Mixed: with CPU offload to DPU", foo(5))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment