Skip to content

Instantly share code, notes, and snippets.

@rossgoodwin
Created August 29, 2017 22:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rossgoodwin/2d20d599ce6cf45cfe4e2128f7a6b448 to your computer and use it in GitHub Desktop.
Save rossgoodwin/2d20d599ce6cf45cfe4e2128f7a6b448 to your computer and use it in GitHub Desktop.
word.camera python wrapper
import pika
import sys
import os
import re
import time
from datetime import datetime
import subprocess
from random import sample as rs, choice as rc
import threading
from collections import defaultdict
from hashids import Hashids
import shutil
import webbrowser
# import razer_rgb
# import serial
# import thermal
class WordCamera(object):
VALID_IMG = set(['jpg', 'jpeg', 'png'])
def __init__(self, do_upload=False, img_orig_fp="", sentence_count=7, seed_ix=0, ebook_title="", ascii_img_path="", manual=False, looper=False, folderpath=""):
self.do_upload = do_upload
self.img_orig_fp = img_orig_fp
self.manual = manual
# ebook of results?
# self.ebook = ebook
self.ebook_title = ebook_title
self.folderpath = folderpath
# ascii img path
self.ascii_img_path = ascii_img_path
# Connect to RabbitMQ
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
self.channel = self.connection.channel()
possible_pre_seeds = [
"The dreams of men who would regard the scene,\nThe sorrows of the morn, whose waters wave,\n",
"~~~The arm of a person or thing and a frequency of a similar process within a postulated printed contest with the weapons of the post office.\n~|~",
"The door opened and the old man turned in his armchair to see whether he had been to the river bank.\n"
]
self.seed_ix = seed_ix
self.pre_seed = possible_pre_seeds[self.seed_ix]
# Serial to Arduino button
# self.ser = serial.Serial('/dev/ttyACM0')
# Queue names
queue_names = [
'ImgPaths',
'Captions',
'CaptionToExpand',
'Expansions'
]
# Declare and Purge Queues
for qn in queue_names:
self.channel.queue_declare(queue=qn)
self.channel.queue_purge(queue=qn)
# HashIds
self.hashids = Hashids()
# Unused captions (changes every image)
self.unused_captions = list()
self.unused_captions_per_graf = 0
# Class Variables
self.sentence_count = sentence_count
self.sentences = defaultdict(list)
self.img_dest = '/home/rg/projects/wc3/img'
self.template_path = '/home/rg/projects/wc3/template.html'
self.ebook_template_path = '/home/rg/projects/wc3/ebook_template.html'
self.thr1 = threading.Thread(target=self.consume)
self.thr1.start()
self.urls = list()
self.looper = looper
self.gogogo = True
if self.looper:
if self.img_orig_fp:
self.process_fp()
else:
self.thr2 = threading.Thread(target=self.loop)
self.thr2.start()
def process_fp(self):
if self.img_orig_fp.rsplit('.').pop().strip().lower() in self.VALID_IMG:
self.pre_narrate_individual(self.img_orig_fp)
else:
self.pre_narrate_folder()
def pre_narrate_folder(self):
for subdir, dirs, files in os.walk(self.img_orig_fp):
for f in files:
if f.rsplit('.', 1).pop().lower().strip() in self.VALID_IMG:
self.pre_narrate_individual(os.path.join(subdir, f))
def pre_narrate_individual(self, fp):
img_hash = self.hashids.encode(int(time.time()*1000))
fn = "%s.jpg" % img_hash
filepath = os.path.join(self.img_dest, fn)
shutil.copy2(fp, filepath)
self.narrate(filepath)
def loop(self):
while 1:
if self.gogogo:
trigger = raw_input('Capture? ')
if trigger:
self.capture()
self.gogogo = False
# curline = self.ser.readline().strip()
# if curline == 'b':
# self.capture()
# elif curline == 's':
# os.system('shutdown -h now')
def process_folder(self, fp):
self.folderpath = fp
for subdir, dirs, files in os.walk(fp):
for f in files:
if f.endswith('.jpg'):
filepath = os.path.join(subdir, f)
img_hash = self.hashids.encode(int(time.time()*1000))
new_fp = os.path.join(self.img_dest, img_hash+'.jpg')
shutil.copy2(filepath, new_fp)
self.narrate(new_fp)
time.sleep(0.1)
def capture(self):
img_hash = self.hashids.encode(int(time.time()*1000))
fn = "%s.jpg" % img_hash
filepath = os.path.join(self.img_dest, fn)
cmd_list = [
'fswebcam', '-r', '640x480', '--jpeg', '100',
'--no-banner', filepath
]
proc = subprocess.Popen(cmd_list)
proc.communicate()
#time.sleep(1)
# Narrate
return self.narrate(filepath)
def img2txt(self, img_path):
cmd_list = [
'/usr/local/bin/img2txt.py', img_path, '--maxLen=80',
'--targetAspect=0.4', '--bgcolor=#FFFFFF'
]
proc = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
result = proc.stdout.read()
# thermal.basic_print(result)
def narrate(self, img_path):
# Put printer in line print mode + feed paper
# thermal.line_print_mode()
# thermal.feed_paper()
# Print ascii image
if self.ascii_img_path:
self.img2txt(self.ascii_img_path)
else:
self.img2txt(img_path)
# self.img2txt()
# Paper feed
# thermal.feed_paper()
# Establish unique hash id for image
img_hash = img_path.rsplit('/', 1).pop().rsplit('.', 1)[0]
# Send img_path to densecap
self.channel.basic_publish(
exchange = '',
routing_key = 'ImgPaths',
body = img_hash + '#' + img_path
)
print img_hash
return img_hash
def approve(self, text):
print('CANDIDATE: %s' % text)
isApproved = raw_input('Approve? (y/n)\n')
return isApproved and isApproved.strip().lower() != 'n'
def consume(self):
# Bind methods to consumption queues
self.channel.basic_consume(self.process_captions, queue='Captions')
self.channel.basic_consume(self.process_expansions, queue='Expansions')
# Go
self.channel.start_consuming()
def process_captions(self, ch, method, properties, body):
def int_to_enc(i):
return "{0:b}".format(i).replace('0', '~').replace('1', '|')
img_hash, csv = body.split('#', 1)
print img_hash, "CAPTIONED"
captions_raw = filter(lambda c: not '<UNK>' in c, list(set(csv.split(','))))
if self.manual or len(captions_raw) <= self.sentence_count:
captions_cut = captions_raw
else:
captions_cut = rs(captions_raw, self.sentence_count*2)
self.unused_captions = list(set(captions_raw) - set(captions_cut))
self.unused_captions_per_graf = len(self.unused_captions) / self.sentence_count
captions = map(
# lambda (i, x): ,
lambda (i, x): x[0].upper() + x[1:] if self.seed_ix != 1 else int_to_enc(i%8) + x[0].upper() + x[1:],
enumerate(captions_cut)
)
approved_captions = list()
for c in captions:
approved = True
if self.manual:
if len(approved_captions) > self.sentence_count*2:
approved = False
else:
approved = self.approve(c)
if approved:
approved_captions.append(c)
for c in approved_captions:
self.channel.basic_publish(
exchange = '',
routing_key = 'CaptionToExpand',
body = img_hash + '#' + self.pre_seed + c
)
# time.sleep(5)
# self.channel.basic_publish(
# exchange = '',
# routing_key = 'Expansions',
# body = 'END#END'
# )
def process_expansions(self, ch, method, properties, body):
img_hash, expansion = body.decode('utf8').split('#', 1)
# print(expansion)
expansion = expansion[len(self.pre_seed):]
grafs = expansion.strip().split('\n')
# if len(grafs) > 1:
# first_graf = '\n'.join(grafs[:-1])
# else:
# first_graf = grafs[0]
first_graf = grafs[0].strip()
first_graf = first_graf.replace('|', '').replace('~', '').replace('<UNK>', '(?)')
def split_on_punc(punc, graf):
# changed from rsplit to split to make shorter sentences
reg_exp = r'\b' + re.escape(punc) + r'\s'
complete_sents_no_punc = re.split(reg_exp, graf, maxsplit=1)[0]
complete_sents = complete_sents_no_punc + punc.strip()
return complete_sents[0].upper() + complete_sents[1:]
result = None
all_punc_set = set(['.', '!', '?', ',', ';', ':'])
# if len(grafs) > 1:
# result = first_graf[0].upper() + first_graf[1:]
# result = result.strip()
# if not result[-1] in all_punc_set:
# result += '.'
if '. ' in first_graf:
result = split_on_punc('.', first_graf)
elif '? ' in first_graf:
result = split_on_punc('?', first_graf)
elif '! ' in first_graf:
result = split_on_punc('!', first_graf)
elif first_graf and first_graf[-1] in all_punc_set:
result = first_graf[0].upper() + first_graf[1:]
else:
result = first_graf[0].upper() + first_graf[1:] + '...'
# else:
# result = first_graf[0].upper() + first_graf[1:].rstrip() + '...'
# if self.unused_captions and rc([True, False, False, False, False]):
# graf_captions = list()
# for _ in range(self.unused_captions_per_graf):
# graf_captions.append( self.unused_captions.pop() )
# graf = ', '.join(graf_captions)
# self.sentences[img_hash].append( graf[0].upper() + graf[1:] + '.' )
if self.looper and result:
print "MAIN BLOCK RUNNING"
approved = True
if self.manual:
approved = self.approve(result)
if approved:
print "APPEND RESULT TO SENTENCES"
print result
self.sentences[img_hash].append(result)
print self.sentences[img_hash]
with open(os.path.join('/home/rg/projects/wc3/pages', img_hash+'.txt'), 'a') as outfile:
outfile.write(result.encode('utf8')+'\n')
# thermal.thermal_print(result)
# thermal.line_break()
else:
self.sentences[img_hash].append("")
if self.looper and len(self.sentences[img_hash]) == self.sentence_count:
if self.ebook_title:
self.urls.append( self.publish(img_hash) )
else:
self.publish(img_hash)
print(img_hash, len(self.sentences[img_hash]))
def get_text(self, img_hash):
return ' '.join(self.sentences[img_hash])
def change_sentence_count(self, new_count):
self.sentence_count = new_count
def publish(self, img_hash):
if self.looper:
self.gogogo = True
approved_sents = filter(lambda x: x, self.sentences[img_hash])
now = str(datetime.now())
signature = 'word.camera by Ross Goodwin | %s' % now
# thermal.line_break()
# thermal.basic_print( '_'*len(signature) )
# thermal.basic_print( signature )
# thermal.line_break()
# thermal.line_break()
if self.do_upload:
from upload_to_s3 import upload
from string import Template
# from datetime import datetime
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
with open(self.template_path, 'r') as infile:
html_temp = Template(infile.read())
img_fp = os.path.join(self.img_dest, img_hash+'.jpg')
if self.do_upload:
img_web_url = upload(img_fp)
else:
img_web_url = 'file://'+img_fp
if approved_sents:
last_line = approved_sents[-1].strip()
if last_line:
if last_line[-1] in set([',', ';', ':']):
approved_sents[-1] = last_line[:-1] + '.'
elif not last_line[-1] in set(['.', '!', '?', ',', ';', ':']):
approved_sents[-1] = last_line + '.'
body_text = '<p>%s</p>' % '</p><p>'.join(
map(lambda x: '<br>'.join(x).replace('\n', '<br>'), list(chunks(approved_sents, 5)))
# self.sentences[img_hash]
)
title = str(datetime.now()).split(' ', 1)[0] + '_' + img_hash
html_str = html_temp.substitute(title=title, img_url=img_web_url, body=body_text)
if self.folderpath:
html_fn = os.path.join(self.folderpath, title+'.html')
else:
html_fn = os.path.join('/home/rg/projects/wc3/pages', title+'.html')
with open(html_fn, 'w') as outfile:
outfile.write(html_str.encode('utf8'))
if self.do_upload:
result_url = upload(html_fn)
os.remove(html_fn)
else:
result_url = 'file://'+html_fn
if not self.ebook_title:
webbrowser.open(result_url)
# razer_rgb.scrolling_text(body_text.split(',', 1)[0], bg_color=(0,0,0), text_color=(255,255,255), speed=20, variety=64)
return result_url
else:
return html_fn
def publish_all(self):
urls = map(self.publish, self.sentences.keys())
if not self.ebook_title:
for u in urls:
print u
return urls
else:
self.create_ebook(self.ebook_title, urls)
def create_ebook(self):
from bs4 import BeautifulSoup
from string import Template
def get_file_text(fp):
with open(fp) as f:
soup = BeautifulSoup(f.read().strip())
return str(soup.select('div.col-md-8')[0])
ebook_html = '\n\n'.join(map(get_file_text, self.urls))
with open(self.ebook_template_path, 'r') as infile:
html_temp = Template(infile.read())
ebook_html_text = html_temp.substitute(title=self.ebook_title, body=ebook_html)
ebook_html_path = '/home/rg/projects/wc3/pages/ebook.html'
ebook_output_path = '/home/rg/projects/wc3/pages/'+self.ebook_title+'.epub'
with open(ebook_html_path, 'w') as outfile:
outfile.write(ebook_html_text)
proc = subprocess.Popen(['ebook-convert', ebook_html_path, ebook_output_path])
proc.communicate()
print ebook_output_path
return ebook_output_path
# os.remove(ebook_html_path)
# def process_folder(self, folderpath):
# for root, dirs, files in os.walk(folderpath):
# for f in files:
# if f.endswith('.jpg'):
# img_path = os.path.join(root, f)
# print img_path
# self.narrate(img_path)
# time.sleep(0.1)
# num_sents, seed_id = map(int, sys.argv[1:3])
# allowed_ext = set(['tif', 'tiff', 'png', 'jpg', 'jpeg'])
# if len(sys.argv) > 3:
# fp = sys.argv[3]
# fp_ext = fp.rsplit('.', 1).pop().lower()
# if fp_ext in allowed_ext:
# if image file
# wc = WordCamera(img_orig_fp=fp, sentence_count=num_sents, seed_ix=seed_id, manual=False, looper=True)
# else:
# if folder
# cameras = list()
# wc = False
# for subdir, dirs, files in os.walk(fp):
# for f in files:
# f_ext = f.rsplit('.', 1).pop().lower()
# if f_ext in allowed_ext:
# img_fp = os.path.join(subdir, f)
# if not wc:
# wc = WordCamera(img_orig_fp=img_fp, sentence_count=num_sents, seed_ix=seed_id, manual=False, looper=True, folderpath=subdir)
# else:
# time.sleep(1.5)
# # with open('big_picture_2016/page_paths.txt') as infile:
# filepaths = filter(lambda y: y, map(lambda x: x.strip(), infile.read().strip().split('\n')))
# wc.create_ebook(sys.argv[4], filepaths)
# else:
# wc = WordCamera(sentence_count=num_sents, seed_ix=seed_id, manual=True, looper=True)
# if len(sys.argv) > 3:
# folderpath = sys.argv[3]
# wc.process_folder(folderpath)
# trigger = raw_input('Publish? ')
# if trigger:
# wc.publish_all()
# else:
# wc = WordCamera(sentence_count=num_sents, seed_ix=seed_id, manual=True, looper=True)
# time.sleep(num_sents*3)
# razer_rgb.write_to_file('brightness', '255')
# razer_rgb.perlin_noise(secs=num_sents*3)
# wc.publish(imghash)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment