Skip to content

Instantly share code, notes, and snippets.

@masahiro-mi
Created January 27, 2016 06:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save masahiro-mi/0eccd8d989ac11ef74a5 to your computer and use it in GitHub Desktop.
Save masahiro-mi/0eccd8d989ac11ef74a5 to your computer and use it in GitHub Desktop.
#! python3
# coding:utf-8
import sys
import os
import json
import random
import math
import MeCab
import numpy
import scipy
from pybrain.tools.shortcuts import buildNetwork
from pybrain.structure import RecurrentNetwork, FullConnection
from pybrain.structure.modules import LSTMLayer, SigmoidLayer, TanhLayer, LinearLayer, SoftmaxLayer
from pybrain.datasets import SequentialDataSet, SupervisedDataSet, UnsupervisedDataSet
from pybrain.supervised import RPropMinusTrainer, BackpropTrainer
from scipy.sparse import coo_matrix, hstack
from sklearn.feature_extraction.text import CountVectorizer
from sklearn import preprocessing
_mecab = MeCab.Tagger('-Owakati')
def mecab(text):
r = _mecab.parse(text.strip()).split()
if len(r) == 0:
r=['<None>']
return r
def dummy(word):
return [word]
argvs = sys.argv
argc = len(argvs)
filename = argvs[1]
if __name__=='__main__':
train_word = []
with open(filename,'r') as f:
for line in f:
for w in line.split():
train_word.append(w)
train_word.append('<s>')
train_word.append('</s>')
vectorizer = CountVectorizer(analyzer=dummy, dtype=numpy.float64)
vectorizer.fit(train_word)
print('Vocab : '+str(len(vectorizer.vocabulary_)))
vocab_list = []
for k,v in sorted(vectorizer.vocabulary_.items(), key=lambda x:x[1] ):
vocab_list.append(k)
voc_size = len(vectorizer.vocabulary_)
ds = SequentialDataSet( voc_size, voc_size )
with open(filename,'r') as f:
for line in f:
ds.newSequence()
# insert <s>
p = numpy.array(vectorizer.transform(['<s>']).todense()).reshape(-1)
for w in line.split():
n = numpy.array(vectorizer.transform([w]).todense()).reshape(-1)
ds.addSample(p, n)
p = n
# append </s>
ds.addSample(p, numpy.array(vectorizer.transform(['</s>']).todense()).reshape(-1))
# Network
_hiddensize = 12
_compsize = _hiddensize - 1
net = RecurrentNetwork()
net.addInputModule(LinearLayer(voc_size, name='in'))
net.addModule(TanhLayer(2 ** _compsize, name='comp'))
net.addModule(LSTMLayer(2 ** _hiddensize, name='lstm'))
net.addOutputModule(SoftmaxLayer(voc_size, name='out'))
net.addConnection(FullConnection(net['in'], net['comp'], name='in_to_comp'))
net.addConnection(FullConnection(net['comp'], net['lstm'], name='comp_to_lstm'))
net.addConnection(FullConnection(net['lstm'], net['out'], name='lstm_to_out'))
net.addRecurrentConnection(FullConnection(net['lstm'], net['lstm'], name='recurrent_lstm'))
net.sortModules()
batches = min(2 ** 0, ds.getNumSequences())
for epochs in range(0, 100):
print('Epoch : '+str(epochs))
ind = [ i for i in range(0, ds.getNumSequences()) ]
random.shuffle(ind)
batch_size = int(len(ind) / batches)
for batch_count in range(0, batches ):
print(str(batch_count + 1)+' / '+ str(batches))
batch = SequentialDataSet( voc_size, voc_size )
for i in ind[batch_size * batch_count : min(len(ind), (batch_size * (batch_count + 1)))]:
batch.newSequence()
for _a,_b in ds.getSequenceIterator(i):
batch.addSample(_a, _b)
trainer = RPropMinusTrainer(net, dataset=batch, )
trainer.train()
print('Error:' + str(trainer.testOnData()))
print('Test generation :')
generation_max = 1000
output_candidates = [(['<s>'], 0.0)]
for _x in range(0, generation_max):
output_seq = None
output_per = float('inf')
output_no = -1
for i, (s, p) in enumerate(output_candidates):
if output_per >= p:
output_seq = s
output_per = p
output_no = i
print(' '.join(output_seq) +' / '+ str(output_per) + ' ('+str(len(output_candidates))+')', end='\r', flush=True)
if output_seq[-1] == '</s>' or len(output_seq ) > 20:
print(' '.join(output_seq) +' / '+ str(output_per) + ' ('+str(len(output_candidates))+')', end='\n\n', flush=True)
break
output_candidates.remove((output_seq, output_per,))
net.reset()
ts = UnsupervisedDataSet(voc_size, )
for w in output_seq:
n = numpy.array(vectorizer.transform([w]).todense()[0]).reshape(-1)
ts.addSample(n)
for s, w in zip(net.activateOnDataset(ts)[-1], vocab_list):
if s > 0.0:
output_candidates.append((output_seq + [w], output_per - math.log(s),))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment