Created
May 15, 2023 09:54
-
-
Save bikcrum/44367c28771f78d8f6b73e06f1b8ca34 to your computer and use it in GitHub Desktop.
Transformer that returns attentions maps
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
from typing import Optional, Any, Union, Callable | |
import torch | |
from torch import Tensor | |
from .. import functional as F | |
from .module import Module | |
from .activation import MultiheadAttention | |
from .container import ModuleList | |
from ..init import xavier_uniform_ | |
from .dropout import Dropout | |
from .linear import Linear | |
from .normalization import LayerNorm | |
__all__ = ['Transformer', 'TransformerEncoder', 'TransformerDecoder', 'TransformerEncoderLayer', | |
'TransformerDecoderLayer'] | |
class Transformer(Module): | |
r"""A transformer model. User is able to modify the attributes as needed. The architecture | |
is based on the paper "Attention Is All You Need". Ashish Vaswani, Noam Shazeer, | |
Niki Parmar, Jakob Uszkoreit, Llion Jones, Aidan N Gomez, Lukasz Kaiser, and | |
Illia Polosukhin. 2017. Attention is all you need. In Advances in Neural Information | |
Processing Systems, pages 6000-6010. | |
Args: | |
d_model: the number of expected features in the encoder/decoder inputs (default=512). | |
nhead: the number of heads in the multiheadattention models (default=8). | |
num_encoder_layers: the number of sub-encoder-layers in the encoder (default=6). | |
num_decoder_layers: the number of sub-decoder-layers in the decoder (default=6). | |
dim_feedforward: the dimension of the feedforward network model (default=2048). | |
dropout: the dropout value (default=0.1). | |
activation: the activation function of encoder/decoder intermediate layer, can be a string | |
("relu" or "gelu") or a unary callable. Default: relu | |
custom_encoder: custom encoder (default=None). | |
custom_decoder: custom decoder (default=None). | |
layer_norm_eps: the eps value in layer normalization components (default=1e-5). | |
batch_first: If ``True``, then the input and output tensors are provided | |
as (batch, seq, feature). Default: ``False`` (seq, batch, feature). | |
norm_first: if ``True``, encoder and decoder layers will perform LayerNorms before | |
other attention and feedforward operations, otherwise after. Default: ``False`` (after). | |
Examples:: | |
>>> transformer_model = nn.Transformer(nhead=16, num_encoder_layers=12) | |
>>> src = torch.rand((10, 32, 512)) | |
>>> tgt = torch.rand((20, 32, 512)) | |
>>> out = transformer_model(src, tgt) | |
Note: A full example to apply nn.Transformer module for the word language model is available in | |
https://github.com/pytorch/examples/tree/master/word_language_model | |
""" | |
def __init__(self, d_model: int = 512, nhead: int = 8, num_encoder_layers: int = 6, | |
num_decoder_layers: int = 6, dim_feedforward: int = 2048, dropout: float = 0.1, | |
activation: Union[str, Callable[[Tensor], Tensor]] = F.relu, | |
custom_encoder: Optional[Any] = None, custom_decoder: Optional[Any] = None, | |
layer_norm_eps: float = 1e-5, batch_first: bool = False, norm_first: bool = False, | |
device=None, dtype=None) -> None: | |
factory_kwargs = {'device': device, 'dtype': dtype} | |
super().__init__() | |
torch._C._log_api_usage_once(f"torch.nn.modules.{self.__class__.__name__}") | |
if custom_encoder is not None: | |
self.encoder = custom_encoder | |
else: | |
encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout, | |
activation, layer_norm_eps, batch_first, norm_first, | |
**factory_kwargs) | |
encoder_norm = LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs) | |
self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm) | |
if custom_decoder is not None: | |
self.decoder = custom_decoder | |
else: | |
decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout, | |
activation, layer_norm_eps, batch_first, norm_first, | |
**factory_kwargs) | |
decoder_norm = LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs) | |
self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm) | |
self._reset_parameters() | |
self.d_model = d_model | |
self.nhead = nhead | |
self.batch_first = batch_first | |
def forward(self, src: Tensor, tgt: Tensor, src_mask: Optional[Tensor] = None, tgt_mask: Optional[Tensor] = None, | |
memory_mask: Optional[Tensor] = None, src_key_padding_mask: Optional[Tensor] = None, | |
tgt_key_padding_mask: Optional[Tensor] = None, | |
memory_key_padding_mask: Optional[Tensor] = None) -> Tensor: | |
r"""Take in and process masked source/target sequences. | |
Args: | |
src: the sequence to the encoder (required). | |
tgt: the sequence to the decoder (required). | |
src_mask: the additive mask for the src sequence (optional). | |
tgt_mask: the additive mask for the tgt sequence (optional). | |
memory_mask: the additive mask for the encoder output (optional). | |
src_key_padding_mask: the Tensor mask for src keys per batch (optional). | |
tgt_key_padding_mask: the Tensor mask for tgt keys per batch (optional). | |
memory_key_padding_mask: the Tensor mask for memory keys per batch (optional). | |
Shape: | |
- src: :math:`(S, E)` for unbatched input, :math:`(S, N, E)` if `batch_first=False` or | |
`(N, S, E)` if `batch_first=True`. | |
- tgt: :math:`(T, E)` for unbatched input, :math:`(T, N, E)` if `batch_first=False` or | |
`(N, T, E)` if `batch_first=True`. | |
- src_mask: :math:`(S, S)` or :math:`(N\cdot\text{num\_heads}, S, S)`. | |
- tgt_mask: :math:`(T, T)` or :math:`(N\cdot\text{num\_heads}, T, T)`. | |
- memory_mask: :math:`(T, S)`. | |
- src_key_padding_mask: :math:`(S)` for unbatched input otherwise :math:`(N, S)`. | |
- tgt_key_padding_mask: :math:`(T)` for unbatched input otherwise :math:`(N, T)`. | |
- memory_key_padding_mask: :math:`(S)` for unbatched input otherwise :math:`(N, S)`. | |
Note: [src/tgt/memory]_mask ensures that position i is allowed to attend the unmasked | |
positions. If a BoolTensor is provided, positions with ``True`` | |
are not allowed to attend while ``False`` values will be unchanged. If a FloatTensor | |
is provided, it will be added to the attention weight. | |
[src/tgt/memory]_key_padding_mask provides specified elements in the key to be ignored by | |
the attention. If a BoolTensor is provided, the positions with the | |
value of ``True`` will be ignored while the position with the value of ``False`` will be unchanged. | |
- output: :math:`(T, E)` for unbatched input, :math:`(T, N, E)` if `batch_first=False` or | |
`(N, T, E)` if `batch_first=True`. | |
Note: Due to the multi-head attention architecture in the transformer model, | |
the output sequence length of a transformer is same as the input sequence | |
(i.e. target) length of the decoder. | |
where S is the source sequence length, T is the target sequence length, N is the | |
batch size, E is the feature number | |
Examples: | |
>>> # xdoctest: +SKIP | |
>>> output = transformer_model(src, tgt, src_mask=src_mask, tgt_mask=tgt_mask) | |
""" | |
is_batched = src.dim() == 3 | |
if not self.batch_first and src.size(1) != tgt.size(1) and is_batched: | |
raise RuntimeError("the batch number of src and tgt must be equal") | |
elif self.batch_first and src.size(0) != tgt.size(0) and is_batched: | |
raise RuntimeError("the batch number of src and tgt must be equal") | |
if src.size(-1) != self.d_model or tgt.size(-1) != self.d_model: | |
raise RuntimeError("the feature number of src and tgt must be equal to d_model") | |
memory = self.encoder(src, mask=src_mask, src_key_padding_mask=src_key_padding_mask) | |
output = self.decoder(tgt, memory, tgt_mask=tgt_mask, memory_mask=memory_mask, | |
tgt_key_padding_mask=tgt_key_padding_mask, | |
memory_key_padding_mask=memory_key_padding_mask) | |
return output | |
@staticmethod | |
def generate_square_subsequent_mask(sz: int, device='cpu') -> Tensor: | |
r"""Generate a square mask for the sequence. The masked positions are filled with float('-inf'). | |
Unmasked positions are filled with float(0.0). | |
""" | |
return torch.triu(torch.full((sz, sz), float('-inf'), device=device), diagonal=1) | |
def _reset_parameters(self): | |
r"""Initiate parameters in the transformer model.""" | |
for p in self.parameters(): | |
if p.dim() > 1: | |
xavier_uniform_(p) | |
class TransformerEncoder(Module): | |
r"""TransformerEncoder is a stack of N encoder layers. Users can build the | |
BERT(https://arxiv.org/abs/1810.04805) model with corresponding parameters. | |
Args: | |
encoder_layer: an instance of the TransformerEncoderLayer() class (required). | |
num_layers: the number of sub-encoder-layers in the encoder (required). | |
norm: the layer normalization component (optional). | |
enable_nested_tensor: if True, input will automatically convert to nested tensor | |
(and convert back on output). This will improve the overall performance of | |
TransformerEncoder when padding rate is high. Default: ``True`` (enabled). | |
Examples:: | |
>>> encoder_layer = nn.TransformerEncoderLayer(d_model=512, nhead=8) | |
>>> transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=6) | |
>>> src = torch.rand(10, 32, 512) | |
>>> out = transformer_encoder(src) | |
""" | |
__constants__ = ['norm'] | |
def __init__(self, encoder_layer, num_layers, norm=None, enable_nested_tensor=True, mask_check=True): | |
super().__init__() | |
torch._C._log_api_usage_once(f"torch.nn.modules.{self.__class__.__name__}") | |
self.layers = _get_clones(encoder_layer, num_layers) | |
self.num_layers = num_layers | |
self.norm = norm | |
self.enable_nested_tensor = enable_nested_tensor | |
self.mask_check = mask_check | |
def forward( | |
self, | |
src: Tensor, | |
mask: Optional[Tensor] = None, | |
src_key_padding_mask: Optional[Tensor] = None, | |
is_causal: Optional[bool] = None, | |
need_weights: Optional[bool] = False) -> Tensor: | |
r"""Pass the input through the encoder layers in turn. | |
Args: | |
src: the sequence to the encoder (required). | |
mask: the mask for the src sequence (optional). | |
is_causal: If specified, applies a causal mask as mask (optional) | |
and ignores attn_mask for computing scaled dot product attention. | |
Default: ``False``. | |
src_key_padding_mask: the mask for the src keys per batch (optional). | |
Shape: | |
see the docs in Transformer class. | |
""" | |
src_key_padding_mask = F._canonical_mask( | |
mask=src_key_padding_mask, | |
mask_name="src_key_padding_mask", | |
other_type=F._none_or_dtype(mask), | |
other_name="mask", | |
target_type=src.dtype | |
) | |
output = src | |
convert_to_nested = False | |
first_layer = self.layers[0] | |
src_key_padding_mask_for_layers = src_key_padding_mask | |
why_not_sparsity_fast_path = '' | |
str_first_layer = "self.layers[0]" | |
if not isinstance(first_layer, torch.nn.TransformerEncoderLayer): | |
why_not_sparsity_fast_path = f"{str_first_layer} was not TransformerEncoderLayer" | |
elif first_layer.norm_first: | |
why_not_sparsity_fast_path = f"{str_first_layer}.norm_first was True" | |
elif first_layer.training: | |
why_not_sparsity_fast_path = f"{str_first_layer} was in training mode" | |
elif not first_layer.self_attn.batch_first: | |
why_not_sparsity_fast_path = f" {str_first_layer}.self_attn.batch_first was not True" | |
elif not first_layer.self_attn._qkv_same_embed_dim: | |
why_not_sparsity_fast_path = f"{str_first_layer}.self_attn._qkv_same_embed_dim was not True" | |
elif not first_layer.activation_relu_or_gelu: | |
why_not_sparsity_fast_path = f" {str_first_layer}.activation_relu_or_gelu was not True" | |
elif not (first_layer.norm1.eps == first_layer.norm2.eps): | |
why_not_sparsity_fast_path = f"{str_first_layer}.norm1.eps was not equal to {str_first_layer}.norm2.eps" | |
elif not src.dim() == 3: | |
why_not_sparsity_fast_path = f"input not batched; expected src.dim() of 3 but got {src.dim()}" | |
elif not self.enable_nested_tensor: | |
why_not_sparsity_fast_path = "enable_nested_tensor was not True" | |
elif src_key_padding_mask is None: | |
why_not_sparsity_fast_path = "src_key_padding_mask was None" | |
elif (((not hasattr(self, "mask_check")) or self.mask_check) | |
and not torch._nested_tensor_from_mask_left_aligned(src, src_key_padding_mask.logical_not())): | |
why_not_sparsity_fast_path = "mask_check enabled, and src and src_key_padding_mask was not left aligned" | |
elif output.is_nested: | |
why_not_sparsity_fast_path = "NestedTensor input is not supported" | |
elif mask is not None: | |
why_not_sparsity_fast_path = "src_key_padding_mask and mask were both supplied" | |
elif first_layer.self_attn.num_heads % 2 == 1: | |
why_not_sparsity_fast_path = "num_head is odd" | |
elif torch.is_autocast_enabled(): | |
why_not_sparsity_fast_path = "autocast is enabled" | |
if not why_not_sparsity_fast_path: | |
tensor_args = ( | |
src, | |
first_layer.self_attn.in_proj_weight, | |
first_layer.self_attn.in_proj_bias, | |
first_layer.self_attn.out_proj.weight, | |
first_layer.self_attn.out_proj.bias, | |
first_layer.norm1.weight, | |
first_layer.norm1.bias, | |
first_layer.norm2.weight, | |
first_layer.norm2.bias, | |
first_layer.linear1.weight, | |
first_layer.linear1.bias, | |
first_layer.linear2.weight, | |
first_layer.linear2.bias, | |
) | |
if torch.overrides.has_torch_function(tensor_args): | |
why_not_sparsity_fast_path = "some Tensor argument has_torch_function" | |
elif not (src.is_cuda or 'cpu' in str(src.device)): | |
why_not_sparsity_fast_path = "src is neither CUDA nor CPU" | |
elif torch.is_grad_enabled() and any(x.requires_grad for x in tensor_args): | |
why_not_sparsity_fast_path = ("grad is enabled and at least one of query or the " | |
"input/output projection weights or biases requires_grad") | |
if (not why_not_sparsity_fast_path) and (src_key_padding_mask is not None): | |
convert_to_nested = True | |
output = torch._nested_tensor_from_mask(output, src_key_padding_mask.logical_not(), mask_check=False) | |
src_key_padding_mask_for_layers = None | |
# Prevent type refinement | |
make_causal = (is_causal is True) | |
if is_causal is None: | |
if mask is not None: | |
sz = mask.size(0) | |
causal_comparison = torch.triu( | |
torch.ones(sz, sz, device=mask.device) * float('-inf'), diagonal=1 | |
).to(mask.dtype) | |
if torch.equal(mask, causal_comparison): | |
make_causal = True | |
is_causal = make_causal | |
attn_maps = [] | |
for mod in self.layers: | |
output, attn_map = mod(output, src_mask=mask, is_causal=is_causal, | |
src_key_padding_mask=src_key_padding_mask_for_layers, need_weights=need_weights) | |
attn_maps.append(attn_map) | |
if convert_to_nested: | |
output = output.to_padded_tensor(0.) | |
if self.norm is not None: | |
output = self.norm(output) | |
return output, attn_maps | |
class TransformerDecoder(Module): | |
r"""TransformerDecoder is a stack of N decoder layers | |
Args: | |
decoder_layer: an instance of the TransformerDecoderLayer() class (required). | |
num_layers: the number of sub-decoder-layers in the decoder (required). | |
norm: the layer normalization component (optional). | |
Examples:: | |
>>> decoder_layer = nn.TransformerDecoderLayer(d_model=512, nhead=8) | |
>>> transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=6) | |
>>> memory = torch.rand(10, 32, 512) | |
>>> tgt = torch.rand(20, 32, 512) | |
>>> out = transformer_decoder(tgt, memory) | |
""" | |
__constants__ = ['norm'] | |
def __init__(self, decoder_layer, num_layers, norm=None): | |
super().__init__() | |
torch._C._log_api_usage_once(f"torch.nn.modules.{self.__class__.__name__}") | |
self.layers = _get_clones(decoder_layer, num_layers) | |
self.num_layers = num_layers | |
self.norm = norm | |
def forward(self, tgt: Tensor, memory: Tensor, tgt_mask: Optional[Tensor] = None, | |
memory_mask: Optional[Tensor] = None, tgt_key_padding_mask: Optional[Tensor] = None, | |
memory_key_padding_mask: Optional[Tensor] = None, | |
need_weights: bool = False) -> Tensor: | |
r"""Pass the inputs (and mask) through the decoder layer in turn. | |
Args: | |
tgt: the sequence to the decoder (required). | |
memory: the sequence from the last layer of the encoder (required). | |
tgt_mask: the mask for the tgt sequence (optional). | |
memory_mask: the mask for the memory sequence (optional). | |
tgt_key_padding_mask: the mask for the tgt keys per batch (optional). | |
memory_key_padding_mask: the mask for the memory keys per batch (optional). | |
Shape: | |
see the docs in Transformer class. | |
""" | |
output = tgt | |
attn_maps_sa = [] | |
attn_maps_mha = [] | |
for mod in self.layers: | |
output, attn_map_sa, attn_map_mha = mod(output, memory, tgt_mask=tgt_mask, | |
memory_mask=memory_mask, | |
tgt_key_padding_mask=tgt_key_padding_mask, | |
memory_key_padding_mask=memory_key_padding_mask, | |
need_weights=need_weights) | |
attn_maps_sa.append(attn_map_sa) | |
attn_maps_mha.append(attn_map_mha) | |
if self.norm is not None: | |
output = self.norm(output) | |
return output, attn_maps_sa, attn_maps_mha | |
class TransformerEncoderLayer(Module): | |
r"""TransformerEncoderLayer is made up of self-attn and feedforward network. | |
This standard encoder layer is based on the paper "Attention Is All You Need". | |
Ashish Vaswani, Noam Shazeer, Niki Parmar, Jakob Uszkoreit, Llion Jones, Aidan N Gomez, | |
Lukasz Kaiser, and Illia Polosukhin. 2017. Attention is all you need. In Advances in | |
Neural Information Processing Systems, pages 6000-6010. Users may modify or implement | |
in a different way during application. | |
Args: | |
d_model: the number of expected features in the input (required). | |
nhead: the number of heads in the multiheadattention models (required). | |
dim_feedforward: the dimension of the feedforward network model (default=2048). | |
dropout: the dropout value (default=0.1). | |
activation: the activation function of the intermediate layer, can be a string | |
("relu" or "gelu") or a unary callable. Default: relu | |
layer_norm_eps: the eps value in layer normalization components (default=1e-5). | |
batch_first: If ``True``, then the input and output tensors are provided | |
as (batch, seq, feature). Default: ``False`` (seq, batch, feature). | |
norm_first: if ``True``, layer norm is done prior to attention and feedforward | |
operations, respectively. Otherwise it's done after. Default: ``False`` (after). | |
Examples:: | |
>>> encoder_layer = nn.TransformerEncoderLayer(d_model=512, nhead=8) | |
>>> src = torch.rand(10, 32, 512) | |
>>> out = encoder_layer(src) | |
Alternatively, when ``batch_first`` is ``True``: | |
>>> encoder_layer = nn.TransformerEncoderLayer(d_model=512, nhead=8, batch_first=True) | |
>>> src = torch.rand(32, 10, 512) | |
>>> out = encoder_layer(src) | |
Fast path: | |
forward() will use a special optimized implementation described in | |
`FlashAttention: Fast and Memory-Efficient Exact Attention with IO-Awareness`_ if all of the following | |
conditions are met: | |
- Either autograd is disabled (using ``torch.inference_mode`` or ``torch.no_grad``) or no tensor | |
argument ``requires_grad`` | |
- training is disabled (using ``.eval()``) | |
- batch_first is ``True`` and the input is batched (i.e., ``src.dim() == 3``) | |
- activation is one of: ``"relu"``, ``"gelu"``, ``torch.functional.relu``, or ``torch.functional.gelu`` | |
- at most one of ``src_mask`` and ``src_key_padding_mask`` is passed | |
- if src is a `NestedTensor <https://pytorch.org/docs/stable/nested.html>`_, neither ``src_mask`` | |
nor ``src_key_padding_mask`` is passed | |
- the two ``LayerNorm`` instances have a consistent ``eps`` value (this will naturally be the case | |
unless the caller has manually modified one without modifying the other) | |
If the optimized implementation is in use, a | |
`NestedTensor <https://pytorch.org/docs/stable/nested.html>`_ can be | |
passed for ``src`` to represent padding more efficiently than using a padding | |
mask. In this case, a `NestedTensor <https://pytorch.org/docs/stable/nested.html>`_ will be | |
returned, and an additional speedup proportional to the fraction of the input that | |
is padding can be expected. | |
.. _`FlashAttention: Fast and Memory-Efficient Exact Attention with IO-Awareness`: | |
https://arxiv.org/abs/2205.14135 | |
""" | |
__constants__ = ['batch_first', 'norm_first'] | |
def __init__(self, d_model: int, nhead: int, dim_feedforward: int = 2048, dropout: float = 0.1, | |
activation: Union[str, Callable[[Tensor], Tensor]] = F.relu, | |
layer_norm_eps: float = 1e-5, batch_first: bool = False, norm_first: bool = False, | |
device=None, dtype=None) -> None: | |
factory_kwargs = {'device': device, 'dtype': dtype} | |
super().__init__() | |
self.self_attn = MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=batch_first, | |
**factory_kwargs) | |
# Implementation of Feedforward model | |
self.linear1 = Linear(d_model, dim_feedforward, **factory_kwargs) | |
self.dropout = Dropout(dropout) | |
self.linear2 = Linear(dim_feedforward, d_model, **factory_kwargs) | |
self.norm_first = norm_first | |
self.norm1 = LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs) | |
self.norm2 = LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs) | |
self.dropout1 = Dropout(dropout) | |
self.dropout2 = Dropout(dropout) | |
# Legacy string support for activation function. | |
if isinstance(activation, str): | |
activation = _get_activation_fn(activation) | |
# We can't test self.activation in forward() in TorchScript, | |
# so stash some information about it instead. | |
if activation is F.relu or isinstance(activation, torch.nn.ReLU): | |
self.activation_relu_or_gelu = 1 | |
elif activation is F.gelu or isinstance(activation, torch.nn.GELU): | |
self.activation_relu_or_gelu = 2 | |
else: | |
self.activation_relu_or_gelu = 0 | |
self.activation = activation | |
def __setstate__(self, state): | |
super().__setstate__(state) | |
if not hasattr(self, 'activation'): | |
self.activation = F.relu | |
def forward( | |
self, | |
src: Tensor, | |
src_mask: Optional[Tensor] = None, | |
src_key_padding_mask: Optional[Tensor] = None, | |
is_causal: bool = False, | |
need_weights: bool = False) -> Tensor: | |
r"""Pass the input through the encoder layer. | |
Args: | |
src: the sequence to the encoder layer (required). | |
src_mask: the mask for the src sequence (optional). | |
is_causal: If specified, applies a causal mask as src_mask. | |
Default: ``False``. | |
src_key_padding_mask: the mask for the src keys per batch (optional). | |
Shape: | |
see the docs in Transformer class. | |
""" | |
src_key_padding_mask = F._canonical_mask( | |
mask=src_key_padding_mask, | |
mask_name="src_key_padding_mask", | |
other_type=F._none_or_dtype(src_mask), | |
other_name="src_mask", | |
target_type=src.dtype | |
) | |
# see Fig. 1 of https://arxiv.org/pdf/2002.04745v1.pdf | |
why_not_sparsity_fast_path = '' | |
if not src.dim() == 3: | |
why_not_sparsity_fast_path = f"input not batched; expected src.dim() of 3 but got {src.dim()}" | |
elif self.training: | |
why_not_sparsity_fast_path = "training is enabled" | |
elif not self.self_attn.batch_first: | |
why_not_sparsity_fast_path = "self_attn.batch_first was not True" | |
elif not self.self_attn._qkv_same_embed_dim: | |
why_not_sparsity_fast_path = "self_attn._qkv_same_embed_dim was not True" | |
elif not self.activation_relu_or_gelu: | |
why_not_sparsity_fast_path = "activation_relu_or_gelu was not True" | |
elif not (self.norm1.eps == self.norm2.eps): | |
why_not_sparsity_fast_path = "norm1.eps is not equal to norm2.eps" | |
elif src.is_nested and (src_key_padding_mask is not None or src_mask is not None): | |
why_not_sparsity_fast_path = "neither src_key_padding_mask nor src_mask are not supported with NestedTensor input" | |
elif self.self_attn.num_heads % 2 == 1: | |
why_not_sparsity_fast_path = "num_head is odd" | |
elif torch.is_autocast_enabled(): | |
why_not_sparsity_fast_path = "autocast is enabled" | |
if not why_not_sparsity_fast_path: | |
tensor_args = ( | |
src, | |
self.self_attn.in_proj_weight, | |
self.self_attn.in_proj_bias, | |
self.self_attn.out_proj.weight, | |
self.self_attn.out_proj.bias, | |
self.norm1.weight, | |
self.norm1.bias, | |
self.norm2.weight, | |
self.norm2.bias, | |
self.linear1.weight, | |
self.linear1.bias, | |
self.linear2.weight, | |
self.linear2.bias, | |
) | |
# We have to use list comprehensions below because TorchScript does not support | |
# generator expressions. | |
if torch.overrides.has_torch_function(tensor_args): | |
why_not_sparsity_fast_path = "some Tensor argument has_torch_function" | |
elif not all((x.is_cuda or 'cpu' in str(x.device)) for x in tensor_args): | |
why_not_sparsity_fast_path = "some Tensor argument is neither CUDA nor CPU" | |
elif torch.is_grad_enabled() and any(x.requires_grad for x in tensor_args): | |
why_not_sparsity_fast_path = ("grad is enabled and at least one of query or the " | |
"input/output projection weights or biases requires_grad") | |
if not why_not_sparsity_fast_path: | |
merged_mask, mask_type = self.self_attn.merge_masks(src_mask, src_key_padding_mask, src) | |
return torch._transformer_encoder_layer_fwd( | |
src, | |
self.self_attn.embed_dim, | |
self.self_attn.num_heads, | |
self.self_attn.in_proj_weight, | |
self.self_attn.in_proj_bias, | |
self.self_attn.out_proj.weight, | |
self.self_attn.out_proj.bias, | |
self.activation_relu_or_gelu == 2, | |
self.norm_first, | |
self.norm1.eps, | |
self.norm1.weight, | |
self.norm1.bias, | |
self.norm2.weight, | |
self.norm2.bias, | |
self.linear1.weight, | |
self.linear1.bias, | |
self.linear2.weight, | |
self.linear2.bias, | |
merged_mask, | |
mask_type, | |
) | |
x = src | |
if self.norm_first: | |
_x, attn_map = self._sa_block(self.norm1(x), src_mask, src_key_padding_mask, need_weights) | |
x = x + _x | |
x = x + self._ff_block(self.norm2(x)) | |
else: | |
_x, attn_map = self._sa_block(x, src_mask, src_key_padding_mask, need_weights) | |
x = self.norm1(x + _x) | |
x = self.norm2(x + self._ff_block(x)) | |
return x, attn_map | |
# self-attention block | |
def _sa_block(self, x: Tensor, | |
attn_mask: Optional[Tensor], key_padding_mask: Optional[Tensor], | |
need_weights: Optional[bool] = False) -> Tensor: | |
x, attn_map = self.self_attn(x, x, x, | |
attn_mask=attn_mask, | |
key_padding_mask=key_padding_mask, | |
need_weights=need_weights) | |
return self.dropout1(x), attn_map | |
# feed forward block | |
def _ff_block(self, x: Tensor) -> Tensor: | |
x = self.linear2(self.dropout(self.activation(self.linear1(x)))) | |
return self.dropout2(x) | |
class TransformerDecoderLayer(Module): | |
r"""TransformerDecoderLayer is made up of self-attn, multi-head-attn and feedforward network. | |
This standard decoder layer is based on the paper "Attention Is All You Need". | |
Ashish Vaswani, Noam Shazeer, Niki Parmar, Jakob Uszkoreit, Llion Jones, Aidan N Gomez, | |
Lukasz Kaiser, and Illia Polosukhin. 2017. Attention is all you need. In Advances in | |
Neural Information Processing Systems, pages 6000-6010. Users may modify or implement | |
in a different way during application. | |
Args: | |
d_model: the number of expected features in the input (required). | |
nhead: the number of heads in the multiheadattention models (required). | |
dim_feedforward: the dimension of the feedforward network model (default=2048). | |
dropout: the dropout value (default=0.1). | |
activation: the activation function of the intermediate layer, can be a string | |
("relu" or "gelu") or a unary callable. Default: relu | |
layer_norm_eps: the eps value in layer normalization components (default=1e-5). | |
batch_first: If ``True``, then the input and output tensors are provided | |
as (batch, seq, feature). Default: ``False`` (seq, batch, feature). | |
norm_first: if ``True``, layer norm is done prior to self attention, multihead | |
attention and feedforward operations, respectively. Otherwise it's done after. | |
Default: ``False`` (after). | |
Examples:: | |
>>> decoder_layer = nn.TransformerDecoderLayer(d_model=512, nhead=8) | |
>>> memory = torch.rand(10, 32, 512) | |
>>> tgt = torch.rand(20, 32, 512) | |
>>> out = decoder_layer(tgt, memory) | |
Alternatively, when ``batch_first`` is ``True``: | |
>>> decoder_layer = nn.TransformerDecoderLayer(d_model=512, nhead=8, batch_first=True) | |
>>> memory = torch.rand(32, 10, 512) | |
>>> tgt = torch.rand(32, 20, 512) | |
>>> out = decoder_layer(tgt, memory) | |
""" | |
__constants__ = ['batch_first', 'norm_first'] | |
def __init__(self, d_model: int, nhead: int, dim_feedforward: int = 2048, dropout: float = 0.1, | |
activation: Union[str, Callable[[Tensor], Tensor]] = F.relu, | |
layer_norm_eps: float = 1e-5, batch_first: bool = False, norm_first: bool = False, | |
device=None, dtype=None) -> None: | |
factory_kwargs = {'device': device, 'dtype': dtype} | |
super().__init__() | |
self.self_attn = MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=batch_first, | |
**factory_kwargs) | |
self.multihead_attn = MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=batch_first, | |
**factory_kwargs) | |
# Implementation of Feedforward model | |
self.linear1 = Linear(d_model, dim_feedforward, **factory_kwargs) | |
self.dropout = Dropout(dropout) | |
self.linear2 = Linear(dim_feedforward, d_model, **factory_kwargs) | |
self.norm_first = norm_first | |
self.norm1 = LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs) | |
self.norm2 = LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs) | |
self.norm3 = LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs) | |
self.dropout1 = Dropout(dropout) | |
self.dropout2 = Dropout(dropout) | |
self.dropout3 = Dropout(dropout) | |
# Legacy string support for activation function. | |
if isinstance(activation, str): | |
self.activation = _get_activation_fn(activation) | |
else: | |
self.activation = activation | |
def __setstate__(self, state): | |
if 'activation' not in state: | |
state['activation'] = F.relu | |
super().__setstate__(state) | |
def forward( | |
self, | |
tgt: Tensor, | |
memory: Tensor, | |
tgt_mask: Optional[Tensor] = None, | |
memory_mask: Optional[Tensor] = None, | |
tgt_key_padding_mask: Optional[Tensor] = None, | |
memory_key_padding_mask: Optional[Tensor] = None, | |
tgt_is_causal: bool = False, | |
memory_is_causal: bool = False, | |
need_weights: bool = False, | |
) -> Tensor: | |
r"""Pass the inputs (and mask) through the decoder layer. | |
Args: | |
tgt: the sequence to the decoder layer (required). | |
memory: the sequence from the last layer of the encoder (required). | |
tgt_mask: the mask for the tgt sequence (optional). | |
memory_mask: the mask for the memory sequence (optional). | |
tgt_key_padding_mask: the mask for the tgt keys per batch (optional). | |
memory_key_padding_mask: the mask for the memory keys per batch (optional). | |
tgt_is_causal: If specified, applies a causal mask as tgt mask. | |
Mutually exclusive with providing tgt_mask. Default: ``False``. | |
memory_is_causal: If specified, applies a causal mask as tgt mask. | |
Mutually exclusive with providing memory_mask. Default: ``False``. | |
Shape: | |
see the docs in Transformer class. | |
""" | |
# see Fig. 1 of https://arxiv.org/pdf/2002.04745v1.pdf | |
x = tgt | |
if self.norm_first: | |
_x, attn_map_sa = self._sa_block(self.norm1(x), tgt_mask, tgt_key_padding_mask, tgt_is_causal) | |
x = x + _x | |
_x, attn_map_mha = self._mha_block(self.norm2(x), memory, memory_mask, memory_key_padding_mask, memory_is_causal) | |
x = x + _x | |
x = x + self._ff_block(self.norm3(x)) | |
else: | |
_x, attn_map_sa = self._sa_block(x, tgt_mask, tgt_key_padding_mask, tgt_is_causal) | |
x = self.norm1(x + _x) | |
_x, attn_map_mha = self._mha_block(x, memory, memory_mask, memory_key_padding_mask, memory_is_causal) | |
x = self.norm2(x + _x) | |
x = self.norm3(x + self._ff_block(x)) | |
return x, attn_map_sa, attn_map_mha | |
# self-attention block | |
def _sa_block(self, x: Tensor, | |
attn_mask: Optional[Tensor], key_padding_mask: Optional[Tensor], is_causal: bool = False, need_weights: bool = False) -> Tensor: | |
x, attn_map = self.self_attn(x, x, x, | |
attn_mask=attn_mask, | |
key_padding_mask=key_padding_mask, | |
is_causal=is_causal, | |
need_weights=need_weights) | |
return self.dropout1(x), attn_map | |
# multihead attention block | |
def _mha_block(self, x: Tensor, mem: Tensor, | |
attn_mask: Optional[Tensor], key_padding_mask: Optional[Tensor], is_causal: bool = False, need_weights: bool = False) -> Tensor: | |
x, attn_map = self.multihead_attn(x, mem, mem, | |
attn_mask=attn_mask, | |
key_padding_mask=key_padding_mask, | |
is_causal=is_causal, | |
need_weights=need_weights) | |
return self.dropout2(x), attn_map | |
# feed forward block | |
def _ff_block(self, x: Tensor) -> Tensor: | |
x = self.linear2(self.dropout(self.activation(self.linear1(x)))) | |
return self.dropout3(x) | |
def _get_clones(module, N): | |
# FIXME: copy.deepcopy() is not defined on nn.module | |
return ModuleList([copy.deepcopy(module) for i in range(N)]) | |
def _get_activation_fn(activation: str) -> Callable[[Tensor], Tensor]: | |
if activation == "relu": | |
return F.relu | |
elif activation == "gelu": | |
return F.gelu | |
raise RuntimeError("activation should be relu/gelu, not {}".format(activation)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment