Created
May 19, 2012 22:52
-
-
Save slezica/2732680 to your computer and use it in GitHub Desktop.
Markov stream generator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from xiter import xiter | |
from collections import defaultdict | |
from random import random, choice | |
def grouped(it, group_size = 2): | |
buffer = [it.next() for i in xrange(group_size - 1)] | |
for i in it: | |
buffer.append(i) | |
yield tuple(buffer) | |
buffer = buffer[1:] | |
class ddict(defaultdict): | |
__str__, __repr__ = dict.__str__, dict.__repr__ | |
def transitions(): | |
return ddict(lambda: ddict(lambda: 0)) | |
def analize(iterable, order = 1): | |
T = transitions() | |
for group in grouped(iter(iterable), order + 1): | |
T[group[:-1]][group[-1]] += 1 | |
return T | |
def markovify(T): | |
newT = transitions() | |
for state, trans in T.iteritems(): | |
total_weights = sum(trans.itervalues()) | |
for next, weight in trans.iteritems(): | |
newT[state][next] = weight / float(total_weights) | |
return newT | |
def choose(transitions): | |
n = random() | |
for next, weight in transitions.iteritems(): | |
n -= weight | |
if n <= 0: | |
return next | |
def create(T, previous_states = None, length = 128): | |
if length == 0: return | |
if not previous_states: # first call | |
previous_states = choice(list(T.iterkeys())) | |
for state in previous_states: | |
yield state | |
current_state = choose(T[previous_states]) | |
yield current_state | |
for state in create(T, previous_states[1:] + (current_state,), length - 1): | |
yield state | |
if __name__ == "__main__": | |
text = " ".join(open(sys.argv[2]).read().split()) | |
M = markovify(analize(text, order = int(sys.argv[1]))) | |
print "".join(create(M)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment