Skip to content

Instantly share code, notes, and snippets.

@jarkin13
Created July 15, 2020 16:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jarkin13/e8a82ef3cf6d27e503988a0df0bda007 to your computer and use it in GitHub Desktop.
Save jarkin13/e8a82ef3cf6d27e503988a0df0bda007 to your computer and use it in GitHub Desktop.
GCS
#!/usr/bin/env python
import pytz
import argparse
import os
import shutil
import logging
import logging.config
from datetime import datetime
from parse import parse
from google.cloud import storage
from google.cloud import pubsub
# import google.cloud.logging
from google.cloud.exceptions import NotFound, Conflict
# from concurrent.futures import ThreadPoolExecutor
import pysftp
this_dir, _ = os.path.split(__file__)
path = os.path.join(this_dir, 'logging.conf')
logging.config.fileConfig(path, disable_existing_loggers=False)
LOGGER = logging.getLogger('bsd')
from yaml import load
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
def force_utc(d_object):
if hasattr(d_object, 'tzinfo') and d_object.tzinfo is None:
return d_object.replace(tzinfo=pytz.UTC)
return d_object
def from_tz(d_object, tz):
t = pytz.timezone(tz).localize(d_object)
return t.astimezone(pytz.UTC)
def sftp_list_dir(sftp, path=None, tz='UTC'):
if not path:
path = '/'
files = []
with sftp.cd(path):
for file in sftp.listdir_attr():
files.append({
'file': file.filename,
'modified': from_tz(datetime.fromtimestamp(file.st_mtime), tz)
})
return files
def gcs_list_dir(bucket, path=None):
files = []
for blob in bucket.list_blobs(prefix=path):
# exclude directories
modified = None
if blob.metadata:
try:
modified = force_utc(datetime.fromtimestamp(blob.metadata.get('lastmodified')))
except:
pass
if not modified:
modified = force_utc(blob.updated)
if blob.name[-1]!='/':
files.append({
'file': blob.name,
'modified': modified,
'_blob': blob
})
return files
#Probably want to refactor this into separate functions. Simplest would be GCP to FTP and another for FTP to GCS, pass in all of the parameters you need from the sync function
def sync(config, gcs):
for job in config.get('jobs',[]):
LOGGER.info(f"Running: {job.get('name')}")
gcs_config = job.get('gcs')
bucket = gcs.get_bucket(gcs_config['bucket'])
ftp_config = job.get('ftp')
# RSA hack
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
sftp = pysftp.Connection(
ftp_config.get('host')
,port=ftp_config.get('port', 22)
,username=ftp_config.get('username')
,password=ftp_config.get('password')
,cnopts=cnopts
)
mode = job.get('mode')
bucket_path = ''
if gcs_config.get('path'):
bucket_path = gcs_config.get('path') + '/'
# Section 1 to break into new function
if mode in ("gcs<ftp", "gcs<>ftp"):
bucket_files = gcs_list_dir(bucket, gcs_config.get('path', None))
ftp_files = sftp_list_dir(sftp, ftp_config.get('path'), ftp_config.get('timezone'))
# FTP to GCS
if not os.path.exists('tmp'):
os.mkdir('tmp')
LOGGER.info("GCS<FTP")
download_files = []
for ftp_file in ftp_files:
ftp_filename = ftp_file.get('file')
exists = False
for gcs_file in bucket_files:
gcs_filename = os.path.basename(gcs_file['file'])
if gcs_filename == ftp_filename:
exists = True
if gcs_file.get('modified', datetime.min) < ftp_file.get('modified', datetime.max):
download_files.append(ftp_filename)
if exists == False:
LOGGER.info(f"GCS<FTP: {ftp_filename}")
download_files.append(ftp_filename)
for filename in download_files:
with sftp.cd(ftp_config.get('path', '/')):
sftp.get(filename, localpath=os.path.join('tmp', filename), preserve_mtime=True)
blob = bucket.blob(bucket_path + filename)
t = os.path.getmtime(os.path.join('tmp', filename))
LOGGER.info(str(int(t)))
blob.metadata = {'lastmodified': str(int(t))}
blob.upload_from_filename(os.path.join('tmp', filename))
if os.path.exists('tmp'):
shutil.rmtree(os.path.join(this_dir, 'tmp'))
#Section 2 to break into new function
#The way you download a file from GCS is different from the way you download a file from FTP
if mode in ("gcs>ftp", "gcs<>ftp"):
bucket_files = gcs_list_dir(bucket, gcs_config.get('path', None))
ftp_files = sftp_list_dir(sftp, ftp_config.get('path'), ftp_config.get('timezone'))
# GCS to FTP
if not os.path.exists('tmp'):
os.mkdir('tmp')
LOGGER.info("GCS>FTP")
download_files = []
for gcs_file in bucket_files:
gcs_filename = os.path.basename(gcs_file['file'])
exists = False
for ftp_file in ftp_files:
ftp_filename = ftp_file.get('file')
if gcs_filename == ftp_filename:
exists = True
if gcs_file.get('modified', datetime.min) > ftp_file.get('modified', datetime.max):
download_files.append(gcs_file)
if exists == False:
LOGGER.info(f"GCS>FTP: {gcs_filename}")
download_files.append(gcs_file)
for file in download_files:
filename = os.path.basename(file['file'])
with open(os.path.join('tmp', filename), "wb") as file_obj:
file['_blob'].download_to_file(file_obj)
sftp.put(
os.path.join('tmp', filename)
,remotepath=os.path.join(ftp_config.get('path','/'), filename)
,preserve_mtime=True
)
if os.path.exists('tmp'):
shutil.rmtree(os.path.join(this_dir, 'tmp'))
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--project-id', type=str)
parser.add_argument('--local-config', type=str)
parser.add_argument('--cloud-config', type=str)
args = parser.parse_args()
gcs = storage.Client(project=args.project_id)
if args.cloud_config:
config_path = parse("gs://{bucket}/{path}", args.cloud_config)
bucket = gcs.get_bucket(config_path['bucket'])
blob = bucket.get_blob(config_path['path'])
config = load(blob.download_as_string(), Loader=Loader)
elif args.local_config:
with open(args.local_config) as f:
config = load(f, Loader=Loader)
try:
sync(config, gcs)
finally:
if os.path.exists('tmp'):
shutil.rmtree(os.path.join(this_dir, 'tmp'))
if __name__ == "__main__":
main()
jobs:
- name: Primary Sync
ftp:
host: sftp://app.files.com
port: 22
username: xxx
password: xxx
path: Test folder/test
timezone: America/New_York
gcs:
bucket: bsd-transfer-test
path: 'test-folder'
mode: gcs<>ftp
# gcs>ftp
# gcs<>ftp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment