Last active
June 14, 2019 17:52
-
-
Save asereda-gs/cf0e1c0b8fa84a4cf3ceb6a21c9b11a8 to your computer and use it in GitHub Desktop.
Using calcite connection concurrently
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.calcite.test; | |
import org.apache.calcite.adapter.jdbc.JdbcSchema; | |
import org.apache.calcite.jdbc.CalciteConnection; | |
import org.apache.calcite.schema.SchemaPlus; | |
import org.apache.commons.dbcp2.BasicDataSource; | |
import com.google.common.util.concurrent.MoreExecutors; | |
import javax.sql.DataSource; | |
import org.junit.After; | |
import org.junit.Assert; | |
import org.junit.Before; | |
import org.junit.Test; | |
import java.sql.Connection; | |
import java.sql.DriverManager; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.sql.Statement; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.TimeUnit; | |
import static org.junit.Assert.assertTrue; | |
/** | |
* Query calcite connection in parallel with a simple SQL {@code select * from table}. | |
* | |
* <p>Getting the following exception | |
* <pre> | |
* {@code | |
* Caused by: org.apache.calcite.avatica.NoSuchStatementException | |
* at org.apache.calcite.jdbc.CalciteConnectionImpl$CalciteServerImpl.getStatement(CalciteConnectionImpl.java:371) | |
* at org.apache.calcite.jdbc.CalciteConnectionImpl.getCancelFlag(CalciteConnectionImpl.java:238) | |
* at org.apache.calcite.avatica.AvaticaStatement.<init>(AvaticaStatement.java:121) | |
* } | |
* </pre> | |
* | |
* <p>Issue seems to be in {@code org.apache.calcite.avatica.MetaImpl.java:213} | |
* | |
* <pre> | |
* {@code | |
* // not atomic increment of id | |
* return new StatementHandle(ch.id, connection.statementCount++, null); | |
* } | |
* </pre> | |
*/ | |
public class CalciteConcurrentTest { | |
private final ExecutorService executor = Executors.newFixedThreadPool(10); | |
/** | |
* Connection pool to underlying database (hsqldb) | |
*/ | |
private DataSource dataSource; | |
/** | |
* Calcite connection ("singleton") | |
*/ | |
private Connection connection; | |
@Before | |
public void setUp() throws Exception { | |
this.dataSource = createDataSource(); | |
try (Connection connection = dataSource.getConnection()) { | |
connection.createStatement().execute("DROP TABLE IF EXISTS dummy;"); | |
try (Statement stm = connection.createStatement()) { | |
stm.execute("create table dummy (id INTEGER IDENTITY PRIMARY KEY);"); | |
for (int i = 0; i < 10; i++) { | |
stm.execute(String.format("INSERT INTO dummy (id) VALUES (%d);", i)); | |
} | |
} | |
} | |
Class.forName("org.apache.calcite.jdbc.Driver"); | |
final Connection connection = DriverManager.getConnection("jdbc:calcite:"); | |
final SchemaPlus rootSchema = connection.unwrap(CalciteConnection.class).getRootSchema(); | |
final JdbcSchema schema = JdbcSchema.create(rootSchema, "test", dataSource, null, null); | |
rootSchema.add("test", schema); | |
this.connection = connection; // calcite connection | |
} | |
/** | |
* Close all connections | |
*/ | |
@After | |
public void tearDown() throws Exception { | |
// shutdown executor | |
MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS); | |
if (connection != null) { | |
connection.close(); | |
} | |
if (dataSource instanceof AutoCloseable) { | |
((AutoCloseable) dataSource).close(); | |
} | |
} | |
private static DataSource createDataSource() throws SQLException { | |
final BasicDataSource ds = new BasicDataSource(); | |
ds.setDriverClassName("org.hsqldb.jdbcDriver"); | |
ds.setUrl("jdbc:hsqldb:mem:db:leak"); | |
ds.setMaxTotal(15); | |
return ds; | |
} | |
@Test public void test() throws Exception { | |
final Connection connection = this.connection; | |
final List<Future<?>> futures = new ArrayList<>(); | |
for (int i = 0; i < 2000; i++) { | |
futures.add(executor.submit(() -> execute(connection))); | |
} | |
final List<Exception> errors = new ArrayList<>(); | |
for (Future future : futures) { | |
try { | |
future.get(); | |
} catch (Exception e) { | |
errors.add(e); | |
} | |
} | |
if (!errors.isEmpty()) { | |
errors.forEach(Throwable::printStackTrace); | |
Assert.fail(String.format("Failed with %d exceptions (see stack trace for details)", errors.size())); | |
} | |
} | |
/** | |
* Executes simple query {@code select * from table} and checks that result is non-empty. | |
*/ | |
private static void execute(Connection connection) { | |
try (Statement stm = connection.createStatement(); | |
ResultSet resultSet = stm.executeQuery("select * from \"test\".\"DUMMY\"")) { | |
int rows = 0; | |
while (resultSet.next()) { | |
rows++; | |
} | |
assertTrue("no rows", rows > 0); | |
} catch (SQLException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
// End CalciteConcurrentTest.java |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment