Skip to content

Instantly share code, notes, and snippets.

@cbaziotis
Last active March 28, 2023 11:50
Show Gist options
  • Save cbaziotis/6428df359af27d58078ca5ed9792bd6d to your computer and use it in GitHub Desktop.
Save cbaziotis/6428df359af27d58078ca5ed9792bd6d to your computer and use it in GitHub Desktop.
Keras Layer that implements an Attention mechanism for temporal data. Supports Masking. Follows the work of Raffel et al. [https://arxiv.org/abs/1512.08756]
from keras import backend as K, initializers, regularizers, constraints
from keras.engine.topology import Layer
def dot_product(x, kernel):
"""
Wrapper for dot product operation, in order to be compatible with both
Theano and Tensorflow
Args:
x (): input
kernel (): weights
Returns:
"""
if K.backend() == 'tensorflow':
# todo: check that this is correct
return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)
else:
return K.dot(x, kernel)
class Attention(Layer):
def __init__(self,
W_regularizer=None, b_regularizer=None,
W_constraint=None, b_constraint=None,
bias=True,
return_attention=False,
**kwargs):
"""
Keras Layer that implements an Attention mechanism for temporal data.
Supports Masking.
Follows the work of Raffel et al. [https://arxiv.org/abs/1512.08756]
# Input shape
3D tensor with shape: `(samples, steps, features)`.
# Output shape
2D tensor with shape: `(samples, features)`.
:param kwargs:
Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True.
The dimensions are inferred based on the output shape of the RNN.
Note: The layer has been tested with Keras 1.x
Example:
# 1
model.add(LSTM(64, return_sequences=True))
model.add(Attention())
# next add a Dense layer (for classification/regression) or whatever...
# 2 - Get the attention scores
hidden = LSTM(64, return_sequences=True)(words)
sentence, word_scores = Attention(return_attention=True)(hidden)
"""
self.supports_masking = True
self.return_attention = return_attention
self.init = initializers.get('glorot_uniform')
self.W_regularizer = regularizers.get(W_regularizer)
self.b_regularizer = regularizers.get(b_regularizer)
self.W_constraint = constraints.get(W_constraint)
self.b_constraint = constraints.get(b_constraint)
self.bias = bias
super(Attention, self).__init__(**kwargs)
def build(self, input_shape):
assert len(input_shape) == 3
self.W = self.add_weight((input_shape[-1],),
initializer=self.init,
name='{}_W'.format(self.name),
regularizer=self.W_regularizer,
constraint=self.W_constraint)
if self.bias:
self.b = self.add_weight((input_shape[1],),
initializer='zero',
name='{}_b'.format(self.name),
regularizer=self.b_regularizer,
constraint=self.b_constraint)
else:
self.b = None
self.built = True
def compute_mask(self, input, input_mask=None):
# do not pass the mask to the next layers
return None
def call(self, x, mask=None):
eij = dot_product(x, self.W)
if self.bias:
eij += self.b
eij = K.tanh(eij)
a = K.exp(eij)
# apply mask after the exp. will be re-normalized next
if mask is not None:
# Cast the mask to floatX to avoid float64 upcasting in theano
a *= K.cast(mask, K.floatx())
# in some cases especially in the early stages of training the sum may be almost zero
# and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
# a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())
weighted_input = x * K.expand_dims(a)
result = K.sum(weighted_input, axis=1)
if self.return_attention:
return [result, a]
return result
def compute_output_shape(self, input_shape):
if self.return_attention:
return [(input_shape[0], input_shape[-1]),
(input_shape[0], input_shape[1])]
else:
return input_shape[0], input_shape[-1]
@modestprophet
Copy link

Some folks in this thread asked about extracting the attention vector during inference. I believe I finally got that bit of functionality to work and have described the process here: https://stackoverflow.com/a/59276694/11133810

@rimchiha-fseg
Copy link

rimchiha-fseg commented Apr 28, 2020

i work on named entity recognition domain
i tried to implement the attention layer proposed in

https://bmcmedinformdecismak.biomedcentral.com/articles/10.1186/s12911-019-0933-6
the code of attention layer
`from keras.engine.topology import Layer
from keras import backend as K, initializers, regularizers, constraints
def dot_product(x, kernel):

    if K.backend() == 'tensorflow':
        # todo: check that this is correct
        return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)
    else:
        return K.dot(x, kernel)

class Attention(Layer):
def init(self,
W_regularizer=None, b_regularizer=None,
W_constraint=None, b_constraint=None,
bias=True,return_attention=False, **kwargs):

    self.supports_masking = True
    self.init = initializers.get('glorot_uniform')

    self.W_regularizer = regularizers.get(W_regularizer)
    self.b_regularizer = regularizers.get(b_regularizer)

    self.W_constraint = constraints.get(W_constraint)
    self.b_constraint = constraints.get(b_constraint)

    self.bias = bias
    self.return_attention = return_attention
    super(Attention, self).__init__(**kwargs)

def build(self, input_shape):
    assert len(input_shape) == 3
    print()
    self.W = self.add_weight(shape=(input_shape[-1],),
                             initializer=self.init,
                             name='{}_W'.format(self.name),
                             regularizer=self.W_regularizer,
                             #shape=(input_shape[-1], input_shape[1]),
                             constraint=self.W_constraint)
    if self.bias:
        self.b = self.add_weight(shape=(input_shape[1],),
                                 initializer='zero',
                                 name='{}_b'.format(self.name),
                                 regularizer=self.b_regularizer,
                                 #shape=(input_shape[-1],),
                                 constraint=self.b_constraint)
    else:
        self.b = None

    self.built = True


def compute_mask(self, input, input_mask=None):
    # do not pass the mask to the next layers
    return None

def call(self, x, mask=None):
    eij = dot_product(x, self.W)
    print("x:",x)
    print("intiale eij", eij)
    if self.bias:
        eij += self.b
    print("first eij:", eij)
    eij = K.tanh(eij)
    print("eij:", eij)
    a = K.exp(eij)

    # apply mask after the exp. will be re-normalized next
    if mask is not None:
        # Cast the mask to floatX to avoid float64 upcasting in theano
        a *= K.cast(mask, K.floatx())

    # in some cases especially in the early stages of training the sum may be almost zero
    # and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
    # a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
    a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

    a = K.expand_dims(a)
    print("alpha", a.shape)
    print(K.expand_dims(a))
    #weighted_input =dot_product(x,a)
    c=K.sum(x * K.expand_dims(a), axis=1)
    print("global vector", c.shape)
    new_output = tf.concat([x,c], axis=2)
    print("new_output", new_output.shape)
    #z=K.tanh(new_output)
   
    #print(z.shape)
    #return K.sum(weighted_input, axis=1)
    return new_output
`

the model is

from keras.models import Sequential from keras import backend as K from keras.models import Model from keras.optimizers import Adam from keras import initializers import numpy as np from keras.layers import Dense, Input, TimeDistributed, Embedding, Activation, Bidirectional return_attention = True inp1=Input(shape=(MAX_LENGTH,)) emb1=Embedding(len(word2index), 128)(inp1) bilstm2=Bidirectional(LSTM(256, return_sequences=True))(emb1) x=Attention(return_attention=True)(bilstm2) dense2=TimeDistributed(Dense(len(tag2index_U)))(x) out2=Activation('softmax')(dense2) model = Model(inputs=inp1, outputs= out2) model.compile(loss='categorical_crossentropy', optimizer=Adam(0.001),metrics=['accuracy']) model.summary()

model

the fit and evaluate run correctly with batch_size=1
model.fit(train_sentences_X, train_sentences_Y ,batch_size=1, epochs=20)
score = model.evaluate(test_sentences_X, train_sentences_Y , batch_size=1 )
but the predict
test_samples=i love paris the result should be O O B-LOC
predictions = model.predict(test_samples_X, batch_size=1, verbose=1)
return the following error
`~\Anaconda3\lib\site-packages\keras\engine\training.py in predict(self, x, batch_size, verbose, steps, callbacks, max_queue_size, workers, use_multiprocessing)
1460 verbose=verbose,
1461 steps=steps,
-> 1462 callbacks=callbacks)
1463
1464 def train_on_batch(self, x, y,

~\Anaconda3\lib\site-packages\keras\engine\training_arrays.py in predict_loop(model, f, ins, batch_size, verbose, steps, callbacks)
330 outs.append(np.zeros(shape, dtype=batch_out.dtype))
331 for i, batch_out in enumerate(batch_outs):
--> 332 outs[i][batch_start:batch_end] = batch_out
333
334 batch_logs['outputs'] = batch_outs

ValueError: could not broadcast input array from shape (2,75,14) into shape (1,75,14)
`

@anoopkdcs
Copy link

Hai,
How to change the attention code to get - an attention distribution is frozen to uniform weights.

@visheshaylani
Copy link

image
I am getting this error while load the model. How can it solve it? Please help.

@sofimukhtar
Copy link

image
I am getting this error while load the model. How can it solve it? Please help.

replace W-regularizer by Kernel_regularizer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment