Skip to content

Instantly share code, notes, and snippets.

@GavinRay97
Last active December 25, 2023 19:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GavinRay97/55ae3493d8079aa72ea44b8087d98413 to your computer and use it in GitHub Desktop.
Save GavinRay97/55ae3493d8079aa72ea44b8087d98413 to your computer and use it in GitHub Desktop.
Apache Calcite - Schema that consumes a datasource connected to a JDBC DB and generates a schema map for all sub-schemas
package com.example;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.avatica.AvaticaUtils;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.AbstractSchema;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Objects.requireNonNull;
public final class JdbcDatabaseWrappingSchema extends AbstractSchema {
private final SchemaPlus rootSchema;
private final DataSource dataSource;
private final String databaseName;
public JdbcDatabaseWrappingSchema(SchemaPlus rootSchema, DataSource dataSource, String databaseName) {
this.rootSchema = rootSchema;
this.dataSource = dataSource;
this.databaseName = databaseName;
}
public SchemaPlus rootSchema() {
return rootSchema;
}
public DataSource dataSource() {
return dataSource;
}
public String databaseName() {
return databaseName;
}
@Override
public Map<String, org.apache.calcite.schema.Schema> getSubSchemaMap() {
try {
Map<String, org.apache.calcite.schema.Schema> schemaMap = new HashMap<>();
final List<String> subSchemas = getSubSchemas();
if (subSchemas.isEmpty()) {
final String schemaName = "root";
final JdbcSchema jdbcSchema = getJdbcSchema(schemaName);
schemaMap.put(schemaName, jdbcSchema);
} else {
for (final String schemaName : subSchemas) {
final JdbcSchema jdbcSchema = getJdbcSchema(schemaName);
schemaMap.put(schemaName, jdbcSchema);
}
}
return schemaMap;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private JdbcSchema getJdbcSchema(String schemaName) {
return JdbcSchema.create(
rootSchema, databaseName + "_" + schemaName,
dataSource, databaseName, schemaName);
}
private List<String> getSubSchemas() throws SQLException {
try (Connection connection = dataSource.getConnection()) {
final ResultSet resultSet = connection.getMetaData().getSchemas();
List<String> schemas = new ArrayList<>();
while (resultSet.next()) {
schemas.add(resultSet.getString("TABLE_SCHEM"));
}
return schemas;
}
}
public static class Factory implements SchemaFactory {
public static final JdbcDatabaseWrappingSchema.Factory INSTANCE = new JdbcDatabaseWrappingSchema.Factory();
private Factory() {
}
@Override
public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
DataSource dataSource;
try {
final String dataSourceName = (String) operand.get("dataSource");
if (dataSourceName != null) {
dataSource = AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
} else {
final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
final String jdbcDriver = (String) operand.get("jdbcDriver");
final String jdbcUser = (String) operand.get("jdbcUser");
final String jdbcPassword = (String) operand.get("jdbcPassword");
dataSource = JdbcSchema.dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
}
} catch (Exception e) {
throw new RuntimeException("Error while reading dataSource", e);
}
String databaseName = (String) operand.get("databaseName");
return new JdbcDatabaseWrappingSchema(parentSchema, dataSource, databaseName);
}
}
}
package com.example;
import org.apache.calcite.schema.SchemaPlus;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import javax.sql.DataSource;
import java.sql.SQLException;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
class CalciteSchemaManagerTest extends AbstractContainerDatabaseTest {
@Container
private static final PostgreSQLContainer postgresContainer = new PostgreSQLContainer<>("postgres:14")
.withDatabaseName("test-postgres");
@Container
private static final MySQLContainer mysqlContainer = new MySQLContainer<>("mysql:8")
.withDatabaseName("test-mysql");
private final DataSource postgresDatasource = getDataSource(postgresContainer);
private final DataSource mysqlDatasource = getDataSource(mysqlContainer);
private void createTestTables() throws SQLException {
postgresDatasource.getConnection().createStatement()
.execute("""
CREATE SCHEMA IF NOT EXISTS test_pg_schema_1;
CREATE TABLE IF NOT EXISTS test_pg_schema_1.test_pg_table_1 (
id INTEGER NOT NULL,
name VARCHAR(255) NOT NULL,
PRIMARY KEY (id)
);
""");
mysqlDatasource.getConnection().createStatement()
.execute("""
CREATE TABLE test_mysql_table_1 (id INTEGER, name VARCHAR(255));
""");
}
@Test
void test() throws SQLException {
// given
createTestTables();
SchemaPlus rootSchema = CalciteSchemaManager.rootSchema;
JdbcDatabaseWrappingSchema postgres = new JdbcDatabaseWrappingSchema(rootSchema, postgresDatasource, "test-postgres");
JdbcDatabaseWrappingSchema mysql = new JdbcDatabaseWrappingSchema(rootSchema, mysqlDatasource, "test-mysql");
// when
rootSchema.add("test-postgres", postgres);
rootSchema.add("test-mysql", mysql);
// then
assertThat(rootSchema.getSubSchemaNames()).contains("test-postgres", "test-mysql");
assertThat(postgres.getSubSchemaNames()).contains("test_pg_schema_1");
assertThat(postgres.getSubSchema("test_pg_schema_1").getTableNames()).contains("test_pg_table_1");
assertThat(mysql.getSubSchemaNames()).contains("root");
assertThat(mysql.getSubSchema("root").getTableNames()).contains("test_mysql_table_1");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment