Skip to content

Instantly share code, notes, and snippets.

@cbaziotis
Last active March 28, 2023 11:50
Show Gist options
  • Save cbaziotis/6428df359af27d58078ca5ed9792bd6d to your computer and use it in GitHub Desktop.
Save cbaziotis/6428df359af27d58078ca5ed9792bd6d to your computer and use it in GitHub Desktop.
Keras Layer that implements an Attention mechanism for temporal data. Supports Masking. Follows the work of Raffel et al. [https://arxiv.org/abs/1512.08756]
from keras import backend as K, initializers, regularizers, constraints
from keras.engine.topology import Layer
def dot_product(x, kernel):
"""
Wrapper for dot product operation, in order to be compatible with both
Theano and Tensorflow
Args:
x (): input
kernel (): weights
Returns:
"""
if K.backend() == 'tensorflow':
# todo: check that this is correct
return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)
else:
return K.dot(x, kernel)
class Attention(Layer):
def __init__(self,
W_regularizer=None, b_regularizer=None,
W_constraint=None, b_constraint=None,
bias=True,
return_attention=False,
**kwargs):
"""
Keras Layer that implements an Attention mechanism for temporal data.
Supports Masking.
Follows the work of Raffel et al. [https://arxiv.org/abs/1512.08756]
# Input shape
3D tensor with shape: `(samples, steps, features)`.
# Output shape
2D tensor with shape: `(samples, features)`.
:param kwargs:
Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True.
The dimensions are inferred based on the output shape of the RNN.
Note: The layer has been tested with Keras 1.x
Example:
# 1
model.add(LSTM(64, return_sequences=True))
model.add(Attention())
# next add a Dense layer (for classification/regression) or whatever...
# 2 - Get the attention scores
hidden = LSTM(64, return_sequences=True)(words)
sentence, word_scores = Attention(return_attention=True)(hidden)
"""
self.supports_masking = True
self.return_attention = return_attention
self.init = initializers.get('glorot_uniform')
self.W_regularizer = regularizers.get(W_regularizer)
self.b_regularizer = regularizers.get(b_regularizer)
self.W_constraint = constraints.get(W_constraint)
self.b_constraint = constraints.get(b_constraint)
self.bias = bias
super(Attention, self).__init__(**kwargs)
def build(self, input_shape):
assert len(input_shape) == 3
self.W = self.add_weight((input_shape[-1],),
initializer=self.init,
name='{}_W'.format(self.name),
regularizer=self.W_regularizer,
constraint=self.W_constraint)
if self.bias:
self.b = self.add_weight((input_shape[1],),
initializer='zero',
name='{}_b'.format(self.name),
regularizer=self.b_regularizer,
constraint=self.b_constraint)
else:
self.b = None
self.built = True
def compute_mask(self, input, input_mask=None):
# do not pass the mask to the next layers
return None
def call(self, x, mask=None):
eij = dot_product(x, self.W)
if self.bias:
eij += self.b
eij = K.tanh(eij)
a = K.exp(eij)
# apply mask after the exp. will be re-normalized next
if mask is not None:
# Cast the mask to floatX to avoid float64 upcasting in theano
a *= K.cast(mask, K.floatx())
# in some cases especially in the early stages of training the sum may be almost zero
# and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
# a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())
weighted_input = x * K.expand_dims(a)
result = K.sum(weighted_input, axis=1)
if self.return_attention:
return [result, a]
return result
def compute_output_shape(self, input_shape):
if self.return_attention:
return [(input_shape[0], input_shape[-1]),
(input_shape[0], input_shape[1])]
else:
return input_shape[0], input_shape[-1]
@jiexiongseeker
Copy link

jiexiongseeker commented Feb 12, 2018

Hi, Thanks for your implementation. However, comparing to the original paper, your code on "bias" is implemented differently.

@gregkoytiger
Copy link

gregkoytiger commented Apr 30, 2018

To fully support saving / loading, I believe the Attention layer requires the following:

    def get_config(self):
        config = {
            'return_attention': self.return_attention,
            'W_regularizer': regularizers.serialize(self.W_regularizer),
            'b_regularizer': regularizers.serialize(self.b_regularizer),
            'W_constraint': constraints.serialize(self.W_constraint),
            'b_constraint': constraints.serialize(self.b_constraint),
            'bias': self.bias
        }
   
        base_config = super(Attention, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

@ronggong
Copy link

ronggong commented May 30, 2018

https://gist.github.com/cbaziotis/6428df359af27d58078ca5ed9792bd6d#gistcomment-2343639

@WladimirSidorenko have you found the solution for variable length input? I think the problem is at the bias, if you set bias=False, it will be fine.

Update: I don't understand why the bias vector is of size the second dimension of the input shape, which is the time steps...
The output of eij = dot_product(x, self.W) should have dimension (samples, steps), I doubt if we need to learn the bias for each time steps. Should we just need to set the bias as:

self.b = self.add_weight((1,),
initializer='zero',
name='{}_b'.format(self.name),
regularizer=self.b_regularizer,
constraint=self.b_constraint)

@ant1pink
Copy link

@cbaziotis
hidden = LSTM(64, return_sequences=True)(words)
sentence, word_scores = Attention(return_attention=True)(hidden)
output = Dense(1, activation='sigmoid')(sentence)
in this case, when I train it with a binary classification problem. How do I catch 'word_scores'?
When I do this:
attention_model = Model(input= model.input, output= model.layers[-2].output)
I got the 'sentence' rather than 'word_scores '

@barbarasilveiraf
Copy link

@ant1pink,

Did you get the "word_scores"?

Thanks.

@EricRiveraLopez
Copy link

Hello! I use your code. But I have a problem when load a model with Attetion layer. This problem is:

ValueError: Unknown layer: AttentionDecoder

Previously, I training a Neuronal Network R LSTM

adam=keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
#aquí se empieza a construir la gráfica de la red
model=Sequential()
#se empieza a ñadir capas al modelo
model.add(LSTM(output_dim=300,
input_shape=x_train.shape[1:],
return_sequences=True,
activation='hard_sigmoid',
dropout=0.2))
#segunda capa
model.add(LSTM(output_dim=300,
input_shape=x_train.shape[1:],
return_sequences=True,
activation='hard_sigmoid',
dropout=0.2))
model.add(AttentionDecoder(300, 300))
model.compile(loss='mean_squared_error', optimizer=adam, metrics=['mean_absolute_percentage_error'])
model.summary()

May somebody help me?

@williamgilpin
Copy link

Hello, I tried using the current version, and I kept getting this error:

TypeError: add_weight() got multiple values for argument 'name'

It turns out that this results from using eager execution in the latest version of tensorflow. The solution was to modify the argument of the add_weights function, such that the first argument is explicitly named shape


    def build(self, input_shape):
        assert len(input_shape) == 3

        self.W = self.add_weight(shape=(input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_W'.format(self.name),
                                 regularizer=self.W_regularizer,
                                 #shape=(input_shape[-1], input_shape[1]),
                                 constraint=self.W_constraint)
        if self.bias:
            self.b = self.add_weight(shape=(input_shape[1],),
                                     initializer='zero',
                                     name='{}_b'.format(self.name),
                                     regularizer=self.b_regularizer,
                                     #shape=(input_shape[-1],),
                                     constraint=self.b_constraint)
        else:
            self.b = None

        self.built = True

@modestprophet
Copy link

Some folks in this thread asked about extracting the attention vector during inference. I believe I finally got that bit of functionality to work and have described the process here: https://stackoverflow.com/a/59276694/11133810

@rimchiha-fseg
Copy link

rimchiha-fseg commented Apr 28, 2020

i work on named entity recognition domain
i tried to implement the attention layer proposed in

https://bmcmedinformdecismak.biomedcentral.com/articles/10.1186/s12911-019-0933-6
the code of attention layer
`from keras.engine.topology import Layer
from keras import backend as K, initializers, regularizers, constraints
def dot_product(x, kernel):

    if K.backend() == 'tensorflow':
        # todo: check that this is correct
        return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)
    else:
        return K.dot(x, kernel)

class Attention(Layer):
def init(self,
W_regularizer=None, b_regularizer=None,
W_constraint=None, b_constraint=None,
bias=True,return_attention=False, **kwargs):

    self.supports_masking = True
    self.init = initializers.get('glorot_uniform')

    self.W_regularizer = regularizers.get(W_regularizer)
    self.b_regularizer = regularizers.get(b_regularizer)

    self.W_constraint = constraints.get(W_constraint)
    self.b_constraint = constraints.get(b_constraint)

    self.bias = bias
    self.return_attention = return_attention
    super(Attention, self).__init__(**kwargs)

def build(self, input_shape):
    assert len(input_shape) == 3
    print()
    self.W = self.add_weight(shape=(input_shape[-1],),
                             initializer=self.init,
                             name='{}_W'.format(self.name),
                             regularizer=self.W_regularizer,
                             #shape=(input_shape[-1], input_shape[1]),
                             constraint=self.W_constraint)
    if self.bias:
        self.b = self.add_weight(shape=(input_shape[1],),
                                 initializer='zero',
                                 name='{}_b'.format(self.name),
                                 regularizer=self.b_regularizer,
                                 #shape=(input_shape[-1],),
                                 constraint=self.b_constraint)
    else:
        self.b = None

    self.built = True


def compute_mask(self, input, input_mask=None):
    # do not pass the mask to the next layers
    return None

def call(self, x, mask=None):
    eij = dot_product(x, self.W)
    print("x:",x)
    print("intiale eij", eij)
    if self.bias:
        eij += self.b
    print("first eij:", eij)
    eij = K.tanh(eij)
    print("eij:", eij)
    a = K.exp(eij)

    # apply mask after the exp. will be re-normalized next
    if mask is not None:
        # Cast the mask to floatX to avoid float64 upcasting in theano
        a *= K.cast(mask, K.floatx())

    # in some cases especially in the early stages of training the sum may be almost zero
    # and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
    # a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
    a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

    a = K.expand_dims(a)
    print("alpha", a.shape)
    print(K.expand_dims(a))
    #weighted_input =dot_product(x,a)
    c=K.sum(x * K.expand_dims(a), axis=1)
    print("global vector", c.shape)
    new_output = tf.concat([x,c], axis=2)
    print("new_output", new_output.shape)
    #z=K.tanh(new_output)
   
    #print(z.shape)
    #return K.sum(weighted_input, axis=1)
    return new_output
`

the model is

from keras.models import Sequential from keras import backend as K from keras.models import Model from keras.optimizers import Adam from keras import initializers import numpy as np from keras.layers import Dense, Input, TimeDistributed, Embedding, Activation, Bidirectional return_attention = True inp1=Input(shape=(MAX_LENGTH,)) emb1=Embedding(len(word2index), 128)(inp1) bilstm2=Bidirectional(LSTM(256, return_sequences=True))(emb1) x=Attention(return_attention=True)(bilstm2) dense2=TimeDistributed(Dense(len(tag2index_U)))(x) out2=Activation('softmax')(dense2) model = Model(inputs=inp1, outputs= out2) model.compile(loss='categorical_crossentropy', optimizer=Adam(0.001),metrics=['accuracy']) model.summary()

model

the fit and evaluate run correctly with batch_size=1
model.fit(train_sentences_X, train_sentences_Y ,batch_size=1, epochs=20)
score = model.evaluate(test_sentences_X, train_sentences_Y , batch_size=1 )
but the predict
test_samples=i love paris the result should be O O B-LOC
predictions = model.predict(test_samples_X, batch_size=1, verbose=1)
return the following error
`~\Anaconda3\lib\site-packages\keras\engine\training.py in predict(self, x, batch_size, verbose, steps, callbacks, max_queue_size, workers, use_multiprocessing)
1460 verbose=verbose,
1461 steps=steps,
-> 1462 callbacks=callbacks)
1463
1464 def train_on_batch(self, x, y,

~\Anaconda3\lib\site-packages\keras\engine\training_arrays.py in predict_loop(model, f, ins, batch_size, verbose, steps, callbacks)
330 outs.append(np.zeros(shape, dtype=batch_out.dtype))
331 for i, batch_out in enumerate(batch_outs):
--> 332 outs[i][batch_start:batch_end] = batch_out
333
334 batch_logs['outputs'] = batch_outs

ValueError: could not broadcast input array from shape (2,75,14) into shape (1,75,14)
`

@anoopkdcs
Copy link

Hai,
How to change the attention code to get - an attention distribution is frozen to uniform weights.

@visheshaylani
Copy link

image
I am getting this error while load the model. How can it solve it? Please help.

@sofimukhtar
Copy link

image
I am getting this error while load the model. How can it solve it? Please help.

replace W-regularizer by Kernel_regularizer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment