Last active May 10, 2024 16:59
Hierarchical Softmax CNN Classification
import torch
import torch.nn as nn
import torch.nn.init as init
dropout_prob = 0.5
class FlatCnnLayer(nn.Module):
def __init__(self, embedding_size, sequence_length, filter_sizes=[3, 4, 5], out_channels=128):
super(FlatCnnLayer, self).__init__()
self.embedding_size = embedding_size
self.sequence_length = sequence_length
self.out_channels = out_channels
self.filter_layers = nn.ModuleList()
for filter_size in filter_sizes:
self.dropout = nn.Dropout(p=dropout_prob)
for m in self.modules():
if isinstance(m, nn.Conv2d):
init.normal(m.weight, mean=0, std=0.1)
init.constant(m.bias, 0.1)
def forward(self, x):
pools = []
for filter_layer in self.filter_layers:
x =, dim=1)
x = x.view(x.size()[0], -1)
x = self.dropout(x)
return x
def _make_filter_layer(self, filter_size):
return nn.Sequential(
nn.Conv2d(1, self.out_channels, (filter_size, self.embedding_size)),
nn.MaxPool2d((self.sequence_length - filter_size + 1, 1), stride=1)
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.init as init
from import DataLoader, TensorDataset
import torch.optim as optim
from FlatCnnLayer import FlatCnnLayer
from TreeTools import TreeTools
import multiprocessing
import numpy as np
batch_size = 128
n_epochs = 200
display_step = 5
N_WORKERS = max(1, multiprocessing.cpu_count() - 1)
class HierarchicalTextClassifyCnnNet(nn.Module):
def __init__(self, embedding_size, sequence_length, tree, filter_sizes=[3, 4, 5], out_channels=128):
super(HierarchicalTextClassifyCnnNet, self).__init__()
self._tree_tools = TreeTools()
self.tree = tree
# create a weight matrix and bias vector for each node in the tree
self.fc = nn.ModuleList([nn.Linear(out_channels * len(filter_sizes), len(subtree[1])) for subtree in
self.value_to_path_and_nodes_dict = {}
for path, value in self._tree_tools.get_paths(tree):
nodes = self._tree_tools.get_nodes(tree, path)
self.value_to_path_and_nodes_dict[value] = path, nodes
self.flat_layer = FlatCnnLayer(embedding_size, sequence_length, filter_sizes=filter_sizes,
self.features = nn.Sequential(self.flat_layer)
for m in self.modules():
if isinstance(m, nn.Linear):
init.xavier_uniform(m.weight, gain=np.sqrt(2.0))
init.constant(m.bias, 0.1)
def forward(self, inputs, targets):
features = self.features(inputs)
predicts = map(self._get_predicts, features, targets)
losses = map(self._get_loss, predicts, targets)
return losses, predicts
def _get_loss(self, predicts, label):
path, _ = self.value_to_path_and_nodes_dict[int([0])]
criterion = nn.CrossEntropyLoss()
if torch.cuda.is_available:
criterion = criterion.cuda()
def f(predict, p):
p = torch.LongTensor([p])
# convert to cuda tensors if cuda flag is true
if torch.cuda.is_available:
p = p.cuda()
p = Variable(p)
return criterion(predict.unsqueeze(0), p)
loss = map(f, predicts, path)
return torch.sum(
def _get_predicts(self, feature, label):
_, nodes = self.value_to_path_and_nodes_dict[int([0])]
predicts = map(lambda n: self.fc[n](feature), nodes)
return predicts
def fit(model, data, save_path):
criterion = nn.CrossEntropyLoss()
if torch.cuda.is_available():
model, criterion = model.cuda(), criterion.cuda()
# for param in list(model.parameters()):
# print(type(, param.size())
# optimizer = optim.SGD(model.parameters(), lr=0.001, weight_decay=0.1)
optimizer = optim.Adam(model.parameters(), lr=0.001)
x_train, x_test = torch.from_numpy(data['X_train']).float(), torch.from_numpy(data['X_test']).float()
y_train, y_test = torch.from_numpy(data['Y_train']).int(), torch.from_numpy(data['Y_test']).int()
train_set = TensorDataset(x_train, y_train)
test_set = TensorDataset(x_test, y_test)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=N_WORKERS,
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=N_WORKERS)
for epoch in range(1, n_epochs + 1): # loop over the dataset multiple times
acc_loss = 0.0
for inputs, labels in iter(train_loader):
# convert to cuda tensors if cuda flag is true
if torch.cuda.is_available:
inputs, labels = inputs.cuda(), labels.cuda()
# wrap them in Variable
inputs, labels = Variable(inputs), Variable(labels)
# zero the parameter gradients
# forward + backward + optimize
losses, _ = model(inputs, labels)
loss = torch.mean(, dim=0))
acc_loss +=[0]
# print statistics
if epoch % display_step == 0 or epoch == 1:
print('[%3d] loss: %.5f' %
(epoch, acc_loss / len(train_set.data_tensor)))
print('\rFinished Training\n')
nb_test_corrects, nb_test_samples = 0, 0
for inputs, labels in iter(test_loader):
# convert to cuda tensors if cuda flag is true
if torch.cuda.is_available:
inputs, labels = inputs.cuda(), labels.cuda()
# wrap them in Variable
inputs, labels = Variable(inputs), Variable(labels)
# forward + backward + optimize
_, predicts = model(inputs, labels)
nb_test_samples += labels.size(0)
for predicted, label in zip(predicts, labels):
nb_test_corrects += _check_predicts(model, predicted, label)
print ('Accuracy of the network {:.2f}% ({:d} / {:d})'.format(
100 * nb_test_corrects / nb_test_samples,
), save_path)
def _check_predicts(model, predicts, label):
path, _ = model.value_to_path_and_nodes_dict[int([0])]
for predict, p in zip(predicts, path):
if np.argmax( != p:
return 0
return 1
# (value, subtrees)
class TreeTools:
def __init__(self):
# memoization for _count_nodes functions
self._count_nodes_dict = {}
# Return tree is leave or not
def _is_not_leave(tree):
return type(tree[1]) == list
def get_subtrees(self, tree):
yield tree
if self._is_not_leave(tree):
for subtree in tree[1]:
if self._is_not_leave(subtree):
for x in self.get_subtrees(subtree):
yield x
# Returns pairs of paths and values of a tree
def get_paths(self, tree):
for i, subtree in enumerate(tree[1]):
yield [i], subtree[0]
if self._is_not_leave(subtree):
for path, value in self.get_paths(subtree):
yield [i] + path, value
# Returns the number of nodes in a tree (not including root)
def count_nodes(self, tree):
return self._count_nodes(tree[1])
def _count_nodes(self, branches):
if id(branches) in self._count_nodes_dict:
return self._count_nodes_dict[id(branches)]
size = 0
for node in branches:
if self._is_not_leave(node):
size += 1 + self._count_nodes(node[1])
self._count_nodes_dict[id(branches)] = size
return size
# Returns all the nodes in a path
def get_nodes(self, tree, path):
next_node = 0
nodes = []
for decision in path:
if not self._is_not_leave(tree):
next_node += 1 + self._count_nodes(tree[1][:decision])
tree = tree[1][decision]
return nodes
It is not quite clear to me why do you need a separate layer for each internal node in the target words partitioning tree:

# create a weight matrix and bias vector for each node in the tree
        self.fc = nn.ModuleList([nn.Linear(out_channels * len(filter_sizes), len(subtree[1])) for subtree in

I suppose it meant by internal node presentation v prime that each internal node has one neuron only in the neural network layer. That should be:
Hidden Layer > Internal Nodes Layer > Target Words Layer

How does that sound?

Could you please provide a sample data file or your input data format in order to make the tree structure and code more understandable.

