Skip to content

Instantly share code, notes, and snippets.

@Gerzer
Last active January 18, 2018 23:42
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Gerzer/370da8cbe0f6066811b5 to your computer and use it in GitHub Desktop.
Save Gerzer/370da8cbe0f6066811b5 to your computer and use it in GitHub Desktop.
Pythonista FTP sync
import ui
import console
import keychain
import ftplib
import os
import re
import time
from datetime import datetime
global cur_dir
cur_dir = os.path.abspath(os.getcwd())
ftp_files = []
def walkftp(ftp_inst, dirname):
global ftp_files
ftp_inst.cwd(dirname)
for name in ftp_inst.nlst(ftp_inst.pwd()):
path = os.path.join(ftp_inst.pwd(), name)
try:
ftp_inst.cwd(path)
ftp_inst.cwd('..')
ui.in_background(walkftp(ftp_inst, path))
except ftplib.error_perm:
ftp_files.append(path)
def save_handler(sender):
addr_file = open(os.path.join(cur_dir, 'ftp_addr.txt'), 'w')
name_file = open(os.path.join(cur_dir, 'ftp_name.txt'), 'w')
dir_file = open(os.path.join(cur_dir, 'ftp_dir.txt'), 'w')
root_dir_file = open(os.path.join(cur_dir, 'ftp_rootdir.txt'), 'w')
addr_file.write(root['addr'].text)
name_file.write(root['name'].text)
dir_file.write(root['dir'].text)
root_dir_file.write(os.path.abspath(root['rootdir'].text))
addr_file.close()
name_file.close()
dir_file.close()
root_dir_file.close()
keychain.set_password('ftpsync', root['name'].text, root['pswd'].text)
@ui.in_background
def sync_handler(sender):
try:
addr_file = open(os.path.join(cur_dir, 'ftp_addr.txt'), 'r')
name_file = open(os.path.join(cur_dir, 'ftp_name.txt'), 'r')
dir_file = open(os.path.join(cur_dir, 'ftp_dir.txt'), 'r')
root_dir_file = open(os.path.join(cur_dir, 'ftp_rootdir.txt'), 'r')
addr = addr_file.read()
name = name_file.read()
sync_dir = dir_file.read()
root_dir = root_dir_file.read()
addr_file.close()
name_file.close()
dir_file.close()
root_dir_file.close()
pswd = keychain.get_password('ftpsync', name)
except:
console.hud_alert('Couldn\'t find a saved FTP host', 'error', 2.3)
return
console.show_activity('Syncing with FTP server...')
os.chdir(root_dir)
time.sleep(0.35)
# try:
# state_file = open('ftp_state.txt', 'r')
# state = state_file.readlines()
# state_file.close()
# except IOError:
# state_file = open('ftp_state.txt', 'a+')
# state_file.close()
# state = []
if name == '#anonymous#' and pswd == '@anonymous@':
ftp_inst = ftplib.FTP(addr)
ftp_inst.login()
else:
ftp_inst = ftplib.FTP(addr)
ftp_inst.login(name, pswd)
try:
ftp_inst.nlst(ftp_inst.pwd())
ftp_inst.set_debuglevel(1)
walkftp(ftp_inst, sync_dir)
except:
pass
for ftp_file in ftp_files:
ftp_mod_date = datetime.strptime(ftp_inst.sendcmd('MDTM ' + ftp_file)[4:], "%Y%m%d%H%M%S")
local_file = ftp_file.replace(sync_dir + '/', '')[1:]
if os.path.exists(local_file) == True:
local_mod_date = datetime.fromtimestamp(os.path.getmtime(local_file))
if ftp_mod_date > local_mod_date:
ftp_inst.retrbinary('RETR ' + ftp_file, open(local_file, 'wb').write)
else:
pass
else:
head, tail = os.path.split(local_file)
if not os.path.exists(head) and head != '':
os.makedirs(head)
local_file_obj = open(local_file, 'a+')
local_file_obj.close()
ftp_inst.retrbinary('RETR ' + ftp_file, open(local_file, 'wb').write)
for local_dir, local_dirs, local_filenames in os.walk('.'):
if local_dir == './.Trash':
for local_filename in local_filenames:
local_file = os.path.join(local_dir, local_filename)
local_file = local_file.replace('./', '')
ftp_file = '/' + os.path.join(sync_dir, local_file)
if ftp_file in ftp_files:
ftp_inst.delete(ftp_file)
for local_trash_subdir in local_dirs:
local_full_dir = os.path.join(local_dir, local_trash_subdir)
local_full_dir = local_full_dir.replace('./', '')
ftp_dir = '/' + os.path.join(sync_dir, local_full_dir)
try:
ftp_inst.rmd(ftp_dir)
except:
pass
else:
for local_filename in local_filenames:
local_file = os.path.join(local_dir, local_filename)
local_file = local_file.replace('./', '')
ftp_file = '/' + os.path.join(sync_dir, local_file)
if ftp_file in ftp_files:
ftp_mod_date = datetime.strptime(ftp_inst.sendcmd('MDTM ' + ftp_file)[4:], "%Y%m%d%H%M%S")
local_mod_date = datetime.fromtimestamp(os.path.getmtime(local_file))
if local_mod_date > ftp_mod_date:
local_file_obj = open(local_file, 'rb')
head, tail = os.path.split(ftp_file)
if tail.startswith('.') == False:
ftp_inst.storbinary('STOR ' + ftp_file, local_file_obj)
local_file_obj.close()
else:
ftp_inst.cwd('/')
head, tail = os.path.split(ftp_file)
do_store_file = True
for dir_level in head.split('/'):
try:
ftp_inst.cwd(dir_level)
except:
ftp_inst.mkd(dir_level)
ftp_inst.cwd(dir_level)
if do_store_file == True:
local_file_obj = open(local_file, 'rb')
if tail.startswith('.') == False:
ftp_inst.storbinary('STOR ' + ftp_file, local_file_obj)
local_file_obj.close()
ftp_inst.close()
console.hide_activity()
def mode_handler(sender):
if sender.selected_index == 0:
root['rootdir'].enabled = False
root['addr'].enabled = False
root['name'].enabled = False
root['pswd'].enabled = False
root['anon'].enabled = False
root['save'].enabled = False
root['dir'].enabled = False
root['sync'].enabled = True
elif sender.selected_index == 1:
root['rootdir'].enabled = True
root['addr'].enabled = True
if root['anon'].value == False:
root['name'].enabled = True
root['pswd'].enabled = True
root['anon'].enabled = True
root['save'].enabled = True
root['dir'].enabled = True
root['sync'].enabled = False
def anon_handler(sender):
if sender.value == True:
root['name'].text = '#anonymous#'
root['pswd'].text = '@anonymous@'
root['name'].enabled = False
root['pswd'].enabled = False
elif sender.value == False:
root['name'].text = ''
root['pswd'].text = ''
root['name'].enabled = True
root['pswd'].enabled = True
root = ui.load_view()
try:
addr_file = open(os.path.join(cur_dir, 'ftp_addr.txt'), 'r')
name_file = open(os.path.join(cur_dir, 'ftp_name.txt'), 'r')
dir_file = open(os.path.join(cur_dir, 'ftp_dir.txt'), 'r')
root_dir_file = open(os.path.join(cur_dir, 'ftp_rootdir.txt'), 'r')
addr = addr_file.read()
name = name_file.read()
sync_dir = dir_file.read()
root_dir = root_dir_file.read()
addr_file.close()
name_file.close()
dir_file.close()
root_dir_file.close()
pswd = keychain.get_password('ftpsync', name)
except:
addr = ''
name = ''
pswd = ''
root_dir = ''
sync_dir = ''
root['addr'].enabled = False
root['addr'].text = addr
root['name'].enabled = False
root['name'].text = name
root['pswd'].enabled = False
root['pswd'].text = pswd
root['rootdir'].enabled = False
root['rootdir'].text = root_dir
root['anon'].enabled = False
if name == '#anonymous#' and pswd == '@anonymous@':
root['anon'].value = True
root['save'].enabled = False
root['dir'].enabled = False
root['dir'].text = sync_dir
root['mode'].action = mode_handler
root.present('sheet')
[{"class":"View","attributes":{"name":"FTP sync","tint_color":"RGBA(0.000000,0.478000,1.000000,1.000000)","background_color":"RGBA(1.000000,1.000000,1.000000,1.000000)","enabled":true,"border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","flex":""},"frame":"{{0, 0}, {152, 392}}","nodes":[{"class":"TextField","attributes":{"alignment":"left","autocorrection_type":"no","font_size":17,"border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","enabled":true,"flex":"","placeholder":"Server address","text_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","name":"addr","border_style":3,"spellchecking_type":"no","uuid":"D41E71EA-F9BE-4BF4-8A0E-4664A34DA47F"},"frame":"{{8, 38}, {136, 32}}","nodes":[]},{"class":"SegmentedControl","attributes":{"name":"mode","border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","uuid":"F9F1EB6A-6071-48C9-AB58-73FC597764D5","enabled":true,"segments":"Sync|Edit","flex":"LR"},"frame":"{{22, 357}, {120, 29}}","nodes":[]},{"class":"TextField","attributes":{"alignment":"left","autocorrection_type":"no","font_size":17,"border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","enabled":true,"flex":"","placeholder":"Username","text_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","name":"name","border_style":3,"spellchecking_type":"no","uuid":"7D4B39EA-FDC7-4BF8-9060-4301FF746C78"},"frame":"{{8, 78}, {136, 32}}","nodes":[]},{"class":"TextField","attributes":{"alignment":"left","autocorrection_type":"no","font_size":17,"border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","enabled":true,"flex":"","placeholder":"Password","text_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","secure":true,"name":"pswd","border_style":3,"spellchecking_type":"no","uuid":"3903F267-28B5-43E9-920F-2AE569A09F58"},"frame":"{{8, 118}, {136, 32}}","nodes":[]},{"class":"Switch","attributes":{"enabled":true,"flex":"","name":"anon","value":false,"action":"anon_handler","border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","uuid":"C0A89942-B88C-42B1-A451-F8446670E1EB"},"frame":"{{50, 238}, {51, 31}}","nodes":[]},{"class":"Label","attributes":{"font_size":17,"enabled":true,"text":"Anonymous","flex":"","name":"anonlabel","border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","text_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","alignment":"center","uuid":"86E6E1F7-2F72-47C6-9EB0-7366E4F51189"},"frame":"{{8, 198}, {136, 32}}","nodes":[]},{"class":"Button","attributes":{"font_size":15,"enabled":true,"flex":"","font_bold":true,"name":"sync","uuid":"FFA3ADB5-B17D-4E81-9A30-F7F86431D854","border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","action":"sync_handler","image_name":"ionicons-ios7-refresh-32","title":"Sync"},"frame":"{{36, 317}, {80, 32}}","nodes":[]},{"class":"Button","attributes":{"font_size":15,"enabled":true,"flex":"","font_bold":false,"name":"save","uuid":"51D6677C-10AB-46E3-A229-665692F43DB7","border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","action":"save_handler","image_name":"ionicons-ios7-checkmark-32","title":"Save"},"frame":"{{36, 277}, {80, 32}}","nodes":[]},{"class":"TextField","attributes":{"alignment":"left","autocorrection_type":"no","font_size":17,"border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","enabled":true,"flex":"","placeholder":"Sync directory","text_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","name":"dir","border_style":3,"spellchecking_type":"no","uuid":"76FE0AEB-1770-4452-98B1-E84B64CA75B7"},"frame":"{{8, 158}, {136, 32}}","nodes":[]},{"class":"TextField","attributes":{"alignment":"left","autocorrection_type":"no","font_size":17,"border_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","enabled":true,"flex":"","placeholder":"Local root path","text_color":"RGBA(0.000000,0.000000,0.000000,1.000000)","name":"rootdir","border_style":3,"spellchecking_type":"no","uuid":"1C9422C7-A825-4246-B0F3-D8935139A463","secure":false},"frame":"{{8, 0}, {136, 32}}","nodes":[]}]}]
@cclauss
Copy link

cclauss commented Sep 26, 2015

This is what I meant by naked exceptions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment