Created
August 29, 2017 22:39
-
-
Save rossgoodwin/2d20d599ce6cf45cfe4e2128f7a6b448 to your computer and use it in GitHub Desktop.
word.camera python wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pika | |
import sys | |
import os | |
import re | |
import time | |
from datetime import datetime | |
import subprocess | |
from random import sample as rs, choice as rc | |
import threading | |
from collections import defaultdict | |
from hashids import Hashids | |
import shutil | |
import webbrowser | |
# import razer_rgb | |
# import serial | |
# import thermal | |
class WordCamera(object): | |
VALID_IMG = set(['jpg', 'jpeg', 'png']) | |
def __init__(self, do_upload=False, img_orig_fp="", sentence_count=7, seed_ix=0, ebook_title="", ascii_img_path="", manual=False, looper=False, folderpath=""): | |
self.do_upload = do_upload | |
self.img_orig_fp = img_orig_fp | |
self.manual = manual | |
# ebook of results? | |
# self.ebook = ebook | |
self.ebook_title = ebook_title | |
self.folderpath = folderpath | |
# ascii img path | |
self.ascii_img_path = ascii_img_path | |
# Connect to RabbitMQ | |
self.connection = pika.BlockingConnection( | |
pika.ConnectionParameters(host='localhost') | |
) | |
self.channel = self.connection.channel() | |
possible_pre_seeds = [ | |
"The dreams of men who would regard the scene,\nThe sorrows of the morn, whose waters wave,\n", | |
"~~~The arm of a person or thing and a frequency of a similar process within a postulated printed contest with the weapons of the post office.\n~|~", | |
"The door opened and the old man turned in his armchair to see whether he had been to the river bank.\n" | |
] | |
self.seed_ix = seed_ix | |
self.pre_seed = possible_pre_seeds[self.seed_ix] | |
# Serial to Arduino button | |
# self.ser = serial.Serial('/dev/ttyACM0') | |
# Queue names | |
queue_names = [ | |
'ImgPaths', | |
'Captions', | |
'CaptionToExpand', | |
'Expansions' | |
] | |
# Declare and Purge Queues | |
for qn in queue_names: | |
self.channel.queue_declare(queue=qn) | |
self.channel.queue_purge(queue=qn) | |
# HashIds | |
self.hashids = Hashids() | |
# Unused captions (changes every image) | |
self.unused_captions = list() | |
self.unused_captions_per_graf = 0 | |
# Class Variables | |
self.sentence_count = sentence_count | |
self.sentences = defaultdict(list) | |
self.img_dest = '/home/rg/projects/wc3/img' | |
self.template_path = '/home/rg/projects/wc3/template.html' | |
self.ebook_template_path = '/home/rg/projects/wc3/ebook_template.html' | |
self.thr1 = threading.Thread(target=self.consume) | |
self.thr1.start() | |
self.urls = list() | |
self.looper = looper | |
self.gogogo = True | |
if self.looper: | |
if self.img_orig_fp: | |
self.process_fp() | |
else: | |
self.thr2 = threading.Thread(target=self.loop) | |
self.thr2.start() | |
def process_fp(self): | |
if self.img_orig_fp.rsplit('.').pop().strip().lower() in self.VALID_IMG: | |
self.pre_narrate_individual(self.img_orig_fp) | |
else: | |
self.pre_narrate_folder() | |
def pre_narrate_folder(self): | |
for subdir, dirs, files in os.walk(self.img_orig_fp): | |
for f in files: | |
if f.rsplit('.', 1).pop().lower().strip() in self.VALID_IMG: | |
self.pre_narrate_individual(os.path.join(subdir, f)) | |
def pre_narrate_individual(self, fp): | |
img_hash = self.hashids.encode(int(time.time()*1000)) | |
fn = "%s.jpg" % img_hash | |
filepath = os.path.join(self.img_dest, fn) | |
shutil.copy2(fp, filepath) | |
self.narrate(filepath) | |
def loop(self): | |
while 1: | |
if self.gogogo: | |
trigger = raw_input('Capture? ') | |
if trigger: | |
self.capture() | |
self.gogogo = False | |
# curline = self.ser.readline().strip() | |
# if curline == 'b': | |
# self.capture() | |
# elif curline == 's': | |
# os.system('shutdown -h now') | |
def process_folder(self, fp): | |
self.folderpath = fp | |
for subdir, dirs, files in os.walk(fp): | |
for f in files: | |
if f.endswith('.jpg'): | |
filepath = os.path.join(subdir, f) | |
img_hash = self.hashids.encode(int(time.time()*1000)) | |
new_fp = os.path.join(self.img_dest, img_hash+'.jpg') | |
shutil.copy2(filepath, new_fp) | |
self.narrate(new_fp) | |
time.sleep(0.1) | |
def capture(self): | |
img_hash = self.hashids.encode(int(time.time()*1000)) | |
fn = "%s.jpg" % img_hash | |
filepath = os.path.join(self.img_dest, fn) | |
cmd_list = [ | |
'fswebcam', '-r', '640x480', '--jpeg', '100', | |
'--no-banner', filepath | |
] | |
proc = subprocess.Popen(cmd_list) | |
proc.communicate() | |
#time.sleep(1) | |
# Narrate | |
return self.narrate(filepath) | |
def img2txt(self, img_path): | |
cmd_list = [ | |
'/usr/local/bin/img2txt.py', img_path, '--maxLen=80', | |
'--targetAspect=0.4', '--bgcolor=#FFFFFF' | |
] | |
proc = subprocess.Popen(cmd_list, stdout=subprocess.PIPE) | |
result = proc.stdout.read() | |
# thermal.basic_print(result) | |
def narrate(self, img_path): | |
# Put printer in line print mode + feed paper | |
# thermal.line_print_mode() | |
# thermal.feed_paper() | |
# Print ascii image | |
if self.ascii_img_path: | |
self.img2txt(self.ascii_img_path) | |
else: | |
self.img2txt(img_path) | |
# self.img2txt() | |
# Paper feed | |
# thermal.feed_paper() | |
# Establish unique hash id for image | |
img_hash = img_path.rsplit('/', 1).pop().rsplit('.', 1)[0] | |
# Send img_path to densecap | |
self.channel.basic_publish( | |
exchange = '', | |
routing_key = 'ImgPaths', | |
body = img_hash + '#' + img_path | |
) | |
print img_hash | |
return img_hash | |
def approve(self, text): | |
print('CANDIDATE: %s' % text) | |
isApproved = raw_input('Approve? (y/n)\n') | |
return isApproved and isApproved.strip().lower() != 'n' | |
def consume(self): | |
# Bind methods to consumption queues | |
self.channel.basic_consume(self.process_captions, queue='Captions') | |
self.channel.basic_consume(self.process_expansions, queue='Expansions') | |
# Go | |
self.channel.start_consuming() | |
def process_captions(self, ch, method, properties, body): | |
def int_to_enc(i): | |
return "{0:b}".format(i).replace('0', '~').replace('1', '|') | |
img_hash, csv = body.split('#', 1) | |
print img_hash, "CAPTIONED" | |
captions_raw = filter(lambda c: not '<UNK>' in c, list(set(csv.split(',')))) | |
if self.manual or len(captions_raw) <= self.sentence_count: | |
captions_cut = captions_raw | |
else: | |
captions_cut = rs(captions_raw, self.sentence_count*2) | |
self.unused_captions = list(set(captions_raw) - set(captions_cut)) | |
self.unused_captions_per_graf = len(self.unused_captions) / self.sentence_count | |
captions = map( | |
# lambda (i, x): , | |
lambda (i, x): x[0].upper() + x[1:] if self.seed_ix != 1 else int_to_enc(i%8) + x[0].upper() + x[1:], | |
enumerate(captions_cut) | |
) | |
approved_captions = list() | |
for c in captions: | |
approved = True | |
if self.manual: | |
if len(approved_captions) > self.sentence_count*2: | |
approved = False | |
else: | |
approved = self.approve(c) | |
if approved: | |
approved_captions.append(c) | |
for c in approved_captions: | |
self.channel.basic_publish( | |
exchange = '', | |
routing_key = 'CaptionToExpand', | |
body = img_hash + '#' + self.pre_seed + c | |
) | |
# time.sleep(5) | |
# self.channel.basic_publish( | |
# exchange = '', | |
# routing_key = 'Expansions', | |
# body = 'END#END' | |
# ) | |
def process_expansions(self, ch, method, properties, body): | |
img_hash, expansion = body.decode('utf8').split('#', 1) | |
# print(expansion) | |
expansion = expansion[len(self.pre_seed):] | |
grafs = expansion.strip().split('\n') | |
# if len(grafs) > 1: | |
# first_graf = '\n'.join(grafs[:-1]) | |
# else: | |
# first_graf = grafs[0] | |
first_graf = grafs[0].strip() | |
first_graf = first_graf.replace('|', '').replace('~', '').replace('<UNK>', '(?)') | |
def split_on_punc(punc, graf): | |
# changed from rsplit to split to make shorter sentences | |
reg_exp = r'\b' + re.escape(punc) + r'\s' | |
complete_sents_no_punc = re.split(reg_exp, graf, maxsplit=1)[0] | |
complete_sents = complete_sents_no_punc + punc.strip() | |
return complete_sents[0].upper() + complete_sents[1:] | |
result = None | |
all_punc_set = set(['.', '!', '?', ',', ';', ':']) | |
# if len(grafs) > 1: | |
# result = first_graf[0].upper() + first_graf[1:] | |
# result = result.strip() | |
# if not result[-1] in all_punc_set: | |
# result += '.' | |
if '. ' in first_graf: | |
result = split_on_punc('.', first_graf) | |
elif '? ' in first_graf: | |
result = split_on_punc('?', first_graf) | |
elif '! ' in first_graf: | |
result = split_on_punc('!', first_graf) | |
elif first_graf and first_graf[-1] in all_punc_set: | |
result = first_graf[0].upper() + first_graf[1:] | |
else: | |
result = first_graf[0].upper() + first_graf[1:] + '...' | |
# else: | |
# result = first_graf[0].upper() + first_graf[1:].rstrip() + '...' | |
# if self.unused_captions and rc([True, False, False, False, False]): | |
# graf_captions = list() | |
# for _ in range(self.unused_captions_per_graf): | |
# graf_captions.append( self.unused_captions.pop() ) | |
# graf = ', '.join(graf_captions) | |
# self.sentences[img_hash].append( graf[0].upper() + graf[1:] + '.' ) | |
if self.looper and result: | |
print "MAIN BLOCK RUNNING" | |
approved = True | |
if self.manual: | |
approved = self.approve(result) | |
if approved: | |
print "APPEND RESULT TO SENTENCES" | |
print result | |
self.sentences[img_hash].append(result) | |
print self.sentences[img_hash] | |
with open(os.path.join('/home/rg/projects/wc3/pages', img_hash+'.txt'), 'a') as outfile: | |
outfile.write(result.encode('utf8')+'\n') | |
# thermal.thermal_print(result) | |
# thermal.line_break() | |
else: | |
self.sentences[img_hash].append("") | |
if self.looper and len(self.sentences[img_hash]) == self.sentence_count: | |
if self.ebook_title: | |
self.urls.append( self.publish(img_hash) ) | |
else: | |
self.publish(img_hash) | |
print(img_hash, len(self.sentences[img_hash])) | |
def get_text(self, img_hash): | |
return ' '.join(self.sentences[img_hash]) | |
def change_sentence_count(self, new_count): | |
self.sentence_count = new_count | |
def publish(self, img_hash): | |
if self.looper: | |
self.gogogo = True | |
approved_sents = filter(lambda x: x, self.sentences[img_hash]) | |
now = str(datetime.now()) | |
signature = 'word.camera by Ross Goodwin | %s' % now | |
# thermal.line_break() | |
# thermal.basic_print( '_'*len(signature) ) | |
# thermal.basic_print( signature ) | |
# thermal.line_break() | |
# thermal.line_break() | |
if self.do_upload: | |
from upload_to_s3 import upload | |
from string import Template | |
# from datetime import datetime | |
def chunks(l, n): | |
"""Yield successive n-sized chunks from l.""" | |
for i in range(0, len(l), n): | |
yield l[i:i + n] | |
with open(self.template_path, 'r') as infile: | |
html_temp = Template(infile.read()) | |
img_fp = os.path.join(self.img_dest, img_hash+'.jpg') | |
if self.do_upload: | |
img_web_url = upload(img_fp) | |
else: | |
img_web_url = 'file://'+img_fp | |
if approved_sents: | |
last_line = approved_sents[-1].strip() | |
if last_line: | |
if last_line[-1] in set([',', ';', ':']): | |
approved_sents[-1] = last_line[:-1] + '.' | |
elif not last_line[-1] in set(['.', '!', '?', ',', ';', ':']): | |
approved_sents[-1] = last_line + '.' | |
body_text = '<p>%s</p>' % '</p><p>'.join( | |
map(lambda x: '<br>'.join(x).replace('\n', '<br>'), list(chunks(approved_sents, 5))) | |
# self.sentences[img_hash] | |
) | |
title = str(datetime.now()).split(' ', 1)[0] + '_' + img_hash | |
html_str = html_temp.substitute(title=title, img_url=img_web_url, body=body_text) | |
if self.folderpath: | |
html_fn = os.path.join(self.folderpath, title+'.html') | |
else: | |
html_fn = os.path.join('/home/rg/projects/wc3/pages', title+'.html') | |
with open(html_fn, 'w') as outfile: | |
outfile.write(html_str.encode('utf8')) | |
if self.do_upload: | |
result_url = upload(html_fn) | |
os.remove(html_fn) | |
else: | |
result_url = 'file://'+html_fn | |
if not self.ebook_title: | |
webbrowser.open(result_url) | |
# razer_rgb.scrolling_text(body_text.split(',', 1)[0], bg_color=(0,0,0), text_color=(255,255,255), speed=20, variety=64) | |
return result_url | |
else: | |
return html_fn | |
def publish_all(self): | |
urls = map(self.publish, self.sentences.keys()) | |
if not self.ebook_title: | |
for u in urls: | |
print u | |
return urls | |
else: | |
self.create_ebook(self.ebook_title, urls) | |
def create_ebook(self): | |
from bs4 import BeautifulSoup | |
from string import Template | |
def get_file_text(fp): | |
with open(fp) as f: | |
soup = BeautifulSoup(f.read().strip()) | |
return str(soup.select('div.col-md-8')[0]) | |
ebook_html = '\n\n'.join(map(get_file_text, self.urls)) | |
with open(self.ebook_template_path, 'r') as infile: | |
html_temp = Template(infile.read()) | |
ebook_html_text = html_temp.substitute(title=self.ebook_title, body=ebook_html) | |
ebook_html_path = '/home/rg/projects/wc3/pages/ebook.html' | |
ebook_output_path = '/home/rg/projects/wc3/pages/'+self.ebook_title+'.epub' | |
with open(ebook_html_path, 'w') as outfile: | |
outfile.write(ebook_html_text) | |
proc = subprocess.Popen(['ebook-convert', ebook_html_path, ebook_output_path]) | |
proc.communicate() | |
print ebook_output_path | |
return ebook_output_path | |
# os.remove(ebook_html_path) | |
# def process_folder(self, folderpath): | |
# for root, dirs, files in os.walk(folderpath): | |
# for f in files: | |
# if f.endswith('.jpg'): | |
# img_path = os.path.join(root, f) | |
# print img_path | |
# self.narrate(img_path) | |
# time.sleep(0.1) | |
# num_sents, seed_id = map(int, sys.argv[1:3]) | |
# allowed_ext = set(['tif', 'tiff', 'png', 'jpg', 'jpeg']) | |
# if len(sys.argv) > 3: | |
# fp = sys.argv[3] | |
# fp_ext = fp.rsplit('.', 1).pop().lower() | |
# if fp_ext in allowed_ext: | |
# if image file | |
# wc = WordCamera(img_orig_fp=fp, sentence_count=num_sents, seed_ix=seed_id, manual=False, looper=True) | |
# else: | |
# if folder | |
# cameras = list() | |
# wc = False | |
# for subdir, dirs, files in os.walk(fp): | |
# for f in files: | |
# f_ext = f.rsplit('.', 1).pop().lower() | |
# if f_ext in allowed_ext: | |
# img_fp = os.path.join(subdir, f) | |
# if not wc: | |
# wc = WordCamera(img_orig_fp=img_fp, sentence_count=num_sents, seed_ix=seed_id, manual=False, looper=True, folderpath=subdir) | |
# else: | |
# time.sleep(1.5) | |
# # with open('big_picture_2016/page_paths.txt') as infile: | |
# filepaths = filter(lambda y: y, map(lambda x: x.strip(), infile.read().strip().split('\n'))) | |
# wc.create_ebook(sys.argv[4], filepaths) | |
# else: | |
# wc = WordCamera(sentence_count=num_sents, seed_ix=seed_id, manual=True, looper=True) | |
# if len(sys.argv) > 3: | |
# folderpath = sys.argv[3] | |
# wc.process_folder(folderpath) | |
# trigger = raw_input('Publish? ') | |
# if trigger: | |
# wc.publish_all() | |
# else: | |
# wc = WordCamera(sentence_count=num_sents, seed_ix=seed_id, manual=True, looper=True) | |
# time.sleep(num_sents*3) | |
# razer_rgb.write_to_file('brightness', '255') | |
# razer_rgb.perlin_noise(secs=num_sents*3) | |
# wc.publish(imghash) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment