Skip to content

Instantly share code, notes, and snippets.

@jeffjirsa
Created May 30, 2015 04:35
Show Gist options
  • Save jeffjirsa/553612b26630b335725c to your computer and use it in GitHub Desktop.
Save jeffjirsa/553612b26630b335725c to your computer and use it in GitHub Desktop.
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 15287bd..9d0b086 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -123,7 +123,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables, long now, int base)
{
- Iterable<SSTableReader> candidates = filterOldSSTables(Lists.newArrayList(candidateSSTables), options.maxSSTableAge, now);
+ Iterable<SSTableReader> candidates = filterOldSSTables(Lists.newArrayList(candidateSSTables), options.maxSSTableAge, now, options.useMinTimestamp);
List<List<SSTableReader>> buckets = getBuckets(createSSTableAndMinTimestampPairs(candidates), options.baseTime, base, now);
logger.debug("Compaction buckets are {}", buckets);
@@ -158,23 +158,39 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
* Removes all sstables with max timestamp older than maxSSTableAge.
* @param sstables all sstables to consider
* @param maxSSTableAge the age in milliseconds when an SSTable stops participating in compactions
- * @param now current time. SSTables with max timestamp less than (now - maxSSTableAge) are filtered.
+ * @param now current time. SSTables with min/max timestamp less than (now - maxSSTableAge) are filtered.
+ * @param useMinTimestamp Toggles decision to use sstables' min vs max timestamp when filtering
* @return a list of sstables with the oldest sstables excluded
*/
@VisibleForTesting
- static Iterable<SSTableReader> filterOldSSTables(List<SSTableReader> sstables, long maxSSTableAge, long now)
+ static Iterable<SSTableReader> filterOldSSTables(List<SSTableReader> sstables, long maxSSTableAge, long now, boolean useMinTimestamp)
{
if (maxSSTableAge == 0)
return sstables;
final long cutoff = now - maxSSTableAge;
- return Iterables.filter(sstables, new Predicate<SSTableReader>()
+ if (useMinTimestamp)
{
- @Override
- public boolean apply(SSTableReader sstable)
+ logger.debug("Filtering old sstables using sstable minimum timestamp greater or equal to {}", maxSSTableAge);
+ return Iterables.filter(sstables, new Predicate<SSTableReader>()
{
- return sstable.getMaxTimestamp() >= cutoff;
- }
- });
+ @Override
+ public boolean apply(SSTableReader sstable)
+ {
+ return sstable.getMinTimestamp() >= cutoff;
+ }
+ });
+ }
+ else
+ {
+ return Iterables.filter(sstables, new Predicate<SSTableReader>()
+ {
+ @Override
+ public boolean apply(SSTableReader sstable)
+ {
+ return sstable.getMaxTimestamp() >= cutoff;
+ }
+ });
+ }
}
/**
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
index f54c020..5e13be8 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
@@ -27,12 +27,15 @@ public final class DateTieredCompactionStrategyOptions
protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS;
protected static final double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365;
protected static final long DEFAULT_BASE_TIME_SECONDS = 60;
+ protected static final boolean DEFAULT_USE_MIN_TIMESTAMP = false;
protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
protected static final String MAX_SSTABLE_AGE_KEY = "max_sstable_age_days";
protected static final String BASE_TIME_KEY = "base_time_seconds";
+ protected static final String USE_MIN_TIMESTAMP_KEY = "use_min_timestamp";
protected final long maxSSTableAge;
protected final long baseTime;
+ protected final boolean useMinTimestamp;
public DateTieredCompactionStrategyOptions(Map<String, String> options)
{
@@ -43,12 +46,15 @@ public final class DateTieredCompactionStrategyOptions
maxSSTableAge = Math.round(fractionalDays * timestampResolution.convert(1, TimeUnit.DAYS));
optionValue = options.get(BASE_TIME_KEY);
baseTime = timestampResolution.convert(optionValue == null ? DEFAULT_BASE_TIME_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS);
+ optionValue = options.get(USE_MIN_TIMESTAMP_KEY);
+ useMinTimestamp = Boolean.parseBoolean(optionValue) ? Boolean.parseBoolean(optionValue) : DEFAULT_USE_MIN_TIMESTAMP;
}
public DateTieredCompactionStrategyOptions()
{
maxSSTableAge = Math.round(DEFAULT_MAX_SSTABLE_AGE_DAYS * DEFAULT_TIMESTAMP_RESOLUTION.convert(1, TimeUnit.DAYS));
baseTime = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS);
+ useMinTimestamp = DEFAULT_USE_MIN_TIMESTAMP;
}
public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException
@@ -92,9 +98,19 @@ public final class DateTieredCompactionStrategyOptions
throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, BASE_TIME_KEY), e);
}
+ optionValue = options.get(USE_MIN_TIMESTAMP_KEY);
+ if(optionValue != null)
+ {
+ if (!optionValue.equalsIgnoreCase("true") && !optionValue.equalsIgnoreCase("false"))
+ {
+ throw new ConfigurationException(String.format("%s should either be 'true' or 'false', not %s", USE_MIN_TIMESTAMP_KEY, optionValue));
+ }
+ }
+
uncheckedOptions.remove(MAX_SSTABLE_AGE_KEY);
uncheckedOptions.remove(BASE_TIME_KEY);
uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY);
+ uncheckedOptions.remove(USE_MIN_TIMESTAMP_KEY);
return uncheckedOptions;
}
diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
index 14e22f0..3e4d73b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
@@ -261,16 +261,54 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
Iterable<SSTableReader> filtered;
List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
- filtered = filterOldSSTables(sstrs, 0, 2);
+ filtered = filterOldSSTables(sstrs, 0, 2, false);
assertEquals("when maxSSTableAge is zero, no sstables should be filtered", sstrs.size(), Iterables.size(filtered));
- filtered = filterOldSSTables(sstrs, 1, 2);
+ filtered = filterOldSSTables(sstrs, 1, 2, false);
assertEquals("only the newest 2 sstables should remain", 2, Iterables.size(filtered));
- filtered = filterOldSSTables(sstrs, 1, 3);
+ filtered = filterOldSSTables(sstrs, 1, 3, false);
assertEquals("only the newest sstable should remain", 1, Iterables.size(filtered));
- filtered = filterOldSSTables(sstrs, 1, 4);
+ filtered = filterOldSSTables(sstrs, 1, 4, false);
+ assertEquals("no sstables should remain when all are too old", 0, Iterables.size(filtered));
+ }
+
+ @Test
+ public void testFilterOldSSTablesByMinKey()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ // create 3 sstables
+ int numSSTables = 3;
+ for (int r = 0; r < numSSTables; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+ rm.add(CF_STANDARD1, Util.cellname("column"), value, r);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ }
+ cfs.forceBlockingFlush();
+
+ Iterable<SSTableReader> filtered;
+ List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+
+ filtered = filterOldSSTables(sstrs, 0, 2, true);
+ assertEquals("when maxSSTableAge is zero, no sstables should be filtered", sstrs.size(), Iterables.size(filtered));
+
+ filtered = filterOldSSTables(sstrs, 1, 2, true);
+ assertEquals("only the newest 2 sstables should remain", 2, Iterables.size(filtered));
+
+ filtered = filterOldSSTables(sstrs, 1, 3, true);
+ assertEquals("only the newest sstable should remain", 1, Iterables.size(filtered));
+
+ filtered = filterOldSSTables(sstrs, 1, 4, true);
assertEquals("no sstables should remain when all are too old", 0, Iterables.size(filtered));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment