Skip to content

Instantly share code, notes, and snippets.

@amit-jain
Created October 18, 2016 12:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amit-jain/05bb106178258502f318e56fa2ae0ec0 to your computer and use it in GitHub Desktop.
Save amit-jain/05bb106178258502f318e56fa2ae0ec0 to your computer and use it in GitHub Desktop.
CloudDataStore implementation using jclouds 1.9.2
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.blob.cloud;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.options.CopyOptions;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.Thread.currentThread;
import static org.apache.jackrabbit.oak.blob.cloud.CloudUtils.configuredLocation;
import static org.apache.jackrabbit.oak.blob.cloud.CloudUtils.getBlobStore;
import static org.jclouds.Constants.PROPERTY_CREDENTIAL;
import static org.jclouds.Constants.PROPERTY_IDENTITY;
import static org.jclouds.blobstore.options.CopyOptions.builder;
import static org.jclouds.blobstore.options.ListContainerOptions.NONE;
import static org.jclouds.blobstore.options.PutOptions.Builder.multipart;
/**
* A data store backend that stores data on a cloud blob store.
*/
public class CloudBackend implements SharedBackend {
/**
* Logger instance.
*/
private static final Logger LOG = LoggerFactory.getLogger(CloudBackend.class);
public static final String DASH = "-";
private static final String KEY_PREFIX = "dataStore_";
private static final String META_KEY = "META";
private static final String META_KEY_PREFIX = "META/";
private String bucket;
private Properties properties;
private Date startTime;
private BlobStore blobStore;
public void init() throws DataStoreException {
LOG.debug("init");
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
startTime = new Date();
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
this.blobStore = getBlobStore(properties);
if (bucket == null || "".equals(bucket.trim())) {
bucket = properties.getProperty("bucket");
}
if (!blobStore.containerExists(bucket)) {
blobStore.createContainerInLocation(configuredLocation(properties), bucket);
LOG.info("Created bucket [{}] in [{}] ", bucket);
} else {
LOG.info("Using bucket [{}] in [{}] ", bucket);
}
String renameKeyProp = properties.getProperty("renameKeys");
boolean renameKeyBool = (renameKeyProp == null || "".equals(renameKeyProp)) ?
false :
Boolean.parseBoolean(renameKeyProp);
LOG.info("Rename keys [{}]", renameKeyBool);
if (renameKeyBool) {
renameKeys();
}
LOG.debug("Cloud backend initialized in [{}] ms",
(System.currentTimeMillis() - startTime.getTime()));
} catch (Exception e) {
LOG.debug(" error ", e);
Map<String, String> filteredMap = Maps.newHashMap();
if (properties != null) {
filteredMap =
Maps.filterKeys(Maps.fromProperties(properties), new Predicate<String>() {
@Override public boolean apply(String input) {
return !input.equals(PROPERTY_IDENTITY) && !input
.equals(PROPERTY_CREDENTIAL);
}
});
}
throw new DataStoreException("Could not initialize cloud blobstore from " + filteredMap,
e);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
/**
* It uploads file to the cloud blob store. If file size is greater than 5MB, this
* method uses parallel concurrent connections to upload.
*/
@Override
public void write(DataIdentifier identifier, File file) throws DataStoreException {
long start = System.currentTimeMillis();
String key = getKeyName(identifier);
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
try {
BlobMetadata metadata = blobStore.blobMetadata(bucket, key);
// check if the same record already exists
if (metadata != null) {
long l = metadata.getSize();
if (l != file.length()) {
throw new DataStoreException(
"Collision: " + key + " new length: " + file.length() + " old length: "
+ l);
}
LOG.debug("[{}]'s exists, lastmodified = [{}]", key,
metadata.getLastModified().getTime());
String eTag = blobStore.copyBlob(bucket, key, bucket, key,
builder().contentMetadata(metadata.getContentMetadata()).build());
LOG.debug("Blob [{}] already exists, refreshed with [{}]", key, eTag);
} else {
Blob blob = blobStore.blobBuilder(key).payload(file).build();
String eTag = blobStore.putBlob(bucket, blob, multipart());
LOG.debug("Blob [{}] with eTag [{}] ", key, eTag);
}
} catch (Exception e) {
throw new DataStoreException(e);
}
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
LOG.debug("write of [{}], length=[{}], in [{}]ms",
new Object[] {identifier, file.length(), (System.currentTimeMillis() - start)});
}
/**
* Check if record identified by identifier exists in the cloud blob store.
*/
@Override
public boolean exists(DataIdentifier identifier) throws DataStoreException {
long start = System.currentTimeMillis();
String key = getKeyName(identifier);
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
boolean exists = blobStore.blobExists(bucket, key);
LOG.trace("exists [{}]: [{}] took [{}] ms.", identifier, exists,
(System.currentTimeMillis() - start));
return exists;
} catch (Exception e) {
throw new DataStoreException(
"Error occured in checking existence for key [" + identifier.toString() + "]", e);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public InputStream read(DataIdentifier identifier) throws DataStoreException {
long start = System.currentTimeMillis();
String key = getKeyName(identifier);
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Blob blob = blobStore.getBlob(bucket, key);
if (blob == null) {
throw new DataStoreException("Object not found: " + key);
}
InputStream in = blob.getPayload().openStream();
LOG.debug("[{}] read took [{}]ms", identifier, (System.currentTimeMillis() - start));
return in;
} catch (Exception e) {
throw new DataStoreException("Error reading blob for [{}]: " + key, e);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
return new RecordsIterator<DataIdentifier>(new Function<StorageMetadata, DataIdentifier>() {
@Override public DataIdentifier apply(StorageMetadata input) {
return new DataIdentifier(getIdentifierName(input.getName()));
}
});
}
@Override
public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
long start = System.currentTimeMillis();
String key = getKeyName(identifier);
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
blobStore.removeBlob(bucket, key);
LOG.debug("Identifier [{}] deleted. It took [{}]ms.",
new Object[] {identifier, (System.currentTimeMillis() - start)});
} catch (Exception e) {
throw new DataStoreException("Could not delete dataIdentifier " + identifier, e);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public void close() {
blobStore.getContext().close();
LOG.info("CloudBackend closed.");
}
public String getBucket() {
return bucket;
}
public void setBucket(String bucket) {
this.bucket = bucket;
}
/**
* Properties used to configure the backend. If provided explicitly
* before init is invoked then these take precedence
*
* @param properties to configure Backend
*/
public void setProperties(Properties properties) {
this.properties = CloudUtils.getProperties(properties);
}
@Override
public void addMetadataRecord(final InputStream input, final String name)
throws DataStoreException {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
File temp = File.createTempFile("metadata", null);
temp.deleteOnExit();
FileUtils.copyInputStreamToFile(input, temp);
IOUtils.closeQuietly(input);
Blob blob = blobStore.blobBuilder(addMetaKeyPrefix(name)).payload(temp).build();
String eTag = blobStore.putBlob(bucket, blob, multipart());
LOG.debug("Uploaded [{}] with eTag [{}]", name, eTag);
} catch (Exception e) {
LOG.error("Error in uploading metadata [{}]", name, e);
throw new DataStoreException("Error in uploading", e);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public void addMetadataRecord(File input, String name) throws DataStoreException {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Blob blob = blobStore.blobBuilder(addMetaKeyPrefix(name)).payload(input).build();
String eTag = blobStore.putBlob(bucket, blob, multipart());
LOG.debug("Uploaded [{}] with eTag [{}]", name, eTag);
} catch (Exception e) {
LOG.error("Error in uploading metadata [{}]", name, e);
throw new DataStoreException("Error in uploading", e);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public DataRecord getMetadataRecord(String name) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
BlobMetadata metadata = blobStore.blobMetadata(bucket, addMetaKeyPrefix(name));
return new CloudDataRecord(blobStore, bucket, name,
metadata.getLastModified().getTime(), metadata.getSize(), true);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public List<DataRecord> getAllMetadataRecords(String prefix) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Iterator<DataRecord> iter =
new RecordsIterator<DataRecord>(new Function<StorageMetadata, DataRecord>() {
@Override public DataRecord apply(StorageMetadata input) {
return new CloudDataRecord(blobStore, bucket,
stripMetaKeyPrefix(input.getName()), input.getLastModified().getTime(),
input.getSize(), true);
}
}, true, prefix);
return Lists.newArrayList(iter);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public boolean deleteMetadataRecord(String name) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
blobStore.removeBlob(bucket, addMetaKeyPrefix(name));
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
return true;
}
@Override
public void deleteAllMetadataRecords(String prefix) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
List<String> deleteList = Lists.newArrayList();
List<DataRecord> list = getAllMetadataRecords(prefix);
for (DataRecord rec : list) {
deleteList.add(META_KEY_PREFIX + rec.getIdentifier().toString());
}
if (deleteList.size() > 0) {
blobStore.removeBlobs(bucket, deleteList);
}
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
public Iterator<DataRecord> getAllRecords() {
return new RecordsIterator<DataRecord>(new Function<StorageMetadata, DataRecord>() {
@Override public DataRecord apply(StorageMetadata input) {
return new CloudDataRecord(blobStore, bucket, getIdentifierName(input.getName()),
input.getLastModified().getTime(), input.getSize());
}
});
}
@Override
public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
long start = System.currentTimeMillis();
String key = getKeyName(identifier);
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
BlobMetadata metadata = blobStore.blobMetadata(bucket, key);
if (metadata == null) {
throw new IOException("Key not found " + identifier.toString());
}
CloudDataRecord record = new CloudDataRecord(blobStore, bucket, identifier.toString(),
metadata.getLastModified().getTime(), metadata.getSize());
LOG.debug("Identifier [{}]'s getRecord = [{}] took [{}]ms.",
new Object[] {identifier, record, (System.currentTimeMillis() - start)});
return record;
} catch (Exception e) {
LOG.info("getRecord:Identifier [{}] not found. Took [{}] ms.", identifier,
(System.currentTimeMillis() - start));
throw new DataStoreException(e);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
/**
* Returns an iterator over the Cloud blobstore objects
*
* @param <T>
*/
class RecordsIterator<T> extends AbstractIterator<T> {
PageSet<? extends StorageMetadata> prevObjectListing;
Queue<StorageMetadata> queue;
long size;
Function<StorageMetadata, T> transformer;
boolean meta;
String prefix;
ListContainerOptions options;
public RecordsIterator(Function<StorageMetadata, T> transformer) {
queue = Lists.newLinkedList();
this.transformer = transformer;
options = NONE;
}
public RecordsIterator(Function<StorageMetadata, T> transformer, boolean meta,
String prefix) {
queue = Lists.newLinkedList();
this.transformer = transformer;
this.meta = meta;
this.prefix = prefix;
options = ListContainerOptions.Builder.inDirectory(META_KEY_PREFIX).maxResults(5);
}
@Override protected T computeNext() {
if (queue.isEmpty()) {
loadBatch();
}
if (!queue.isEmpty()) {
return transformer.apply(queue.remove());
}
return endOfData();
}
private boolean loadBatch() {
ClassLoader contextClassLoader = currentThread().getContextClassLoader();
long start = System.currentTimeMillis();
try {
currentThread().setContextClassLoader(getClass().getClassLoader());
// initialize the listing the first time
if (prevObjectListing == null) {
prevObjectListing = blobStore.list(bucket, options);
} else if (prevObjectListing.getNextMarker()
!= null) { //already initialized more objects available
prevObjectListing = blobStore
.list(bucket, options.afterMarker(prevObjectListing.getNextMarker()));
} else { // no more available
return false;
}
List<StorageMetadata> listing = Lists.newArrayList(Iterators
.filter(prevObjectListing.iterator(), new Predicate<StorageMetadata>() {
@Override public boolean apply(StorageMetadata input) {
if (!meta) {
return !input.getName().startsWith(META_KEY_PREFIX) && !input
.getName().startsWith(META_KEY);
} else {
return input.getName().startsWith(META_KEY_PREFIX + prefix);
}
}
}));
// After filtering no elements
if (listing.isEmpty()) {
return false;
}
size += listing.size();
queue.addAll(listing);
LOG.info("Loaded batch of size [{}] in [{}] ms.", listing.size(),
(System.currentTimeMillis() - start));
return true;
} catch (Exception e) {
LOG.warn("Could not list objects", e);
} finally {
if (contextClassLoader != null) {
currentThread().setContextClassLoader(contextClassLoader);
}
}
return false;
}
}
private static String addMetaKeyPrefix(String key) {
return META_KEY_PREFIX + key;
}
private static String stripMetaKeyPrefix(String name) {
if (name.startsWith(META_KEY_PREFIX)) {
return name.substring(META_KEY_PREFIX.length());
}
return name;
}
/**
* CloudDataRecord which lazily retrieves the input stream of the record.
*/
static class CloudDataRecord implements DataRecord {
private BlobStore blobStore;
private DataIdentifier identifier;
private long length;
private long lastModified;
private String bucket;
private boolean isMeta;
public CloudDataRecord(BlobStore blobStore, String bucket, String key, long lastModified,
long length) {
this(blobStore, bucket, key, lastModified, length, false);
}
public CloudDataRecord(BlobStore blobStore, String bucket, String key, long lastModified,
long length, boolean isMeta) {
this.blobStore = blobStore;
this.identifier = new DataIdentifier(key);
this.lastModified = lastModified;
this.length = length;
this.bucket = bucket;
this.isMeta = isMeta;
}
@Override public DataIdentifier getIdentifier() {
return identifier;
}
@Override public String getReference() {
return identifier.toString();
}
@Override public long getLength() throws DataStoreException {
return length;
}
@Override public InputStream getStream() throws DataStoreException {
String id = getKeyName(identifier);
if (isMeta) {
id = addMetaKeyPrefix(identifier.toString());
}
try {
return blobStore.getBlob(bucket, id).getPayload().openStream();
} catch (IOException e) {
throw new DataStoreException(e);
}
}
@Override public long getLastModified() {
return lastModified;
}
@Override public String toString() {
return "CloudDataRecord{" + "identifier=" + identifier + ", length=" + length
+ ", lastModified=" + lastModified + ", bucket='" + bucket + '\'' + '}';
}
}
/**
* This method rename object keys in CloudBlobStore concurrently. The number of
* concurrent threads is defined by 'maxConnections' property in
* aws.properties. As Cloud blobStore doesn't have "move" command, this method simulate
* move as copy object object to new key and then delete older key.
*/
private void renameKeys() throws DataStoreException {
long startTime = System.currentTimeMillis();
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
long count = 0;
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Iterator<String> iter =
new RecordsIterator<String>(new Function<StorageMetadata, String>() {
@Override public String apply(StorageMetadata input) {
return input.getName();
}
});
List<String> deleteList = new ArrayList<String>();
int nThreads = Integer.parseInt(properties.getProperty("maxConnections"));
ExecutorService executor = Executors
.newFixedThreadPool(nThreads, new NamedThreadFactory("cloud-object-rename-worker"));
boolean taskAdded = false;
while (iter.hasNext()) {
String key = iter.next();
executor.execute(new KeyRenameThread(key));
taskAdded = true;
count++;
// delete the object if it follows old key name format
if (key.startsWith(KEY_PREFIX)) {
deleteList.add(key);
}
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
try {
// Wait until all threads are finish
while (taskAdded && !executor.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.info("Rename cloud blob store keys tasks timedout. Waiting again");
}
} catch (InterruptedException ie) {
}
LOG.info("Renamed [{}] keys, time taken [{}]sec", count,
((System.currentTimeMillis() - startTime) / 1000));
// Delete older keys.
if (deleteList.size() > 0) {
int batchSize = 500, startIndex = 0, size = deleteList.size();
List<List<String>> partitions = Lists.partition(deleteList, batchSize);
for (List<String> partition : partitions) {
blobStore.removeBlobs(bucket, partition);
LOG.info("Records[{}] deleted in datastore from index", partition);
}
}
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
/**
* The method convert old key format to new format. For e.g. this method
* converts old key dataStore_004cb70c8f87d78f04da41e7547cb434094089ea to
* 004c-b70c8f87d78f04da41e7547cb434094089ea.
*/
private static String convertKey(String oldKey) throws IllegalArgumentException {
if (!oldKey.startsWith(KEY_PREFIX)) {
return oldKey;
}
String key = oldKey.substring(KEY_PREFIX.length());
return key.substring(0, 4) + DASH + key.substring(4);
}
/**
* Get key from data identifier. Object is stored with key in cloud blob store.
*/
private static String getKeyName(DataIdentifier identifier) {
String key = identifier.toString();
return key.substring(0, 4) + DASH + key.substring(4);
}
/**
* Get data identifier from key.
*/
private static String getIdentifierName(String key) {
if (!key.contains(DASH)) {
return null;
} else if (key.contains(META_KEY_PREFIX)) {
return key;
}
return key.substring(0, 4) + key.substring(5);
}
/**
* The class renames object key in cloud blob store in a thread.
*/
private class KeyRenameThread implements Runnable {
private String oldKey;
public void run() {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
String newKey = null;
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
newKey = convertKey(oldKey);
blobStore.copyBlob(bucket, oldKey, bucket, newKey, CopyOptions.NONE);
LOG.debug("[{}] renamed to [{}] ", oldKey, newKey);
} catch (Exception ie) {
LOG.error(" Exception in renaming [{}] to [{}] ",
new Object[] {ie, oldKey, newKey});
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
public KeyRenameThread(String oldKey) {
this.oldKey = oldKey;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.blob.cloud;
import java.util.Properties;
import com.google.common.base.Strings;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3Backend;
import org.apache.jackrabbit.oak.plugins.blob.AbstractSharedCachingDataStore;
import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cloud data store extending from {@link AbstractSharedCachingDataStore}.
*/
public class CloudDataStore extends AbstractSharedCachingDataStore {
/**
* Logger instance.
*/
private static final Logger LOG = LoggerFactory.getLogger(CloudDataStore.class);
protected Properties properties;
/**
* The minimum size of an object that should be stored in this data store.
*/
private int minRecordLength = 16 * 1024;
private String secret;
@Override
protected SharedBackend createBackend() {
CloudBackend backend = new CloudBackend();
if(properties != null){
backend.setProperties(properties);
}
return backend;
}
@Override
protected byte[] getOrCreateReferenceKey() throws DataStoreException {
try {
return secret.getBytes("UTF-8");
} catch (Exception e) {
LOG.info("Error in creating reference key", e);
throw new DataStoreException(e);
}
}
/**
* Look in the backend for a record matching the given identifier. Returns true
* if such a record exists.
*
* @param identifier - An identifier for the record.
* @return true if a record for the provided identifier can be found.
*/
public boolean haveRecordForIdentifier(final String identifier) {
try {
if (!Strings.isNullOrEmpty(identifier)) {
return backend.exists(new DataIdentifier(identifier));
}
}
catch (DataStoreException e) {
LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads",
identifier), e);
}
return false;
}
/**------------------------------------------- Getters & Setters-----------------------------**/
/**
* Properties required to configure the S3Backend
*/
public void setProperties(Properties properties) {
this.properties = properties;
}
public SharedBackend getBackend() {
return backend;
}
@Override
public int getMinRecordLength() {
return minRecordLength;
}
public void setMinRecordLength(int minRecordLength) {
this.minRecordLength = minRecordLength;
}
public void setSecret(String secret) {
this.secret = secret;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.blob.cloud;
import java.util.Properties;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3Constants;
import org.jclouds.Constants;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.domain.Location;
import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.jclouds.Constants.PROPERTY_CREDENTIAL;
import static org.jclouds.Constants.PROPERTY_IDENTITY;
import static org.jclouds.Constants.PROPERTY_PROVIDER;
import static org.jclouds.ContextBuilder.newBuilder;
import static org.jclouds.location.reference.LocationConstants.PROPERTY_REGIONS;
/**
* Cloud BlobStore utilities.
*/
public final class CloudUtils {
private static final Logger LOG = LoggerFactory.getLogger(CloudUtils.class);
/**
* private constructor so that class cannot initialized from outside.
*/
private CloudUtils() {
}
public static Properties getProperties(Properties props) {
Properties properties = new Properties();
set(properties, Constants.PROPERTY_IDENTITY, props, S3Constants.ACCESS_KEY);
set(properties, Constants.PROPERTY_CREDENTIAL, props, S3Constants.SECRET_KEY);
set(properties, Constants.PROPERTY_PROVIDER, props, "provider");
set(properties, "bucket", props, S3Constants.S3_BUCKET);
set(properties, PROPERTY_REGIONS, props, S3Constants.S3_REGION);
set(properties, "renameKeys", props, S3Constants.S3_RENAME_KEYS);
set(properties, Constants.PROPERTY_MAX_CONNECTIONS_PER_CONTEXT, props, S3Constants.S3_MAX_CONNS);
set(properties, Constants.PROPERTY_MAX_RETRIES, props, S3Constants.S3_MAX_ERR_RETRY);
set(properties, Constants.PROPERTY_SO_TIMEOUT, props, S3Constants.S3_SOCK_TIMEOUT);
set(properties, Constants.PROPERTY_CONNECTION_TIMEOUT, props, S3Constants.S3_CONN_TIMEOUT);
set(properties, Constants.PROPERTY_PROXY_HOST, props, S3Constants.PROXY_HOST);
set(properties, Constants.PROPERTY_PROXY_PORT, props, S3Constants.PROXY_PORT);
set(properties, Constants.PROPERTY_ENDPOINT, props, S3Constants.S3_END_POINT);
return properties;
}
public static BlobStore getBlobStore(Properties properties) {
String accessKey = (String) properties.get(PROPERTY_IDENTITY);
String secretKey = (String) properties.get(PROPERTY_CREDENTIAL);
String cloudProvider = (String) properties.get(PROPERTY_PROVIDER);
return newBuilder(cloudProvider)
.credentials(accessKey, secretKey)
.overrides(properties)
.buildView(BlobStoreContext.class).getBlobStore();
}
public static Location configuredLocation(Properties properties) {
String region = properties.getProperty(PROPERTY_REGIONS);
Location parent = new LocationBuilder().scope(LocationScope.PROVIDER)
.id(properties.getProperty(Constants.PROPERTY_PROVIDER))
.description(properties.getProperty(Constants.PROPERTY_PROVIDER)).build();
return new LocationBuilder().scope(LocationScope.REGION).id(region).description(region)
.parent(parent).build();
}
private static void set(Properties newProps, String newProp, Properties props, String prop) {
if (props.containsKey(prop)) {
newProps.setProperty(newProp, props.getProperty(prop));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.io.File;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3Constants;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.document.blob.ds.MongoDataStoreBlobGCTest;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.junit.Assume.assumeTrue;
/**
* Tests DataStoreGC with Mongo and S3
*/
@RunWith(Parameterized.class)
public class MongoS3DataStoreBlobGCTest extends MongoDataStoreBlobGCTest {
@BeforeClass
public static void assumptions() {
assumeTrue(S3DataStoreUtils.isS3Configured());
}
@Parameterized.Parameter
public String s3Class;
@Parameterized.Parameters(name = "{index}: ({0})")
public static List<String> fixtures() {
return S3DataStoreUtils.getFixtures();
}
protected String bucket;
@Before
@Override
public void setUpConnection() throws Exception {
Properties props = S3DataStoreUtils.getS3Config();
startDate = new Date();
mongoConnection = connectionFactory.getConnection();
MongoUtils.dropCollections(mongoConnection.getDB());
File root = folder.newFolder();
bucket = root.getName();
props.setProperty(S3Constants.S3_BUCKET, bucket);
blobStore = new DataStoreBlobStore(
S3DataStoreUtils.getS3DataStore(s3Class, props, root.getAbsolutePath()));
mk = new DocumentMK.Builder().clock(getTestClock()).setMongoDB(mongoConnection.getDB())
.setBlobStore(blobStore).open();
}
@After
@Override
public void tearDownConnection() throws Exception {
S3DataStoreUtils.deleteBucket(s3Class, bucket, startDate);
super.tearDownConnection();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oak-parent</artifactId>
<groupId>org.apache.jackrabbit</groupId>
<version>1.6-SNAPSHOT</version>
<relativePath>../oak-parent/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oak-blob-cloud</artifactId>
<name>Oak Cloud Blob Store</name>
<packaging>bundle</packaging>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<configuration>
<instructions>
<Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.s3.stats</Export-Package>
<DynamicImport-Package>sun.io</DynamicImport-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
<!-- ====================================================================== -->
<!-- D E P E N D E N C I E S -->
<!-- ====================================================================== -->
<dependencies>
<!-- Optional OSGi dependencies, used only when running within OSGi -->
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>biz.aQute.bnd</groupId>
<artifactId>bndlib</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
<scope>provided</scope>
</dependency>
<!-- JCR and Jackrabbit dependencies -->
<dependency>
<groupId>javax.jcr</groupId>
<artifactId>jcr</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-jcr-commons</artifactId>
<version>${jackrabbit.version}</version>
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-data</artifactId>
<version>${jackrabbit.version}</version>
</dependency>
<!-- Dependencies to other Oak components -->
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-blob</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-core</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Amazon AWS dependency -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.10.76</version>
</dependency>
<!-- Optional dependency for jclouds s3 to enable s3 cloud store -->
<dependency>
<groupId>org.apache.jclouds.provider</groupId>
<artifactId>aws-s3</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-data</artifactId>
<version>${jackrabbit.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-core</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.testing.osgi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.blob.cloud.s3;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.data.DataStore;
import org.apache.jackrabbit.oak.blob.cloud.CloudDataStore;
import org.apache.jackrabbit.oak.blob.cloud.CloudUtils;
import org.apache.jackrabbit.oak.blob.cloud.aws.s3.SharedS3DataStore;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
import org.jclouds.blobstore.BlobStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.jackrabbit.oak.blob.cloud.CloudUtils.getProperties;
/**
* Extension to {@link DataStoreUtils} to enable S3 extensions for cleaning and initialization.
*/
public class S3DataStoreUtils extends DataStoreUtils {
private static final Logger log = LoggerFactory.getLogger(S3DataStoreUtils.class);
private static final String DEFAULT_CONFIG_PATH = "./src/test/resources/aws.properties";
private static Class JR2_S3 = SharedS3DataStore.class;
private static Class S3 = S3DataStore.class;
private static Class CLOUD = CloudDataStore.class;
public static List<String> getFixtures() {
return ImmutableList.of(
//S3.getName(),
//JR2_S3.getName(),
CLOUD.getName());
}
public static boolean isS3DataStore() {
String dsName = System.getProperty(DS_CLASS_NAME);
boolean s3Class =
(dsName != null) && (dsName.equals(S3.getName()) || dsName.equals(JR2_S3.getName())
|| dsName.equals(CLOUD.getName()));
if (!isS3Configured()) {
return false;
}
return s3Class;
}
/**
* Check for presence of mandatory properties.
*
* @return true if mandatory props configured.
*/
public static boolean isS3Configured() {
Properties props = getS3Config();
if (!props.containsKey(S3Constants.ACCESS_KEY) || !props.containsKey(S3Constants.SECRET_KEY)
|| !(props.containsKey(S3Constants.S3_REGION) || props.containsKey(S3Constants.S3_END_POINT))) {
return false;
}
return true;
}
/**
* Read any config property configured.
* Also, read any props available as system properties.
* System properties take precedence.
*
* @return Properties instance
*/
public static Properties getS3Config() {
String config = System.getProperty("config");
if (Strings.isNullOrEmpty(config)) {
config = DEFAULT_CONFIG_PATH;
}
Properties props = new Properties();
InputStream is = null;
try {
props.load(new FileInputStream(config));
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(is);
}
props.putAll(getConfig());
Map filtered = Maps.filterEntries(Maps.fromProperties(props),
new Predicate<Map.Entry<? extends Object, ? extends Object>>() {
@Override
public boolean apply(Map.Entry<? extends Object, ? extends Object> input) {
return !Strings.isNullOrEmpty((String) input.getValue());
}
});
props = new Properties();
props.putAll(filtered);
return props;
}
public static DataStore getS3DataStore(String className, Properties props, String homeDir) throws Exception {
DataStore ds = Class.forName(className).asSubclass(DataStore.class).newInstance();
PropertiesUtil.populate(ds, Maps.fromProperties(props), false);
// Set the props object
if (S3.getName().equals(className)) {
((S3DataStore) ds).setProperties(props);
} else if (JR2_S3.getName().equals(className)) {
((org.apache.jackrabbit.oak.blob.cloud.aws.s3.SharedS3DataStore) ds).setProperties(props);
} else if (CLOUD.getName().equals(className)) {
((CloudDataStore) ds).setProperties(props);
}
ds.init(homeDir);
return ds;
}
public static DataStore getS3DataStore(String className, String homeDir) throws Exception {
return getS3DataStore(className, getS3Config(), homeDir);
}
public static void deleteBucket(String className, String bucket, Date date) throws Exception {
if (!Strings.isNullOrEmpty(className) && !Strings.isNullOrEmpty(bucket)) {
if (className.equals(CLOUD.getName())) {
log.info("cleaning bucket [" + bucket + "]");
BlobStore blobStore = CloudUtils.getBlobStore(getProperties(getS3Config()));
if (blobStore.containerExists(bucket)) {
blobStore.deleteContainer(bucket);
log.info("bucket [ " + bucket + "] cleaned");
}
} else {
deleteBucket(bucket, date);
}
}
}
public static void deleteBucket(String bucket, Date date) throws Exception {
log.info("cleaning bucket [" + bucket + "]");
Properties props = getS3Config();
AmazonS3Client s3service = Utils.openService(props);
TransferManager tmx = new TransferManager(s3service);
if (s3service.doesBucketExist(bucket)) {
for (int i = 0; i < 4; i++) {
tmx.abortMultipartUploads(bucket, date);
ObjectListing prevObjectListing = s3service.listObjects(bucket);
while (prevObjectListing != null ) {
List<DeleteObjectsRequest.KeyVersion>
deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
deleteList.add(new DeleteObjectsRequest.KeyVersion(
s3ObjSumm.getKey()));
}
if (deleteList.size() > 0) {
DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
bucket);
delObjsReq.setKeys(deleteList);
s3service.deleteObjects(delObjsReq);
}
if (!prevObjectListing.isTruncated()) break;
prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
}
}
s3service.deleteBucket(bucket);
log.info("bucket [ " + bucket + "] cleaned");
} else {
log.info("bucket [" + bucket + "] doesn't exists");
}
tmx.shutdownNow();
s3service.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.io.File;
import java.util.List;
import java.util.Properties;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3Constants;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.junit.Assume.assumeTrue;
/**
* Shared BlobStoreGCTest for S3.
*/
@RunWith(Parameterized.class)
public class S3SharedBlobStoreGCTest extends SharedBlobStoreGCTest {
@Parameterized.Parameter
public String s3Class;
protected String bucket;
@Parameterized.Parameters(name = "{index}: ({0})")
public static List<String> fixtures() {
return S3DataStoreUtils.getFixtures();
}
@BeforeClass
public static void assumptions() {
assumeTrue(S3DataStoreUtils.isS3Configured());
}
@After
public void tearDown() throws Exception {
S3DataStoreUtils.deleteBucket(s3Class, bucket, cluster1.getDate());
super.tearDown();
}
@Override
protected DataStoreBlobStore getBlobStore(File rootFolder) throws Exception {
Properties props = S3DataStoreUtils.getS3Config();
bucket = rootFolder.getName();
props.setProperty(S3Constants.S3_BUCKET, bucket);
return new DataStoreBlobStore(
S3DataStoreUtils.getS3DataStore(s3Class, props, rootFolder.getAbsolutePath()));
}
@Override
protected void sleep() throws InterruptedException {
if (S3DataStoreUtils.isS3Configured()) {
Thread.sleep(1000);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.segment;
import java.io.File;
import java.util.List;
import java.util.Properties;
import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3Constants;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.runners.Parameterized;
import static org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture.SEGMENT_MK;
import static org.apache.jackrabbit.oak.commons.FixturesHelper.getFixtures;
import static org.junit.Assume.assumeTrue;
/**
* Tests for SegmentNodeStore on S3DataStore GC
*/
public class SegmentS3DataStoreBlobGCIT extends SegmentDataStoreBlobGCIT {
@Parameterized.Parameter(0)
public boolean usePersistedMap;
@Parameterized.Parameter(1)
public String s3Class;
protected String bucket;
@Parameterized.Parameters(name = "{index}: ({0}, {1})")
public static List<Object[]> fixtures1() {
List<String> classes = S3DataStoreUtils.getFixtures();
List<Object[]> params = Lists.newArrayList();
for (String cl : classes) {
params.add(new Object[] {true, cl});
params.add(new Object[] {false, cl});
}
return params;
}
@BeforeClass
public static void assumptions() {
assumeTrue(getFixtures().contains(SEGMENT_MK));
assumeTrue(S3DataStoreUtils.isS3Configured());
}
protected DataStoreBlobStore getBlobStore(File rootFolder) throws Exception {
Properties props = S3DataStoreUtils.getS3Config();
bucket = rootFolder.getName();
props.setProperty(S3Constants.S3_BUCKET, bucket);
return new DataStoreBlobStore(
S3DataStoreUtils.getS3DataStore(s3Class, props, rootFolder.getAbsolutePath()));
}
@After
public void close() throws Exception {
super.close();
S3DataStoreUtils.deleteBucket(s3Class, bucket, startDate);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3Constants;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStoreUtilsTest;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.deleteBucket;
import static org.junit.Assume.assumeTrue;
/**
* SharedDataStoreUtilsTest for S3.
*/
@RunWith(Parameterized.class)
public class SharedS3DataStoreUtilsTest extends SharedDataStoreUtilsTest {
private static final Logger log = LoggerFactory.getLogger(SharedS3DataStoreUtilsTest.class);
@Parameterized.Parameter
public String s3Class;
protected String bucket;
@Parameterized.Parameters(name = "{index}: ({0})")
public static List<String> fixtures() {
return S3DataStoreUtils.getFixtures();
}
@BeforeClass
public static void assumptions() {
assumeTrue(S3DataStoreUtils.isS3Configured());
}
@Override
protected DataStoreBlobStore getBlobStore(File rootFolder) throws Exception {
Properties props = S3DataStoreUtils.getS3Config();
bucket = rootFolder.getName();
props.setProperty(S3Constants.S3_BUCKET, bucket);
return new DataStoreBlobStore(
S3DataStoreUtils.getS3DataStore(s3Class, props, rootFolder.getAbsolutePath()));
}
@After
public void close() throws IOException {
try {
deleteBucket(s3Class, bucket, new Date());
} catch (Exception e) {
log.error("Error closing data store", e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment