Skip to content

Instantly share code, notes, and snippets.

@nishimotz
Created April 4, 2010 11:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nishimotz/9f927195fa3df2228160 to your computer and use it in GitHub Desktop.
Save nishimotz/9f927195fa3df2228160 to your computer and use it in GitHub Desktop.
# recog.py for python 2.6
# encoding: utf-8
# created by nishimotz 2010-04-04
# http://ja.nishimotz.com
import socket
import threading
import time
import subprocess
import os
import re
from xml.dom import minidom
class RecogThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self, name = 'recog_thread')
def run(self):
os.chdir("c:\\work\\istc\\dictation-kit-v3.2-win")
self.p = subprocess.Popen(
".\\bin\\julius.exe -C fast.jconf -module 10500 -progout -outcode wlpsWLPSC -charconv euc-jp utf-8".split(" "),
stdin=None,
stdout=None,
stderr=None)
while True:
#print self.p.stdout.readline()
self.p.poll()
if self.p.returncode != 0:
return
time.sleep(0.1)
class RecogManager:
"""control julius/julian"""
def __init__(self):
self.s = socket.socket()
self.engine_thread = RecogThread()
self.engine_thread.start()
time.sleep(2)
def connect(self):
addr = ('127.0.0.1', 10500)
try:
self.s.connect(addr)
except:
return False
#print "RecogManager connected."
time.sleep(5)
return True
def send(self, msg):
try:
print "sending: " + msg
self.s.setblocking(1)
self.s.send(msg)
self.s.send("\r")
except:
print "send error"
def receive(self):
str = ''
retval = []
while True:
time.sleep(0.01)
r = ""
self.s.setblocking(0)
try:
r = self.s.recv(100)
except:
pass
r = re.sub("\<s\>", "&lt;s&gt;", r)
r = re.sub("\</s\>", "&lt;/s&gt;", r)
str += r
if str[-2:] == ".\n":
for i in str.split(".\n"):
if len(i) > 0:
try:
d = minidom.parseString(i)
retval.append(d)
except:
print "parse error: " + i
break
return retval
def wait_for_elem(self, name):
while True:
for m in self.receive():
if m.firstChild.nodeName == name:
return m
time.sleep(0.01)
def close(self):
self.send('DIE')
self.engine_thread.join()
return self.s.close()
def print_as_sjis(s):
print s.encode('SJIS')
def to_xml(dom):
return re.sub("<\?xml version=\"1\.0\" \?>", "", dom.toxml())
def print_dom(dom):
print_as_sjis(to_xml(dom))
if __name__ == '__main__':
reco = RecogManager()
if reco.connect() == False:
sys.exit()
print_dom(reco.wait_for_elem("STARTPROC"))
for i in xrange(100):
for d in reco.receive():
print_dom(d)
reco.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment