Skip to content

Instantly share code, notes, and snippets.

@moorepants
Created August 19, 2013 02:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save moorepants/6265289 to your computer and use it in GitHub Desktop.
Save moorepants/6265289 to your computer and use it in GitHub Desktop.
Image uploader for localwikis.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from upload_tagged_photos import ImageUploader
def TestUploadWiki():
url = "http://clevelandwiki.org/api/"
user_name = raw_input("What is your username?")
api_key = getpassword("What is your api_key?")
def setup(self):
self.api = slumber.API(self.url, auth=(self.user_name,
self.password))
# create two directories with some images, some should have correct
# tags
# create test page
page_dict = {
"content": "<p>The Upload Test Page.</p>",
"name": "Upload Test Page",
}
test_page = self.api.page.post(page_dict)
test_page_slug = self.api.page(test_page['id'])['slug']
# add an image to it
with open('image.jpg') as f:
self.api.file.post(name='image.jpg', slug=test_page_slug,
files={'file': f})
self.uploader = ImageUploader(self.api_url,
user_name=self.user_name,
api_key=self.api_key)
def test_init(self):
uploader = ImageUploader(self.api_url, user_name=self.user_name,
api_key=self.api_key)
assert self.api._store['base_url'] == self.api_url
assert self.api._store['format'] == 'json'
assert sefl.api._store['session'].auth[0] == self.user_name
assert sefl.api._store['session'].auth[1] == self.api_key
def test_remove_tmp_dirs():
directories = ['localwikidir1', 'localwikidir1']
for directory in directories:
os.mkdir(os.path.join('/tmp', directory,
self.uploader._tmp_dir_name))
files = ['file1.jpg', 'file2.jpg']
file_paths = []
for directory, file_name in zip(directores, files):
file_path = os.path.join('/tmp', directory,
self.uploader._tmp_dir_name, file_name)
with open(file_path, 'w') as f:
pass
file_path.append()
assert os.exists(file_path)
self.uploader.remove_tmp_dirs(file_paths)
for file_path in file_paths:
assert not os.exists(os.split(file_path)[0])
assert not os.exists(file_path)
for directory in directories:
shutil.rmtree(os.path.join('/tmp', directory))
def test_find_files_in_page(self):
assert find_files_in_page('this will never be the name of a page') is None
files_in_page = find_files_in_page('Upload Test Page')
assert files_in_page[0]['name'] = 'image.jpg'
assert files_in_page[0]['slug'] = 'upload test page'
def teardown():
# remove the files attached to the test page
# delete the test page and all versions
# delete the tmp image directories
test_page = api.page('Upload Test Page').get()
test_file_id = \
api.files.get(slug=test_page['slug'])['objects'][0]['id']
self.api.files.id.delete()
self.api.page('Upload Test Page').delete()
def test_find_wiki_images():
os.mkdir('/tmp/wiki-images')
upload_wiki_images('dir'])
upload_wiki_images(['dir1', 'dir2'])
upload_wiki_images('dir'], create_new_page=True, embed_image_on_create=True)
wiki_images = find_wiki_images('dir')
wiki_images = find_wiki_images(['dir1', 'dir2'])
wiki_images['/tmp/cw/test_image.jpg'] == ['page:Test Page']
check_if_file_exists(file_name)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import shutil
import getpass
import slumber
from gi.repository import GExiv2
class ImageUploader(object):
_tmp_dir_name = '.localwiki'
_file_extensions = ['.png', '.jpg', '.gif']
def __init__(self, api_url, user_name=None, api_key=None):
"""Initializes the uploader.
Parameters
==========
api_url : string
This should point to your API and have a trailing slash, e.g.
'http://clevelandwiki.org/api/'.
user_name : string, optional, default=None
The user name for you API key. If you don't provide this here,
then you will be prompted to enter it on initialization.
api_key : string, optional, default=None
The api_key for this user. If you don't provide this here, then
you will be prompted to enter it on initialization.
"""
if user_name is None:
user_name = raw_input("What is your username?\n")
if api_key is None:
api_key = getpass.getpass("What is your api_key?\n")
self.api = slumber.API(api_url, auth=(user_name, api_key))
def upload(self, main_keyword, *directories, page_keyword_prefix="page:"):
"""Uploads all the new files in the specified directories with the
proper tags to localwiki and creates new pages if needed.
Parameters
==========
main_keyword : string
The keyword embedd in Iptc.Application2.Keywords that identifies
your image as one that belongs on your localwiki site, e.g.
'cleveland wiki'.
directories : str
The directories to search for images.
page_keyword_prefix : string, optional, default="page:"
This is the prefix for the keyword embedded in your image which
contains the page name where the image belongs, e.g. if you you
image belongs on the front page then you keyword should look
like "page:Front Page".
"""
self.directories = list(directories)
self.main_keyword = main_keyword
self.page_keyword_prefix = page_keyword_prefix
wiki_images = self.find_localwiki_images()
for file_path, page_names in wiki_images.items():
for page_name in page_names:
page = create_page(page_name)
if not file_exists_on_server(os.path.split(file_path)[1]):
upload_file(page, file_path)
embed_image(page_name, image_name)
else:
print("{} already exists on the localwiki.".format(file_path))
print('Cleaning up image rotations.')
self.remove_tmp_dirs(wiki_images.keys())
print('Done.')
def remove_tmp_dirs(self, file_paths):
"""Removes any of the temporary directories used to rotate images."""
fondled_directories = set([os.path.split(path)[0] for path in
file_paths])
for directory in fondled_directories:
tmp_dir = os.path.join(directory, self._tmp_dir_name)
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
def find_localwiki_images(self):
"""Returns a dictionary mapping local image paths to local wiki page
names for all images in the provided directories that have the
corret tags."""
wiki_images = {}
for directory in self.directories:
wiki_images.upate(self.find_images_in_directory(directory))
return wiki_images
def find_localwiki_images_in_directory(self, directory):
"""Returns a dictionary mapping local image paths to local wiki page
names.
directory : string
The path to the directory that should be scanned for localwiki
images.
"""
file_names = os.listdir(directory)
wiki_images = {}
for file_name in file_names:
if True in [file_name.endswith(ext) for ext in_file_extensions]:
metadata = GExiv2.Metadata(file_name)
keywords = \
metadata.get_tag_multiple('Iptc.Application2.Keywords')
if self.main_keyword in keywords:
wiki_images[os.join.path(directory, file_name)] = \
[keyword.split(':')[1] for keyword in keywords if
keyword.startswith('page:')]
return wiki_images
def create_page(self, page_name):
"""Creates a new blank page on the server with the provided page
name and returns the data recieved from the post, unless it already
exists, in which case it returns the exsiting page.
Parameters
==========
page_name : string
The name of the page to create.
Returns
=======
page_dict : dictionary
The response from the post.
"""
page_dict = {
"content": "<p>Please add some content to help describe this page.</p>",
"name": page_name,
}
try:
return self.api.page(page_name).get()
except HTTPClientError:
print("Creating the new page: {}".format(page_name))
return self.api.page.post(page_dict)
def find_files_in_page(self, page_name):
"""Returns a list of dictionaries, one for each file, attached to a
page and returns None if the page doesn't exist.
Parameters
==========
page_name : string
The name of the page to create.
Returns
=======
files : list of dictionaries or None
The respons dictionaries, one for each file associated with the
page. None if the page doesn't exist.
"""
try:
slug = self.api.page(page_name).get()['slug']
except HTTPClientError:
return None
else:
return self.api.files.get(slug=slug)['objects']
def file_exists_on_server(self, file_name):
"""Returns true if the file already exists on the server.
Parameters
==========
file_name : string
The name of the file, as stored in localwiki.
"""
file_list = self.api.file.get()['objects']
exists = False
for file_dict in file_list:
if file_dict['name'] == file_name:
exists = True
return exists
def rotate_image(self, file_path):
"""Creates a temporary directory beside the file, copies the file
into the directory, and rotates it based on the EXIF orientaiton
data.
Parameters
==========
file_path : string
The path to the image file.
"""
directory, file_name = os.path.split(file_path)
os.mkdir(os.path.join(directory, self._tmp_dir_name))
tmp_file_path = os.path.join(directory, self._tmp_dir_name,
file_name)
shutil.copyfile(file_path, tmp_file_path)
os.system("jhead -ft -autorot {}".format(tmp_file_path))
def upload_image(self, page, file_path):
"""Uploads the image to the server.
Parameters
==========
page : dictionary
The response dictionary for a page.
file_path : string
The path to the image file.
"""
metadata = GExiv2.Metadata(file_path)
directory, file_name = os.path.split(file_path)
tmp_file_path = os.path.join(directory, 'tmp', file_name)
try:
with open(tmp_file_path):
pass
except IOError:
rotated = False
else:
rotated = True
if metadata['Exif.Image.Orientation'] != '1' and not rotated:
rotate_image(file_path)
image = open(tmp_file_path, 'r')
else:
image = open(file_path, 'r')
print('Uploading {} to {}'.format(file_name, page['name']))
# may need this instead:
#self.api.file.post({'name': file_name, 'slug'=page['slug']},
#files={'file': image})
self.api.file.post(name=file_name, slug=page['slug'],
files={'file': image})
print('Done.')
image.close()
def embed_image(self, page_name, image_name, caption='Caption me!'):
"""Appends HTML to the page that embeds the attached image.
Parameters
==========
page_name : string
The name of the page to create.
image_name : string
The name of an image that is attached to the page.
caption : string, optional, default='Caption me!'
The caption that is displayed under the image.
Returns
=======
page_response : dictionary
The response from the patch.
"""
current_content = self.api.page(page_name).get()['content']
html = \
"""
<p>
<span class="image_frame image_frame_border">
<img src="_files/{}" style="width: 300px; height: 225px;" />
<span class="image_caption" style="width: 300px;">
{}
</span>
</span>
</p>
""".format(file_name, caption)
return self.api.page(page_name).patch({'content': current_content +
html})
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description='Upload images to local wiki.')
parser.add_argument('url', type=str,
help="The API url with a trailing slash, e.g http://clevelandwiki.org/api/")
parser.add_argument('keyword', type=str,
help="The main keyword to look for, e.g. 'cleveland wiki'.")
parser.add_argument('directories', type=str, narg='*',
help="The directories to search.")
parser.add_argument('--prefix', type=str,
help="The keyword page name prefix, the default is 'page:'.")
args = parser.parse_args()
uploader = ImageUploader(args.url)
if args.prefix:
kwargs = {'page_keyword_prefix': args.prefix}
else:
kwargs = {}
uploader.upload(arg.keyword, *arg.directories, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment