Skip to content

Instantly share code, notes, and snippets.

@alfredfrancis
Last active June 4, 2018 18:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alfredfrancis/9514d1779730678498d39f48fecde2e6 to your computer and use it in GitHub Desktop.
Save alfredfrancis/9514d1779730678498d39f48fecde2e6 to your computer and use it in GitHub Desktop.
Wrapper for Rasa NLU Starspace classifier written in Tensorflow. Based on the starspace idea from: https://arxiv.org/abs/1709.03856.
cloudpickle
tensorflow
spacy
numpy
sklearn
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import io
import os
import cloudpickle as pickle
import numpy as np
import tensorflow as tf
import spacy
import logging
logger = logging.getLogger(__name__)
# tf.logging.set_verbosity(1)
class EmbeddingIntentClassifier():
name = "intent_classifier_starspace"
def __init__(self,
inv_intent_dict=None,
encoded_all_intents=None,
session=None,
graph=None,
intent_placeholder=None,
embedding_placeholder=None,
similarity_op=None,
vectorizer = None,
use_word_vectors = False
):
"""Declare instant variables with default values"""
self._check_tensorflow()
self.component_config = {
# nn architecture
"num_hidden_layers_a": 2,
"hidden_layer_size_a": [256, 128],
"num_hidden_layers_b": 0,
"hidden_layer_size_b": [],
"batch_size": 32,
"epochs": 300,
# embedding parameters
"embed_dim": 10,
"mu_pos": 0.8, # should be 0.0 < ... < 1.0 for 'cosine'
"mu_neg": -0.4, # should be -1.0 < ... < 1.0 for 'cosine'
"similarity_type": 'cosine', # string 'cosine' or 'inner'
"num_neg": 10,
"use_max_sim_neg": True, # flag which loss function to use
# regularization
"C2": 0.002,
"C_emb": 0.8,
"droprate": 0.2,
# flag if tokenize intents
"intent_tokenization_flag": False,
"intent_split_symbol": '_'
}
# nn architecture parameters
self._load_nn_architecture_params()
# embedding parameters
self._load_embedding_params()
# regularization
self._load_regularization_params()
# flag if tokenize intents
self._load_flag_if_tokenize_intents()
# check if hidden_layer_sizes are valid
(self.num_hidden_layers_a,
self.hidden_layer_size_a) = self._check_hidden_layer_sizes(
self.num_hidden_layers_a,
self.hidden_layer_size_a,
name='a')
(self.num_hidden_layers_b,
self.hidden_layer_size_b) = self._check_hidden_layer_sizes(
self.num_hidden_layers_b,
self.hidden_layer_size_b,
name='b')
# transform numbers to intents
self.inv_intent_dict = inv_intent_dict
# encode all intents with numbers
self.encoded_all_intents = encoded_all_intents
# tf related instances
self.session = session
self.graph = graph
self.intent_placeholder = intent_placeholder
self.embedding_placeholder = embedding_placeholder
self.similarity_op = similarity_op
self.nlp = spacy.load('en')
self.vect = vectorizer
self.use_word_vectors = use_word_vectors
def _load_nn_architecture_params(self):
self.num_hidden_layers_a = self.component_config['num_hidden_layers_a']
self.hidden_layer_size_a = self.component_config['hidden_layer_size_a']
self.num_hidden_layers_b = self.component_config['num_hidden_layers_b']
self.hidden_layer_size_b = self.component_config['hidden_layer_size_b']
self.batch_size = self.component_config['batch_size']
self.epochs = self.component_config['epochs']
def _load_embedding_params(self):
self.embed_dim = self.component_config['embed_dim']
self.mu_pos = self.component_config['mu_pos']
self.mu_neg = self.component_config['mu_neg']
self.similarity_type = self.component_config['similarity_type']
self.num_neg = self.component_config['num_neg']
self.use_max_sim_neg = self.component_config['use_max_sim_neg']
def _load_regularization_params(self):
self.C2 = self.component_config['C2']
self.C_emb = self.component_config['C_emb']
self.droprate = self.component_config['droprate']
def _load_flag_if_tokenize_intents(self):
self.intent_tokenization_flag = self.component_config[
'intent_tokenization_flag']
self.intent_split_symbol = self.component_config[
'intent_split_symbol']
if self.intent_tokenization_flag and not self.intent_split_symbol:
logger.warning("intent_split_symbol was not specified, "
"so intent tokenization will be ignored")
self.intent_tokenization_flag = False
@staticmethod
def _check_hidden_layer_sizes(num_layers, layer_size, name=''):
num_layers = int(num_layers)
if num_layers < 0:
logger.error("num_hidden_layers_{} = {} < 0."
"Set it to 0".format(name, num_layers))
num_layers = 0
if isinstance(layer_size, list) and len(layer_size) != num_layers:
if len(layer_size) == 0:
raise ValueError("hidden_layer_size_{} = {} "
"is an empty list, "
"while num_hidden_layers_{} = {} > 0"
"".format(name, layer_size,
name, num_layers))
logger.error("The length of hidden_layer_size_{} = {} "
"does not correspond to num_hidden_layers_{} "
"= {}. Set hidden_layer_size_{} to "
"the first element = {} for all layers"
"".format(name, len(layer_size),
name, num_layers,
name, layer_size[0]))
layer_size = layer_size[0]
if not isinstance(layer_size, list):
layer_size = [layer_size for _ in range(num_layers)]
return num_layers, layer_size
@staticmethod
def _check_tensorflow():
if tf is None:
raise ImportError(
'Failed to import `tensorflow`. '
'Please install `tensorflow`. '
'For example with `pip install tensorflow`.')
# training data helpers:
@staticmethod
def _create_intent_dict(training_data):
"""Create intent dictionary"""
distinct_intents = set([example.get("intent")
for example in training_data.get("intent_examples")])
return {intent: idx
for idx, intent in enumerate(sorted(distinct_intents))}
@staticmethod
def _create_intent_token_dict(intents, intent_split_symbol):
"""Create intent token dictionary"""
distinct_tokens = set([token
for intent in intents
for token in intent.split(
intent_split_symbol)])
return {token: idx
for idx, token in enumerate(sorted(distinct_tokens))}
def _create_encoded_intents(self, intent_dict):
"""Create matrix with intents encoded in rows as bag of words,
if intent_tokenization_flag = False this is identity matrix"""
if self.intent_tokenization_flag:
intent_token_dict = self._create_intent_token_dict(
list(intent_dict.keys()), self.intent_split_symbol)
encoded_all_intents = np.zeros((len(intent_dict),
len(intent_token_dict)))
for key, idx in intent_dict.items():
for t in key.split(self.intent_split_symbol):
encoded_all_intents[idx, intent_token_dict[t]] = 1
return encoded_all_intents
else:
return np.eye(len(intent_dict))
# data helpers:
def _create_all_Y(self, size):
# stack encoded_all_intents on top of each other
# to create candidates for training examples
# to calculate training accuracy
all_Y = np.stack([self.encoded_all_intents for _ in range(size)])
return all_Y
def _prepare_data_for_training(self, training_data, intent_dict):
"""Prepare data for training"""
X = np.stack([e.get("text_features")
for e in training_data.get("intent_examples")])
intents_for_X = np.array([intent_dict[e.get("intent")]
for e in training_data.get("intent_examples")])
Y = np.stack([self.encoded_all_intents[intent_idx]
for intent_idx in intents_for_X])
all_Y = self._create_all_Y(X.shape[0])
helper_data = intents_for_X, all_Y
return X, Y, helper_data
# tf helpers:
def _create_tf_embed_nn(self, x_in, is_training,
num_layers, layer_size, name):
"""Create embed nn for layer with name"""
reg = tf.contrib.layers.l2_regularizer(self.C2)
x = x_in
for i in range(num_layers):
x = tf.layers.dense(inputs=x,
units=layer_size[i],
activation=tf.nn.relu,
kernel_regularizer=reg,
name='hidden_layer_{}_{}'.format(name, i))
x = tf.layers.dropout(x, rate=self.droprate, training=is_training)
x = tf.layers.dense(inputs=x,
units=self.embed_dim,
kernel_regularizer=reg,
name='embed_layer_{}'.format(name))
return x
def _tf_sim(self, a, b):
"""Define similarity"""
if self.similarity_type == 'cosine':
a = tf.nn.l2_normalize(a, -1)
b = tf.nn.l2_normalize(b, -1)
if self.similarity_type == 'cosine' or self.similarity_type == 'inner':
sim = tf.reduce_sum(tf.expand_dims(a, 1) * b, -1)
# similarity between intent embeddings
sim_emb = tf.reduce_sum(b[:, 0:1, :] * b[:, 1:, :], -1)
return sim, sim_emb
else:
raise ValueError("Wrong similarity type {}, "
"should be 'cosine' or 'inner'"
"".format(self.similarity_type))
def _tf_loss(self, sim, sim_emb):
"""Define loss"""
if self.use_max_sim_neg:
max_sim_neg = tf.reduce_max(sim[:, 1:], -1)
loss = tf.reduce_mean(tf.maximum(0., self.mu_pos - sim[:, 0]) +
tf.maximum(0., self.mu_neg + max_sim_neg))
else:
# create an array for mu
mu = self.mu_neg * np.ones(self.num_neg + 1)
mu[0] = self.mu_pos
factors = tf.concat([-1 * tf.ones([1, 1]),
tf.ones([1, tf.shape(sim)[1] - 1])], 1)
max_margin = tf.maximum(0., mu + factors * sim)
loss = tf.reduce_mean(tf.reduce_sum(max_margin, -1))
max_sim_emb = tf.maximum(0., tf.reduce_max(sim_emb, -1))
loss = (loss +
# penalize max similarity between intent embeddings
tf.reduce_mean(max_sim_emb) * self.C_emb +
# add regularization losses
tf.losses.get_regularization_loss())
return loss
def _create_tf_graph(self, a_in, b_in, is_training):
"""Create tf graph for training"""
a = self._create_tf_embed_nn(a_in, is_training,
self.num_hidden_layers_a,
self.hidden_layer_size_a,
name='a')
b = self._create_tf_embed_nn(b_in, is_training,
self.num_hidden_layers_b,
self.hidden_layer_size_b,
name='b')
sim, sim_emb = self._tf_sim(a, b)
loss = self._tf_loss(sim, sim_emb)
return sim, loss
# training helpers:
def _create_batch_b(self, batch_pos_b, intent_ids):
"""Create batch of intents, where the first is correct intent
and the rest are wrong intents sampled randomly"""
batch_pos_b = batch_pos_b[:, np.newaxis, :]
# sample negatives
batch_neg_b = np.zeros((batch_pos_b.shape[0], self.num_neg,
batch_pos_b.shape[-1]))
for b in range(batch_pos_b.shape[0]):
# create negative indexes out of possible ones
# except for correct index of b
negative_indexes = [i for i in range(
self.encoded_all_intents.shape[0])
if i != intent_ids[b]]
negs = np.random.choice(negative_indexes, size=self.num_neg)
batch_neg_b[b] = self.encoded_all_intents[negs]
return np.concatenate([batch_pos_b, batch_neg_b], 1)
def _train_tf(self, X, Y, helper_data,
sess, a_in, b_in, sim,
loss, is_training, train_op):
"""Train tf graph"""
sess.run(tf.global_variables_initializer())
intents_for_X, all_Y = helper_data
batches_per_epoch = (len(X) // self.batch_size +
int(len(X) % self.batch_size > 0))
for ep in range(self.epochs):
indices = np.random.permutation(len(X))
sess_out = {}
for i in range(batches_per_epoch):
end_idx = (i + 1) * self.batch_size
start_idx = i * self.batch_size
batch_a = X[indices[start_idx:end_idx]]
batch_pos_b = Y[indices[start_idx:end_idx]]
intents_for_b = intents_for_X[indices[start_idx:end_idx]]
# add negatives
batch_b = self._create_batch_b(batch_pos_b, intents_for_b)
sess_out = sess.run({'loss': loss, 'train_op': train_op},
feed_dict={a_in: batch_a,
b_in: batch_b,
is_training: True})
if (ep + 1) % 10 == 0:
self._output_training_stat(X, intents_for_X, all_Y,
sess, a_in, b_in,
sim, is_training,
ep, sess_out)
def _output_training_stat(self,
X, intents_for_X, all_Y,
sess, a_in, b_in, sim, is_training,
ep, sess_out):
"""Output training statistics"""
train_sim = sess.run(sim, feed_dict={a_in: X,
b_in: all_Y,
is_training: False})
train_acc = np.mean(np.argmax(train_sim, -1) == intents_for_X)
logger.info("epoch {} / {}: loss {}, train accuracy : {:.3f}"
"".format((ep + 1), self.epochs,
sess_out.get('loss'), train_acc))
def _lemmatize(self,message):
return ' '.join([t.lemma_ for t in message])
def prepare_training_data(self,X,y):
from sklearn.feature_extraction.text import CountVectorizer
import re
training_data = {
"intent_examples":[]
}
# use even single character word as a token
self.vect = CountVectorizer(token_pattern=r'(?u)\b\w\w+\b',
strip_accents=None,
stop_words=None,
ngram_range=(1,
1),
max_df=1.0,
min_df=1,
max_features=None,
preprocessor=lambda s: re.sub(r'\b[0-9]+\b', 'NUMBER', s.lower()))
spacy_docs = [self.nlp(x) for x in X]
lem_exs = [self._lemmatize(x)
for x in spacy_docs]
self.vect = self.vect.fit(lem_exs)
X = self.vect.transform(lem_exs).toarray()
for i,intent in enumerate(y):
# create bag for each example
training_data["intent_examples"].append({
"text_features": np.hstack((X[i],spacy_docs[i].vector)) if self.use_word_vectors else X[i],
"intent":intent
})
return training_data
def train(self, X,y):
"""Train the embedding intent classifier on a data set."""
training_data = self.prepare_training_data(X,y)
intent_dict = self._create_intent_dict(training_data)
if len(intent_dict) < 2:
logger.error("Can not train an intent classifier. "
"Need at least 2 different classes. "
"Skipping training of intent classifier.")
return
self.inv_intent_dict = {v: k for k, v in intent_dict.items()}
self.encoded_all_intents = self._create_encoded_intents(
intent_dict)
X, Y, helper_data = self._prepare_data_for_training(
training_data, intent_dict)
# check if number of negatives is less than number of intents
logger.debug("Check if num_neg {} is smaller than "
"number of intents {}, "
"else set num_neg to the number of intents - 1"
"".format(self.num_neg,
self.encoded_all_intents.shape[0]))
self.num_neg = min(self.num_neg,
self.encoded_all_intents.shape[0] - 1)
self.graph = tf.Graph()
with self.graph.as_default():
a_in = tf.placeholder(tf.float32, (None, X.shape[-1]),
name='a')
b_in = tf.placeholder(tf.float32, (None, None, Y.shape[-1]),
name='b')
self.embedding_placeholder = a_in
self.intent_placeholder = b_in
is_training = tf.placeholder_with_default(False, shape=())
sim, loss = self._create_tf_graph(a_in, b_in, is_training)
self.similarity_op = sim
train_op = tf.train.AdamOptimizer().minimize(loss)
# train tensorflow graph
sess = tf.Session()
self.session = sess
self._train_tf(X, Y, helper_data,
sess, a_in, b_in, sim,
loss, is_training, train_op)
# process helpers
def _calculate_message_sim(self, X, all_Y):
"""Load tf graph and calculate message similarities"""
a_in = self.embedding_placeholder
b_in = self.intent_placeholder
sim = self.similarity_op
sess = self.session
message_sim = sess.run(sim, feed_dict={a_in: X,
b_in: all_Y})
message_sim = message_sim.flatten() # sim is a matrix
intent_ids = message_sim.argsort()[::-1]
message_sim[::-1].sort()
# transform sim to python list for JSON serializing
message_sim = message_sim.tolist()
return intent_ids, message_sim
def transform(self,query):
spacy_doc = self.nlp(query)
vectorized = self.vect.transform([self._lemmatize(spacy_doc)]).toarray()
return {
"text_features": np.hstack(
(vectorized[0],spacy_doc.vector)) if self.use_word_vectors else vectorized
}
def process(self, query, INTENT_RANKING_LENGTH=5):
"""Return the most likely intent and its similarity to the input."""
message = self.transform(query)
intent = {"name": None, "confidence": 0.0}
intent_ranking = []
if self.session is None:
logger.error("There is no trained tf.session: "
"component is either not trained or "
"didn't receive enough training data")
else:
# get features (bag of words) for a message
X = message.get("text_features").reshape(1, -1)
# stack encoded_all_intents on top of each other
# to create candidates for test examples
all_Y = self._create_all_Y(X.shape[0])
# load tf graph and session
intent_ids, message_sim = self._calculate_message_sim(X, all_Y)
if intent_ids.size > 0:
intent = {"intent": self.inv_intent_dict[intent_ids[0]],
"confidence": message_sim[0]}
ranking = list(zip(list(intent_ids), message_sim))
ranking = ranking[:INTENT_RANKING_LENGTH]
intent_ranking = [{"intent": self.inv_intent_dict[intent_idx],
"confidence": score}
for intent_idx, score in ranking]
return intent,intent_ranking
@classmethod
def load(cls,model_dir=None,use_word_vectors=False):
if model_dir:
file_name = cls.name + ".ckpt"
checkpoint = os.path.join(model_dir, file_name)
if not os.path.exists(os.path.join(model_dir, "checkpoint")):
logger.warning("Failed to load nlu model. Maybe path {} "
"doesn't exist"
"".format(os.path.abspath(model_dir)))
return EmbeddingIntentClassifier()
graph = tf.Graph()
with graph.as_default():
sess = tf.Session()
saver = tf.train.import_meta_graph(checkpoint + '.meta')
saver.restore(sess, checkpoint)
embedding_placeholder = tf.get_collection(
'embedding_placeholder')[0]
intent_placeholder = tf.get_collection(
'intent_placeholder')[0]
similarity_op = tf.get_collection(
'similarity_op')[0]
with io.open(os.path.join(
model_dir,
cls.name + "_inv_intent_dict.pkl"), 'rb') as f:
inv_intent_dict = pickle.load(f)
with io.open(os.path.join(
model_dir,
cls.name + "_encoded_all_intents.pkl"), 'rb') as f:
encoded_all_intents = pickle.load(f)
with io.open(os.path.join(
model_dir,
cls.name + "_inv_count_vectorizer.pkl"), 'rb') as f:
vect = pickle.load(f)
return EmbeddingIntentClassifier(
inv_intent_dict=inv_intent_dict,
encoded_all_intents=encoded_all_intents,
session=sess,
graph=graph,
intent_placeholder=intent_placeholder,
embedding_placeholder=embedding_placeholder,
similarity_op=similarity_op,
vectorizer = vect,
use_word_vectors = use_word_vectors
)
else:
logger.warning("Failed to load nlu model. Maybe path {} "
"doesn't exist"
"".format(os.path.abspath(model_dir)))
return EmbeddingIntentClassifier()
def persist(self, model_dir):
"""Persist this model into the passed directory.
Return the metadata necessary to load the model again."""
if self.session is None:
return {"classifier_file": None}
checkpoint = os.path.join(model_dir, self.name + ".ckpt")
try:
os.makedirs(os.path.dirname(model_dir))
except OSError as e:
# be happy if someone already created the path
import errno
if e.errno != errno.EEXIST:
raise
with self.graph.as_default():
self.graph.clear_collection('embedding_placeholder')
self.graph.add_to_collection('embedding_placeholder',
self.embedding_placeholder)
self.graph.clear_collection('intent_placeholder')
self.graph.add_to_collection('intent_placeholder',
self.intent_placeholder)
self.graph.clear_collection('similarity_op')
self.graph.add_to_collection('similarity_op',
self.similarity_op)
saver = tf.train.Saver()
saver.save(self.session, checkpoint)
with io.open(os.path.join(
model_dir,
self.name + "_inv_intent_dict.pkl"), 'wb') as f:
pickle.dump(self.inv_intent_dict, f)
with io.open(os.path.join(
model_dir,
self.name + "_encoded_all_intents.pkl"), 'wb') as f:
pickle.dump(self.encoded_all_intents, f)
with io.open(os.path.join(
model_dir,
self.name + "_inv_count_vectorizer.pkl"), 'wb') as f:
pickle.dump(self.vect, f)
return {"classifier_file": self.name + ".ckpt"}
# run python -m spacy download en
# replacde with your training data
X = ["hello how are you ?","goodbye"]
y = ["hello","bye"]
from starspace_intent_classifier import EmbeddingIntentClassifier
intent_classifier = EmbeddingIntentClassifier()
intent_classifier.train(X,y)
intent_classifier.persist(model_dir=app.config["MODELS_DIR"])
intent,suggetions = intent_classifier.predict("hello")
print(intent)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment