Last active
December 25, 2023 19:57
-
-
Save GavinRay97/55ae3493d8079aa72ea44b8087d98413 to your computer and use it in GitHub Desktop.
Apache Calcite - Schema that consumes a datasource connected to a JDBC DB and generates a schema map for all sub-schemas
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import org.apache.calcite.adapter.jdbc.JdbcSchema; | |
import org.apache.calcite.avatica.AvaticaUtils; | |
import org.apache.calcite.schema.Schema; | |
import org.apache.calcite.schema.SchemaFactory; | |
import org.apache.calcite.schema.SchemaPlus; | |
import org.apache.calcite.schema.impl.AbstractSchema; | |
import javax.sql.DataSource; | |
import java.sql.Connection; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import static java.util.Objects.requireNonNull; | |
public final class JdbcDatabaseWrappingSchema extends AbstractSchema { | |
private final SchemaPlus rootSchema; | |
private final DataSource dataSource; | |
private final String databaseName; | |
public JdbcDatabaseWrappingSchema(SchemaPlus rootSchema, DataSource dataSource, String databaseName) { | |
this.rootSchema = rootSchema; | |
this.dataSource = dataSource; | |
this.databaseName = databaseName; | |
} | |
public SchemaPlus rootSchema() { | |
return rootSchema; | |
} | |
public DataSource dataSource() { | |
return dataSource; | |
} | |
public String databaseName() { | |
return databaseName; | |
} | |
@Override | |
public Map<String, org.apache.calcite.schema.Schema> getSubSchemaMap() { | |
try { | |
Map<String, org.apache.calcite.schema.Schema> schemaMap = new HashMap<>(); | |
final List<String> subSchemas = getSubSchemas(); | |
if (subSchemas.isEmpty()) { | |
final String schemaName = "root"; | |
final JdbcSchema jdbcSchema = getJdbcSchema(schemaName); | |
schemaMap.put(schemaName, jdbcSchema); | |
} else { | |
for (final String schemaName : subSchemas) { | |
final JdbcSchema jdbcSchema = getJdbcSchema(schemaName); | |
schemaMap.put(schemaName, jdbcSchema); | |
} | |
} | |
return schemaMap; | |
} catch (SQLException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private JdbcSchema getJdbcSchema(String schemaName) { | |
return JdbcSchema.create( | |
rootSchema, databaseName + "_" + schemaName, | |
dataSource, databaseName, schemaName); | |
} | |
private List<String> getSubSchemas() throws SQLException { | |
try (Connection connection = dataSource.getConnection()) { | |
final ResultSet resultSet = connection.getMetaData().getSchemas(); | |
List<String> schemas = new ArrayList<>(); | |
while (resultSet.next()) { | |
schemas.add(resultSet.getString("TABLE_SCHEM")); | |
} | |
return schemas; | |
} | |
} | |
public static class Factory implements SchemaFactory { | |
public static final JdbcDatabaseWrappingSchema.Factory INSTANCE = new JdbcDatabaseWrappingSchema.Factory(); | |
private Factory() { | |
} | |
@Override | |
public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { | |
DataSource dataSource; | |
try { | |
final String dataSourceName = (String) operand.get("dataSource"); | |
if (dataSourceName != null) { | |
dataSource = AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName); | |
} else { | |
final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl"); | |
final String jdbcDriver = (String) operand.get("jdbcDriver"); | |
final String jdbcUser = (String) operand.get("jdbcUser"); | |
final String jdbcPassword = (String) operand.get("jdbcPassword"); | |
dataSource = JdbcSchema.dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword); | |
} | |
} catch (Exception e) { | |
throw new RuntimeException("Error while reading dataSource", e); | |
} | |
String databaseName = (String) operand.get("databaseName"); | |
return new JdbcDatabaseWrappingSchema(parentSchema, dataSource, databaseName); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import org.apache.calcite.schema.SchemaPlus; | |
import org.junit.jupiter.api.Test; | |
import org.testcontainers.containers.MySQLContainer; | |
import org.testcontainers.containers.PostgreSQLContainer; | |
import org.testcontainers.junit.jupiter.Container; | |
import org.testcontainers.junit.jupiter.Testcontainers; | |
import javax.sql.DataSource; | |
import java.sql.SQLException; | |
import static org.assertj.core.api.Assertions.assertThat; | |
@Testcontainers | |
class CalciteSchemaManagerTest extends AbstractContainerDatabaseTest { | |
@Container | |
private static final PostgreSQLContainer postgresContainer = new PostgreSQLContainer<>("postgres:14") | |
.withDatabaseName("test-postgres"); | |
@Container | |
private static final MySQLContainer mysqlContainer = new MySQLContainer<>("mysql:8") | |
.withDatabaseName("test-mysql"); | |
private final DataSource postgresDatasource = getDataSource(postgresContainer); | |
private final DataSource mysqlDatasource = getDataSource(mysqlContainer); | |
private void createTestTables() throws SQLException { | |
postgresDatasource.getConnection().createStatement() | |
.execute(""" | |
CREATE SCHEMA IF NOT EXISTS test_pg_schema_1; | |
CREATE TABLE IF NOT EXISTS test_pg_schema_1.test_pg_table_1 ( | |
id INTEGER NOT NULL, | |
name VARCHAR(255) NOT NULL, | |
PRIMARY KEY (id) | |
); | |
"""); | |
mysqlDatasource.getConnection().createStatement() | |
.execute(""" | |
CREATE TABLE test_mysql_table_1 (id INTEGER, name VARCHAR(255)); | |
"""); | |
} | |
@Test | |
void test() throws SQLException { | |
// given | |
createTestTables(); | |
SchemaPlus rootSchema = CalciteSchemaManager.rootSchema; | |
JdbcDatabaseWrappingSchema postgres = new JdbcDatabaseWrappingSchema(rootSchema, postgresDatasource, "test-postgres"); | |
JdbcDatabaseWrappingSchema mysql = new JdbcDatabaseWrappingSchema(rootSchema, mysqlDatasource, "test-mysql"); | |
// when | |
rootSchema.add("test-postgres", postgres); | |
rootSchema.add("test-mysql", mysql); | |
// then | |
assertThat(rootSchema.getSubSchemaNames()).contains("test-postgres", "test-mysql"); | |
assertThat(postgres.getSubSchemaNames()).contains("test_pg_schema_1"); | |
assertThat(postgres.getSubSchema("test_pg_schema_1").getTableNames()).contains("test_pg_table_1"); | |
assertThat(mysql.getSubSchemaNames()).contains("root"); | |
assertThat(mysql.getSubSchema("root").getTableNames()).contains("test_mysql_table_1"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment