Skip to content

Instantly share code, notes, and snippets.

Created January 22, 2014 14:15
Show Gist options
  • Save anonymous/8559391 to your computer and use it in GitHub Desktop.
Save anonymous/8559391 to your computer and use it in GitHub Desktop.
'''
Created on Aug 28, 2013
@author: cgrimm
'''
import cPickle as pickle
import time
import hashlib
import sys
from datetime import datetime, tzinfo, timedelta
from gettext import gettext as _
from email.utils import parsedate
from swift.common.utils import log_traceback, lock_path
from swift.common.exceptions import PathNotDir, DiskFileNotExist, DiskFileCollision
from boto.s3.key import Key
from boto.exception import S3ResponseError, S3DataError
S3OBJECT_SYSTEM_META = set('Content-Length Content-Type deleted ETag'.split())
HASH_FILE = 'hashes.pkl'
PICKLE_PROTOCOL = 2
ONE_WEEK = 604800
PATH_SEP = '/'
class S3Http(object):
"""
Class constructor.
:param bucket: Boto S3 bucket object that all operations should use.
"""
def __init__(self, bucket, logger, timeout=60):
self.logger = logger
self.bucket = bucket
self.timeout = timeout
self.logger.notice(_("ENTERING: s3http.S3Http.__init__(%(bucketName)s, ...)"),
{'bucketName': self.bucket.name})
"""
Parses out the directory path of the path passed in. Will not contain trailing PATH_SEP
:param path: absolute or relative path to extract directory path
"""
def dirname(self, path):
return PATH_SEP.join(path.split(PATH_SEP)[:-1]) if path else None
"""
Parses out the base file from path passed in.
:param path: absolute or relative path to extract base file name
"""
def basename(self, path):
return path.split(PATH_SEP)[-1] if path else None
"""
Constructs a S3 path with the variable number of arguments passed in.
:param args: variable length number of args to append together into a path.
"""
def join(self, *args):
# First remove all trailing folder specifiers to avoid doubles.
# Also will ensure that no result ever returns a trailing PATH_SEP
new_args = list()
isFirst = True
for oneArg in args:
# Skip over empty args.
if not oneArg or 0 == len(oneArg):
continue
# Process non-empty args.
if not isFirst:
new_args.append(oneArg.rstrip(PATH_SEP).lstrip(PATH_SEP))
else:
new_args.append(oneArg.rstrip(PATH_SEP))
isFirst = False
return PATH_SEP.join(new_args)
"""
Returns True or False as to whether the specified path matches an object in S3 bucket.
If an exact match is not requested it will return true if there are any files that
start with the specified path in the bucket.
:param path: S3 path to check for existence
:param exactMatch: Boolean to indicate whether only exact matches of the path
"""
def exists(self, path, exactMatch=True):
self.logger.notice(_('ENTERING: s3http.exists(%(inPath)s, %(inExactMatch)s)'), {'inPath': path, 'inExactMatch': str(exactMatch)})
s3path = path.lstrip(PATH_SEP)
if exactMatch:
retval = True if self.bucket.get_key(s3path) else False
else:
try:
# See if there is anything there
iter(self.bucket.list(s3path)).next()
retval = True #Yes there is
except StopIteration:
retval = False # No there isn't
return retval
"""
Deletes the object from the bucket as indicated by path
:param path: S3 path for object to remove.
"""
def unlink(self, path):
self.logger.notice(_('ENTERING: s3http.unlink(%(inPath)s)'), {'inPath': path})
Key = self.bucket.get_key(path.lstrip(PATH_SEP))
# Fail silently
if Key:
Key.delete()
# Make sure the delete removed what was asked for.
if Key.exists():
return False # Oops.. Still exists. Might be a non-empty folder.
else:
return False
return True
"""
Deletes the entire folder structure from the bucket as indicated by path
:param path: S3 path to remove.
"""
def rmtree(self, path, ignore_errors=False, onerror=None):
"""Recursively delete a directory tree.
If ignore_errors is set, errors are ignored; otherwise, if onerror
is set, it is called to handle the error with arguments (func,
path, exc_info) where func is os.listdir, os.remove, or os.rmdir;
path is the argument to that function that caused it to fail; and
exc_info is a tuple returned by sys.exc_info(). If ignore_errors
is false and onerror is None, an exception is raised.
"""
self.logger.notice(_('ENTERING: s3http.rmtree(%(inPath)s)'), {'inPath': path})
if ignore_errors:
def onerror(*args):
pass
elif onerror is None:
def onerror(*args):
raise
allNames = []
try:
allNames = self.listdir(path)
fileNames = self.listdir(path, filesOnly=True)
for name in allNames:
fullname = self.join(path, name)
if name in fileNames:
# It is a regular object
if not self.unlink(fullname):
self.logger.info(_('WARNING: In s3http.rmtree(%(inPath)s), s3http.unlink(%(inFullName)s)'),
{'inPath': path, 'inFullName': fullname})
if not ignore_errors:
raise Exception('Unexpected s3http.unlink failure. Method returned False.')
else:
# it is a Folder.
self.rmtree(fullname, ignore_errors, onerror)
# Now remove ourselves.
if not self.rmdir(path):
self.logger.info(_('WARNING: In s3http.rmtree(%(inPath)s), failed to s3http.rmdir(%(inFullName)s)'),
{'inPath': path, 'inFullName': path})
if not ignore_errors:
raise Exception('Unexpected s3http.rmdir failure. Method returned False.')
except S3ResponseError:
onerror(self.rmdir, path, sys.exc_info())
"""
Return the epoch time representation of the S3 object last_modified time stamp
:param path: S3 path for object to retrieve time value
:return integer representing the epoch time value
"""
def getmtime(self, path):
"""
A tzinfo class for datetime objects that returns a 0 timedelta (UTC time)
"""
class _UTC(tzinfo):
def dst(self, dt):
return timedelta(0)
utcoffset = dst
def tzname(self, dt):
return 'UTC'
self.logger.notice(_('ENTERING: s3http.getmtime(%(inPath)s)'), {'inPath': path})
Key = self.bucket.get_key(path.lstrip(PATH_SEP))
# Didn't find anything.
if not Key:
return None
# Take the string date and translate to epoch time.
try:
parts = parsedate(Key.last_modified)[:7]
date = datetime(*(parts + (_UTC(),)))
except Exception:
return None
if date.year < 1970:
raise ValueError('Somehow an invalid year')
return int(date.strftime('%s'))
"""
Create all folders that do not exist in for the path specified.
NOTE: This is equivalent to mkdir() below.
:param path: S3 path for folder to be created.
:return True if successful, otherwise False
"""
def mkdirs(self, path):
self.logger.notice(_('ENTERING: s3http.mkdirs(%(inPath)s)'), {'inPath': path})
return self.mkdir(path)
"""
Create all folders that do not exist in for the path specified.
NOTE: This is equivalent to mkdirs() above.
:param path: S3 path for folder to be created.
:return True if successful, otherwise False
"""
def mkdir(self, path):
self.logger.notice(_('ENTERING: s3http.mkdir(%(inPath)s)'), {'inPath': path})
try:
newKey = Key(self.bucket)
newKey.key = path.rstrip(PATH_SEP) + PATH_SEP
newKey.set_contents_from_string('')
except S3DataError, e: # Throws this error if already exists.
self.logger.info(_('WARNING: In s3http.mkdir(%(inPath)s) received an S3DataError that implies a False return value. Reason: (%(inReason)s)'),
{'inPath': path, 'inReason': e.reason})
return False
return True
"""
Remove the specified folder from S3 bucket.
:param path: S3 path for folder to be removed.
:return True if successful, otherwise False
"""
def rmdir(self, path):
self.logger.notice(_('ENTERING: s3http.rmdir(%(inPath)s)'), {'inPath': path})
return self.unlink(path.rstrip(PATH_SEP) + PATH_SEP)
"""
Return a list of items in the given folder path. In default mode, it will return just the
item names from the context of the folder specification and will return subfolder
names that reside in the provided folder as well. It will only return content at the
specified folder level.
Can be called with optional parameters to change whether to return full paths in the list
and/or to not include sub-folder names in the list.
:param path_prefix the starting folder for objects to locate. The trailing PATH_SEP is added
if not provided.
:return list of items that match the criteria
"""
def listdir(self, path_prefix, fullPath=False, filesOnly=False):
self.logger.notice(_('ENTERING: s3http.listdir(%(inPath)s, %(inFullPath)s, %(inFilesOnly)s)'),
{'inPath': path_prefix, 'inFullPath': fullPath, 'inFilesOnly': filesOnly})
folderpath = path_prefix.rstrip(PATH_SEP) + PATH_SEP # Make sure it has a PATH_SEP on the end
s3_folderpath = folderpath.lstrip(PATH_SEP) # Remove leading slash for sending to S3
uniqueList = dict() # Just be explicit for the benefit of the reader.
front = folderpath if fullPath else ''
for item in self.bucket.list(s3_folderpath):
# Don't consider if the result is for a folder.
if filesOnly and item.key.endswith(PATH_SEP):
continue
pieces = item.key.replace(s3_folderpath, '', 1).split(PATH_SEP)
if pieces and len(pieces) >= 2 and pieces[1]:
continue # Too deep just skip and look for more
if pieces[0]:
uniqueList[front + pieces[0]] = ''
return uniqueList.keys()
"""
Return a list of items that start with the given path prefix. In default mode, it will
return just the item names from the context of the folder part of the specification
and not will return subfolder names.
Can be called with optional parameters to change whether to return full paths and to
not include sub-folder items.
:param path_prefix the starting path for objects to locate. If not proceeded by a PATH_SEP,
all objects that start with that path will be included which may include any subfolders.
If proceeded by a PATH_SEP, it will specify only root at the specified folder and do not include
any parent folder items that might match
:return list of items that match the criteria
"""
def listfiles(self, path_prefix, fullPath=False, filesOnly=True):
self.logger.notice(_('ENTERING: s3http.listfiles(%(inPath)s, %(inFullPath)s, %(inFilesOnly)s)'),
{'inPath': path_prefix, 'inFullPath': fullPath, 'inFilesOnly': filesOnly})
s3_path_prefix = path_prefix.lstrip(PATH_SEP)
uniqueList = dict() # Just be explicit for the benefit of the reader
# determine what part we need to include with the result.
if fullPath:
front = path_prefix
else:
if path_prefix.endswith(PATH_SEP):
front = ''
else:
parts = path_prefix.split(PATH_SEP)
front = parts[len(parts) - 1] if len(parts) >= 1 else ''
# Loop through all items returned based on the path.
for item in self.bucket.list(s3_path_prefix):
# Skip if the result is for a folder and we should not report on them.
if filesOnly and item.key.endswith(PATH_SEP):
continue
# Special case of we matched exactly to what we are looking for
if item.key == s3_path_prefix:
# But avoid adding an empty string as a path. This will happen
# when the full path is not requested and a folder s3_path_prefix
# was provided.
if len(front) > 0:
uniqueList[front] = ''
else:
# Not an exact match, but need to formulate the ending of the item
# found.
pieces = item.key.replace(s3_path_prefix, '', 1).split(PATH_SEP)
if pieces and len(pieces) >= 2 and pieces[1]:
continue # Too deep just skip and look for more
if pieces[0]:
uniqueList[front + pieces[0]] = ''
return uniqueList.keys()
"""
Read the object contents and return. Can specify the size of the return and/or
where to start in the object.
:param path - S3 path to the object in the bucket.
:param size - Number of bytes to return in the result
:param startPos - starting 0 based offset into the object to start
returing content.
:return string of bytes
"""
def read(self, path, size=None, startPos=None):
self.logger.notice(_('ENTERING: s3http.read(%(inPath)s, %(inSize)s, %(inStartPos)s)'),
{'inPath': path, 'inSize': size, 'inStartPos': startPos})
# Short circuit specified a zero size.
if size != None and size <= 0:
return None
# Prepare the object request specifying the HTTP Range read as required
# by the input parameters.
objectKey = Key(self.bucket)
objectKey.key = path.lstrip(PATH_SEP)
headers = dict()
if startPos or size:
startPos = startPos if startPos else 0
endPos = startPos + size - 1 if size else ''
headers['Range'] = "bytes=" \
+ str(startPos) + '-' + str(endPos)
retval = None
try:
# Got retrieve the content.
retval = objectKey.get_contents_as_string(headers)
except (S3ResponseError) as e:
if e.error_code == "InvalidRange":
# Trying to read after EOF, just fall through and return None.
pass
else:
raise
return retval
"""
Create an object specified by the path from the content pointed to by the file pointer.
:param path - S3 path of object to create.
:param fp - File Pointer to read content from. Must be open with 'rb' mode.
"""
def write(self, path, fp):
self.logger.notice(_('ENTERING: s3http.write(%(inPath)s, ...)'),
{'inPath': path})
newKey = Key(self.bucket)
newKey.key = path.lstrip(PATH_SEP)
newKey.set_contents_from_file(fp)
"""
Create an object specified by the path from the string content.
:param path - S3 path of object to create.
:param fp - String of content write to object.
"""
def write_from_string(self, path, content):
self.logger.notice(_('ENTERING: s3http.write_from_string(%(inPath)s, ...)'),
{'inPath': path})
newKey = Key(self.bucket)
newKey.key = path.lstrip(PATH_SEP)
newKey.set_contents_from_string(content)
"""
Write the user metadata to the already existing S3 object. If any metadata
specified is S3 system metadata it will not be written as user metadata.
:param path - S3 path of object to write metadata to.
:param metadata - dictionary of metadata to write
"""
def write_metadata(self, path, metadata):
self.logger.notice(_('ENTERING: s3http.write_metadata(%(inPath)s, ...)'),
{'inPath': path})
objectKey = self.bucket.get_key(path.lstrip(PATH_SEP))
if not objectKey:
raise DiskFileNotExist('S3 Object does not exist when attempting write object metadata: ' + path.lstrip(PATH_SEP))
for k,v in metadata.iteritems():
if k not in S3OBJECT_SYSTEM_META:
objectKey.set_metadata(k.lower(), str(v))
# Special Case for HCP. It does not maintain Content-Type
# So need to record it as user metadata
if 'Content-Type' in metadata:
objectKey.set_metadata('x-content-type', metadata['Content-Type'])
objectKey.copy(objectKey.bucket.name, objectKey.name, objectKey.metadata, preserve_acl=True)
"""
Read the metadata from the existing S3 object. Any system metadata will also
be returned in the dictionary as well.
:param path - S3 path to read the metadata from.
"""
def read_metadata(self, path):
self.logger.notice(_('ENTERING: s3http.read_metadata(%(inPath)s)'),
{'inPath': path})
objectKey = self.bucket.get_key(path.lstrip(PATH_SEP))
if not objectKey:
raise DiskFileNotExist('S3 Object does not exist when attempting read object metadata: ' + path.lstrip(PATH_SEP))
metadata = objectKey.metadata
# S3 headers are case-insensitive Need some translations
if 'x-timestamp' in metadata:
metadata['X-Timestamp'] = metadata['x-timestamp']
del metadata['x-timestamp']
if 'x-delete-at' in metadata:
metadata['X-Delete-At'] = metadata['x-delete-at']
del metadata['x-delete-at']
metadata['ETag'] = objectKey.etag.lstrip('"').rstrip('"')
metadata['Content-Length'] = objectKey.size
# Special Case for HCP. It does not maintain Content-Type
# So it is recorded as user metadata
if 'x-content-type' in metadata:
metadata['Content-Type'] = metadata['x-content-type']
del metadata['x-content-type']
else:
metadata['Content-Type'] = objectKey.content_type
if objectKey.content_encoding:
metadata['Content-Encoding'] = objectKey.content_encoding
return metadata
def copy(self, inSrc, inDst):
self.logger.notice(_('ENTERING: s3http.copy(%(inSrc)s, %(inDst)s)'),
{'inSrc': inSrc, 'inDst': inDst})
srcPath = inSrc.lstrip(PATH_SEP)
dstPath = inDst.lstrip(PATH_SEP)
srcObjectKey = self.bucket.get_key(srcPath)
if not srcObjectKey:
self.logger.error(_('ERROR: Requested copy of non-existent file "%(inSrc)s" '), {'inSrc': srcPath})
raise DiskFileNotExist('S3 Object does not exist when copy object: ' + srcPath)
if self.exists(inDst):
self.logger.error(_('ERROR: Requested copy to existing file "%(inDst)s" '), {'inDst': dstPath})
raise DiskFileCollision('S3 Object copy destination already exists : ' + inDst)
srcObjectKey.copy(srcObjectKey.bucket.name, inDst, srcObjectKey.metadata, preserve_acl=True)
"""
Invalidates the hash for a suffix_dir in the partition's hashes file.
:param suffix_dir: absolute path to suffix dir whose hash needs
invalidating
"""
def invalidate_hash(self, suffix_dir):
self.logger.notice(_('ENTERING: s3http.invalidate_hash(%(inSuffix_Dir)s)'),
{'inSuffix_Dir': suffix_dir})
# Just to make the code a little bit easier to read
suffix = self.basename(suffix_dir)
partition_dir = self.dirname(suffix_dir)
hashes_file = self.join(partition_dir, HASH_FILE)
with lock_path(partition_dir, timeout=self.timeout):
try:
pickleInfo = self.read(hashes_file)
hashes = pickle.loads(pickleInfo)
if suffix in hashes and not hashes[suffix]:
return
except Exception as err:
# Just ignore NotFound errors when trying to read. Nothing to invalidate
if isinstance(err, S3ResponseError) and err.error_code == "NoSuchKey":
return
self.logger.error(_('ERROR: Failure trying to invalidate_hash in file "%(hashfile)s" '), {'hashfile': hashes_file})
log_traceback(self.logger)
return
hashes[suffix] = None
self.write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
self.logger.notice(_('RELEASING lock on folder "%(folder)s '), {'folder': partition_dir})
"""
Performs reclamation and returns an md5 of all (remaining) files.
:param reclaim_age: age in seconds at which to remove tombstones
:raises PathNotDir: if given path is not a valid directory
:raises OSError: for non-ENOTDIR errors
"""
def hash_suffix(self, path, reclaim_age):
self.logger.notice(_('ENTERING: s3http.hash_suffix(%(inPath)s, %(inAge)s)'),
{'inPath': path, 'inAge': reclaim_age})
md5 = hashlib.md5()
if not self.exists(path.rstrip(PATH_SEP) + PATH_SEP):
raise PathNotDir()
# Take the file content in the suffix dir and make unique object
# hash list that is sorted.
path_obj_list = set()
try:
for x in self.listdir(path, filesOnly=True):
path_obj_list.add(x.split('-')[0])
path_obj_list = sorted(path_obj_list)
except S3ResponseError, err:
if err.error_code == "NoSuchKey":
raise PathNotDir()
raise
for hsh in path_obj_list:
hsh_path = self.join(path, hsh)
try:
files = self.listfiles(hsh_path)
except S3ResponseError, err:
if err.error_code == "NoSuchKey":
# TODO Need some logging.
# logging.exception(
# _('Quarantined %s to %s because it is not a directory') %
# (hsh_path, quar_path))
continue
raise
if len(files) == 1:
if files[0].endswith('.ts'):
# remove tombstones older than reclaim_age
ts = files[0].split('-')[1].rsplit('.', 1)[0]
if (time.time() - float(ts)) > reclaim_age:
self.unlink(self.join(hsh_path, files[0]))
files.remove(files[0])
elif files:
files.sort(reverse=True)
meta = data = tomb = None
for filename in list(files):
if not meta and filename.endswith('.meta'):
meta = filename
if not data and filename.endswith('.data'):
data = filename
if not tomb and filename.endswith('.ts'):
tomb = filename
if (filename < tomb or # any file older than tomb
filename < data or # any file older than data
(filename.endswith('.meta') and
filename < meta)): # old meta
self.unlink(self.join(hsh_path, filename))
files.remove(filename)
for filename in files:
md5.update(filename)
try:
self.rmdir(path)
except S3ResponseError:
pass
return md5.hexdigest()
"""
Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
the hash cache for suffix existence at the (unexpectedly high) cost of a
listdir. reclaim_age is just passed on to hash_suffix.
:param partition_dir: absolute path of partition to get hashes for
:param recalculate: list of suffixes which should be recalculated when got
:param do_listdir: force existence check for all hashes in the partition
:param reclaim_age: age at which to remove tombstones
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
"""
def get_hashes(self, partition_dir, recalculate=None, do_listdir=False,
reclaim_age=ONE_WEEK):
self.logger.notice(_('ENTERING: s3http.get_hashes(%(inPartDir)s, ...)'),
{'inPartDir': partition_dir})
hashed = 0
hashes_file = self.join(partition_dir, HASH_FILE)
modified = False
force_rewrite = False
hashes = {}
mtime = -1
if recalculate is None:
recalculate = []
try:
hashes = pickle.loads(self.read(hashes_file))
mtime = self.getmtime(hashes_file)
except Exception:
do_listdir = True
force_rewrite = True
if do_listdir:
for suff in self.listdir(partition_dir):
if len(suff) == 4:
hashes.setdefault(suff, None)
modified = True
hashes.update((hash_, None) for hash_ in recalculate)
for suffix, hash_ in hashes.items():
if not hash_:
suffix_dir = self.join(partition_dir, suffix)
try:
hashes[suffix] = self.hash_suffix(suffix_dir, reclaim_age)
hashed += 1
except PathNotDir:
del hashes[suffix]
except Exception:
log_traceback(self.logger)
pass
modified = True
if modified:
with lock_path(partition_dir, timeout=self.timeout):
if force_rewrite or not self.exists(hashes_file) or \
self.getmtime(hashes_file) == mtime:
self.write_pickle(
hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
return hashed, hashes
return self.get_hashes(partition_dir, recalculate, do_listdir,
reclaim_age)
else:
return hashed, hashes
"""
Ensure that a pickle file gets written to disk. The file
is first written to a tmp location, ensure it is synced to disk, then
perform a move to its final location
:param obj: python object to be pickled
:param dest: path of final destination file
:param tmp: path to tmp to use, defaults to None
:param pickle_protocol: protocol to pickle the obj with, defaults to 0
"""
def write_pickle(self, obj, dest, tmp=None, pickle_protocol=0):
self.logger.notice(_('ENTERING: s3http.write_pickle(%(inObj)s, %(inDest)s, ...)'),
{'inObj': obj, 'inDest': dest})
pkl_value = pickle.dumps(obj, pickle_protocol)
# Some S3 targets are WORM e.g. HCP
if self.exists(dest, exactMatch=True):
self.unlink(dest)
try:
self.write_from_string(dest, pkl_value)
except Exception:
self.logger.error(_('ERROR: Failure trying to write_pickle in file "%(hashfile)s '), {'hashfile': dest})
log_traceback(self.logger)
raise
# Copyright (c) 2013 Hitachi Data Systems, Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Swift S3 Object Interface for Swift Object Server"""
import os
import time
import hashlib
from gettext import gettext as _
from os.path import exists, join
from tempfile import mkstemp
from contextlib import contextmanager
from swift.common.utils import mkdirs, normalize_timestamp, \
hash_path, fallocate, fsync, \
fdatasync, drop_buffer_cache, ThreadPool
from swift.common.exceptions import DiskFileCollision, DiskFileNoSpace, \
DiskFileNotExist
from swift.common.swob import multi_range_iterator
from swift.common.utils import log_traceback
class S3Writer(object):
"""
Encapsulation of the write context for servicing PUT REST API
requests. Serves as the context manager object for DiskFile's writer()
method.
"""
def __init__(self, s3object, fd, tmppath, threadpool):
self.s3object = s3object
self.logger = s3object.logger
self.s3_http_wrapper = s3object.s3_http_wrapper
self.fd = fd
self.tmppath = tmppath
self.upload_size = 0
self.last_sync = 0
self.threadpool = threadpool
self.logger.notice("ENTERING: s3object.S3Writer.__init__(...)")
def write(self, chunk):
"""
Write a chunk of data into the temporary file.
:param chunk: the chunk of data to write as a string object
"""
def _write_entire_chunk(chunk):
while chunk:
written = os.write(self.fd, chunk)
self.upload_size += written
chunk = chunk[written:]
self.logger.notice("ENTERING: s3object.S3Writer.write(...)")
self.s3object.hasWritableData = True
#TODO: Can't use the thread pool for some reason...
#self.threadpool.run_in_thread(_write_entire_chunk, chunk)
_write_entire_chunk(chunk)
# For large files sync every 512MB (by default) written
diff = self.upload_size - self.last_sync
if diff >= self.s3object.bytes_per_sync:
#TODO: Can't use the thread pool for some reason
#self.threadpool.force_run_in_thread(fdatasync, self.fd)
fdatasync(self.fd)
drop_buffer_cache(self.fd, self.last_sync, diff)
self.last_sync = self.upload_size
def put(self, metadata, extension='.data'):
"""
Finalize writing the file on disk, and renames it from the temp file
to the real location. This should be called after the data has been
written to the temp file.
:param metadata: dictionary of metadata to be written
:param extension: extension to be used when making the file
"""
self.logger.notice("ENTERING: s3object.S3Writer.put(...)")
def finalize_put():
self.logger.notice("ENTERING: s3object.S3Writer.put.finalize_put(...)")
objectName = self.s3object.objectnamebase + '-' + timestamp + extension
try:
# Just to make the code a little bit easier to read.
s3io = self.s3_http_wrapper
# We call fsync() before calling drop_cache() to lower the amount
# of redundant work the drop cache code will perform on the pages
# (now that after fsync the pages will be all clean).
fsync(self.fd)
# From the Department of the Redundancy Department, make sure
# we call drop_cache() after fsync() to avoid redundant work
# (pages all clean).
drop_buffer_cache(self.fd, 0, self.upload_size)
s3io.invalidate_hash(s3io.dirname(self.s3object.objectnamebase))
fp = None
if self.s3object.hasWritableData:
fp = open(self.tmppath)
s3io.write(objectName, fp)
fp.close()
else:
# Might need to create an empty object to attach metadata to.
if not s3io.exists(objectName):
srcObjectName = None
files = sorted(self.s3_http_wrapper.listfiles(self.s3object.objectnamebase), reverse=True)
for afile in files:
if afile.endswith('.ts') and objectName.endswith('.ts'):
srcObjectName = s3io.join(self.s3object.datadir, afile)
break
if afile.endswith('.data') and objectName.endswith('.data'):
srcObjectName = s3io.join(self.s3object.datadir, afile)
break
if srcObjectName:
s3io.copy(srcObjectName, objectName)
else:
s3io.write_from_string(objectName, '')
s3io.write_metadata(objectName, metadata)
except Exception:
self.logger.error(_('ERROR: Failed to write content to S3 object %(name)s '),
{'name': objectName})
log_traceback(self.logger)
raise
if not self.tmppath:
raise ValueError("tmppath is unusable.")
timestamp = normalize_timestamp(metadata['X-Timestamp'])
metadata['name'] = self.s3object.name
# TODO:
# Can't use the force_run_in_thread for some reason. It gets completely wedged.
# and honestly, can't really tell what help it is since it waits until it
# is completely anyway, I think....
#self.threadpool.force_run_in_thread(finalize_put)
finalize_put()
self.s3object.metadata = metadata
class S3Object(object):
"""
Manage object files on S3 Target.
:param path: path to devices on the node
:param device: device name
:param partition: partition on the device the object lives in
:param account: account name for the object
:param container: container name for the object
:param obj: object name for the object
:param keep_data_fp: if True, don't close the fp, otherwise close it
:param disk_chunk_size: size of chunks on file reads
:param bytes_per_sync: number of bytes between fdatasync calls
:param iter_hook: called when __iter__ returns a chunk
:param threadpool: thread pool in which to do blocking operations
:raises DiskFileCollision: on md5 collision
"""
def __init__(self, s3_http_wrapper, path, device, partition, account, container, obj,
logger, keep_data_fp=False, disk_chunk_size=65536,
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
threadpool=None, obj_dir='objects', mount_check=False, tmpdir=None):
self.s3_http_wrapper = s3_http_wrapper
self.disk_chunk_size = disk_chunk_size
self.bytes_per_sync = bytes_per_sync
self.iter_hook = iter_hook
self.name = '/' + '/'.join((account, container, obj))
name_hash = hash_path(account, container, obj)
self.datadir = self.s3_http_wrapper.join(
path, device, obj_dir, partition, name_hash[-4:])
self.objectnamebase = self.s3_http_wrapper.join(self.datadir, name_hash)
self.device_path = join(path, device)
self.tmpdir = tmpdir if tmpdir else join(path, device, 'tmp')
self.logger = logger
self.metadata = {}
self.data_file = None
self.iter_etag = None
self.started_at_0 = False
self.tell_pos = 0
self.read_to_eof = False
self.threadpool = threadpool or ThreadPool(nthreads=0)
self.hasWritableData = False
self.logger.notice(_("ENTERING: s3object.S3Object.__init__(... name='%(name)s' )"),
{'name': self.name})
if not self.s3_http_wrapper.exists(self.objectnamebase, exactMatch=False):
return
files = sorted(self.s3_http_wrapper.listfiles(self.objectnamebase), reverse=True)
for afile in files:
if afile.endswith('.ts'):
self.data_file = None
self.metadata = self.s3_http_wrapper.read_metadata(self.s3_http_wrapper.join(self.datadir, afile))
self.metadata['deleted'] = True
break
if afile.endswith('.data') and not self.data_file:
self.data_file = self.s3_http_wrapper.join(self.datadir, afile)
break
if not self.data_file:
return
self.metadata = self.s3_http_wrapper.read_metadata(self.data_file)
if 'name' in self.metadata:
if self.metadata['name'] != self.name:
self.logger.error(_('Client path %(client)s does not match '
'path stored in object metadata %(meta)s'),
{'client': self.name,
'meta': self.metadata['name']})
raise DiskFileCollision('Client path does not match path '
'stored in object metadata')
def __iter__(self):
"""Returns an iterator over the data file."""
self.logger.notice(_("ENTERING: s3object.S3Object.__iter__(... name='%(name)s' )"),
{'name': self.name})
try:
read = 0
self.started_at_0 = False
self.read_to_eof = False
if self.tell_pos == 0:
self.started_at_0 = True
self.iter_etag = hashlib.md5()
while True:
#TODO: Can't use thread pool for some reason
#chunk = self.threadpool.run_in_thread(
# self.s3_http_wrapper.read, self.data_file, self.disk_chunk_size, self.tell_pos)
chunk = self.s3_http_wrapper.read(self.data_file, self.disk_chunk_size, self.tell_pos)
if chunk:
if self.iter_etag:
self.iter_etag.update(chunk)
read += len(chunk)
self.tell_pos += len(chunk)
yield chunk
if self.iter_hook:
self.iter_hook()
else:
self.read_to_eof = True
break
finally:
pass
def app_iter_range(self, start, stop):
"""Returns an iterator over the data file for range (start, stop)"""
self.logger.notice(_("ENTERING: s3object.S3Object.app_iter_range(... name='%(name)s' )"),
{'name': self.name})
if start or start == 0:
self.tell_pos = start
if stop is not None:
length = stop - start
else:
length = None
try:
for chunk in self:
if length is not None:
length -= len(chunk)
if length < 0:
# Chop off the extra:
yield chunk[:length]
break
yield chunk
finally:
pass
def app_iter_ranges(self, ranges, content_type, boundary, size):
"""Returns an iterator over the data file for a set of ranges"""
self.logger.notice(_("ENTERING: s3object.S3Object.app_iter_ranges(... name='%(name)s' )"),
{'name': self.name})
if not ranges:
yield ''
else:
try:
for chunk in multi_range_iterator(
ranges, content_type, boundary, size,
self.app_iter_range):
yield chunk
finally:
pass
def is_deleted(self):
"""
Check if the file is deleted.
:returns: True if the file doesn't exist or has been flagged as
deleted.
"""
return not self.data_file or 'deleted' in self.metadata
def is_expired(self):
"""
Check if the file is expired.
:returns: True if the file has an X-Delete-At in the past
"""
return ('X-Delete-At' in self.metadata and
int(self.metadata['X-Delete-At']) <= time.time())
@contextmanager
def writer(self, size=None):
"""
Context manager to write a file. We create a temporary file first, and
then return a S3Writer object to encapsulate the state.
:param size: optional initial size of file to explicitly allocate on
disk
:raises DiskFileNoSpace: if a size is specified and allocation fails
"""
self.logger.notice(_("ENTERING: s3object.S3Object.writer(... name='%(name)s' )"),
{'name': self.name})
# Only set this object as having writable data if at least one time
# a writer has been requested with a size.
self.hasWritableData = True if size else self.hasWritableData
if not exists(self.tmpdir):
mkdirs(self.tmpdir)
fd, tmppath = mkstemp(dir=self.tmpdir)
try:
if size is not None and size > 0:
try:
fallocate(fd, size)
except OSError:
raise DiskFileNoSpace(_(
'Failed to reserve local scratch space for object in folder %(folder)s '),
{'folder': self.tmpdir})
yield S3Writer(self, fd, tmppath, self.threadpool)
finally:
try:
os.close(fd)
except OSError:
pass
try:
os.unlink(tmppath)
except OSError:
pass
def put_metadata(self, metadata, tombstone=False):
"""
Short hand for putting metadata onto .data and .ts files.
:param metadata: dictionary of metadata to be written
:param tombstone: whether or not we are writing a tombstone
"""
self.logger.notice(_("ENTERING: s3object.S3Object.put_metadata(... name='%(name)s' )"),
{'name': self.name})
extension = '.ts' if tombstone else '.data'
with self.writer() as writer:
writer.put(metadata, extension=extension)
def unlinkold(self, timestamp):
"""
Remove any older versions of the object file. Any file that has an
older timestamp than timestamp will be deleted.
:param timestamp: timestamp to compare with each file
"""
self.logger.notice(_("ENTERING: s3object.S3Object.unlinkold(... name='%(name)s' )"),
{'name': self.name})
timestamp = normalize_timestamp(timestamp)
def _unlinkold():
for fname in self.s3_http_wrapper.listfiles(self.objectnamebase):
if fname.split('-')[1] < timestamp:
self.s3_http_wrapper.unlink(self.s3_http_wrapper.join(self.datadir, fname))
#TODO: Can't use thread pool for some reason.
#self.threadpool.run_in_thread(_unlinkold)
_unlinkold()
# This function is a no-op and just for interface compatibility
def quarantine(self):
pass
# This function is a no-op and just for interface compatibility
def close(self):
pass
def get_data_file_size(self):
"""
Returns the size read from metadata. Raises an exception if
self.data_file does not exist.
:returns: file size as an int
"""
if self.data_file:
return int(self.metadata['Content-Length'])
else:
raise DiskFileNotExist()
# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Object Server for Swift """
from __future__ import with_statement
import cPickle as pickle
import os
import time
import traceback
from collections import defaultdict
from datetime import datetime
from gettext import gettext as _
from hashlib import md5
from urllib import unquote
from eventlet import sleep, Timeout
from swift.common.utils import mkdirs, normalize_timestamp, public, \
hash_path, split_path, get_logger, write_pickle, \
config_true_value, validate_device_partition, timing_stats, \
ThreadPool, replication, log_traceback
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_utf8
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace, \
DiskFileDeviceUnavailable
from swift.common.http import is_success
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPInternalServerError, HTTPNoContent, HTTPNotFound, HTTPNotModified, \
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, UTC, \
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \
HTTPConflict
from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFile, \
get_hashes
from swift.obj.s3object import S3Object
from boto.s3.connection import S3Connection
from swift.obj.s3http import S3Http
from boto.exception import S3ResponseError
DATADIR = 'objects'
ASYNCDIR = 'async_pending'
MAX_OBJECT_NAME_LENGTH = 1024
def _parse_path(request, minsegs=5, maxsegs=5):
"""
Utility function to split and validate the request path.
:returns: result of split_path if everything's okay
:raises: HTTPBadRequest if something's not okay
"""
try:
segs = split_path(unquote(request.path), minsegs, maxsegs, True)
validate_device_partition(segs[0], segs[1])
return segs
except ValueError as err:
raise HTTPBadRequest(body=str(err), request=request,
content_type='text/plain')
class ObjectController(object):
"""Implements the WSGI application for the Swift Object Server."""
def __init__(self, conf):
"""
Creates a new WSGI application for the Swift Object Server. An
example configuration is given at
<source-dir>/etc/object-server.conf-sample or
/etc/swift/object-server.conf-sample.
"""
self.logger = get_logger(conf, log_route='object-server')
self.devices = conf.get('devices', '/srv/node/')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.node_timeout = int(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
self.network_chunk_size = int(conf.get('network_chunk_size', 65536))
self.keep_cache_size = int(conf.get('keep_cache_size', 5242880))
self.keep_cache_private = \
config_true_value(conf.get('keep_cache_private', 'false'))
self.log_requests = config_true_value(conf.get('log_requests', 'true'))
self.max_upload_time = int(conf.get('max_upload_time', 86400))
self.slow = int(conf.get('slow', 0))
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
replication_server = conf.get('replication_server', None)
if replication_server is not None:
replication_server = config_true_value(replication_server)
self.replication_server = replication_server
self.threads_per_disk = int(conf.get('threads_per_disk', '0'))
self.threadpools = defaultdict(
lambda: ThreadPool(nthreads=self.threads_per_disk))
default_allowed_headers = '''
content-disposition,
content-encoding,
x-delete-at,
x-object-manifest,
x-static-large-object,
'''
extra_allowed_headers = [
header.strip().lower() for header in conf.get(
'allowed_headers', default_allowed_headers).split(',')
if header.strip()
]
self.allowed_headers = set()
for header in extra_allowed_headers:
if header not in DATAFILE_SYSTEM_META:
self.allowed_headers.add(header)
self.expiring_objects_account = \
(conf.get('auto_create_account_prefix') or '.') + \
'expiring_objects'
self.expiring_objects_container_divisor = \
int(conf.get('expiring_objects_container_divisor') or 86400)
self.storage_type = (conf.get('storage_type') or 'DiskFile')
if self.storage_type == 'S3Object':
# TODO: Should validate configuration has the required information and fail gracefully.
self.s3_local_tmpdir = (conf.get('s3_local_tmpdir') or '/tmp')
self.s3_connection = S3Connection(conf.get('s3_access_key'),
conf.get('s3_secret_key'),
host=conf.get('s3_endpoint'))
self.s3_bucket = self.s3_connection.get_bucket(conf.get('s3_bucket'))
self.s3_http_wrapper = S3Http(self.s3_bucket,
self.logger,
timeout=int(conf.get('s3_lock_timeout', '10')))
def _diskfile(self, device, partition, account, container, obj, **kwargs):
"""Utility method for instantiating a DiskFile."""
kwargs.setdefault('mount_check', self.mount_check)
kwargs.setdefault('bytes_per_sync', self.bytes_per_sync)
kwargs.setdefault('disk_chunk_size', self.disk_chunk_size)
kwargs.setdefault('threadpool', self.threadpools[device])
kwargs.setdefault('obj_dir', DATADIR)
return DiskFile(self.devices, device, partition, account,
container, obj, self.logger, **kwargs) \
if self.storage_type != 'S3Object' \
else S3Object(self.s3_http_wrapper, self.devices,
device, partition, account,
container, obj, self.logger,
tmpdir=self.s3_local_tmpdir,
**kwargs)
def async_update(self, op, account, container, obj, host, partition,
contdevice, headers_out, objdevice):
"""
Sends or saves an async update.
:param op: operation performed (ex: 'PUT', or 'DELETE')
:param account: account name for the object
:param container: container name for the object
:param obj: object name
:param host: host that the container is on
:param partition: partition that the container is on
:param contdevice: device name that the container is on
:param headers_out: dictionary of headers to send in the container
request
:param objdevice: device name that the object is in
"""
headers_out['user-agent'] = 'obj-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)
if all([host, partition, contdevice]):
try:
with ConnectionTimeout(self.conn_timeout):
ip, port = host.rsplit(':', 1)
conn = http_connect(ip, port, contdevice, partition, op,
full_path, headers_out)
with Timeout(self.node_timeout):
response = conn.getresponse()
response.read()
if is_success(response.status):
return
else:
self.logger.error(_(
'ERROR Container update failed '
'(saving for async update later): %(status)d '
'response from %(ip)s:%(port)s/%(dev)s'),
{'status': response.status, 'ip': ip, 'port': port,
'dev': contdevice})
except (Exception, Timeout):
self.logger.exception(_(
'ERROR container update failed with '
'%(ip)s:%(port)s/%(dev)s (saving for async update later)'),
{'ip': ip, 'port': port, 'dev': contdevice})
async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)
ohash = hash_path(account, container, obj)
self.logger.increment('async_pendings')
self.threadpools[objdevice].run_in_thread(
write_pickle,
{'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out},
os.path.join(async_dir, ohash[-3:], ohash + '-' +
normalize_timestamp(headers_out['x-timestamp'])),
os.path.join(self.devices, objdevice, 'tmp'))
def container_update(self, op, account, container, obj, request,
headers_out, objdevice):
"""
Update the container when objects are updated.
:param op: operation performed (ex: 'PUT', or 'DELETE')
:param account: account name for the object
:param container: container name for the object
:param obj: object name
:param request: the original request object driving the update
:param headers_out: dictionary of headers to send in the container
request(s)
:param objdevice: device name that the object is in
"""
headers_in = request.headers
conthosts = [h.strip() for h in
headers_in.get('X-Container-Host', '').split(',')]
contdevices = [d.strip() for d in
headers_in.get('X-Container-Device', '').split(',')]
contpartition = headers_in.get('X-Container-Partition', '')
if len(conthosts) != len(contdevices):
# This shouldn't happen unless there's a bug in the proxy,
# but if there is, we want to know about it.
self.logger.error(_('ERROR Container update failed: different '
'numbers of hosts and devices in request: '
'"%s" vs "%s"' %
(headers_in.get('X-Container-Host', ''),
headers_in.get('X-Container-Device', ''))))
return
if contpartition:
updates = zip(conthosts, contdevices)
else:
updates = []
headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
headers_out['referer'] = request.as_referer()
for conthost, contdevice in updates:
self.async_update(op, account, container, obj, conthost,
contpartition, contdevice, headers_out,
objdevice)
def delete_at_update(self, op, delete_at, account, container, obj,
request, objdevice):
"""
Update the expiring objects container when objects are updated.
:param op: operation performed (ex: 'PUT', or 'DELETE')
:param account: account name for the object
:param container: container name for the object
:param obj: object name
:param request: the original request driving the update
:param objdevice: device name that the object is in
"""
# Quick cap that will work from now until Sat Nov 20 17:46:39 2286
# At that time, Swift will be so popular and pervasive I will have
# created income for thousands of future programmers.
delete_at = max(min(delete_at, 9999999999), 0)
updates = [(None, None)]
partition = None
hosts = contdevices = [None]
headers_in = request.headers
headers_out = HeaderKeyDict({
'x-timestamp': headers_in['x-timestamp'],
'x-trans-id': headers_in.get('x-trans-id', '-'),
'referer': request.as_referer()})
if op != 'DELETE':
delete_at_container = headers_in.get('X-Delete-At-Container', None)
if not delete_at_container:
self.logger.warning(
'X-Delete-At-Container header must be specified for '
'expiring objects background %s to work properly. Making '
'best guess as to the container name for now.' % op)
# TODO(gholt): In a future release, change the above warning to
# a raised exception and remove the guess code below.
delete_at_container = str(
delete_at / self.expiring_objects_container_divisor *
self.expiring_objects_container_divisor)
partition = headers_in.get('X-Delete-At-Partition', None)
hosts = headers_in.get('X-Delete-At-Host', '')
contdevices = headers_in.get('X-Delete-At-Device', '')
updates = [upd for upd in
zip((h.strip() for h in hosts.split(',')),
(c.strip() for c in contdevices.split(',')))
if all(upd) and partition]
if not updates:
updates = [(None, None)]
headers_out['x-size'] = '0'
headers_out['x-content-type'] = 'text/plain'
headers_out['x-etag'] = 'd41d8cd98f00b204e9800998ecf8427e'
else:
# DELETEs of old expiration data have no way of knowing what the
# old X-Delete-At-Container was at the time of the initial setting
# of the data, so a best guess is made here.
# Worst case is a DELETE is issued now for something that doesn't
# exist there and the original data is left where it is, where
# it will be ignored when the expirer eventually tries to issue the
# object DELETE later since the X-Delete-At value won't match up.
delete_at_container = str(
delete_at / self.expiring_objects_container_divisor *
self.expiring_objects_container_divisor)
for host, contdevice in updates:
self.async_update(
op, self.expiring_objects_account, delete_at_container,
'%s-%s/%s/%s' % (delete_at, account, container, obj),
host, partition, contdevice, headers_out, objdevice)
@public
@timing_stats()
def POST(self, request):
"""Handle HTTP POST requests for the Swift Object Server."""
#TODO
#TODO: Still does not appear that this is called even with metadata update
#TODO: This is the only method that calls diskfile.put_metadata with the
#TODO: for the .meta file and put_metdata really is not implemented correctly.
device, partition, account, container, obj = _parse_path(request)
if 'x-timestamp' not in request.headers or \
not check_float(request.headers['x-timestamp']):
return HTTPBadRequest(body='Missing timestamp', request=request,
content_type='text/plain')
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
if new_delete_at and new_delete_at < time.time():
return HTTPBadRequest(body='X-Delete-At in past', request=request,
content_type='text/plain')
try:
disk_file = self._diskfile(device, partition, account, container,
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
orig_timestamp = disk_file.metadata.get('X-Timestamp', '0')
if orig_timestamp >= request.headers['x-timestamp']:
return HTTPConflict(request=request)
metadata = {'X-Timestamp': request.headers['x-timestamp']}
metadata.update(val for val in request.headers.iteritems()
if val[0].startswith('X-Object-Meta-'))
for header_key in self.allowed_headers:
if header_key in request.headers:
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0)
if old_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update('PUT', new_delete_at, account, container,
obj, request, device)
if old_delete_at:
self.delete_at_update('DELETE', old_delete_at, account,
container, obj, request, device)
disk_file.put_metadata(metadata)
return HTTPAccepted(request=request)
@public
@timing_stats()
def PUT(self, request):
"""Handle HTTP PUT requests for the Swift Object Server."""
device, partition, account, container, obj = _parse_path(request)
if 'x-timestamp' not in request.headers or \
not check_float(request.headers['x-timestamp']):
return HTTPBadRequest(body='Missing timestamp', request=request,
content_type='text/plain')
error_response = check_object_creation(request, obj)
if error_response:
return error_response
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
if new_delete_at and new_delete_at < time.time():
return HTTPBadRequest(body='X-Delete-At in past', request=request,
content_type='text/plain')
try:
fsize = request.message_length()
except ValueError as e:
return HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
try:
disk_file = self._diskfile(device, partition, account, container,
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0)
orig_timestamp = disk_file.metadata.get('X-Timestamp')
if orig_timestamp and orig_timestamp >= request.headers['x-timestamp']:
return HTTPConflict(request=request)
upload_expiration = time.time() + self.max_upload_time
etag = md5()
elapsed_time = 0
try:
with disk_file.writer(size=fsize) as writer:
reader = request.environ['wsgi.input'].read
for chunk in iter(lambda: reader(self.network_chunk_size), ''):
start_time = time.time()
if start_time > upload_expiration:
self.logger.increment('PUT.timeouts')
return HTTPRequestTimeout(request=request)
etag.update(chunk)
writer.write(chunk)
sleep()
elapsed_time += time.time() - start_time
upload_size = writer.upload_size
if upload_size:
self.logger.transfer_rate(
'PUT.' + device + '.timing', elapsed_time,
upload_size)
if fsize is not None and fsize != upload_size:
return HTTPClientDisconnect(request=request)
etag = etag.hexdigest()
if 'etag' in request.headers and \
request.headers['etag'].lower() != etag:
return HTTPUnprocessableEntity(request=request)
metadata = {
'X-Timestamp': request.headers['x-timestamp'],
'Content-Type': request.headers['content-type'],
'ETag': etag,
'Content-Length': str(upload_size),
}
metadata.update(val for val in request.headers.iteritems()
if val[0].lower().startswith('x-object-meta-')
and len(val[0]) > 14)
for header_key in self.allowed_headers:
if header_key in request.headers:
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
writer.put(metadata)
except DiskFileNoSpace, e:
return HTTPInsufficientStorage(drive=device, request=request)
except S3ResponseError, s3resp:
if s3resp.error_code == "MetadataTooLarge":
return HTTPBadRequest(body='Metadata exceeds allowed size', request=request,
content_type='text/plain')
else:
raise # Crud. Nothing we were expecting.
disk_file.unlinkold(metadata['X-Timestamp'])
if old_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update(
'PUT', new_delete_at, account, container, obj,
request, device)
if old_delete_at:
self.delete_at_update(
'DELETE', old_delete_at, account, container, obj,
request, device)
if not orig_timestamp or \
orig_timestamp < request.headers['x-timestamp']:
self.container_update(
'PUT', account, container, obj, request,
HeaderKeyDict({
'x-size': disk_file.metadata['Content-Length'],
'x-content-type': disk_file.metadata['Content-Type'],
'x-timestamp': disk_file.metadata['X-Timestamp'],
'x-etag': disk_file.metadata['ETag']}),
device)
resp = HTTPCreated(request=request, etag=etag)
return resp
@public
@timing_stats()
def GET(self, request):
"""Handle HTTP GET requests for the Swift Object Server."""
device, partition, account, container, obj = _parse_path(request)
try:
disk_file = self._diskfile(device, partition, account, container,
obj, keep_data_fp=True, iter_hook=sleep)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
if disk_file.is_deleted() or disk_file.is_expired():
if request.headers.get('if-match') == '*':
return HTTPPreconditionFailed(request=request)
else:
return HTTPNotFound(request=request)
try:
file_size = disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
if request.headers.get('if-match') not in (None, '*') and \
disk_file.metadata['ETag'] not in request.if_match:
disk_file.close()
return HTTPPreconditionFailed(request=request)
if request.headers.get('if-none-match') is not None:
if disk_file.metadata['ETag'] in request.if_none_match:
resp = HTTPNotModified(request=request)
resp.etag = disk_file.metadata['ETag']
disk_file.close()
return resp
try:
if_unmodified_since = request.if_unmodified_since
except (OverflowError, ValueError):
# catches timestamps before the epoch
return HTTPPreconditionFailed(request=request)
if if_unmodified_since and \
datetime.fromtimestamp(
float(disk_file.metadata['X-Timestamp']), UTC) > \
if_unmodified_since:
disk_file.close()
return HTTPPreconditionFailed(request=request)
try:
if_modified_since = request.if_modified_since
except (OverflowError, ValueError):
# catches timestamps before the epoch
return HTTPPreconditionFailed(request=request)
if if_modified_since and \
datetime.fromtimestamp(
float(disk_file.metadata['X-Timestamp']), UTC) < \
if_modified_since:
disk_file.close()
return HTTPNotModified(request=request)
response = Response(app_iter=disk_file,
request=request, conditional_response=True)
response.headers['Content-Type'] = disk_file.metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in disk_file.metadata.iteritems():
if key.lower().startswith('x-object-meta-') or \
key.lower() in self.allowed_headers:
response.headers[key] = value
response.etag = disk_file.metadata['ETag']
response.last_modified = float(disk_file.metadata['X-Timestamp'])
response.content_length = file_size
if response.content_length < self.keep_cache_size and \
(self.keep_cache_private or
('X-Auth-Token' not in request.headers and
'X-Storage-Token' not in request.headers)):
disk_file.keep_cache = True
if 'Content-Encoding' in disk_file.metadata:
response.content_encoding = disk_file.metadata['Content-Encoding']
response.headers['X-Timestamp'] = disk_file.metadata['X-Timestamp']
return request.get_response(response)
@public
@timing_stats(sample_rate=0.8)
def HEAD(self, request):
"""Handle HTTP HEAD requests for the Swift Object Server."""
device, partition, account, container, obj = _parse_path(request)
try:
disk_file = self._diskfile(device, partition, account, container,
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
file_size = disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
response = Response(request=request, conditional_response=True)
response.headers['Content-Type'] = disk_file.metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in disk_file.metadata.iteritems():
if key.lower().startswith('x-object-meta-') or \
key.lower() in self.allowed_headers:
response.headers[key] = value
response.etag = disk_file.metadata['ETag']
response.last_modified = float(disk_file.metadata['X-Timestamp'])
# Needed for container sync feature
response.headers['X-Timestamp'] = disk_file.metadata['X-Timestamp']
response.content_length = file_size
if 'Content-Encoding' in disk_file.metadata:
response.content_encoding = disk_file.metadata['Content-Encoding']
return response
@public
@timing_stats()
def DELETE(self, request):
"""Handle HTTP DELETE requests for the Swift Object Server."""
device, partition, account, container, obj = _parse_path(request)
if 'x-timestamp' not in request.headers or \
not check_float(request.headers['x-timestamp']):
return HTTPBadRequest(body='Missing timestamp', request=request,
content_type='text/plain')
try:
disk_file = self._diskfile(device, partition, account, container,
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
if 'x-if-delete-at' in request.headers and \
int(request.headers['x-if-delete-at']) != \
int(disk_file.metadata.get('X-Delete-At') or 0):
return HTTPPreconditionFailed(
request=request,
body='X-If-Delete-At and X-Delete-At do not match')
old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0)
if old_delete_at:
self.delete_at_update('DELETE', old_delete_at, account,
container, obj, request, device)
orig_timestamp = disk_file.metadata.get('X-Timestamp', 0)
req_timestamp = request.headers['X-Timestamp']
if disk_file.is_deleted() or disk_file.is_expired():
response_class = HTTPNotFound
else:
if orig_timestamp < req_timestamp:
response_class = HTTPNoContent
else:
response_class = HTTPConflict
if orig_timestamp < req_timestamp:
disk_file.put_metadata({'X-Timestamp': req_timestamp},
tombstone=True)
disk_file.unlinkold(req_timestamp)
self.container_update(
'DELETE', account, container, obj, request,
HeaderKeyDict({'x-timestamp': req_timestamp}),
device)
resp = response_class(request=request)
return resp
@public
@replication
@timing_stats(sample_rate=0.1)
def REPLICATE(self, request):
"""
Handle REPLICATE requests for the Swift Object Server. This is used
by the object replicator to get hashes for directories.
"""
if self.storage_type != 'S3Object':
device, partition, suffix = _parse_path(request, 2, 3)
if self.mount_check and not check_mount(self.devices, device):
return HTTPInsufficientStorage(drive=device, request=request)
path = os.path.join(self.devices, device, DATADIR, partition)
if not os.path.exists(path):
mkdirs(path)
suffixes = suffix.split('-') if suffix else []
_junk, hashes = self.threadpools[device].force_run_in_thread(
get_hashes, path, recalculate=suffixes)
else:
# Using S3 to store content.
# Use this instead of the _parse_path to avoid validating path
# on the local server.
try:
device, partition, suffix = split_path(unquote(request.path), 2, 3, True)
except ValueError as err:
raise HTTPBadRequest(body=str(err), request=request,
content_type='text/plain')
s3io = self.s3_http_wrapper # just for readabilty
path = s3io.join(self.devices, device, DATADIR, partition)
if not s3io.exists(path + "/"):
s3io.mkdirs(path)
suffixes = suffix.split('-') if suffix else []
_junk, hashes = self.threadpools[device].force_run_in_thread(
s3io.get_hashes, path, recalculate=suffixes)
return Response(body=pickle.dumps(hashes))
def __call__(self, env, start_response):
"""WSGI Application entry point for the Swift Object Server."""
start_time = time.time()
req = Request(env)
self.logger.txn_id = req.headers.get('x-trans-id', None)
if not check_utf8(req.path_info):
res = HTTPPreconditionFailed(body='Invalid UTF8 or contains NULL')
else:
try:
# disallow methods which have not been marked 'public'
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
replication_method = getattr(method, 'replication', False)
if (self.replication_server is not None and
self.replication_server != replication_method):
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()
else:
res = method(req)
except DiskFileCollision:
res = HTTPForbidden(request=req)
except HTTPException as error_response:
res = error_response
except (Exception, Timeout):
self.logger.exception(_(
'ERROR __call__ error with %(method)s'
' %(path)s '), {'method': req.method, 'path': req.path})
log_traceback(self.logger)
res = HTTPInternalServerError(body=traceback.format_exc())
trans_time = time.time() - start_time
if self.log_requests:
log_line = '%s - - [%s] "%s %s" %s %s "%s" "%s" "%s" %.4f' % (
req.remote_addr,
time.strftime('%d/%b/%Y:%H:%M:%S +0000',
time.gmtime()),
req.method, req.path, res.status.split()[0],
res.content_length or '-', req.referer or '-',
req.headers.get('x-trans-id', '-'),
req.user_agent or '-',
trans_time)
if req.method == 'REPLICATE':
self.logger.debug(log_line)
else:
self.logger.info(log_line)
if req.method in ('PUT', 'DELETE'):
slow = self.slow - trans_time
if slow > 0:
sleep(slow)
return res(env, start_response)
def app_factory(global_conf, **local_conf):
"""paste.deploy app factory for creating WSGI object server apps"""
conf = global_conf.copy()
conf.update(local_conf)
return ObjectController(conf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment