Created
July 6, 2015 16:36
-
-
Save yssharma/2581ae8a97c559b2677f to your computer and use it in GitHub Desktop.
Apache Drill Cassandra storage patch - Rebased on Drill 1.2.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit e1f17a09b6d402e78268753897cbdbd4f8bed169 | |
Author: Yash Sharma <yash360@gmail.com> | |
Date: Mon Jul 6 09:51:22 2015 +0530 | |
DRILL-92 : Cassandra storage plugin - rebased on Drill-1.2.0 | |
diff --git a/contrib/pom.xml b/contrib/pom.xml | |
index 8c00e76..0269efb 100644 | |
--- a/contrib/pom.xml | |
+++ b/contrib/pom.xml | |
@@ -37,5 +37,6 @@ | |
<module>storage-mongo</module> | |
<module>sqlline</module> | |
<module>data</module> | |
+ <module>storage-cassandra</module> | |
</modules> | |
</project> | |
diff --git a/contrib/storage-cassandra/pom.xml b/contrib/storage-cassandra/pom.xml | |
new file mode 100644 | |
index 0000000..08184fc | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/pom.xml | |
@@ -0,0 +1,106 @@ | |
+<?xml version="1.0" encoding="UTF-8"?> | |
+<!-- | |
+Licensed to the Apache Software Foundation (ASF) under one or more | |
+contributor license agreements. See the NOTICE file distributed with | |
+this work for additional information regarding copyright ownership. | |
+The ASF licenses this file to You under the Apache License, Version 2.0 | |
+(the "License"); you may not use this file except in compliance with | |
+the License. You may obtain a copy of the License at | |
+ | |
+http://www.apache.org/licenses/LICENSE-2.0 | |
+ | |
+Unless required by applicable law or agreed to in writing, software | |
+distributed under the License is distributed on an "AS IS" BASIS, | |
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+See the License for the specific language governing permissions and | |
+limitations under the License. | |
+--> | |
+<project xmlns="http://maven.apache.org/POM/4.0.0" | |
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
+ <modelVersion>4.0.0</modelVersion> | |
+ <parent> | |
+ <artifactId>drill-contrib-parent</artifactId> | |
+ <groupId>org.apache.drill.contrib</groupId> | |
+ <version>1.2.0-SNAPSHOT</version> | |
+ </parent> | |
+ | |
+ | |
+ <artifactId>drill-storage-cassandra</artifactId> | |
+ <name>contrib/cassandra-storage-plugin</name> | |
+ | |
+ <properties> | |
+ <cassandra.TestSuite>**/CassandraTestSuite.class</cassandra.TestSuite> | |
+ </properties> | |
+ | |
+ <dependencies> | |
+ <dependency> | |
+ <groupId>com.datastax.cassandra</groupId> | |
+ <artifactId>cassandra-driver-core</artifactId> | |
+ <version>2.0.7</version> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>org.apache.hadoop</groupId> | |
+ <artifactId>hadoop-common</artifactId> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>org.apache.drill.exec</groupId> | |
+ <artifactId>drill-java-exec</artifactId> | |
+ <version>${project.version}</version> | |
+ <exclusions> | |
+ <exclusion> | |
+ <artifactId>hadoop-common</artifactId> | |
+ <groupId>org.apache.hadoop</groupId> | |
+ </exclusion> | |
+ <exclusion> | |
+ <artifactId>hadoop-client</artifactId> | |
+ <groupId>org.apache.hadoop</groupId> | |
+ </exclusion> | |
+ </exclusions> | |
+ </dependency> | |
+ | |
+ | |
+ <!-- Test dependencies --> | |
+ <dependency> | |
+ <groupId>org.apache.drill.exec</groupId> | |
+ <artifactId>drill-java-exec</artifactId> | |
+ <classifier>tests</classifier> | |
+ <version>${project.version}</version> | |
+ <scope>test</scope> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>org.apache.drill</groupId> | |
+ <artifactId>drill-common</artifactId> | |
+ <classifier>tests</classifier> | |
+ <version>${project.version}</version> | |
+ <scope>test</scope> | |
+ </dependency> | |
+ <dependency> | |
+ <groupId>com.yammer.metrics</groupId> | |
+ <artifactId>metrics-core</artifactId> | |
+ <version>2.1.1</version> | |
+ <scope>test</scope> | |
+ </dependency> | |
+ </dependencies> | |
+ | |
+ <build> | |
+ <plugins> | |
+ <plugin> | |
+ <groupId>org.apache.maven.plugins</groupId> | |
+ <artifactId>maven-surefire-plugin</artifactId> | |
+ <configuration> | |
+ <includes> | |
+ <include>${cassandra.TestSuite}</include> | |
+ </includes> | |
+ <systemProperties> | |
+ <property> | |
+ <name>logback.log.dir</name> | |
+ <value>${project.build.directory}/surefire-reports</value> | |
+ </property> | |
+ </systemProperties> | |
+ </configuration> | |
+ </plugin> | |
+ </plugins> | |
+ </build> | |
+ | |
+</project> | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraCompareFunctionsProcessor.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraCompareFunctionsProcessor.java | |
new file mode 100644 | |
index 0000000..761ae5d | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraCompareFunctionsProcessor.java | |
@@ -0,0 +1,251 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import com.google.common.base.Charsets; | |
+import com.google.common.collect.ImmutableMap; | |
+import com.google.common.collect.ImmutableSet; | |
+import io.netty.buffer.ByteBuf; | |
+import io.netty.buffer.Unpooled; | |
+import org.apache.drill.common.expression.CastExpression; | |
+import org.apache.drill.common.expression.ConvertExpression; | |
+import org.apache.drill.common.expression.FunctionCall; | |
+import org.apache.drill.common.expression.LogicalExpression; | |
+import org.apache.drill.common.expression.SchemaPath; | |
+import org.apache.drill.common.expression.ValueExpressions; | |
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor; | |
+ | |
+import java.nio.ByteOrder; | |
+ | |
+ | |
+import io.netty.buffer.ByteBuf; | |
+import io.netty.buffer.Unpooled; | |
+ | |
+import java.nio.ByteOrder; | |
+ | |
+import org.apache.drill.common.expression.CastExpression; | |
+import org.apache.drill.common.expression.ConvertExpression; | |
+import org.apache.drill.common.expression.FunctionCall; | |
+import org.apache.drill.common.expression.LogicalExpression; | |
+import org.apache.drill.common.expression.SchemaPath; | |
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression; | |
+import org.apache.drill.common.expression.ValueExpressions.DateExpression; | |
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression; | |
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression; | |
+import org.apache.drill.common.expression.ValueExpressions.IntExpression; | |
+import org.apache.drill.common.expression.ValueExpressions.LongExpression; | |
+import org.apache.drill.common.expression.ValueExpressions.QuotedString; | |
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression; | |
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor; | |
+ | |
+import com.google.common.base.Charsets; | |
+import com.google.common.collect.ImmutableMap; | |
+import com.google.common.collect.ImmutableSet; | |
+ | |
+public class CassandraCompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> { | |
+ private String value; | |
+ private boolean success; | |
+ private boolean isEqualityFn; | |
+ private SchemaPath path; | |
+ private String functionName; | |
+ | |
+ public static boolean isCompareFunction(String functionName) { | |
+ return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName); | |
+ } | |
+ | |
+ public static CassandraCompareFunctionsProcessor process(FunctionCall call) { | |
+ String functionName = call.getName(); | |
+ LogicalExpression nameArg = call.args.get(0); | |
+ LogicalExpression valueArg = call.args.size() >= 2 ? call.args.get(1) : null; | |
+ CassandraCompareFunctionsProcessor evaluator = new CassandraCompareFunctionsProcessor(functionName); | |
+ | |
+ if (valueArg != null) { // binary function | |
+ if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) { | |
+ LogicalExpression swapArg = valueArg; | |
+ valueArg = nameArg; | |
+ nameArg = swapArg; | |
+ evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName); | |
+ } | |
+ evaluator.success = nameArg.accept(evaluator, valueArg); | |
+ } else if (call.args.get(0) instanceof SchemaPath) { | |
+ evaluator.success = true; | |
+ evaluator.path = (SchemaPath) nameArg; | |
+ } | |
+ | |
+ return evaluator; | |
+ } | |
+ | |
+ public CassandraCompareFunctionsProcessor(String functionName) { | |
+ this.success = false; | |
+ this.functionName = functionName; | |
+ this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName) | |
+ && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName); | |
+ } | |
+ | |
+ public String getValue() { | |
+ return value; | |
+ } | |
+ | |
+ public boolean isSuccess() { | |
+ return success; | |
+ } | |
+ | |
+ public SchemaPath getPath() { | |
+ return path; | |
+ } | |
+ | |
+ public String getFunctionName() { | |
+ return functionName; | |
+ } | |
+ | |
+ @Override | |
+ public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException { | |
+ if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) { | |
+ return e.getInput().accept(this, valueArg); | |
+ } | |
+ return false; | |
+ } | |
+ | |
+ @Override | |
+ public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException { | |
+ if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM && e.getInput() instanceof SchemaPath) { | |
+ ByteBuf bb = null; | |
+ String encodingType = e.getEncodingType(); | |
+ switch (encodingType) { | |
+ case "INT_BE": | |
+ case "INT": | |
+ case "UINT_BE": | |
+ case "UINT": | |
+ case "UINT4_BE": | |
+ case "UINT4": | |
+ if (valueArg instanceof ValueExpressions.IntExpression | |
+ && (isEqualityFn || encodingType.startsWith("U"))) { | |
+ bb = Unpooled.wrappedBuffer(new byte[4]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN); | |
+ bb.writeInt(((ValueExpressions.IntExpression)valueArg).getInt()); | |
+ } | |
+ break; | |
+ case "BIGINT_BE": | |
+ case "BIGINT": | |
+ case "UINT8_BE": | |
+ case "UINT8": | |
+ if (valueArg instanceof ValueExpressions.LongExpression | |
+ && (isEqualityFn || encodingType.startsWith("U"))) { | |
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN); | |
+ bb.writeLong(((ValueExpressions.LongExpression)valueArg).getLong()); | |
+ } | |
+ break; | |
+ case "FLOAT": | |
+ if (valueArg instanceof ValueExpressions.FloatExpression && isEqualityFn) { | |
+ bb = Unpooled.wrappedBuffer(new byte[4]).order(ByteOrder.BIG_ENDIAN); | |
+ bb.writeFloat(((ValueExpressions.FloatExpression)valueArg).getFloat()); | |
+ } | |
+ break; | |
+ case "DOUBLE": | |
+ if (valueArg instanceof ValueExpressions.DoubleExpression && isEqualityFn) { | |
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(ByteOrder.BIG_ENDIAN);; | |
+ bb.writeDouble(((ValueExpressions.DoubleExpression)valueArg).getDouble()); | |
+ } | |
+ break; | |
+ case "TIME_EPOCH": | |
+ case "TIME_EPOCH_BE": | |
+ if (valueArg instanceof ValueExpressions.TimeExpression) { | |
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN); | |
+ bb.writeLong(((ValueExpressions.TimeExpression)valueArg).getTime()); | |
+ } | |
+ break; | |
+ case "DATE_EPOCH": | |
+ case "DATE_EPOCH_BE": | |
+ if (valueArg instanceof ValueExpressions.DateExpression) { | |
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN); | |
+ bb.writeLong(((ValueExpressions.DateExpression)valueArg).getDate()); | |
+ } | |
+ break; | |
+ case "BOOLEAN_BYTE": | |
+ if (valueArg instanceof ValueExpressions.BooleanExpression) { | |
+ bb = Unpooled.wrappedBuffer(new byte[1]); | |
+ bb.writeByte(((ValueExpressions.BooleanExpression)valueArg).getBoolean() ? 1 : 0); | |
+ } | |
+ break; | |
+ case "UTF8": | |
+ // let visitSchemaPath() handle this. | |
+ return e.getInput().accept(this, valueArg); | |
+ } | |
+ | |
+ if (bb != null) { | |
+ this.value = bb.toString(); | |
+ this.path = (SchemaPath)e.getInput(); | |
+ return true; | |
+ } | |
+ } | |
+ return false; | |
+ } | |
+ | |
+ @Override | |
+ public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException { | |
+ return false; | |
+ } | |
+ | |
+ @Override | |
+ public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException { | |
+ if (valueArg instanceof ValueExpressions.QuotedString) { | |
+ this.value = new String(((ValueExpressions.QuotedString) valueArg).value.getBytes(Charsets.UTF_8)); | |
+ this.path = path; | |
+ return true; | |
+ } | |
+ return false; | |
+ } | |
+ | |
+ private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES; | |
+ static { | |
+ ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder(); | |
+ VALUE_EXPRESSION_CLASSES = builder | |
+ .add(ValueExpressions.BooleanExpression.class) | |
+ .add(ValueExpressions.DateExpression.class) | |
+ .add(ValueExpressions.DoubleExpression.class) | |
+ .add(ValueExpressions.FloatExpression.class) | |
+ .add(ValueExpressions.IntExpression.class) | |
+ .add(ValueExpressions.LongExpression.class) | |
+ .add(ValueExpressions.QuotedString.class) | |
+ .add(ValueExpressions.TimeExpression.class) | |
+ .build(); | |
+ } | |
+ | |
+ private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP; | |
+ static { | |
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder(); | |
+ COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder | |
+ // unary functions | |
+ .put("isnotnull", "isnotnull") | |
+ .put("isNotNull", "isNotNull") | |
+ .put("is not null", "is not null") | |
+ .put("isnull", "isnull") | |
+ .put("isNull", "isNull") | |
+ .put("is null", "is null") | |
+ // binary functions | |
+ .put("like", "like") | |
+ .put("equal", "equal") | |
+ .put("not_equal", "not_equal") | |
+ .put("greater_than_or_equal_to", "less_than_or_equal_to") | |
+ .put("greater_than", "less_than") | |
+ .put("less_than_or_equal_to", "greater_than_or_equal_to") | |
+ .put("less_than", "greater_than") | |
+ .build(); | |
+ } | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraDatabaseSchema.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraDatabaseSchema.java | |
new file mode 100644 | |
index 0000000..4287ae4 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraDatabaseSchema.java | |
@@ -0,0 +1,57 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import java.util.List; | |
+import java.util.Set; | |
+ | |
+import org.apache.calcite.schema.Table; | |
+ | |
+import org.apache.drill.exec.store.AbstractSchema; | |
+import org.apache.drill.exec.store.cassandra.CassandraSchemaFactory.CassandraSchema; | |
+ | |
+import com.google.common.collect.Sets; | |
+ | |
+public class CassandraDatabaseSchema extends AbstractSchema { | |
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraDatabaseSchema.class); | |
+ private final CassandraSchema cassandraSchema; | |
+ private final Set<String> tables; | |
+ | |
+ public CassandraDatabaseSchema(List<String> tableList, CassandraSchema cassandraSchema, | |
+ String name) { | |
+ super(cassandraSchema.getSchemaPath(), name); | |
+ this.cassandraSchema = cassandraSchema; | |
+ this.tables = Sets.newHashSet(tableList); | |
+ } | |
+ | |
+ @Override | |
+ public Table getTable(String tableName) { | |
+ return cassandraSchema.getDrillTable(this.name, tableName); | |
+ } | |
+ | |
+ @Override | |
+ public Set<String> getTableNames() { | |
+ return tables; | |
+ } | |
+ | |
+ @Override | |
+ public String getTypeName() { | |
+ return CassandraStoragePluginConfig.NAME; | |
+ } | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraFilterBuilder.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraFilterBuilder.java | |
new file mode 100644 | |
index 0000000..ffb6c6b | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraFilterBuilder.java | |
@@ -0,0 +1,201 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import java.util.Arrays; | |
+import java.util.List; | |
+ | |
+import com.datastax.driver.core.querybuilder.Clause; | |
+import com.datastax.driver.core.querybuilder.QueryBuilder; | |
+import com.google.common.base.Charsets; | |
+import com.google.common.collect.ImmutableList; | |
+import com.google.common.collect.Lists; | |
+import org.apache.drill.common.expression.BooleanOperator; | |
+import org.apache.drill.common.expression.FunctionCall; | |
+import org.apache.drill.common.expression.LogicalExpression; | |
+import org.apache.drill.common.expression.SchemaPath; | |
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor; | |
+import org.slf4j.Logger; | |
+import org.slf4j.LoggerFactory; | |
+ | |
+ | |
+public class CassandraFilterBuilder extends | |
+ AbstractExprVisitor<CassandraScanSpec, Void, RuntimeException> implements | |
+ DrillCassandraConstants { | |
+ static final Logger logger = LoggerFactory.getLogger(CassandraFilterBuilder.class); | |
+ final CassandraGroupScan groupScan; | |
+ final LogicalExpression le; | |
+ private boolean allExpressionsConverted = true; | |
+ | |
+ public CassandraFilterBuilder(CassandraGroupScan groupScan, | |
+ LogicalExpression conditionExp) { | |
+ this.groupScan = groupScan; | |
+ this.le = conditionExp; | |
+ | |
+ } | |
+ | |
+ /* Called by CassandraPushDownFilterForScan */ | |
+ public CassandraScanSpec parseTree() { | |
+ CassandraScanSpec parsedSpec = le.accept(this, null); | |
+ | |
+ if (parsedSpec != null) { | |
+ parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getCassandraScanSpec(), parsedSpec); | |
+ } | |
+ return parsedSpec; | |
+ } | |
+ | |
+ private CassandraScanSpec mergeScanSpecs(String functionName, | |
+ CassandraScanSpec leftScanSpec, CassandraScanSpec rightScanSpec) { | |
+ | |
+ List<Clause> newFilters = Lists.newArrayList(); | |
+ CassandraScanSpec scanSpec = null; | |
+ switch (functionName) { | |
+ case "booleanAnd": | |
+ if (leftScanSpec != null && leftScanSpec.getFilters() != null) { | |
+ newFilters.addAll(leftScanSpec.getFilters()); | |
+ } | |
+ if (rightScanSpec != null && rightScanSpec.getFilters() != null) { | |
+ newFilters.addAll(rightScanSpec.getFilters()); | |
+ } | |
+ | |
+ scanSpec = new CassandraScanSpec(groupScan.getCassandraScanSpec().getKeyspace(), | |
+ groupScan.getCassandraScanSpec().getTable(), newFilters); | |
+ break; | |
+ } | |
+ | |
+ return scanSpec; | |
+ } | |
+ | |
+ public boolean isAllExpressionsConverted() { | |
+ return allExpressionsConverted; | |
+ } | |
+ | |
+ @Override | |
+ public CassandraScanSpec visitUnknown(LogicalExpression e, Void value) | |
+ throws RuntimeException { | |
+ allExpressionsConverted = false; | |
+ return null; | |
+ } | |
+ | |
+ @Override | |
+ public CassandraScanSpec visitBooleanOperator(BooleanOperator op, Void value) { | |
+ | |
+ List<LogicalExpression> args = op.args; | |
+ CassandraScanSpec nodeScanSpec = null; | |
+ String functionName = op.getName(); | |
+ for (int i = 0; i < args.size(); ++i) { | |
+ switch (functionName) { | |
+ case "booleanAnd": | |
+ if (nodeScanSpec == null) { | |
+ nodeScanSpec = args.get(i).accept(this, null); | |
+ } else { | |
+ CassandraScanSpec scanSpec = args.get(i).accept(this, null); | |
+ if (scanSpec != null) { | |
+ nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec); | |
+ } else { | |
+ allExpressionsConverted = false; | |
+ } | |
+ } | |
+ break; | |
+ } | |
+ } | |
+ return nodeScanSpec; | |
+ } | |
+ | |
+ @Override | |
+ public CassandraScanSpec visitFunctionCall(FunctionCall call, Void value) | |
+ throws RuntimeException { | |
+ | |
+ CassandraScanSpec nodeScanSpec = null; | |
+ String functionName = call.getName(); | |
+ ImmutableList<LogicalExpression> args = call.args; | |
+ | |
+ if (CassandraCompareFunctionsProcessor.isCompareFunction(functionName)) { | |
+ CassandraCompareFunctionsProcessor processor = CassandraCompareFunctionsProcessor.process(call); | |
+ if (processor.isSuccess()) { | |
+ nodeScanSpec = createCassandraScanSpec(call, processor); | |
+ } | |
+ } else { | |
+ switch (functionName) { | |
+ case "booleanAnd": | |
+ CassandraScanSpec firstScanSpec = args.get(0).accept(this, null); | |
+ for (int i = 1; i < args.size(); ++i) { | |
+ CassandraScanSpec nextScanSpec = args.get(i).accept(this, null); | |
+ if (firstScanSpec != null && nextScanSpec != null) { | |
+ nodeScanSpec = mergeScanSpecs(functionName, firstScanSpec, nextScanSpec); | |
+ } else { | |
+ allExpressionsConverted = false; | |
+ if ("booleanAnd".equals(functionName)) { | |
+ nodeScanSpec = firstScanSpec == null ? nextScanSpec : firstScanSpec; | |
+ } | |
+ } | |
+ firstScanSpec = nodeScanSpec; | |
+ } | |
+ break; | |
+ } | |
+ } | |
+ | |
+ if (nodeScanSpec == null) { | |
+ allExpressionsConverted = false; | |
+ } | |
+ | |
+ return nodeScanSpec; | |
+ } | |
+ | |
+ | |
+ | |
+ private CassandraScanSpec createCassandraScanSpec(FunctionCall call, CassandraCompareFunctionsProcessor processor) { | |
+ String functionName = processor.getFunctionName(); | |
+ SchemaPath field = processor.getPath(); | |
+ String fieldValue = processor.getValue(); | |
+ | |
+ Clause clause = null; | |
+ boolean isNullTest = false; | |
+ switch (functionName) { | |
+ case "equal": | |
+ clause = QueryBuilder.eq(field.getAsNamePart().getName(), fieldValue); | |
+ break; | |
+ case "greater_than_or_equal_to": | |
+ clause = QueryBuilder.gte(field.getAsNamePart().getName(), fieldValue); | |
+ break; | |
+ case "greater_than": | |
+ clause = QueryBuilder.gt(field.getAsNamePart().getName(), fieldValue); | |
+ break; | |
+ case "less_than_or_equal_to": | |
+ clause = QueryBuilder.lte(field.getAsNamePart().getName(), fieldValue); | |
+ break; | |
+ case "less_than": | |
+ clause = QueryBuilder.lt(field.getAsNamePart().getName(), fieldValue); | |
+ break; | |
+ case "isnull": | |
+ case "isNull": | |
+ case "is null": | |
+ clause = QueryBuilder.eq(field.getAsNamePart().getName(), null); | |
+ break; | |
+ } | |
+ | |
+ if (clause != null) { | |
+ List<Clause> filters = Lists.newArrayList(); | |
+ filters.add(clause); | |
+ return new CassandraScanSpec(groupScan.getCassandraScanSpec().getKeyspace(), | |
+ groupScan.getCassandraScanSpec().getTable(), filters); | |
+ } | |
+ // else | |
+ return null; | |
+ } | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraGroupScan.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraGroupScan.java | |
new file mode 100644 | |
index 0000000..10b3362 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraGroupScan.java | |
@@ -0,0 +1,478 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import java.io.IOException; | |
+import java.nio.CharBuffer; | |
+import java.nio.charset.Charset; | |
+import java.nio.charset.CharsetEncoder; | |
+import java.util.ArrayList; | |
+import java.util.Collections; | |
+import java.util.Comparator; | |
+import java.util.HashMap; | |
+import java.util.Iterator; | |
+import java.util.List; | |
+import java.util.Map; | |
+import java.util.PriorityQueue; | |
+import java.util.Queue; | |
+import java.util.Set; | |
+import java.util.concurrent.TimeUnit; | |
+ | |
+import com.datastax.driver.core.Cluster; | |
+import com.datastax.driver.core.Host; | |
+import com.datastax.driver.core.Metadata; | |
+import com.datastax.driver.core.ResultSet; | |
+import com.datastax.driver.core.Session; | |
+import com.datastax.driver.core.Statement; | |
+import com.datastax.driver.core.querybuilder.QueryBuilder; | |
+import org.apache.drill.common.exceptions.DrillRuntimeException; | |
+import org.apache.drill.common.exceptions.ExecutionSetupException; | |
+import org.apache.drill.common.expression.SchemaPath; | |
+import org.apache.drill.exec.physical.EndpointAffinity; | |
+import org.apache.drill.exec.physical.base.AbstractGroupScan; | |
+import org.apache.drill.exec.physical.base.GroupScan; | |
+import org.apache.drill.exec.physical.base.PhysicalOperator; | |
+import org.apache.drill.exec.physical.base.ScanStats; | |
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; | |
+import org.apache.drill.exec.store.StoragePluginRegistry; | |
+import org.apache.drill.exec.store.cassandra.CassandraSubScan.CassandraSubScanSpec; | |
+ | |
+import org.apache.drill.exec.store.cassandra.connection.CassandraConnectionManager; | |
+import parquet.org.codehaus.jackson.annotate.JsonCreator; | |
+import com.fasterxml.jackson.annotation.JacksonInject; | |
+import com.fasterxml.jackson.annotation.JsonIgnore; | |
+import com.fasterxml.jackson.annotation.JsonProperty; | |
+import com.fasterxml.jackson.annotation.JsonTypeName; | |
+import com.google.common.annotations.VisibleForTesting; | |
+import com.google.common.base.Preconditions; | |
+import com.google.common.base.Stopwatch; | |
+import com.google.common.collect.Lists; | |
+import com.google.common.collect.Maps; | |
+import com.google.common.collect.Sets; | |
+ | |
+@JsonTypeName("cassandra-scan") | |
+public class CassandraGroupScan extends AbstractGroupScan implements DrillCassandraConstants { | |
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraGroupScan.class); | |
+ | |
+ private static final Comparator<List<CassandraSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<CassandraSubScanSpec>>() { | |
+ @Override | |
+ public int compare(List<CassandraSubScanSpec> list1, List<CassandraSubScanSpec> list2) { | |
+ return list1.size() - list2.size(); | |
+ } | |
+ }; | |
+ | |
+ private static final Comparator<List<CassandraSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR); | |
+ | |
+ private String userName; | |
+ | |
+ private CassandraStoragePluginConfig storagePluginConfig; | |
+ | |
+ private List<SchemaPath> columns; | |
+ | |
+ private CassandraScanSpec cassandraScanSpec; | |
+ | |
+ private CassandraStoragePlugin storagePlugin; | |
+ | |
+ private Stopwatch watch = new Stopwatch(); | |
+ | |
+ private Map<Integer, List<CassandraSubScanSpec>> endpointFragmentMapping; | |
+ | |
+ private Set<Host> keyspaceHosts; | |
+ | |
+ private int totalAssignmentsTobeDone; | |
+ | |
+ private boolean filterPushedDown = false; | |
+ | |
+ private Map<String, CassandraPartitionToken> hostTokenMapping = new HashMap<>(); | |
+ | |
+ //private TableStatsCalculator statsCalculator; | |
+ | |
+ private long scanSizeInBytes = 0; | |
+ | |
+ private Metadata metadata; | |
+ | |
+ private Cluster cluster; | |
+ | |
+ private Session session; | |
+ | |
+ private ResultSet rs; | |
+ | |
+ @JsonCreator | |
+ public CassandraGroupScan(@JsonProperty("userName") String userName, | |
+ @JsonProperty("cassandraScanSpec") CassandraScanSpec cassandraScanSpec, | |
+ @JsonProperty("storage") CassandraStoragePluginConfig storagePluginConfig, | |
+ @JsonProperty("columns") List<SchemaPath> columns, | |
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { | |
+ this (userName, (CassandraStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), cassandraScanSpec, columns); | |
+ } | |
+ | |
+ public CassandraGroupScan(String userName, CassandraStoragePlugin storagePlugin, | |
+ CassandraScanSpec scanSpec, List<SchemaPath> columns) { | |
+ super(userName); | |
+ this.storagePlugin = storagePlugin; | |
+ this.storagePluginConfig = storagePlugin.getConfig(); | |
+ this.cassandraScanSpec = scanSpec; | |
+ this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns; | |
+ init(); | |
+ } | |
+ | |
+ /** | |
+ * Private constructor, used for cloning. | |
+ * @param that The CassandraGroupScan to clone | |
+ */ | |
+ private CassandraGroupScan(CassandraGroupScan that) { | |
+ super(that); | |
+ this.columns = that.columns; | |
+ this.cassandraScanSpec = that.cassandraScanSpec; | |
+ this.endpointFragmentMapping = that.endpointFragmentMapping; | |
+ this.keyspaceHosts = that.keyspaceHosts; | |
+ this.metadata = that.metadata; | |
+ this.storagePlugin = that.storagePlugin; | |
+ this.storagePluginConfig = that.storagePluginConfig; | |
+ this.filterPushedDown = that.filterPushedDown; | |
+ this.totalAssignmentsTobeDone = that.totalAssignmentsTobeDone; | |
+ this.hostTokenMapping = that.hostTokenMapping; | |
+ //this.statsCalculator = that.statsCalculator; | |
+ this.scanSizeInBytes = that.scanSizeInBytes; | |
+ } | |
+ | |
+ @Override | |
+ public GroupScan clone(List<SchemaPath> columns) { | |
+ CassandraGroupScan newScan = new CassandraGroupScan(this); | |
+ newScan.columns = columns; | |
+ return newScan; | |
+ } | |
+ | |
+ private void init() { | |
+ try { | |
+ logger.info(String.format("Getting cassandra session from host %s, port: %s.", | |
+ storagePluginConfig.getHosts(), storagePluginConfig.getPort())); | |
+ | |
+ cluster = CassandraConnectionManager.getCluster(storagePluginConfig.getHosts(), | |
+ storagePluginConfig.getPort()); | |
+ | |
+ session = cluster.connect(); | |
+ | |
+ metadata = session.getCluster().getMetadata(); | |
+ | |
+ Charset charset = Charset.forName("UTF-8"); | |
+ CharsetEncoder encoder = charset.newEncoder(); | |
+ | |
+ keyspaceHosts = session.getCluster().getMetadata().getAllHosts(); | |
+ | |
+ logger.info("KeySpace hosts for Cassandra : {}", keyspaceHosts); | |
+ | |
+ if(null == keyspaceHosts){ | |
+ logger.error(String.format("No Keyspace Hosts Found for Cassandra %s:%s .", | |
+ storagePluginConfig.getHosts(), storagePluginConfig.getPort())); | |
+ throw new DrillRuntimeException(String.format("No Keyspace Hosts Found for Cassandra %s:%s .", | |
+ storagePluginConfig.getHosts(), storagePluginConfig.getPort())); | |
+ } | |
+ | |
+ String[] tokens = CassandraUtil.getPartitionTokens(metadata.getPartitioner(), keyspaceHosts.size()); | |
+ int index = 0; | |
+ for(Host h : keyspaceHosts){ | |
+ CassandraPartitionToken token = new CassandraPartitionToken(); | |
+ token.setLow(tokens[index]); | |
+ if(index+1 < tokens.length){ | |
+ token.setHigh(tokens[index+1]); | |
+ } | |
+ hostTokenMapping.put(h.getAddress().getHostName(), token); | |
+ index++; | |
+ } | |
+ logger.info("Host token mapping: {}", hostTokenMapping); | |
+ | |
+ Statement q = QueryBuilder.select().all().from(cassandraScanSpec.getKeyspace(), | |
+ cassandraScanSpec.getTable()); | |
+ | |
+ if(session.isClosed()){ | |
+ logger.error("Error in initializing CasandraGroupScan. Session Closed."); | |
+ throw new DrillRuntimeException("Error in initializing CasandraGroupScan. Session Closed."); | |
+ } | |
+ | |
+ rs = session.execute(q); | |
+ this.totalAssignmentsTobeDone = rs.getAvailableWithoutFetching(); | |
+ | |
+ } | |
+ catch(Exception e){ | |
+ logger.error("Error in initializing CasandraGroupScan, Error: "+e.getMessage()); | |
+ throw new DrillRuntimeException(e); | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public List<EndpointAffinity> getOperatorAffinity() { | |
+ watch.reset(); | |
+ watch.start(); | |
+ Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>(); | |
+ for (DrillbitEndpoint ep : storagePlugin.getContext().getBits()) { | |
+ endpointMap.put(ep.getAddress(), ep); | |
+ } | |
+ | |
+ logger.info("Building affinity map. Endpoints: {}, KeyspaceHosts: {}", endpointMap, keyspaceHosts); | |
+ | |
+ Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>(); | |
+ for (Host host : keyspaceHosts) { | |
+ DrillbitEndpoint ep = endpointMap.get(host.getAddress().getHostName()); | |
+ if (ep != null) { | |
+ EndpointAffinity affinity = affinityMap.get(ep); | |
+ if (affinity == null) { | |
+ affinityMap.put(ep, new EndpointAffinity(ep, 1)); | |
+ } else { | |
+ affinity.addAffinity(1); | |
+ } | |
+ } | |
+ } | |
+ | |
+ logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS) / 1000); | |
+ | |
+ if(null == affinityMap || affinityMap.size()==0){ | |
+ logger.info("Affinity map is empty for CassandraGroupScan {}:{}", storagePluginConfig.getHosts(), storagePluginConfig.getPort()); | |
+ } | |
+ return Lists.newArrayList(affinityMap.values()); | |
+ } | |
+ | |
+ /** | |
+ * | |
+ * @param incomingEndpoints | |
+ */ | |
+ @Override | |
+ public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) { | |
+ watch.reset(); | |
+ watch.start(); | |
+ | |
+ final int numSlots = incomingEndpoints.size(); | |
+ | |
+ Preconditions.checkArgument(numSlots <= totalAssignmentsTobeDone, | |
+ String.format("Incoming endpoints %d is greater than number of chunks %d", | |
+ numSlots, totalAssignmentsTobeDone)); | |
+ | |
+ final int minPerEndpointSlot = (int) Math.floor((double) totalAssignmentsTobeDone / numSlots); | |
+ final int maxPerEndpointSlot = (int) Math.ceil((double) totalAssignmentsTobeDone / numSlots); | |
+ | |
+ /* Map for (index,endpoint)'s */ | |
+ endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots); | |
+ /* Reverse mapping for above indexes */ | |
+ Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap(); | |
+ | |
+ /* | |
+ * Initialize these two maps | |
+ */ | |
+ for (int i = 0; i < numSlots; ++i) { | |
+ endpointFragmentMapping.put(i, new ArrayList<CassandraSubScanSpec>(maxPerEndpointSlot)); | |
+ String hostname = incomingEndpoints.get(i).getAddress(); | |
+ Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname); | |
+ if (hostIndexQueue == null) { | |
+ hostIndexQueue = Lists.newLinkedList(); | |
+ endpointHostIndexListMap.put(hostname, hostIndexQueue); | |
+ } | |
+ hostIndexQueue.add(i); | |
+ } | |
+ | |
+ Set<Host> hostsToAssignSet = Sets.newHashSet(keyspaceHosts); | |
+ | |
+ for (Iterator<Host> hostIterator = hostsToAssignSet.iterator(); hostIterator.hasNext(); /*nothing*/) { | |
+ Host hostEntry = hostIterator.next(); | |
+ | |
+ Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(hostEntry.getAddress().getHostName()); | |
+ if (endpointIndexlist != null) { | |
+ Integer slotIndex = endpointIndexlist.poll(); | |
+ List<CassandraSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex); | |
+ endpointSlotScanList.add(hostToSubScanSpec(hostEntry, storagePluginConfig.getHosts())); | |
+ // add to the tail of the slot list, to add more later in round robin fashion | |
+ endpointIndexlist.offer(slotIndex); | |
+ // this region has been assigned | |
+ hostIterator.remove(); | |
+ } | |
+ } | |
+ | |
+ /* | |
+ * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more. | |
+ */ | |
+ PriorityQueue<List<CassandraSubScanSpec>> minHeap = new PriorityQueue<List<CassandraSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR); | |
+ PriorityQueue<List<CassandraSubScanSpec>> maxHeap = new PriorityQueue<List<CassandraSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV); | |
+ for(List<CassandraSubScanSpec> listOfScan : endpointFragmentMapping.values()) { | |
+ if (listOfScan.size() < minPerEndpointSlot) { | |
+ minHeap.offer(listOfScan); | |
+ } else if (listOfScan.size() > minPerEndpointSlot) { | |
+ maxHeap.offer(listOfScan); | |
+ } | |
+ } | |
+ | |
+ /* | |
+ * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments. | |
+ */ | |
+ if (hostsToAssignSet.size() > 0) { | |
+ for (Host hostEntry : hostsToAssignSet) { | |
+ List<CassandraSubScanSpec> smallestList = minHeap.poll(); | |
+ smallestList.add(hostToSubScanSpec(hostEntry, storagePluginConfig.getHosts())); | |
+ if (smallestList.size() < maxPerEndpointSlot) { | |
+ minHeap.offer(smallestList); | |
+ } | |
+ } | |
+ } | |
+ | |
+ /* | |
+ * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more. | |
+ */ | |
+ try { | |
+ // If there is more work left | |
+ if(maxHeap.peek() != null && maxHeap.peek().size() > 0) { | |
+ while (minHeap.peek() != null && minHeap.peek().size() <= minPerEndpointSlot) { | |
+ List<CassandraSubScanSpec> smallestList = minHeap.poll(); | |
+ List<CassandraSubScanSpec> largestList = maxHeap.poll(); | |
+ | |
+ smallestList.add(largestList.remove(largestList.size() - 1)); | |
+ if (largestList.size() > minPerEndpointSlot) { | |
+ maxHeap.offer(largestList); | |
+ } | |
+ if (smallestList.size() <= minPerEndpointSlot) { | |
+ minHeap.offer(smallestList); | |
+ } | |
+ } | |
+ } | |
+ }catch(Exception e){ | |
+ e.printStackTrace(); | |
+ } | |
+ | |
+ | |
+ /* no slot should be empty at this point */ | |
+ assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format( | |
+ "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.", | |
+ incomingEndpoints, endpointFragmentMapping.toString()); | |
+ | |
+ logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}", | |
+ watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString()); | |
+ | |
+ } | |
+ | |
+ private CassandraSubScanSpec hostToSubScanSpec(Host host, List<String> contactPoints) { | |
+ CassandraScanSpec spec = cassandraScanSpec; | |
+ CassandraPartitionToken token = hostTokenMapping.get(host.getAddress().getHostName()); | |
+ | |
+ return new CassandraSubScanSpec() | |
+ .setTable(spec.getTable()) | |
+ .setKeyspace(spec.getKeyspace()) | |
+ .setFilter(spec.getFilters()) | |
+ .setHosts(contactPoints) | |
+ .setPort(storagePluginConfig.getPort()) | |
+ .setStartToken(token != null ? token.getLow() : null) | |
+ .setEndToken(token != null ? token.getHigh() : null); | |
+ | |
+ } | |
+ | |
+ private boolean isNullOrEmpty(byte[] key) { | |
+ return key == null || key.length == 0; | |
+ } | |
+ | |
+ @Override | |
+ public CassandraSubScan getSpecificScan(int minorFragmentId) { | |
+ assert minorFragmentId < endpointFragmentMapping.size() : String.format( | |
+ "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(), | |
+ minorFragmentId); | |
+ return new CassandraSubScan(storagePlugin, storagePluginConfig, endpointFragmentMapping.get(minorFragmentId), columns); | |
+ } | |
+ | |
+ @Override | |
+ public int getMaxParallelizationWidth() { | |
+ return -1; | |
+ } | |
+ | |
+ @Override | |
+ public ScanStats getScanStats() { | |
+ //TODO | |
+ return ScanStats.TRIVIAL_TABLE; | |
+ } | |
+ | |
+ @Override | |
+ @JsonIgnore | |
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { | |
+ Preconditions.checkArgument(children.isEmpty()); | |
+ return new CassandraGroupScan(this); | |
+ } | |
+ | |
+ @JsonIgnore | |
+ public CassandraStoragePlugin getStoragePlugin() { | |
+ return storagePlugin; | |
+ } | |
+ | |
+ @JsonIgnore | |
+ public String getTableName() { | |
+ return getCassandraScanSpec().getTable(); | |
+ } | |
+ | |
+ @Override | |
+ public String getDigest() { | |
+ return toString(); | |
+ } | |
+ | |
+ @Override | |
+ public String toString() { | |
+ return "CassandraGroupScan [CassandraScanSpec=" | |
+ + cassandraScanSpec + ", columns=" | |
+ + columns + "]"; | |
+ } | |
+ | |
+ @JsonProperty("storage") | |
+ public CassandraStoragePluginConfig getStorageConfig() { | |
+ return this.storagePluginConfig; | |
+ } | |
+ | |
+ @JsonProperty | |
+ public List<SchemaPath> getColumns() { | |
+ return columns; | |
+ } | |
+ | |
+ @JsonProperty | |
+ public CassandraScanSpec getCassandraScanSpec() { | |
+ return cassandraScanSpec; | |
+ } | |
+ | |
+ @Override | |
+ @JsonIgnore | |
+ public boolean canPushdownProjects(List<SchemaPath> columns) { | |
+ return true; | |
+ } | |
+ | |
+ @JsonIgnore | |
+ public void setFilterPushedDown(boolean b) { | |
+ this.filterPushedDown = true; | |
+ } | |
+ | |
+ @JsonIgnore | |
+ public boolean isFilterPushedDown() { | |
+ return filterPushedDown; | |
+ } | |
+ | |
+ /** | |
+ * Empty constructor, do not use, only for testing. | |
+ */ | |
+ @VisibleForTesting | |
+ public CassandraGroupScan() { | |
+ super((String)null); | |
+ } | |
+ | |
+ /** | |
+ * Do not use, only for testing. | |
+ */ | |
+ @VisibleForTesting | |
+ public void setCassandraScanSpec(CassandraScanSpec cassandraScanSpec) { | |
+ this.cassandraScanSpec = cassandraScanSpec; | |
+ } | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPartitionToken.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPartitionToken.java | |
new file mode 100644 | |
index 0000000..816324e | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPartitionToken.java | |
@@ -0,0 +1,48 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+public class CassandraPartitionToken { | |
+ | |
+ private String low; | |
+ private String high; | |
+ | |
+ public String getLow() { | |
+ return low; | |
+ } | |
+ | |
+ public void setLow(String low) { | |
+ this.low = low; | |
+ } | |
+ | |
+ public String getHigh() { | |
+ return high; | |
+ } | |
+ | |
+ public void setHigh(String high) { | |
+ this.high = high; | |
+ } | |
+ | |
+ @Override | |
+ public String toString() { | |
+ return "CassandraPartitionToken{" + | |
+ "low='" + low + '\'' + | |
+ ", high='" + high + '\'' + | |
+ '}'; | |
+ } | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPushDownFilterForScan.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPushDownFilterForScan.java | |
new file mode 100644 | |
index 0000000..4b5a295 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPushDownFilterForScan.java | |
@@ -0,0 +1,110 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import java.io.IOException; | |
+ | |
+import org.apache.drill.common.exceptions.DrillRuntimeException; | |
+import org.apache.drill.common.expression.LogicalExpression; | |
+import org.apache.drill.exec.planner.logical.DrillOptiq; | |
+import org.apache.drill.exec.planner.logical.DrillParseContext; | |
+import org.apache.drill.exec.planner.logical.RelOptHelper; | |
+import org.apache.drill.exec.planner.physical.FilterPrel; | |
+import org.apache.drill.exec.planner.physical.PrelUtil; | |
+import org.apache.drill.exec.planner.physical.ScanPrel; | |
+import org.apache.drill.exec.store.StoragePluginOptimizerRule; | |
+import org.apache.calcite.rel.RelNode; | |
+import org.apache.calcite.plan.RelOptRuleCall; | |
+import org.apache.calcite.plan.RelOptRuleOperand; | |
+import org.apache.calcite.rex.RexNode; | |
+import org.slf4j.Logger; | |
+import org.slf4j.LoggerFactory; | |
+ | |
+import com.google.common.collect.ImmutableList; | |
+ | |
+public class CassandraPushDownFilterForScan extends StoragePluginOptimizerRule { | |
+ | |
+ /* Flag to bypass filter pushdown */ | |
+ static final boolean BYPASS_CASSANDRA_FILTER_PUSHDOWN = true; | |
+ | |
+ public static final StoragePluginOptimizerRule INSTANCE = new CassandraPushDownFilterForScan(); | |
+ | |
+ private CassandraPushDownFilterForScan() { | |
+ super(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "CassandraPushDownFilterForScan"); | |
+ } | |
+ | |
+ public CassandraPushDownFilterForScan(RelOptRuleOperand operand, String description) { | |
+ super(operand, description); | |
+ } | |
+ | |
+ @Override | |
+ public void onMatch(RelOptRuleCall call) { | |
+ final ScanPrel scan = (ScanPrel) call.rel(1); | |
+ final FilterPrel filter = (FilterPrel) call.rel(0); | |
+ final RexNode condition = filter.getCondition(); | |
+ | |
+ CassandraGroupScan groupScan = (CassandraGroupScan)scan.getGroupScan(); | |
+ if (groupScan.isFilterPushedDown()) { | |
+ | |
+ /* | |
+ * The rule can get triggered again due to the transformed "scan => filter" sequence | |
+ * created by the earlier execution of this rule when we could not do a complete | |
+ * conversion of Optiq Filter's condition to Cassandra Filter. In such cases, we rely upon | |
+ * this flag to not do a re-processing of the rule on the already transformed call. | |
+ */ | |
+ return; | |
+ } | |
+ | |
+ LogicalExpression conditionExp = DrillOptiq.toDrill( | |
+ new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition); | |
+ CassandraFilterBuilder cassandraFilterBuilder = new CassandraFilterBuilder(groupScan, conditionExp); | |
+ CassandraScanSpec newScanSpec = cassandraFilterBuilder.parseTree(); | |
+ | |
+ // TODO : Fix Pushdown | |
+ if(BYPASS_CASSANDRA_FILTER_PUSHDOWN){ | |
+ return; | |
+ } | |
+ | |
+ if (newScanSpec == null) { | |
+ return; //no filter pushdown ==> No transformation. | |
+ } | |
+ | |
+ final CassandraGroupScan newGroupsScan = new CassandraGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns()); | |
+ newGroupsScan.setFilterPushedDown(true); | |
+ | |
+ final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType()); | |
+ if (cassandraFilterBuilder.isAllExpressionsConverted()) { | |
+ /* | |
+ * Since we could convert the entire filter condition expression into an cassandra filter, | |
+ * we can eliminate the filter operator altogether. | |
+ */ | |
+ call.transformTo(newScanPrel); | |
+ } else { | |
+ call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of((RelNode)newScanPrel))); | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public boolean matches(RelOptRuleCall call) { | |
+ final ScanPrel scan = (ScanPrel) call.rel(1); | |
+ if (scan.getGroupScan() instanceof CassandraGroupScan) { | |
+ return super.matches(call); | |
+ } | |
+ return false; | |
+ } | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraRecordReader.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraRecordReader.java | |
new file mode 100644 | |
index 0000000..51fa6ce | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraRecordReader.java | |
@@ -0,0 +1,359 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import java.io.IOException; | |
+import java.math.BigDecimal; | |
+import java.math.BigInteger; | |
+import java.net.UnknownHostException; | |
+import java.util.ArrayList; | |
+import java.util.Collection; | |
+import java.util.Date; | |
+import java.util.HashMap; | |
+import java.util.Iterator; | |
+import java.util.List; | |
+import java.util.Map; | |
+import java.util.Map.Entry; | |
+import java.util.Set; | |
+import java.util.concurrent.TimeUnit; | |
+ | |
+import com.datastax.driver.core.Cluster; | |
+import com.datastax.driver.core.ColumnDefinitions; | |
+import com.datastax.driver.core.ColumnMetadata; | |
+import com.datastax.driver.core.Host; | |
+import com.datastax.driver.core.ResultSet; | |
+import com.datastax.driver.core.Row; | |
+import com.datastax.driver.core.Session; | |
+import com.datastax.driver.core.Statement; | |
+import com.datastax.driver.core.querybuilder.Clause; | |
+import com.datastax.driver.core.querybuilder.QueryBuilder; | |
+import com.datastax.driver.core.querybuilder.Select; | |
+import com.google.common.base.Preconditions; | |
+import org.apache.drill.common.exceptions.DrillRuntimeException; | |
+import org.apache.drill.common.exceptions.ExecutionSetupException; | |
+import org.apache.drill.common.expression.PathSegment; | |
+import org.apache.drill.common.expression.PathSegment.NameSegment; | |
+import org.apache.drill.common.expression.SchemaPath; | |
+import org.apache.drill.common.types.TypeProtos; | |
+import org.apache.drill.common.types.TypeProtos.MinorType; | |
+import org.apache.drill.common.types.Types; | |
+import org.apache.drill.exec.ExecConstants; | |
+import org.apache.drill.exec.exception.SchemaChangeException; | |
+import org.apache.drill.exec.ops.FragmentContext; | |
+import org.apache.drill.exec.ops.OperatorContext; | |
+import org.apache.drill.exec.physical.impl.OutputMutator; | |
+import org.apache.drill.exec.record.MaterializedField; | |
+import org.apache.drill.exec.store.AbstractRecordReader; | |
+import org.apache.drill.exec.store.cassandra.connection.CassandraConnectionManager; | |
+import org.apache.drill.exec.store.ischema.Records; | |
+import org.apache.drill.exec.vector.NullableVarBinaryVector; | |
+import org.apache.drill.exec.vector.NullableVarCharVector; | |
+import org.apache.drill.exec.vector.RepeatedVarCharVector; | |
+import org.apache.drill.exec.vector.ValueVector; | |
+import org.apache.drill.exec.vector.VarBinaryVector; | |
+import org.apache.drill.exec.vector.complex.MapVector; | |
+import org.apache.drill.exec.vector.complex.fn.JsonReader; | |
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; | |
+import org.apache.drill.exec.vector.complex.writer.BaseWriter; | |
+import org.apache.hadoop.conf.Configuration; | |
+ | |
+import org.slf4j.Logger; | |
+import org.slf4j.LoggerFactory; | |
+ | |
+import com.google.common.base.Charsets; | |
+import com.google.common.base.Stopwatch; | |
+import com.google.common.collect.Lists; | |
+import com.google.common.collect.Sets; | |
+ | |
+public class CassandraRecordReader extends AbstractRecordReader implements DrillCassandraConstants{ | |
+ static final Logger logger = LoggerFactory.getLogger(CassandraRecordReader.class); | |
+ | |
+ private static final int TARGET_RECORD_COUNT = 3000; | |
+ | |
+ private Cluster cluster; | |
+ private Session session; | |
+ private ResultSet rs; | |
+ private Iterator<Row> it; | |
+ | |
+ private NullableVarCharVector valueVector; | |
+ private OutputMutator outputMutator; | |
+ private Map<String, MapVector> familyVectorMap; | |
+ | |
+ | |
+ private String cassandraTableName; | |
+ | |
+ private CassandraSubScan.CassandraSubScanSpec subScanSpec; | |
+ private String cassandraKeySpace; | |
+ private CassandraStoragePluginConfig cassandraConf; | |
+ private List<SchemaPath> projectedColumns; | |
+ private boolean allColumnsProjected; | |
+ | |
+ private NullableVarCharVector vector; | |
+ private List<ValueVector> vectors = Lists.newArrayList(); | |
+ | |
+ private VarBinaryVector rowKeyVector; | |
+ | |
+ | |
+ private FragmentContext fragmentContext; | |
+ private OperatorContext operatorContext; | |
+ | |
+ public CassandraRecordReader(CassandraStoragePluginConfig conf, CassandraSubScan.CassandraSubScanSpec subScanSpec, | |
+ List<SchemaPath> projectedColumns, FragmentContext context) { | |
+ this.cassandraTableName = Preconditions.checkNotNull(subScanSpec, "Cassandra reader needs a sub-scan spec").getTable(); | |
+ this.cassandraKeySpace = Preconditions.checkNotNull(subScanSpec, "Cassandra reader needs a sub-scan spec").getKeyspace(); | |
+ this.subScanSpec = subScanSpec; | |
+ this.projectedColumns = projectedColumns; | |
+ this.cassandraConf = conf; | |
+ | |
+ setColumns(projectedColumns); | |
+ } | |
+ | |
+ public OperatorContext getOperatorContext() { | |
+ return operatorContext; | |
+ } | |
+ | |
+ public void setOperatorContext(OperatorContext operatorContext) { | |
+ this.operatorContext = operatorContext; | |
+ } | |
+ | |
+ @Override | |
+ protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) { | |
+ //TODO: | |
+ return columns; | |
+ } | |
+ | |
+ | |
+ @Override | |
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { | |
+ this.outputMutator = output; | |
+ this.operatorContext = context; | |
+ | |
+ try{ | |
+ logger.debug("Opening scanner for Cassandra table '{}'.", cassandraTableName); | |
+ | |
+ List<String> host = subScanSpec.getHosts(); | |
+ int port = subScanSpec.getPort(); | |
+ | |
+ cluster = CassandraConnectionManager.getCluster(host, port); | |
+ session = cluster.connect(); | |
+ | |
+ List<ColumnMetadata> partitioncols = session.getCluster().getMetadata() | |
+ .getKeyspace(subScanSpec.getKeyspace()).getTable(subScanSpec.getTable()).getPartitionKey(); | |
+ | |
+ String[] partitionkeys = new String[partitioncols.size()]; | |
+ for(int index = 0; index < partitioncols.size(); index++){ | |
+ partitionkeys[index] = partitioncols.get(index).getName(); | |
+ } | |
+ | |
+ Statement q = null; | |
+ | |
+ /* Check projected columns */ | |
+ for(SchemaPath path : getColumns()){ | |
+ if(path.getAsNamePart().getName().equals("*")){ | |
+ allColumnsProjected = true; | |
+ break; | |
+ } | |
+ } | |
+ | |
+ /* Project only required columns */ | |
+ Select.Where where; | |
+ Select.Selection select = QueryBuilder.select(); | |
+ if(allColumnsProjected){ | |
+ where = select.all().from(subScanSpec.getKeyspace(), subScanSpec.getTable()).where(); | |
+ } | |
+ else{ | |
+ for(SchemaPath path : getColumns()){ | |
+ if(path.getAsNamePart().getName().equals("*")){ | |
+ continue; | |
+ } | |
+ else{ | |
+ select = select.column(path.getAsNamePart().getName()); | |
+ } | |
+ } | |
+ where = select.from(subScanSpec.getKeyspace(), subScanSpec.getTable()).where(); | |
+ } | |
+ | |
+ if(subScanSpec.getStartToken() != null){ | |
+ where = where.and(QueryBuilder.gte(QueryBuilder.token(partitionkeys), new Long(subScanSpec.getStartToken()))); | |
+ } | |
+ if(subScanSpec.getEndToken() != null){ | |
+ where = where.and(QueryBuilder.lt(QueryBuilder.token(partitionkeys), new Long(subScanSpec.getEndToken()))); | |
+ } | |
+ | |
+ q = where; | |
+ rs = session.execute(q); | |
+ | |
+ for(SchemaPath column : getColumns()){ | |
+ if(column.getAsNamePart().getName().equals("*") ){ | |
+ Iterator<ColumnDefinitions.Definition> iter = rs.getColumnDefinitions().iterator(); | |
+ | |
+ /* Add all columns to ValueVector */ | |
+ while(iter.hasNext()) { | |
+ ColumnDefinitions.Definition def = iter.next(); | |
+ MaterializedField field = MaterializedField.create(def.getName(), COLUMN_TYPE); | |
+ vector = this.outputMutator.addField(field, NullableVarCharVector.class); | |
+ } | |
+ } | |
+ else { | |
+ MaterializedField field = MaterializedField.create(column.getRootSegment().getPath(), COLUMN_TYPE); | |
+ vector = this.outputMutator.addField(field, NullableVarCharVector.class); | |
+ } | |
+ } | |
+ | |
+ } catch (SchemaChangeException | IOException e) { | |
+ throw new ExecutionSetupException("Failure in Cassandra Record Reader setup. Cause: ",e); | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public int next() { | |
+ Stopwatch watch = new Stopwatch(); | |
+ watch.start(); | |
+ int rowCount = 0; | |
+ Row row=null; | |
+ int start, end, batchsize=0; | |
+ start = end = -1; | |
+ try{ | |
+ vectors = Lists.newArrayList(); | |
+ // TODO: Use Batch Size - TARGET_RECORD_COUNT(3000) | |
+ for (; rs.iterator().hasNext(); rowCount++) { | |
+ | |
+ if (operatorContext != null) { | |
+ operatorContext.getStats().startWait(); | |
+ } | |
+ try { | |
+ if (rs.iterator().hasNext()) { | |
+ row = rs.iterator().next(); | |
+ } | |
+ }finally { | |
+ if (operatorContext != null) { | |
+ operatorContext.getStats().stopWait(); | |
+ } | |
+ } | |
+ if (row == null) { | |
+ break; | |
+ } | |
+ | |
+ start = end = -1; | |
+ for (SchemaPath col : getColumns()){ | |
+ | |
+ if(col.getAsNamePart().getName().equals("*") ){ | |
+ /* Add all columns to ValueVector */ | |
+ for(ColumnDefinitions.Definition def : row.getColumnDefinitions()){ | |
+ updateValueVector(row, def.getName(), rowCount); | |
+ } | |
+ } | |
+ else { | |
+ updateValueVector(row, col.getAsNamePart().getName(), rowCount); | |
+ } | |
+ } | |
+ logger.debug("text scan batch size {}", batchsize); | |
+ } | |
+ | |
+ for (ValueVector v : vectors) { | |
+ v.getMutator().setValueCount(rowCount); | |
+ } | |
+ logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), rowCount); | |
+ return rowCount; | |
+ } catch (Exception e) { | |
+ if (operatorContext != null) { | |
+ operatorContext.getStats().stopWait(); | |
+ } | |
+ throw new DrillRuntimeException(e); | |
+ } | |
+ } | |
+ | |
+ private void updateValueVector(Row row, String colname, int rowCount){ | |
+ try { | |
+ String val = getAsString(row, colname); | |
+ int start = 0; | |
+ int end = val.length(); | |
+ | |
+ MaterializedField field = MaterializedField.create(colname, COLUMN_TYPE); | |
+ vector = outputMutator.addField(field, NullableVarCharVector.class); | |
+ | |
+ vector.getMutator().setSafe(rowCount, val.getBytes(), start, end - start); | |
+ vectors.add(vector); | |
+ } catch(Exception e){ | |
+ e.printStackTrace(); | |
+ | |
+ throw new DrillRuntimeException(e); | |
+ } | |
+ } | |
+ | |
+ | |
+ @Override | |
+ public void cleanup() { | |
+ try { | |
+ if (session != null) { | |
+ session.close(); | |
+ } | |
+ } catch (Exception e) { | |
+ logger.error("Failure while closing Cassandra table. Error: {}", e.getMessage()); | |
+ throw new DrillRuntimeException(String.format("Failure while closing Cassandra table. Error: %s",e.getMessage())); | |
+ } | |
+ } | |
+ | |
+ | |
+ | |
+ /** | |
+ * Utility function to get the type of the column and return its String value. | |
+ * TODO: Convert to appropriate Drill Type. | |
+ * | |
+ * @param r | |
+ * @param colname | |
+ * @return | |
+ */ | |
+ public String getAsString(Row r, String colname){ | |
+ String value = null; | |
+ try { | |
+ Class clazz = r.getColumnDefinitions().getType(colname).asJavaClass(); | |
+ | |
+ if (clazz.isInstance(Long.MIN_VALUE)) { | |
+ value = String.valueOf(r.getLong(colname)); | |
+ } else if (clazz.isInstance(Boolean.FALSE)) { | |
+ value = String.valueOf(r.getBool(colname)); | |
+ } else if (clazz.isInstance(Byte.MIN_VALUE)) { | |
+ value = String.valueOf(r.getBytes(colname)); | |
+ } else if (clazz.isInstance(new Date())) { | |
+ value = String.valueOf(r.getDate(colname)); | |
+ } else if (clazz.isInstance(BigDecimal.ZERO)) { | |
+ value = String.valueOf(r.getDecimal(colname)); | |
+ } else if (clazz.isInstance(Double.MIN_VALUE)) { | |
+ value = String.valueOf(r.getDouble(colname)); | |
+ } else if (clazz.isInstance(Float.MIN_VALUE)) { | |
+ value = String.valueOf(r.getFloat(colname)); | |
+ } else if (clazz.isInstance(Integer.MIN_VALUE)) { | |
+ value = String.valueOf(r.getInt(colname)); | |
+ } else if (clazz.isInstance(new String())) { | |
+ value = r.getString(colname); | |
+ } else if (clazz.isInstance(BigInteger.ZERO)) { | |
+ value = String.valueOf(r.getVarint(colname)); | |
+ } else { | |
+ value = null; | |
+ } | |
+ } | |
+ catch(Exception e){ | |
+ throw new DrillRuntimeException(String.format("Unable to get Cassandra column: %s, of type: %s." | |
+ , colname, r.getColumnDefinitions().getType(colname).asJavaClass().getCanonicalName())); | |
+ } | |
+ return value; | |
+ } | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanBatchCreator.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanBatchCreator.java | |
new file mode 100644 | |
index 0000000..3f9914c | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanBatchCreator.java | |
@@ -0,0 +1,55 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import java.util.List; | |
+ | |
+import org.apache.drill.common.exceptions.ExecutionSetupException; | |
+import org.apache.drill.common.expression.SchemaPath; | |
+import org.apache.drill.exec.ops.FragmentContext; | |
+import org.apache.drill.exec.physical.base.GroupScan; | |
+import org.apache.drill.exec.physical.impl.BatchCreator; | |
+import org.apache.drill.exec.physical.impl.ScanBatch; | |
+import org.apache.drill.exec.record.RecordBatch; | |
+import org.apache.drill.exec.store.RecordReader; | |
+ | |
+import com.google.common.base.Preconditions; | |
+import com.google.common.collect.Lists; | |
+ | |
+public class CassandraScanBatchCreator implements BatchCreator<CassandraSubScan>{ | |
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraScanBatchCreator.class); | |
+ | |
+ @Override | |
+ public ScanBatch getBatch(FragmentContext context, CassandraSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { | |
+ Preconditions.checkArgument(children.isEmpty()); | |
+ List<RecordReader> readers = Lists.newArrayList(); | |
+ List<SchemaPath> columns = null; | |
+ for(CassandraSubScan.CassandraSubScanSpec scanSpec : subScan.getChunkScanSpecList()){ | |
+ try { | |
+ if ((columns = subScan.getColumns())==null) { | |
+ columns = GroupScan.ALL_COLUMNS; | |
+ } | |
+ readers.add(new CassandraRecordReader(subScan.getCassandraStoragePlugin().getConfig(), scanSpec, columns, context)); | |
+ } catch (Exception e1) { | |
+ throw new ExecutionSetupException(e1); | |
+ } | |
+ } | |
+ return new ScanBatch(subScan, context, readers.iterator()); | |
+ } | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanSpec.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanSpec.java | |
new file mode 100644 | |
index 0000000..428a12e | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanSpec.java | |
@@ -0,0 +1,66 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import com.datastax.driver.core.querybuilder.Clause; | |
+import com.fasterxml.jackson.annotation.JsonCreator; | |
+import com.fasterxml.jackson.annotation.JsonIgnore; | |
+import com.fasterxml.jackson.annotation.JsonProperty; | |
+ | |
+import java.util.List; | |
+ | |
+public class CassandraScanSpec { | |
+ private String keyspace; | |
+ private String table; | |
+ | |
+ @JsonIgnore | |
+ private List<Clause> filters; | |
+ | |
+ @JsonCreator | |
+ public CassandraScanSpec(@JsonProperty("keyspace") String keyspace, | |
+ @JsonProperty("table") String table) { | |
+ this.keyspace = keyspace; | |
+ this.table = table; | |
+ } | |
+ | |
+ public CassandraScanSpec(String keyspace, String table, List<Clause> filters) { | |
+ this.keyspace = keyspace; | |
+ this.table = table; | |
+ this.filters = filters; | |
+ } | |
+ | |
+ public String getKeyspace() { | |
+ return keyspace; | |
+ } | |
+ | |
+ public String getTable() { | |
+ return table; | |
+ } | |
+ | |
+ public List<Clause> getFilters() { | |
+ return filters; | |
+ } | |
+ | |
+ | |
+ @Override | |
+ public String toString() { | |
+ return "CassandraScanSpec [keyspace=" + keyspace + ", table=" | |
+ + table + ", filters=" + filters + "]"; | |
+ } | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSchemaFactory.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSchemaFactory.java | |
new file mode 100644 | |
index 0000000..48d5ce5 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSchemaFactory.java | |
@@ -0,0 +1,198 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import java.io.IOException; | |
+import java.net.UnknownHostException; | |
+import java.util.Collection; | |
+import java.util.Collections; | |
+import java.util.List; | |
+import java.util.Set; | |
+import java.util.concurrent.ExecutionException; | |
+import java.util.concurrent.TimeUnit; | |
+ | |
+import com.datastax.driver.core.Cluster; | |
+import com.datastax.driver.core.KeyspaceMetadata; | |
+import com.datastax.driver.core.TableMetadata; | |
+import org.apache.calcite.schema.Schema; | |
+import org.apache.calcite.schema.SchemaPlus; | |
+ | |
+import org.apache.drill.common.exceptions.ExecutionSetupException; | |
+import org.apache.drill.exec.planner.logical.DrillTable; | |
+import org.apache.drill.exec.planner.logical.DynamicDrillTable; | |
+import org.apache.drill.exec.rpc.user.UserSession; | |
+import org.apache.drill.exec.store.AbstractSchema; | |
+import org.apache.drill.exec.store.SchemaFactory; | |
+import org.apache.drill.exec.store.SchemaConfig; | |
+ | |
+ | |
+import org.slf4j.Logger; | |
+import org.slf4j.LoggerFactory; | |
+ | |
+import com.google.common.cache.CacheBuilder; | |
+import com.google.common.cache.CacheLoader; | |
+import com.google.common.cache.LoadingCache; | |
+import com.google.common.collect.ImmutableList; | |
+import com.google.common.collect.Lists; | |
+import com.google.common.collect.Sets; | |
+ | |
+ | |
+public class CassandraSchemaFactory implements SchemaFactory { | |
+ | |
+ static final Logger logger = LoggerFactory.getLogger(CassandraSchemaFactory.class); | |
+ | |
+ private static final String DATABASES = "keyspaces"; | |
+ | |
+ private LoadingCache<String, List<String>> keyspaceCache; | |
+ private LoadingCache<String, List<String>> tableCache; | |
+ private final String schemaName; | |
+ private final CassandraStoragePlugin plugin; | |
+ | |
+ private final Cluster cluster; | |
+ | |
+ public CassandraSchemaFactory(CassandraStoragePlugin schema, String schemaName) | |
+ throws ExecutionSetupException, UnknownHostException { | |
+ List<String> hosts = schema.getConfig().getHosts(); | |
+ int port = schema.getConfig().getPort(); | |
+ | |
+ this.plugin = schema; | |
+ this.schemaName = schemaName; | |
+ | |
+ Cluster.Builder builder = Cluster.builder(); | |
+ for(String host : hosts){ | |
+ builder = builder.addContactPoint(host); | |
+ } | |
+ builder = builder.withPort(port); | |
+ cluster = builder.build(); | |
+ | |
+ keyspaceCache = CacheBuilder // | |
+ .newBuilder() // | |
+ .expireAfterAccess(1, TimeUnit.MINUTES) // | |
+ .build(new KeyspaceLoader()); | |
+ | |
+ tableCache = CacheBuilder // | |
+ .newBuilder() // | |
+ .expireAfterAccess(1, TimeUnit.MINUTES) // | |
+ .build(new TableNameLoader()); | |
+ } | |
+ | |
+ | |
+ /** | |
+ * Utility class for fetching all the key spaces in cluster. | |
+ */ | |
+ private class KeyspaceLoader extends CacheLoader<String, List<String>> { | |
+ | |
+ @Override | |
+ public List<String> load(String key) throws Exception { | |
+ if (!DATABASES.equals(key)) { | |
+ throw new UnsupportedOperationException(); | |
+ } | |
+ List<KeyspaceMetadata> keyspaces = cluster.getMetadata().getKeyspaces(); | |
+ List<String> keys = Lists.newArrayList(); | |
+ for(KeyspaceMetadata k : keyspaces){ | |
+ keys.add(k.getName()); | |
+ } | |
+ return keys; | |
+ } | |
+ } | |
+ | |
+ /** | |
+ * Utility class for populating all tables in a provided key space. | |
+ */ | |
+ private class TableNameLoader extends CacheLoader<String, List<String>> { | |
+ | |
+ @Override | |
+ public List<String> load(String keyspace) throws Exception { | |
+ Collection<TableMetadata> tables = cluster.getMetadata().getKeyspace(keyspace).getTables(); | |
+ List<String> tabs = Lists.newArrayList(); | |
+ for(TableMetadata t : tables){ | |
+ tabs.add(t.getName()); | |
+ } | |
+ return tabs; | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { | |
+ CassandraSchema schema = new CassandraSchema(schemaName); | |
+ SchemaPlus schemaPlus = parent.add(schemaName, schema); | |
+ schema.setHolder(schemaPlus); | |
+ } | |
+ | |
+ class CassandraSchema extends AbstractSchema { | |
+ | |
+ public CassandraSchema(String name) { | |
+ super(ImmutableList.<String> of(), name); | |
+ } | |
+ | |
+ @Override | |
+ public AbstractSchema getSubSchema(String name) { | |
+ List<String> tables; | |
+ try { | |
+ tables = tableCache.get(name); | |
+ return new CassandraDatabaseSchema(tables, this, name); | |
+ } catch (ExecutionException e) { | |
+ logger.warn("Failure while attempting to access Cassandra DataBase '{}'.", | |
+ name, e.getCause()); | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ void setHolder(SchemaPlus plusOfThis) { | |
+ for (String s : getSubSchemaNames()) { | |
+ plusOfThis.add(s, getSubSchema(s)); | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public boolean showInInformationSchema() { | |
+ return false; | |
+ } | |
+ | |
+ @Override | |
+ public Set<String> getSubSchemaNames() { | |
+ try { | |
+ List<String> dbs = keyspaceCache.get(DATABASES); | |
+ return Sets.newHashSet(dbs); | |
+ } catch (ExecutionException e) { | |
+ logger.warn("Failure while getting Cassandra keyspace list.", e); | |
+ return Collections.emptySet(); | |
+ } | |
+ } | |
+ | |
+ List<String> getTableNames(String dbName) { | |
+ try { | |
+ return tableCache.get(dbName); | |
+ } catch (ExecutionException e) { | |
+ logger.warn("Failure while loading table names for keyspace '{}'.", | |
+ dbName, e.getCause()); | |
+ return Collections.emptyList(); | |
+ } | |
+ } | |
+ | |
+ DrillTable getDrillTable(String dbName, String tableName) { | |
+ CassandraScanSpec cassandraScanSpec = new CassandraScanSpec(dbName, tableName); | |
+ return new DynamicDrillTable(plugin, schemaName, cassandraScanSpec); | |
+ } | |
+ | |
+ @Override | |
+ public String getTypeName() { | |
+ return CassandraStoragePluginConfig.NAME; | |
+ } | |
+ } | |
+} | |
\ No newline at end of file | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java | |
new file mode 100644 | |
index 0000000..2f89d85 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java | |
@@ -0,0 +1,86 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import java.io.IOException; | |
+import java.util.Set; | |
+ | |
+import org.apache.calcite.schema.SchemaPlus; | |
+import org.apache.drill.exec.store.SchemaConfig; | |
+ | |
+import org.apache.drill.common.JSONOptions; | |
+import org.apache.drill.common.exceptions.ExecutionSetupException; | |
+import org.apache.drill.exec.physical.base.AbstractGroupScan; | |
+import org.apache.drill.exec.server.DrillbitContext; | |
+import org.apache.drill.exec.store.AbstractStoragePlugin; | |
+import org.apache.drill.exec.store.StoragePluginOptimizerRule; | |
+import org.slf4j.Logger; | |
+import org.slf4j.LoggerFactory; | |
+ | |
+import com.fasterxml.jackson.core.type.TypeReference; | |
+import com.fasterxml.jackson.databind.ObjectMapper; | |
+import com.google.common.collect.ImmutableSet; | |
+ | |
+public class CassandraStoragePlugin extends AbstractStoragePlugin { | |
+ static final Logger logger = LoggerFactory.getLogger(CassandraStoragePlugin.class); | |
+ | |
+ private DrillbitContext context; | |
+ private CassandraStoragePluginConfig cassandraConfig; | |
+ private CassandraSchemaFactory schemaFactory; | |
+ | |
+ public CassandraStoragePlugin(CassandraStoragePluginConfig cassandraConfig, | |
+ DrillbitContext context, String name) throws IOException, | |
+ ExecutionSetupException { | |
+ this.context = context; | |
+ this.cassandraConfig = cassandraConfig; | |
+ this.schemaFactory = new CassandraSchemaFactory(this, name); | |
+ } | |
+ | |
+ public DrillbitContext getContext() { | |
+ return this.context; | |
+ } | |
+ | |
+ @Override | |
+ public CassandraStoragePluginConfig getConfig() { | |
+ return cassandraConfig; | |
+ } | |
+ | |
+ @Override | |
+ public boolean supportsRead() { | |
+ return true; | |
+ } | |
+ | |
+ | |
+ @Override | |
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) | |
+ throws IOException { | |
+ CassandraScanSpec cassandraScanSpec = selection.getListWith(new ObjectMapper(), | |
+ new TypeReference<CassandraScanSpec>() {}); | |
+ | |
+ return new CassandraGroupScan(userName, this, cassandraScanSpec, null); | |
+ } | |
+ | |
+ public Set<StoragePluginOptimizerRule> getOptimizerRules() { | |
+ return ImmutableSet.of(CassandraPushDownFilterForScan.INSTANCE); | |
+ } | |
+ | |
+ @Override | |
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { | |
+ schemaFactory.registerSchemas(schemaConfig, parent); | |
+ } | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePluginConfig.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePluginConfig.java | |
new file mode 100644 | |
index 0000000..4dcd160 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePluginConfig.java | |
@@ -0,0 +1,86 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+ | |
+import com.fasterxml.jackson.annotation.JsonIgnore; | |
+import com.google.common.collect.Lists; | |
+import com.google.common.collect.Maps; | |
+import org.apache.drill.common.logical.StoragePluginConfig; | |
+ | |
+import com.fasterxml.jackson.annotation.JsonCreator; | |
+import com.fasterxml.jackson.annotation.JsonProperty; | |
+import com.fasterxml.jackson.annotation.JsonTypeName; | |
+ | |
+import java.util.ArrayList; | |
+import java.util.Arrays; | |
+import java.util.List; | |
+import java.util.Map; | |
+ | |
+@JsonTypeName(CassandraStoragePluginConfig.NAME) | |
+public class CassandraStoragePluginConfig extends StoragePluginConfig { | |
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraStoragePluginConfig.class); | |
+ | |
+ public static final String NAME = "cassandra"; | |
+ public Map<String, Object> config; | |
+ | |
+ @JsonIgnore | |
+ private List<String> hosts; | |
+ | |
+ @JsonIgnore | |
+ private int port; | |
+ | |
+ @JsonCreator | |
+ public CassandraStoragePluginConfig(@JsonProperty("config") Map<String, Object> config) { | |
+ this.config = config; | |
+ if(config==null){ | |
+ config = Maps.newHashMap(); | |
+ return; | |
+ } | |
+ | |
+ this.hosts = (ArrayList<String>)this.config.get(DrillCassandraConstants.CASSANDRA_CONFIG_HOSTS); | |
+ this.port = (Integer)this.config.get(DrillCassandraConstants.CASSANDRA_CONFIG_PORT); | |
+ } | |
+ | |
+ @Override | |
+ public boolean equals(Object that) { | |
+ if (this == that) { | |
+ return true; | |
+ } else if (that == null || getClass() != that.getClass()) { | |
+ return false; | |
+ } | |
+ CassandraStoragePluginConfig thatConfig = (CassandraStoragePluginConfig) that; | |
+ return (this.hosts.equals(thatConfig.hosts)) && (this.port==thatConfig.port); | |
+ | |
+ } | |
+ | |
+ @Override | |
+ public int hashCode() { | |
+ return this.hosts != null ? this.hosts.hashCode() : 0; | |
+ } | |
+ | |
+ public Map<String, Object> getConfig() { | |
+ return config; | |
+ } | |
+ | |
+ public List<String> getHosts() { | |
+ return hosts; | |
+ } | |
+ | |
+ public int getPort(){ return port; } | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSubScan.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSubScan.java | |
new file mode 100644 | |
index 0000000..699516e | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSubScan.java | |
@@ -0,0 +1,235 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import java.util.ArrayList; | |
+import java.util.Iterator; | |
+import java.util.LinkedList; | |
+import java.util.List; | |
+ | |
+import com.datastax.driver.core.querybuilder.Clause; | |
+import org.apache.drill.common.exceptions.ExecutionSetupException; | |
+import org.apache.drill.common.expression.SchemaPath; | |
+import org.apache.drill.common.logical.StoragePluginConfig; | |
+import org.apache.drill.exec.physical.base.AbstractBase; | |
+import org.apache.drill.exec.physical.base.PhysicalOperator; | |
+import org.apache.drill.exec.physical.base.PhysicalVisitor; | |
+import org.apache.drill.exec.physical.base.SubScan; | |
+import org.apache.drill.exec.store.StoragePluginRegistry; | |
+import org.slf4j.Logger; | |
+import org.slf4j.LoggerFactory; | |
+ | |
+import com.fasterxml.jackson.annotation.JacksonInject; | |
+import com.fasterxml.jackson.annotation.JsonCreator; | |
+import com.fasterxml.jackson.annotation.JsonIgnore; | |
+import com.fasterxml.jackson.annotation.JsonProperty; | |
+import com.fasterxml.jackson.annotation.JsonTypeName; | |
+import com.google.common.base.Preconditions; | |
+import com.google.common.collect.Iterators; | |
+ | |
+@JsonTypeName("cassandra-subscan") | |
+public class CassandraSubScan extends AbstractBase implements SubScan { | |
+ static final Logger logger = LoggerFactory.getLogger(CassandraSubScan.class); | |
+ | |
+ @JsonProperty | |
+ private final CassandraStoragePluginConfig cassandraPluginConfig; | |
+ @JsonProperty | |
+ private final List<SchemaPath> columns; | |
+ @JsonProperty | |
+ private final List<CassandraSubScanSpec> chunkScanSpecList; | |
+ | |
+ @JsonIgnore | |
+ private final CassandraStoragePlugin cassandraStoragePlugin; | |
+ | |
+ | |
+ @JsonCreator | |
+ public CassandraSubScan( | |
+ @JacksonInject("registry") StoragePluginRegistry registry, | |
+ @JsonProperty("cassandraPluginConfig") StoragePluginConfig cassandraPluginConfig, | |
+ @JsonProperty("chunkScanSpecList") LinkedList<CassandraSubScanSpec> chunkScanSpecList, | |
+ @JsonProperty("columns") List<SchemaPath> columns) | |
+ throws ExecutionSetupException { | |
+ | |
+ System.out.println("CassandraSubScan constructor"); | |
+ this.columns = columns; | |
+ this.cassandraPluginConfig = (CassandraStoragePluginConfig) cassandraPluginConfig; | |
+ this.cassandraStoragePlugin = (CassandraStoragePlugin) registry | |
+ .getPlugin(cassandraPluginConfig); | |
+ this.chunkScanSpecList = chunkScanSpecList; | |
+ } | |
+ | |
+ public CassandraSubScan(CassandraStoragePlugin storagePlugin, | |
+ CassandraStoragePluginConfig storagePluginConfig, | |
+ List<CassandraSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) { | |
+ this.cassandraStoragePlugin = storagePlugin; | |
+ this.cassandraPluginConfig = storagePluginConfig; | |
+ this.columns = columns; | |
+ this.chunkScanSpecList = chunkScanSpecList; | |
+ } | |
+ | |
+ | |
+ @Override | |
+ public <T, X, E extends Throwable> T accept( | |
+ PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { | |
+ return physicalVisitor.visitSubScan(this, value); | |
+ } | |
+ | |
+ @JsonIgnore | |
+ public CassandraStoragePluginConfig getCassandraPluginConfig() { | |
+ return cassandraPluginConfig; | |
+ } | |
+ | |
+ @JsonIgnore | |
+ public CassandraStoragePlugin getCassandraStoragePlugin() { | |
+ return cassandraStoragePlugin; | |
+ } | |
+ | |
+ public List<SchemaPath> getColumns() { | |
+ return columns; | |
+ } | |
+ | |
+ public List<CassandraSubScanSpec> getChunkScanSpecList() { | |
+ return chunkScanSpecList; | |
+ } | |
+ | |
+ @Override | |
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) | |
+ throws ExecutionSetupException { | |
+ Preconditions.checkArgument(children.isEmpty()); | |
+ return new CassandraSubScan(cassandraStoragePlugin, cassandraPluginConfig, | |
+ chunkScanSpecList, columns); | |
+ } | |
+ | |
+ @Override | |
+ public int getOperatorType() { | |
+ /* Copied shamelessly from HBase SubScan */ | |
+ return 1009; | |
+ } | |
+ | |
+ @Override | |
+ public Iterator<PhysicalOperator> iterator() { | |
+ return Iterators.emptyIterator(); | |
+ } | |
+ | |
+ public static class CassandraSubScanSpec { | |
+ | |
+ protected String keyspace; | |
+ protected String table; | |
+ protected List<String> hosts; | |
+ protected int port; | |
+ | |
+ protected String startToken; | |
+ protected String endToken; | |
+ | |
+ @JsonIgnore | |
+ protected List<Clause> filter; | |
+ | |
+ @JsonCreator | |
+ public CassandraSubScanSpec(@JsonProperty("keyspace") String keyspace, | |
+ @JsonProperty("table") String table, | |
+ @JsonProperty("hosts") List<String> hosts, | |
+ @JsonProperty("port") int port, | |
+ @JsonProperty("startToken") String startToken, | |
+ @JsonProperty("endToken") String endToken | |
+ ) { | |
+ this.keyspace = keyspace; | |
+ this.table = table; | |
+ this.hosts = hosts; | |
+ this.port = port; | |
+ this.startToken = startToken; | |
+ this.endToken = endToken; | |
+ this.filter = filter; | |
+ } | |
+ | |
+ CassandraSubScanSpec() { | |
+ | |
+ } | |
+ | |
+ public String getKeyspace() { | |
+ return keyspace; | |
+ } | |
+ | |
+ public CassandraSubScanSpec setKeyspace(String keyspace) { | |
+ this.keyspace = keyspace; | |
+ return this; | |
+ } | |
+ | |
+ public String getTable() { | |
+ return table; | |
+ } | |
+ | |
+ public CassandraSubScanSpec setTable(String table) { | |
+ this.table = table; | |
+ return this; | |
+ } | |
+ | |
+ public List<String> getHosts() { | |
+ return hosts; | |
+ } | |
+ | |
+ public CassandraSubScanSpec setHosts(List<String> hosts) { | |
+ this.hosts = hosts; | |
+ return this; | |
+ } | |
+ | |
+ public int getPort() { | |
+ return port; | |
+ } | |
+ | |
+ public CassandraSubScanSpec setPort(int port) { | |
+ this.port = port; | |
+ return this; | |
+ } | |
+ | |
+ public List<Clause> getFilter() { | |
+ return filter; | |
+ } | |
+ | |
+ public CassandraSubScanSpec setFilter(List<Clause> filter) { | |
+ this.filter = filter; | |
+ return this; | |
+ } | |
+ | |
+ public String getStartToken() { | |
+ return startToken; | |
+ } | |
+ | |
+ public CassandraSubScanSpec setStartToken(String startToken) { | |
+ this.startToken = startToken; | |
+ return this; | |
+ } | |
+ | |
+ public String getEndToken() { | |
+ return endToken; | |
+ } | |
+ | |
+ public CassandraSubScanSpec setEndToken(String endToken) { | |
+ this.endToken = endToken; | |
+ return this; | |
+ } | |
+ | |
+ @Override | |
+ public String toString() { | |
+ return "CassandraSubScanSpec [keyspace=" + keyspace + ", table=" | |
+ + table + ", host=" + hosts +", port=" + port + ", startToken=" + startToken | |
+ + ", endToken=" + endToken + ", filter=" + filter + "]"; | |
+ } | |
+ | |
+ } | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraUtil.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraUtil.java | |
new file mode 100644 | |
index 0000000..2b2ac2c | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraUtil.java | |
@@ -0,0 +1,91 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+ | |
+import java.math.BigDecimal; | |
+import java.math.RoundingMode; | |
+import java.text.DecimalFormat; | |
+ | |
+public class CassandraUtil { | |
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraUtil.class); | |
+ | |
+ | |
+ public static String[] getPartitionTokens(String partitionTechnique, int numberOfNodes){ | |
+ | |
+ logger.info("Getting partition bounds for nodes. PartitionScheme: {}, Node Count: {}.", partitionTechnique, numberOfNodes); | |
+ | |
+ switch(partitionTechnique){ | |
+ case "org.apache.cassandra.dht.Murmur3Partitioner": | |
+ return getMurmur3PartitionTokens(numberOfNodes); | |
+ | |
+ case "org.apache.cassandra.dht.RandomPartitioner": | |
+ return getRandomPartitionTokens(numberOfNodes); | |
+ | |
+ default: | |
+ logger.error("Cassandra partition scheme {} not supported.", partitionTechnique); | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ private static String[] getMurmur3PartitionTokens(int numberOfNodes){ | |
+ | |
+ String[] tokens = new String[numberOfNodes]; | |
+ | |
+ BigDecimal two = new BigDecimal(2); | |
+ BigDecimal nodes = new BigDecimal(numberOfNodes); | |
+ BigDecimal val = two.pow(64).divide(nodes, RoundingMode.DOWN); | |
+ BigDecimal twoPow63 = two.pow(63); | |
+ BigDecimal index; | |
+ | |
+ for(int i=0; i<numberOfNodes; i++){ | |
+ index = new BigDecimal(i); | |
+ tokens[i] = index.multiply(val).subtract(twoPow63).toPlainString(); | |
+ } | |
+ return tokens; | |
+ } | |
+ | |
+ private static String[] getRandomPartitionTokens(int numberOfNodes){ | |
+ | |
+ String[] tokens = new String[numberOfNodes]; | |
+ | |
+ BigDecimal two = new BigDecimal(2); | |
+ BigDecimal nodes = new BigDecimal(numberOfNodes); | |
+ BigDecimal val = two.pow(127).divide(nodes, RoundingMode.DOWN); | |
+ | |
+ BigDecimal index; | |
+ | |
+ for(int i=0; i<numberOfNodes; i++){ | |
+ index = new BigDecimal(i); | |
+ tokens[i] = index.multiply(val).toPlainString(); | |
+ } | |
+ return tokens; | |
+ } | |
+ | |
+ public static void main(String[] args) { | |
+ | |
+ String[] out = new CassandraUtil().getPartitionTokens("org.apache.cassandra.dht.Murmur3Partitioner", 5); | |
+ | |
+ for(String s : out){ | |
+ System.out.println(s); | |
+ } | |
+ } | |
+ | |
+ | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/DrillCassandraConstants.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/DrillCassandraConstants.java | |
new file mode 100644 | |
index 0000000..80777ff | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/DrillCassandraConstants.java | |
@@ -0,0 +1,35 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+ | |
+package org.apache.drill.exec.store.cassandra; | |
+ | |
+import org.apache.drill.common.expression.SchemaPath; | |
+import org.apache.drill.common.types.TypeProtos; | |
+import org.apache.drill.common.types.Types; | |
+import org.apache.drill.exec.vector.RepeatedVarCharVector; | |
+ | |
+public interface DrillCassandraConstants { | |
+ | |
+ public static final TypeProtos.MajorType ROW_KEY_TYPE = Types.required(TypeProtos.MinorType.VARCHAR); | |
+ | |
+ public static final TypeProtos.MajorType COLUMN_TYPE = Types.optional(TypeProtos.MinorType.VARCHAR); | |
+ | |
+ public static final String CASSANDRA_CONFIG_HOSTS = "cassandra.hosts"; | |
+ | |
+ public static final String CASSANDRA_CONFIG_PORT = "cassandra.port"; | |
+} | |
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/connection/CassandraConnectionManager.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/connection/CassandraConnectionManager.java | |
new file mode 100644 | |
index 0000000..04c7e0b | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/connection/CassandraConnectionManager.java | |
@@ -0,0 +1,76 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+ | |
+package org.apache.drill.exec.store.cassandra.connection; | |
+ | |
+import com.datastax.driver.core.Cluster; | |
+import com.datastax.driver.core.Session; | |
+import com.google.common.cache.Cache; | |
+import com.google.common.cache.CacheBuilder; | |
+import com.google.common.cache.RemovalListener; | |
+import com.google.common.cache.RemovalNotification; | |
+import org.slf4j.Logger; | |
+import org.slf4j.LoggerFactory; | |
+ | |
+import java.net.UnknownHostException; | |
+import java.util.List; | |
+import java.util.concurrent.TimeUnit; | |
+ | |
+public class CassandraConnectionManager{ | |
+ | |
+ private static final Logger logger = LoggerFactory.getLogger(CassandraConnectionManager.class); | |
+ | |
+ private static Cache<String, Cluster> hostConnectionMap; | |
+ | |
+ static { | |
+ hostConnectionMap = CacheBuilder.newBuilder().maximumSize(5) | |
+ .expireAfterAccess(10, TimeUnit.MINUTES) | |
+ .removalListener(new AddressCloser()).build(); | |
+ } | |
+ | |
+ public synchronized static Cluster getCluster(List<String> hosts, int port) | |
+ throws UnknownHostException { | |
+ Cluster cluster = hostConnectionMap.getIfPresent(hosts); | |
+ if (cluster == null || cluster.isClosed()) { | |
+ Cluster.Builder builder = Cluster.builder(); | |
+ for(String host : hosts){ | |
+ builder = builder.addContactPoints(host); | |
+ } | |
+ builder = builder.withPort(port); | |
+ cluster = builder.build(); | |
+ | |
+ for(String host : hosts) { | |
+ hostConnectionMap.put(host, cluster); | |
+ } | |
+ | |
+ logger.debug("Created connection to {}.", hosts); | |
+ logger.debug("Number of sessions opened are {}.", hostConnectionMap.size()); | |
+ } | |
+ return cluster; | |
+ } | |
+ | |
+ private static class AddressCloser implements | |
+ RemovalListener<String, Cluster> { | |
+ @Override | |
+ public synchronized void onRemoval(RemovalNotification<String, Cluster> removal) { | |
+ removal.getValue().close(); | |
+ ; | |
+ logger.debug("Closed connection to {}.", removal.getKey().toString()); | |
+ } | |
+ } | |
+} | |
\ No newline at end of file | |
diff --git a/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json | |
new file mode 100644 | |
index 0000000..47966b6 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json | |
@@ -0,0 +1,12 @@ | |
+{ | |
+ "storage":{ | |
+ cassandra : { | |
+ type:"cassandra", | |
+ enabled: false, | |
+ config : { | |
+ "cassandra.hosts" : ["127.0.0.1","127.0.0.2"], | |
+ "cassandra.port" : 9042 | |
+ } | |
+ } | |
+ } | |
+} | |
\ No newline at end of file | |
diff --git a/contrib/storage-cassandra/src/main/resources/checkstyle-suppressions.xml b/contrib/storage-cassandra/src/main/resources/checkstyle-suppressions.xml | |
new file mode 100644 | |
index 0000000..9d4682b | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/resources/checkstyle-suppressions.xml | |
@@ -0,0 +1,19 @@ | |
+<?xml version="1.0" encoding="UTF-8"?> | |
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor | |
+ license agreements. See the NOTICE file distributed with this work for additional | |
+ information regarding copyright ownership. The ASF licenses this file to | |
+ You under the Apache License, Version 2.0 (the "License"); you may not use | |
+ this file except in compliance with the License. You may obtain a copy of | |
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required | |
+ by applicable law or agreed to in writing, software distributed under the | |
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS | |
+ OF ANY KIND, either express or implied. See the License for the specific | |
+ language governing permissions and limitations under the License. --> | |
+<!DOCTYPE suppressions PUBLIC | |
+ "-//Puppy Crawl//DTD Suppressions 1.1//EN" | |
+ "suppressions_1_1.dtd"> | |
+ | |
+<!-- Checkstyle Suppressions for Apache Drill --> | |
+<suppressions> | |
+ <suppress files="[\\/]generated-sources[\\/]" checks="AvoidStarImport,NeedBraces"/> | |
+</suppressions> | |
diff --git a/contrib/storage-cassandra/src/main/resources/drill-module.conf b/contrib/storage-cassandra/src/main/resources/drill-module.conf | |
new file mode 100644 | |
index 0000000..3ec4ada | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/main/resources/drill-module.conf | |
@@ -0,0 +1,32 @@ | |
+// Licensed to the Apache Software Foundation (ASF) under one or more | |
+// contributor license agreements. See the NOTICE file distributed with | |
+// this work for additional information regarding copyright ownership. | |
+// The ASF licenses this file to You under the Apache License, Version 2.0 | |
+// (the "License"); you may not use this file except in compliance with | |
+// the License. You may obtain a copy of the License at | |
+// | |
+// http://www.apache.org/licenses/LICENSE-2.0 | |
+// | |
+// Unless required by applicable law or agreed to in writing, software | |
+// distributed under the License is distributed on an "AS IS" BASIS, | |
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+// See the License for the specific language governing permissions and | |
+// limitations under the License. | |
+// | |
+// This file tells Drill to consider this module when class path scanning. | |
+// This file can also include any supplementary configuration information. | |
+// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. | |
+ | |
+drill.exec: { | |
+ | |
+ sys.store.provider: { | |
+ cassandra : { | |
+ type:"cassandra", | |
+ enabled: false, | |
+ config : { | |
+ "cassandra.hosts" : ["127.0.0.1"], | |
+ "cassandra.port" : 9042 | |
+ } | |
+ } | |
+ } | |
+} | |
\ No newline at end of file | |
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/BaseCassandraTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/BaseCassandraTest.java | |
new file mode 100644 | |
index 0000000..943e83c | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/BaseCassandraTest.java | |
@@ -0,0 +1,146 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.cassandra; | |
+ | |
+import java.io.IOException; | |
+import java.util.List; | |
+ | |
+import com.datastax.driver.core.Cluster; | |
+import com.datastax.driver.core.Session; | |
+import org.apache.drill.BaseTestQuery; | |
+import org.apache.drill.common.exceptions.DrillException; | |
+import org.apache.drill.common.util.FileUtils; | |
+import org.apache.drill.exec.exception.SchemaChangeException; | |
+import org.apache.drill.exec.rpc.user.QueryDataBatch; | |
+import org.apache.drill.exec.store.StoragePluginRegistry; | |
+import org.apache.drill.exec.store.cassandra.CassandraStoragePlugin; | |
+import org.apache.drill.exec.store.cassandra.CassandraStoragePluginConfig; | |
+import org.apache.drill.exec.store.cassandra.connection.CassandraConnectionManager; | |
+import org.junit.AfterClass; | |
+import org.junit.Assert; | |
+import org.junit.BeforeClass; | |
+ | |
+import com.google.common.base.Charsets; | |
+import com.google.common.io.Files; | |
+ | |
+public class BaseCassandraTest extends BaseTestQuery implements CassandraTestConstants { | |
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseCassandraTest.class); | |
+ | |
+ | |
+ private static final String CASSANDRA_STORAGE_PLUGIN_NAME = "cassandra"; | |
+ | |
+ | |
+ protected static CassandraStoragePlugin storagePlugin; | |
+ protected static CassandraStoragePluginConfig storagePluginConfig; | |
+ private static boolean testTablesCreated; | |
+ | |
+ @BeforeClass | |
+ public static void setUpBeforeClass() throws Exception { | |
+ | |
+ final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage(); | |
+ storagePlugin = (CassandraStoragePlugin) pluginRegistry.getPlugin(CASSANDRA_STORAGE_PLUGIN_NAME); | |
+ storagePluginConfig = storagePlugin.getConfig(); | |
+ storagePluginConfig.setEnabled(true); | |
+ pluginRegistry.createOrUpdate(CASSANDRA_STORAGE_PLUGIN_NAME, storagePluginConfig, true); | |
+ | |
+ if(!testTablesCreated) { | |
+ createTestCassandraTableIfNotExists(storagePluginConfig.getHosts(), storagePluginConfig.getPort()); | |
+ testTablesCreated = true; | |
+ } | |
+ } | |
+ | |
+ private static boolean createTestCassandraTableIfNotExists(List<String> host, int port) throws DrillException { | |
+ try { | |
+ logger.info("Initiating Cassandra Keyspace/Table for Test cases. Host: {}, Port: {}", host, port); | |
+ Cluster cluster = CassandraConnectionManager.getCluster(host, port); | |
+ Session session = cluster.connect(); | |
+ | |
+ /* Create Schema: Keyspace */ | |
+ if(session.getCluster().getMetadata().getKeyspace(KEYSPACE_NAME) == null) { | |
+ logger.info("Creating Keyspace: {}",KEYSPACE_NAME); | |
+ session.execute("CREATE KEYSPACE " +KEYSPACE_NAME+ " WITH replication " + | |
+ "= {'class':'SimpleStrategy', 'replication_factor':3};"); | |
+ } | |
+ | |
+ /* Create Schema: Table */ | |
+ if(session.getCluster().getMetadata().getKeyspace(KEYSPACE_NAME).getTable(TABLE_NAME) == null) { | |
+ logger.info("Creating Table [{}] in Keyspace [{}]",TABLE_NAME, KEYSPACE_NAME); | |
+ session.execute( | |
+ "CREATE TABLE " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (\n" + | |
+ " id text,\n" + | |
+ " pog_rank int,\n" + | |
+ " pog_id bigint,\n" + | |
+ " PRIMARY KEY (id, pog_rank)\n" + | |
+ ");"); | |
+ | |
+ /* Load Data */ | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0001', 1, 10001);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0005', 1, 10001);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0002', 1, 10001);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0002', 2, 10001);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0002', 3, 10001);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0006', 1, 10001);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0006', 2, 10001);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 1, 10001);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 2, 10001);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 3, 10002);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 4, 10002);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 5, 10002);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 6, 10002);"); | |
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0003', 1, 10001);"); | |
+ } | |
+ return true; | |
+ } | |
+ catch (Exception e) { | |
+ throw new DrillException("Failure while Cassandra Test table creation", e); | |
+ } | |
+ } | |
+ | |
+ @AfterClass | |
+ public static void tearDownAfterClass() throws Exception { | |
+ | |
+ } | |
+ | |
+ protected String getPlanText(String planFile, String keyspaceName, String tableName) throws IOException { | |
+ return Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8).replace("[TABLE_NAME]", tableName).replace("[KEYSPACE_NAME]", keyspaceName); | |
+ } | |
+ | |
+ protected void runCassandraPhysicalVerifyCount(String planFile, String keyspaceName, String tableName, int expectedRowCount) throws Exception{ | |
+ String physicalPlan = getPlanText(planFile, keyspaceName, tableName); | |
+ List<QueryDataBatch> results = testPhysicalWithResults(physicalPlan); | |
+ printResultAndVerifyRowCount(results, expectedRowCount); | |
+ } | |
+ | |
+ protected List<QueryDataBatch> runCassandraSQLlWithResults(String sql) throws Exception { | |
+ System.out.println("Running query:\n" + sql); | |
+ return testSqlWithResults(sql); | |
+ } | |
+ | |
+ protected void runCassandraSQLVerifyCount(String sql, int expectedRowCount) throws Exception{ | |
+ List<QueryDataBatch> results = runCassandraSQLlWithResults(sql); | |
+ printResultAndVerifyRowCount(results, expectedRowCount); | |
+ } | |
+ | |
+ private void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) throws SchemaChangeException { | |
+ int rowCount; | |
+ rowCount = printResult(results); | |
+ if (expectedRowCount != -1) { | |
+ Assert.assertEquals(expectedRowCount, rowCount); | |
+ } | |
+ } | |
+} | |
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraFilterPushdownTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraFilterPushdownTest.java | |
new file mode 100644 | |
index 0000000..71238b2 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraFilterPushdownTest.java | |
@@ -0,0 +1,54 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.cassandra; | |
+ | |
+import org.junit.Test; | |
+ | |
+public class CassandraFilterPushdownTest extends BaseCassandraTest implements CassandraTestConstants { | |
+ | |
+ @Test | |
+ public void testSelectAll() throws Exception{ | |
+ runCassandraSQLVerifyCount(SELECT_ALL, 14); | |
+ } | |
+ | |
+ @Test | |
+ public void testFilter() throws Exception{ | |
+ runCassandraSQLVerifyCount(SELECT_QUERY_FILTER, 6); | |
+ } | |
+ | |
+ @Test | |
+ public void testFilter1() throws Exception{ | |
+ runCassandraSQLVerifyCount(SELECT_QUERY_FILTER_1, 4); | |
+ } | |
+ | |
+ @Test | |
+ public void testFilter2() throws Exception{ | |
+ runCassandraSQLVerifyCount(SELECT_QUERY_FILTER_2, 4); | |
+ } | |
+ | |
+ @Test | |
+ public void testFilterWithOrCondition() throws Exception{ | |
+ runCassandraSQLVerifyCount(SELECT_QUERY_FILTER_With_OR, 8); | |
+ } | |
+ | |
+ @Test | |
+ public void testFilterWithAndCondition() throws Exception{ | |
+ runCassandraSQLVerifyCount(SELECT_QUERY_FILTER_WITH_AND, 1); | |
+ } | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraProjectPushdownTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraProjectPushdownTest.java | |
new file mode 100644 | |
index 0000000..d7cdaa3 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraProjectPushdownTest.java | |
@@ -0,0 +1,29 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.cassandra; | |
+ | |
+import org.junit.Test; | |
+ | |
+public class CassandraProjectPushdownTest extends BaseCassandraTest implements CassandraTestConstants{ | |
+ | |
+ @Test | |
+ public void testProjection() throws Exception{ | |
+ runCassandraSQLVerifyCount(SELECT_QUERY_PROJECT, 14); | |
+ } | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraRecordReaderTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraRecordReaderTest.java | |
new file mode 100644 | |
index 0000000..7ec1036 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraRecordReaderTest.java | |
@@ -0,0 +1,33 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.cassandra; | |
+ | |
+import org.junit.Test; | |
+ | |
+public class CassandraRecordReaderTest extends BaseCassandraTest implements CassandraTestConstants { | |
+ | |
+ @Test | |
+ public void testPlanLocal() throws Exception { | |
+ runCassandraPhysicalVerifyCount(PHYSICAL_PLAN_SCAN, KEYSPACE_NAME, TABLE_NAME, 14); | |
+ } | |
+ | |
+ @Test | |
+ public void testPlanLocalWithColumns() throws Exception { | |
+ runCassandraPhysicalVerifyCount(PHYSICAL_PLAN_SCAN_WITH_COLS, KEYSPACE_NAME, TABLE_NAME, 14); | |
+ } | |
+} | |
\ No newline at end of file | |
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTest.java | |
new file mode 100644 | |
index 0000000..4f40eeb | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTest.java | |
@@ -0,0 +1,49 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.cassandra; | |
+ | |
+import com.datastax.driver.core.Cluster; | |
+import com.datastax.driver.core.ResultSet; | |
+import com.datastax.driver.core.Row; | |
+import com.datastax.driver.core.Session; | |
+import org.apache.drill.exec.store.cassandra.connection.CassandraConnectionManager; | |
+ | |
+/** | |
+ * Created by yash on 15/3/15. | |
+ */ | |
+public class CassandraTest { | |
+ | |
+ public static void main(String[] args) { | |
+ Cluster.Builder builder = Cluster.builder() | |
+ .addContactPoint("127.0.0.2") | |
+ .addContactPoint("127.0.0.3") | |
+ .withPort(9042); | |
+ | |
+ Cluster cluster = builder.build(); | |
+ Session session = cluster.connect(); | |
+ ResultSet rs = session.execute("SELECT * FROM drilltest.trending_now"); | |
+ | |
+ | |
+ while(!rs.isExhausted()){ | |
+ Row r = rs.one(); | |
+ System.out.println(r.getString(0)); | |
+ System.out.println(r.getInt(1)); | |
+ System.out.println(r.getLong(2)); | |
+ } | |
+ } | |
+} | |
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestConstants.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestConstants.java | |
new file mode 100644 | |
index 0000000..684034d | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestConstants.java | |
@@ -0,0 +1,60 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.cassandra; | |
+ | |
+public interface CassandraTestConstants { | |
+ | |
+ static final String COL_NAME_1 = "id"; | |
+ static final String COL_NAME_2 = "pog_id"; | |
+ | |
+ static final String KEYSPACE_NAME = "drilltest"; | |
+ static final String TABLE_NAME = "trending_now"; | |
+ | |
+ static final String PHYSICAL_PLAN_SCAN = "/cassandra-plans/cassandra_scan_screen_physical.json"; | |
+ static final String PHYSICAL_PLAN_SCAN_WITH_COLS = "/cassandra-plans/cassandra_scan_screen_with_columns_physical.json"; | |
+ | |
+ static final String SELECT_ALL = "SELECT * FROM cassandra."+KEYSPACE_NAME +".`"+ TABLE_NAME +"` t"; | |
+ | |
+ static final String SELECT_QUERY_PROJECT = | |
+ "SELECT " | |
+ + COL_NAME_1 +"," + COL_NAME_2 | |
+ + " FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t"; | |
+ | |
+ static final String SELECT_QUERY_FILTER = | |
+ "SELECT * FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t WHERE " | |
+ +COL_NAME_1+" = 'id0004'"; | |
+ | |
+ static final String SELECT_QUERY_FILTER_1 = | |
+ "SELECT * FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t WHERE " | |
+ +COL_NAME_2+" = 10002"; | |
+ | |
+ static final String SELECT_QUERY_FILTER_2 = | |
+ "SELECT * FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t WHERE " | |
+ +COL_NAME_1+" = 'id0004' and " +COL_NAME_2+" = '10002'"; | |
+ | |
+ static final String SELECT_QUERY_FILTER_With_OR = | |
+ "SELECT * FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t WHERE " + | |
+ "(" +COL_NAME_1+" = 'id0004' or " +COL_NAME_1+" = 'id0002') and " + | |
+ "(" +COL_NAME_2+" = '10001' or "+COL_NAME_2+" = '10002') " + | |
+ "order by " + COL_NAME_2 +" asc, "+ COL_NAME_1 +" desc limit 8"; | |
+ | |
+ static final String SELECT_QUERY_FILTER_WITH_AND = | |
+ "SELECT * FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t WHERE " | |
+ +COL_NAME_1+" = 'id0004' and pog_rank = 2"; | |
+ | |
+} | |
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestSuite.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestSuite.java | |
new file mode 100644 | |
index 0000000..38afe97 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestSuite.java | |
@@ -0,0 +1,31 @@ | |
+/** | |
+ * Licensed to the Apache Software Foundation (ASF) under one | |
+ * or more contributor license agreements. See the NOTICE file | |
+ * distributed with this work for additional information | |
+ * regarding copyright ownership. The ASF licenses this file | |
+ * to you under the Apache License, Version 2.0 (the | |
+ * "License"); you may not use this file except in compliance | |
+ * with the License. You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.apache.drill.cassandra; | |
+ | |
+import org.junit.runner.RunWith; | |
+import org.junit.runners.Suite; | |
+ | |
+@RunWith(Suite.class) | |
+@Suite.SuiteClasses({ | |
+ CassandraRecordReaderTest.class, | |
+ CassandraProjectPushdownTest.class, | |
+ CassandraFilterPushdownTest.class | |
+}) | |
+public class CassandraTestSuite { | |
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraTestSuite.class); | |
+} | |
diff --git a/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_physical.json b/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_physical.json | |
new file mode 100644 | |
index 0000000..9f4e0e3 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_physical.json | |
@@ -0,0 +1,30 @@ | |
+{ | |
+ head : { | |
+ type : "APACHE_DRILL_PHYSICAL", | |
+ version : 1, | |
+ generator : { | |
+ type : "manual" | |
+ } | |
+ }, | |
+ graph : [ { | |
+ pop : "cassandra-scan", | |
+ @id : 1, | |
+ cassandraScanSpec : { | |
+ keyspace : "[KEYSPACE_NAME]", | |
+ table : "[TABLE_NAME]" | |
+ }, | |
+ storage: | |
+ { | |
+ "type":"cassandra", | |
+ config : { | |
+ "cassandra.hosts" : ["127.0.0.1","127.0.0.2"], | |
+ "cassandra.port" : 9042 | |
+ } | |
+ } | |
+ }, | |
+ { | |
+ pop : "screen", | |
+ @id : 2, | |
+ child : 1 | |
+ } ] | |
+} | |
\ No newline at end of file | |
diff --git a/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_with_columns_physical.json b/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_with_columns_physical.json | |
new file mode 100644 | |
index 0000000..31eac67 | |
--- /dev/null | |
+++ b/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_with_columns_physical.json | |
@@ -0,0 +1,33 @@ | |
+{ | |
+ head : { | |
+ type : "APACHE_DRILL_PHYSICAL", | |
+ version : 1, | |
+ generator : { | |
+ type : "manual" | |
+ } | |
+ }, | |
+ graph : [ { | |
+ pop : "cassandra-scan", | |
+ @id : 1, | |
+ cassandraScanSpec : { | |
+ keyspace : "[KEYSPACE_NAME]", | |
+ table : "[TABLE_NAME]" | |
+ }, | |
+ storage: | |
+ { | |
+ "type":"cassandra", | |
+ config : { | |
+ "cassandra.hosts" : ["127.0.0.1","127.0.0.2","127.0.0.3"], | |
+ "cassandra.port" : 9042 | |
+ } | |
+ }, | |
+ columns: [ | |
+ "id", "pog_id" | |
+ ] | |
+ }, | |
+ { | |
+ pop : "screen", | |
+ @id : 2, | |
+ child : 1 | |
+ } ] | |
+} | |
\ No newline at end of file | |
diff --git a/distribution/pom.xml b/distribution/pom.xml | |
index c08efc7..fe3a313 100644 | |
--- a/distribution/pom.xml | |
+++ b/distribution/pom.xml | |
@@ -157,6 +157,17 @@ | |
</exclusion> | |
</exclusions> | |
</dependency> | |
+ <dependency> | |
+ <groupId>org.apache.drill.contrib</groupId> | |
+ <artifactId>drill-storage-cassandra</artifactId> | |
+ <version>${project.version}</version> | |
+ <exclusions> | |
+ <exclusion> | |
+ <groupId>org.apache.hadoop</groupId> | |
+ <artifactId>hadoop-core</artifactId> | |
+ </exclusion> | |
+ </exclusions> | |
+ </dependency> | |
<dependency> | |
<groupId>org.apache.drill.contrib.storage-hive</groupId> | |
<artifactId>drill-storage-hive-core</artifactId> | |
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml | |
index 244230f..3a67020 100644 | |
--- a/distribution/src/assemble/bin.xml | |
+++ b/distribution/src/assemble/bin.xml | |
@@ -92,6 +92,7 @@ | |
<include>org.apache.drill.contrib.data:tpch-sample-data:jar</include> | |
<include>org.apache.drill.contrib:drill-mongo-storage</include> | |
<include>org.apache.drill.contrib:drill-storage-hbase</include> | |
+ <include>org.apache.drill.contrib:drill-storage-cassandra</include> | |
</includes> | |
<excludes> | |
<exclude>org.apache.drill.contrib.storage-hive:drill-storage-hive-core:jar:tests</exclude> | |
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java | |
index d428920..97849de 100644 | |
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java | |
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java | |
@@ -461,8 +461,10 @@ public class BaseTestQuery extends ExecTest { | |
int rowCount = 0; | |
RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); | |
for(QueryDataBatch result : results) { | |
- rowCount += result.getHeader().getRowCount(); | |
- loader.load(result.getHeader().getDef(), result.getData()); | |
+ if (result.hasData()) { | |
+ rowCount += result.getHeader().getRowCount(); | |
+ loader.load(result.getHeader().getDef(), result.getData()); | |
+ } | |
// TODO: Clean: DRILL-2933: That load(...) no longer throws | |
// SchemaChangeException, so check/clean throw clause above. | |
if (loader.getRecordCount() <= 0) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
good job