Skip to content

Instantly share code, notes, and snippets.

@junwen12221
Created October 14, 2021 09:12
Show Gist options
  • Save junwen12221/2b554de59d72b83238e15343db226696 to your computer and use it in GitHub Desktop.
Save junwen12221/2b554de59d72b83238e15343db226696 to your computer and use it in GitHub Desktop.
/**
* Copyright (C) <2021> <chen junwen>
* <p>
* This program is free software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
* <p>
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
* <p>
* You should have received a copy of the GNU General Public License along with this program. If
* not, see <http://www.gnu.org/licenses/>.
*/
package io.mycat.config;
import cn.mycat.vertx.xa.MySQLManager;
import cn.mycat.vertx.xa.XaLog;
import cn.mycat.vertx.xa.impl.LocalXaMemoryRepositoryImpl;
import cn.mycat.vertx.xa.impl.XaLogImpl;
import com.alibaba.druid.sql.MycatSQLUtils;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLIndexDefinition;
import com.alibaba.druid.sql.ast.SQLName;
import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
import com.alibaba.druid.sql.ast.statement.SQLSelectOrderByItem;
import com.alibaba.druid.sql.ast.statement.SQLTableElement;
import com.alibaba.druid.sql.dialect.mysql.ast.MySqlPrimaryKey;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlTableIndex;
import io.mycat.*;
import io.mycat.beans.mycat.TransactionType;
import io.mycat.calcite.spm.DbPlanManagerPersistorImpl;
import io.mycat.calcite.spm.MemPlanCache;
import io.mycat.calcite.spm.QueryPlanner;
import io.mycat.calcite.spm.UpdatePlanCache;
import io.mycat.calcite.table.DualCustomTableHandler;
import io.mycat.calcite.table.SchemaHandler;
import io.mycat.commands.MycatMySQLManagerImpl;
import io.mycat.commands.SqlResultSetService;
import io.mycat.datasource.jdbc.DruidDatasourceProvider;
import io.mycat.datasource.jdbc.datasource.JdbcConnectionManager;
import io.mycat.monitor.MonitorReplicaSelectorManager;
import io.mycat.plug.loadBalance.LoadBalanceManager;
import io.mycat.plug.sequence.SequenceGenerator;
import io.mycat.proxy.session.AuthenticatorImpl;
import io.mycat.replica.ReplicaSelectorManager;
import io.mycat.replica.ReplicaSelectorRuntime;
import io.mycat.replica.ReplicaSwitchType;
import io.mycat.replica.ReplicaType;
import io.mycat.sqlhandler.ConfigUpdater;
import io.mycat.sqlhandler.config.KV;
import io.mycat.sqlhandler.config.StorageManager;
import io.mycat.sqlhandler.config.UpdateSet;
import io.mycat.statistic.StatisticCenter;
import io.mycat.util.JsonUtil;
import io.mycat.util.NameMap;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.json.Json;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
public class MycatRouterConfigOps implements AutoCloseable, ConfigOps {
private static final Logger LOGGER = LoggerFactory.getLogger(MycatRouterConfigOps.class);
final MycatRouterConfig original;
final StorageManager storageManager;
MycatRouterConfig newConfig;
static boolean init = true;
public MycatRouterConfigOps(MycatRouterConfig original, StorageManager metadataManager) {
this.original = original;
this.storageManager = metadataManager;
this.newConfig = Json.decodeValue(Json.encode(original), original.getClass());
}
public MycatRouterConfigOps(MycatRouterConfig original, MycatRouterConfig newConfig, StorageManager storageManager) {
this.original = original;
this.storageManager = storageManager;
this.newConfig = newConfig;
}
public static boolean isInit() {
boolean init = MycatRouterConfigOps.init;
if (init) {
MycatRouterConfigOps.init = false;
}
return init;
}
@Override
public void addSchema(String schemaName, String targetName) {
List<LogicSchemaConfig> schemas = newConfig.getSchemas();
LogicSchemaConfig schemaConfig;
Optional<LogicSchemaConfig> first = schemas.stream().filter(i -> schemaName.equals(i.getSchemaName())).findFirst();
if (first.isPresent()) {
first.get().setTargetName(targetName);
} else {
schemas.add(schemaConfig = new LogicSchemaConfig());
schemaConfig.setSchemaName(schemaName);
schemaConfig.setTargetName(targetName);
}
}
@Override
public void putSchema(LogicSchemaConfig schemaConfig) {
List<LogicSchemaConfig> schemas = newConfig.getSchemas();
Optional<LogicSchemaConfig> first = schemas.stream().filter(i ->
Objects.requireNonNull(schemaConfig.getSchemaName(), "schema name is null")
.equals(i.getSchemaName())).findFirst();
first.ifPresent(schemas::remove);
schemas.add(schemaConfig);
}
@Override
public void dropSchema(String schemaName) {
List<LogicSchemaConfig> schemas = newConfig.getSchemas();
Optional<LogicSchemaConfig> first = schemas.stream().filter(i -> i.getSchemaName().equals(schemaName)).findFirst();
first.ifPresent(o -> {
schemas.remove(o);
});
}
@Override
public void putNormalTable(String schemaName, String tableName, MySqlCreateTableStatement sqlString) {
List<LogicSchemaConfig> schemas = newConfig.getSchemas();
Optional<LogicSchemaConfig> first = schemas.stream().filter(i -> i.getSchemaName().equals(schemaName)).findFirst();
first.ifPresent(logicSchemaConfig -> {
String defaultTarget = Optional.ofNullable(logicSchemaConfig.getTargetName()).orElse(MetadataManager.getPrototype());
putNormalTable(schemaName, tableName, sqlString, defaultTarget);
});
}
@Override
public NormalTableConfig putNormalTable(String schemaName, String tableName, MySqlCreateTableStatement sqlString, String targetName) {
NormalTableConfig normalTableConfig = new NormalTableConfig();
normalTableConfig.setCreateTableSQL(sqlString.toString());
normalTableConfig.setLocality(NormalBackEndTableInfoConfig.builder()
.targetName(targetName)
.schemaName(schemaName)
.tableName(tableName)
.build());
return putNormalTable(schemaName, tableName, normalTableConfig);
}
@Override
public NormalTableConfig putNormalTable(String schemaName, String tableName, NormalTableConfig normalTableConfig) {
List<LogicSchemaConfig> schemas = newConfig.getSchemas();
LogicSchemaConfig logicSchemaConfig = schemas.stream()
.filter(i -> i.getSchemaName().equals(schemaName))
.findFirst().orElse(null);
if (logicSchemaConfig == null) {
throw new IllegalArgumentException("unknown:" + schemaName);
}
Map<String, NormalTableConfig> normalTables = logicSchemaConfig.getNormalTables();
normalTables.put(tableName, normalTableConfig);
return normalTableConfig;
}
@Override
public void putTable(CreateTableConfig createTableConfig) {
String schemaName = createTableConfig.getSchemaName();
String tableName = createTableConfig.getTableName();
NormalTableConfig normalTable = createTableConfig.getNormalTable();
GlobalTableConfig globalTable = createTableConfig.getGlobalTable();
ShardingTableConfig shardingTable = createTableConfig.getShardingTable();
if (normalTable != null) {
putNormalTable(schemaName, tableName, normalTable);
} else if (globalTable != null) {
putGlobalTableConfig(schemaName, tableName, globalTable);
} else if (shardingTable != null) {
putShardingTable(schemaName, tableName, shardingTable);
}
}
@Override
public GlobalTableConfig putGlobalTable(String schemaName, String tableName, MySqlCreateTableStatement sqlString) {
GlobalTableConfig globalTableConfig = getGlobalTableConfig(sqlString);
return putGlobalTableConfig(schemaName, tableName, globalTableConfig);
}
@Override
public GlobalTableConfig putGlobalTableConfig(String schemaName, String tableName, GlobalTableConfig globalTableConfig) {
List<LogicSchemaConfig> schemas = newConfig.getSchemas();
LogicSchemaConfig logicSchemaConfig = schemas.stream().filter(i -> i.getSchemaName().equals(schemaName)).findFirst().orElse(null);
if (logicSchemaConfig == null) {
throw new IllegalArgumentException("unknown:" + schemaName);
}
Map<String, GlobalTableConfig> globalTableConfigMap = logicSchemaConfig.getGlobalTables();
globalTableConfigMap.put(tableName, globalTableConfig);
return globalTableConfig;
}
@NotNull
private GlobalTableConfig getGlobalTableConfig(MySqlCreateTableStatement sqlString) {
List<ClusterConfig> clusters = newConfig.getClusters();
List<String> allReplica = clusters.stream().map(i -> i.getName()).filter(i -> i.startsWith("c")).collect(Collectors.toList());
GlobalTableConfig globalTableConfig = new GlobalTableConfig();
globalTableConfig.setCreateTableSQL(sqlString.toString());
globalTableConfig.setBroadcast(allReplica.stream()
.map(i -> {
GlobalBackEndTableInfoConfig backEndTableInfoConfig = new GlobalBackEndTableInfoConfig();
backEndTableInfoConfig.setTargetName(i);
return backEndTableInfoConfig;
}).collect(Collectors.toList()));
return globalTableConfig;
}
@Override
public void removeTable(String schemaNameArg, String tableNameArg) {
String schemaName = SQLUtils.normalize(schemaNameArg);
String tableName = SQLUtils.normalize(tableNameArg);
List<LogicSchemaConfig> schemas = newConfig.getSchemas();
Optional<LogicSchemaConfig> first = schemas.stream().filter(i -> i.getSchemaName().equals(schemaName)).findFirst();
first.ifPresent(logicSchemaConfig -> {
logicSchemaConfig.getNormalTables().remove(tableName);
logicSchemaConfig.getGlobalTables().remove(tableName);
logicSchemaConfig.getShardingTables().remove(tableName);
logicSchemaConfig.getCustomTables().remove(tableName);
});
}
@Override
public ShardingTableConfig putRangeTable(String schemaName, String tableName, MySqlCreateTableStatement tableStatement, Map<String, Object> infos) {
Map<String, String> ranges = (Map) infos.get("ranges");
Map<String, String> dataNodes = (Map) Optional.ofNullable(infos.get("dataNodes")).orElseGet(() -> infos.get("partition"));
Map<String, String> properties = (Map) infos.get("properties");
String aClass = (String) (infos.get("class"));
String name = (String) (infos.get("name"));
ShardingTableConfig.ShardingTableConfigBuilder builder = ShardingTableConfig.builder();
ShardingTableConfig config = builder
.createTableSQL(tableStatement.toString())
.function(ShardingFunction.builder().name(name).clazz(aClass).properties((Map) properties).ranges((Map) ranges).build())
.partition(Optional.ofNullable(dataNodes).map(i -> ShardingBackEndTableInfoConfig
.builder()
.schemaNames(dataNodes.get("schemaNames"))
.tableNames(dataNodes.get("tableNames"))
.targetNames(dataNodes.get("targetNames")).build())
.orElse(null))
.build();
return putShardingTable(schemaName, tableName, config);
}
@Override
public ShardingTableConfig putShardingTable(String schemaName, String tableName, ShardingTableConfig config) {
removeTable(schemaName, tableName);
Map<String, ShardingTableConfig> indexTables
= Optional.ofNullable(config.getShardingIndexTables()).orElse(Collections.emptyMap());
for (Map.Entry<String, ShardingTableConfig> entry : indexTables.entrySet()) {
removeTable(schemaName, entry.getKey());
}
List<LogicSchemaConfig> schemas = newConfig.getSchemas();
Optional<LogicSchemaConfig> first = schemas.stream().filter(i -> i.getSchemaName().equals(schemaName)).findFirst();
first.ifPresent(logicSchemaConfig -> {
Map<String, ShardingTableConfig> shardingTables = logicSchemaConfig.getShardingTables();
shardingTables.put(tableName, config);
});
return config;
}
@Override
public ShardingTableConfig putHashTable(String schemaName, String tableName, MySqlCreateTableStatement tableStatement, Map<String, Object> infos) {
NameMap<SQLColumnDefinition> columnMap = NameMap.immutableCopyOf(tableStatement.getColumnDefinitions().stream()
.collect(Collectors.toMap(k -> SQLUtils.normalize(k.getColumnName()), v -> v)));
Map<String, ShardingTableConfig> indexTableConfigs = new HashMap<>();
MySqlPrimaryKey primaryKey = (MySqlPrimaryKey) tableStatement.getTableElementList().stream().filter(i -> i instanceof MySqlPrimaryKey).findFirst().orElse(null);
for (SQLTableElement sqlTableElement : tableStatement.getTableElementList()) {
if (sqlTableElement instanceof MySqlTableIndex) {
MySqlTableIndex element = (MySqlTableIndex) sqlTableElement;
if (!element.isGlobal()) {
continue;
}
SQLIndexDefinition indexDefinition = element.getIndexDefinition();
MySqlCreateTableStatement indexCreateTableStatement = new MySqlCreateTableStatement();
indexCreateTableStatement.setIfNotExiists(true);
String indexTableName = tableName + "_" + SQLUtils.normalize(indexDefinition.getName().getSimpleName());
indexCreateTableStatement.setTableName(indexTableName);
indexCreateTableStatement.setSchema(schemaName);
for (SQLSelectOrderByItem indexColumn : indexDefinition.getColumns()) {
indexCreateTableStatement.addColumn(columnMap.get(SQLUtils.normalize(indexColumn.getExpr().toString())));
}
for (SQLName sqlName : indexDefinition.getCovering()) {
indexCreateTableStatement.addColumn(columnMap.get(SQLUtils.normalize(sqlName.toString())));
}
if (primaryKey != null) {
indexCreateTableStatement.getTableElementList().add(primaryKey);
}
indexCreateTableStatement.setDbPartitionBy(indexDefinition.getDbPartitionBy());
indexCreateTableStatement.setTablePartitionBy(indexDefinition.getTbPartitionBy());
indexCreateTableStatement.setDbPartitions(indexCreateTableStatement.getDbPartitions());
indexCreateTableStatement.setTablePartitions(indexDefinition.getTbPartitions());
Map<String, Object> autoHashProperties = getAutoHashProperties(indexCreateTableStatement);
ShardingTableConfig.ShardingTableConfigBuilder builder = ShardingTableConfig.builder();
ShardingTableConfig config = builder
.createTableSQL(MycatSQLUtils.toString(indexCreateTableStatement))
.function(ShardingFunction.builder().properties(autoHashProperties).build())
.build();
indexTableConfigs.put(indexTableName, config);
}
}
ShardingTableConfig.ShardingTableConfigBuilder builder = ShardingTableConfig.builder();
ShardingTableConfig config = builder
.createTableSQL(MycatSQLUtils.toString(tableStatement))
.function(ShardingFunction.builder().properties((Map) infos).build())
.shardingIndexTables(indexTableConfigs)
.build();
return putShardingTable(schemaName, tableName, config);
}
@Override
public void putUser(String username, String password, String ip, String transactionType) {
UserConfig userConfig = UserConfig.builder()
.username(username)
.password(password)
.ip(ip)
.transactionType(transactionType)
.build();
putUser(userConfig);
}
@Override
public void putUser(UserConfig userConfig) {
List<UserConfig> users = newConfig.getUsers();
users.stream().filter(u -> u.getUsername().equals(userConfig.getUsername()))
.findFirst().ifPresent(find -> users.remove(find));
users.add(userConfig);
}
@Override
public void deleteUser(String username) {
List<UserConfig> users = newConfig.getUsers();
users.stream().filter(i -> username.equals(i.getUsername()))
.findFirst().ifPresent(i -> users.remove(i));
}
@Override
public void putSequence(SequenceConfig sequenceConfig) {
List<SequenceConfig> sequences = newConfig.getSequences();
sequences.stream().filter(i -> i.getName().equals(sequenceConfig.getName()))
.findFirst().ifPresent(s -> sequences.remove(s));
sequences.add(sequenceConfig);
}
@Override
public void removeSequenceByName(String name) {
List<SequenceConfig> sequences = newConfig.getSequences();
sequences.stream()
.filter(i -> name.equals(i.getName())).findFirst()
.ifPresent(i -> sequences.remove(i));
}
@Override
public void putDatasource(DatasourceConfig datasourceConfig) {
List<DatasourceConfig> datasources = newConfig.getDatasources();
Optional<DatasourceConfig> first = datasources.stream().filter(i -> datasourceConfig.getName().equals(i.getName())).findFirst();
first.ifPresent(config -> datasources.remove(config));
datasources.add(datasourceConfig);
}
@Override
public void removeDatasource(String datasourceName) {
List<DatasourceConfig> datasources = newConfig.getDatasources();
Optional<DatasourceConfig> first = Optional.empty();
for (DatasourceConfig i : datasources) {
if (datasourceName.equals(i.getName())) {
first = Optional.of(i);
break;
}
}
first.ifPresent(datasources::remove);
}
@Override
public void putReplica(ClusterConfig clusterConfig) {
List<ClusterConfig> clusters = newConfig.getClusters();
Optional<ClusterConfig> first = clusters.stream().filter(i -> clusterConfig.getName().equals(i.getName())).findFirst();
first.ifPresent(clusters::remove);
clusters.add(clusterConfig);
}
@Override
public void removeReplica(String replicaName) {
List<ClusterConfig> clusters = newConfig.getClusters();
Optional<ClusterConfig> first = clusters.stream().filter(i -> replicaName.equals(i.getName())).findFirst();
first.ifPresent(clusters::remove);
}
@Override
public void putSqlCache(SqlCacheConfig currentSqlCacheConfig) {
List<SqlCacheConfig> sqlCaches = newConfig.getSqlCacheConfigs();
Optional<SqlCacheConfig> first = sqlCaches.stream().filter(i -> currentSqlCacheConfig.getName().equals(i.getName())).findFirst();
first.ifPresent(o -> {
sqlCaches.remove(o);
});
sqlCaches.add(currentSqlCacheConfig);
}
@Override
public void removeSqlCache(String cacheName) {
Optional<SqlCacheConfig> first = newConfig.getSqlCacheConfigs()
.stream().filter(i -> cacheName.equals(i.getName())).findFirst();
if (!first.isPresent()) {
return;
}
List<SqlCacheConfig> sqlCaches = newConfig.getSqlCacheConfigs();
first.ifPresent(o -> {
sqlCaches.remove(o);
});
}
@Override
public void commit(boolean persistence) throws Exception {
if (MetaClusterCurrent.exist(ReplicaSelectorManager.class)) {
ReplicaSelectorManager replicaSelectorManager = MetaClusterCurrent.wrapper(ReplicaSelectorManager.class);
replicaSelectorManager.close();
}
MycatRouterConfig newConfig = this.newConfig;
defaultConfig(newConfig);
newConfig.fixPrototypeTargetName();
if(LOGGER.isDebugEnabled()){
LOGGER.debug(JsonUtil.toJson(this.original));
LOGGER.debug("===========================================change to ===========================================");
LOGGER.debug(JsonUtil.toJson(newConfig));
}
UpdateSet<LogicSchemaConfig> schemaConfigUpdateSet = UpdateSet.create(newConfig.getSchemas(), original.getSchemas());
UpdateSet<ClusterConfig> clusterConfigUpdateSet = UpdateSet.create(newConfig.getClusters(), original.getClusters());
UpdateSet<DatasourceConfig> datasourceConfigUpdateSet = UpdateSet.create(newConfig.getDatasources(), original.getDatasources());
UpdateSet<SequenceConfig> sequenceConfigUpdateSet = UpdateSet.create(newConfig.getSequences(), original.getSequences());
UpdateSet<SqlCacheConfig> sqlCacheConfigUpdateSet = UpdateSet.create(newConfig.getSqlCacheConfigs(), original.getSqlCacheConfigs());
UpdateSet<UserConfig> userConfigUpdateSet = UpdateSet.create(newConfig.getUsers(), original.getUsers());
if (!userConfigUpdateSet.isEmpty() || !MetaClusterCurrent.exist(Authenticator.class)) {
Collection<UserConfig> target = userConfigUpdateSet.getTarget();
AuthenticatorImpl authenticator = new AuthenticatorImpl(target);
MetaClusterCurrent.register(Authenticator.class, authenticator);
}
if (!sqlCacheConfigUpdateSet.isEmpty() || !MetaClusterCurrent.exist(SqlResultSetService.class)) {
Collection<SqlCacheConfig> target = sqlCacheConfigUpdateSet.getTarget();
if (MetaClusterCurrent.exist(SqlResultSetService.class)) {
MetaClusterCurrent.wrapper(SqlResultSetService.class).clear();
}
SqlResultSetService sqlResultSetService = new SqlResultSetService();
for (SqlCacheConfig sqlCacheConfig : target) {
sqlResultSetService.addIfNotPresent(sqlCacheConfig);
}
MetaClusterCurrent.register(SqlResultSetService.class, sqlResultSetService);
}
if (!sequenceConfigUpdateSet.isEmpty() || !MetaClusterCurrent.exist(SequenceGenerator.class)) {
Collection<SequenceConfig> target = sequenceConfigUpdateSet.getTarget();
ServerConfig serverConfig = MetaClusterCurrent.wrapper(ServerConfig.class);
SequenceGenerator sequenceGenerator = new SequenceGenerator(serverConfig.getMycatId(), target);
MetaClusterCurrent.register(SequenceGenerator.class, sequenceGenerator);
}
JdbcConnectionManager jdbcConnectionManager = null;
if (!datasourceConfigUpdateSet.isEmpty() || !MetaClusterCurrent.exist(JdbcConnectionManager.class)) {
Map<String, DatasourceConfig> targetAsMap = datasourceConfigUpdateSet.getTargetAsMap();
if (MetaClusterCurrent.exist(JdbcConnectionManager.class)) {
jdbcConnectionManager = MetaClusterCurrent.wrapper(JdbcConnectionManager.class);
for (DatasourceConfig datasourceConfig : datasourceConfigUpdateSet.getDelete()) {
jdbcConnectionManager.removeDatasource(datasourceConfig.getName());
}
for (DatasourceConfig datasourceConfig : datasourceConfigUpdateSet.getCreate()) {
jdbcConnectionManager.addDatasource(datasourceConfig);
}
} else {
jdbcConnectionManager = new JdbcConnectionManager(
DruidDatasourceProvider.class.getCanonicalName(),
targetAsMap);
}
MetaClusterCurrent.register(JdbcConnectionManager.class, jdbcConnectionManager);
MetaClusterCurrent.register(ConnectionManager.class, jdbcConnectionManager);
if (MetaClusterCurrent.exist(MySQLManager.class)) {
MetaClusterCurrent.wrapper(MySQLManager.class).close();
}
DatasourceConfigProvider datasourceConfigProvider = new DatasourceConfigProvider() {
@Override
public Map<String, DatasourceConfig> get() {
return targetAsMap;
}
};
MetaClusterCurrent.register(DatasourceConfigProvider.class, datasourceConfigProvider);
MetaClusterCurrent.register(MySQLManager.class, new MycatMySQLManagerImpl(newConfig));
}
{
LoadBalanceManager loadBalanceManager = MetaClusterCurrent.wrapper(LoadBalanceManager.class);
Map<String, DatasourceConfig> datasourceConfigMap =
datasourceConfigUpdateSet.getTarget().stream().collect(Collectors.toMap(k -> k.getName(), v -> v));
ArrayList<ClusterConfig> clusterConfigs = new ArrayList<>(clusterConfigUpdateSet.getTarget());
ReplicaSelectorManager replicaSelector = new ReplicaSelectorRuntime(clusterConfigs, datasourceConfigMap, loadBalanceManager,
name -> {
try {
MySQLManager manager = MetaClusterCurrent.wrapper(MySQLManager.class);
return manager.getSessionCount(name);
} catch (Exception e) {
LOGGER.error("", e);
return 0;
}
}, (command, initialDelay, period, unit) -> {
ScheduledFuture<?> scheduled = ScheduleUtil.getTimer().scheduleAtFixedRate(command, initialDelay, period, unit);
return () -> {
try {
if (scheduled != null && (!scheduled.isDone() || !scheduled.isCancelled())) {
scheduled.cancel(true);
}
} catch (Throwable throwable) {
LOGGER.error("", throwable);
}
};
});
replicaSelector = new MonitorReplicaSelectorManager(replicaSelector);
MetaClusterCurrent.register(ReplicaSelectorManager.class, replicaSelector);
MetaClusterCurrent.wrapper(JdbcConnectionManager.class).register(replicaSelector);
}
if (!schemaConfigUpdateSet.isEmpty()|| !MetaClusterCurrent.exist(MetadataManager.class)) {
MetadataManager metadataManager;
if (MetaClusterCurrent.exist(MetadataManager.class)) {
metadataManager = MetaClusterCurrent.wrapper(MetadataManager.class);
for (LogicSchemaConfig logicSchemaConfig : schemaConfigUpdateSet.getDelete()) {
metadataManager.removeSchema(logicSchemaConfig.getSchemaName());
}
for (LogicSchemaConfig logicSchemaConfig : schemaConfigUpdateSet.getCreate()) {
metadataManager.addSchema(logicSchemaConfig);
}
} else {
Map<String, LogicSchemaConfig> targetAsMap = schemaConfigUpdateSet.getTargetAsMap();
PrototypeService prototypeService = new PrototypeService();
metadataManager = MetadataManager.createMetadataManager(targetAsMap, prototypeService);
}
MetaClusterCurrent.register(MetadataManager.class, metadataManager);
MetaClusterCurrent.register(DrdsSqlCompiler.class, new DrdsSqlCompiler(new DrdsConfig() {
@Override
public NameMap<SchemaHandler> schemas() {
return metadataManager.getSchemaMap();
}
}));
if (!MetaClusterCurrent.exist(MemPlanCache.class)|| !MetaClusterCurrent.exist(MemPlanCache.class)) {
DbPlanManagerPersistorImpl newDbPlanManagerPersistor = new DbPlanManagerPersistorImpl();
IOExecutor vertx = MetaClusterCurrent.wrapper(IOExecutor.class);
vertx.executeBlocking((Handler<Promise<Void>>) promise -> {
try {
newDbPlanManagerPersistor.checkStore();
promise.tryComplete();
} catch (Throwable throwable) {
LOGGER.error("", throwable);
promise.fail(throwable);
}
});
MemPlanCache memPlanCache = new MemPlanCache((newDbPlanManagerPersistor));
MetaClusterCurrent.register(MemPlanCache.class, memPlanCache);
MetaClusterCurrent.register(QueryPlanner.class, new QueryPlanner(memPlanCache));
memPlanCache.init();
} else {
MemPlanCache memPlanCache = MetaClusterCurrent.wrapper(MemPlanCache.class);
memPlanCache.clearCache();
}
}
if (!MetaClusterCurrent.exist(StatisticCenter.class)) {
StatisticCenter statisticCenter = new StatisticCenter();
MetaClusterCurrent.register(StatisticCenter.class, statisticCenter);
}
if (!MetaClusterCurrent.exist(MysqlVariableService.class) && jdbcConnectionManager != null) {
MetaClusterCurrent.register(MysqlVariableService.class, new MysqlVariableServiceImpl(jdbcConnectionManager));
}
MySQLManager mySQLManager = MetaClusterCurrent.wrapper(MySQLManager.class);
ServerConfig serverConfig = (ServerConfig) MetaClusterCurrent.wrapper(ServerConfig.class);
LocalXaMemoryRepositoryImpl localXaMemoryRepository = LocalXaMemoryRepositoryImpl.createLocalXaMemoryRepository(() -> mySQLManager);
MetaClusterCurrent.register(XaLog.class, new XaLogImpl(localXaMemoryRepository, serverConfig.getMycatId(), Objects.requireNonNull(mySQLManager)));
MetaClusterCurrent.register(UpdatePlanCache.class, new UpdatePlanCache());
MetaClusterCurrent.register(MycatRouterConfig.class, newConfig);
MycatRouterConfig curConfig = MetaClusterCurrent.wrapper(MycatRouterConfig.class);
boolean allMatchMySQL = curConfig.getDatasources().stream().allMatch(s -> "mysql".equalsIgnoreCase(s.getDbType()));
XaLog xaLog = MetaClusterCurrent.wrapper(XaLog.class);
if (isInit() && allMatchMySQL) {
Authenticator authenticator = MetaClusterCurrent.wrapper(Authenticator.class);
boolean hasXA = authenticator.getConfigAsList().stream().anyMatch(u -> TransactionType.parse(u.getTransactionType()) == TransactionType.JDBC_TRANSACTION_TYPE);
if (hasXA) {
LOGGER.info("readXARecoveryLog start");
xaLog.readXARecoveryLog();
}
}
MycatRouterConfig runtimeConfigSnapshot = ConfigUpdater.getRuntimeConfigSnapshot();
if (!newConfig.baseMatch(runtimeConfigSnapshot)){
System.out.println();
}
if (persistence){
List<KV<LogicSchemaConfig>> schemaKvs = Arrays.asList(storageManager.get(LogicSchemaConfig.class));
List<KV<ClusterConfig>> clusterKvs = Arrays.asList(storageManager.get(ClusterConfig.class));
List<KV<DatasourceConfig>> datasourceKvs = Arrays.asList(storageManager.get(DatasourceConfig.class));
List<KV<SequenceConfig>> sequenceKvs = Arrays.asList(storageManager.get(SequenceConfig.class));
List<KV<SqlCacheConfig>> sqlCacheKvs = Arrays.asList(storageManager.get(SqlCacheConfig.class));
List<KV<UserConfig>> userKvs = Arrays.asList(storageManager.get(UserConfig.class));
schemaKvs.forEach(kv1 -> schemaConfigUpdateSet.execute(kv1));
clusterKvs.forEach(kv1 -> clusterConfigUpdateSet.execute(kv1));
datasourceKvs.forEach(kv1 -> datasourceConfigUpdateSet.execute(kv1));
sequenceKvs.forEach(kv1 -> sequenceConfigUpdateSet.execute(kv1));
sqlCacheKvs.forEach(kv1 -> sqlCacheConfigUpdateSet.execute(kv1));
userKvs.forEach(kv1 -> userConfigUpdateSet.execute(kv1));
}
}
@Override
public void reset() {
MycatRouterConfig newMycatRouterConfig = new MycatRouterConfig();
defaultConfig(newMycatRouterConfig);
this.newConfig = newMycatRouterConfig;
}
public static void defaultConfig(MycatRouterConfig routerConfig) {
if (routerConfig.getUsers().isEmpty()) {
UserConfig userConfig = new UserConfig();
userConfig.setPassword("123456");
userConfig.setUsername("root");
routerConfig.getUsers().add(userConfig);
}
if (routerConfig.getDatasources().isEmpty()) {
DatasourceConfig datasourceConfig = new DatasourceConfig();
datasourceConfig.setDbType("mysql");
datasourceConfig.setUser("root");
datasourceConfig.setPassword("123456");
datasourceConfig.setName("prototypeDs");
datasourceConfig.setUrl("jdbc:mysql://localhost:3306/mysql");
routerConfig.getDatasources().add(datasourceConfig);
if (routerConfig.getClusters().isEmpty()) {
ClusterConfig clusterConfig = new ClusterConfig();
clusterConfig.setName("prototype");
clusterConfig.setMasters(Collections.singletonList("prototypeDs"));
clusterConfig.setMaxCon(200);
clusterConfig.setClusterType(ReplicaType.MASTER_SLAVE.name());
clusterConfig.setSwitchType(ReplicaSwitchType.SWITCH.name());
routerConfig.getClusters().add(clusterConfig);
}
}
routerConfig.fixPrototypeTargetName();
routerConfig.setSchemas(
new ArrayList<>(
fix(routerConfig.getSchemas().stream().collect(Collectors.toMap(k -> k.getSchemaName(), v -> v))).values()
)
);
}
@Override
public void close() {
}
public static Map<String, Object> getAutoHashProperties(MySqlCreateTableStatement createTableSql) {
SQLExpr dbPartitionBy = createTableSql.getDbPartitionBy();
HashMap<String, Object> properties = new HashMap<>();
MetadataManager metadataManager = MetaClusterCurrent.wrapper(MetadataManager.class);
int defaultStoreNodeNum = metadataManager.getDefaultStoreNodeNum();
properties.put("storeNum", defaultStoreNodeNum);
if (dbPartitionBy != null) {
int dbPartitions = (Optional.ofNullable(createTableSql.getDbPartitions())
.map(i -> i.toString()).map(i -> Integer.parseInt(SQLUtils.normalize(i))).orElse(defaultStoreNodeNum));
properties.put("dbNum", Objects.toString(dbPartitions));
properties.put("dbMethod", Objects.toString(dbPartitionBy));
}
SQLExpr tablePartitionBy = createTableSql.getTablePartitionBy();
if (tablePartitionBy != null) {
int tablePartitions = Integer.parseInt(SQLUtils.normalize(createTableSql.getTablePartitions().toString()));
properties.put("tableNum", Objects.toString(tablePartitions));
properties.put("tableMethod", Objects.toString(tablePartitionBy));
}
return properties;
}
private static Map<String, LogicSchemaConfig> fix(Map<String, LogicSchemaConfig> orginal) {
orginal = new HashMap<>(orginal);
Set<String> databases = new HashSet<>();
databases.add("information_schema");
databases.add("mysql");
databases.add("performance_schema");
for (String database : databases) {
if (!orginal.containsKey(database)) {
LogicSchemaConfig schemaConfig = new LogicSchemaConfig();
schemaConfig.setSchemaName(database);
schemaConfig.setTargetName(MetadataManager.getPrototype());
orginal.put(database, schemaConfig);
}
}
ArrayList<LogicSchemaConfig> logicSchemaConfigs = new ArrayList<>();
addInnerTable(logicSchemaConfigs);
for (LogicSchemaConfig logicSchemaConfig : logicSchemaConfigs) {
if (!orginal.containsKey(logicSchemaConfig.getSchemaName())) {
orginal.put(logicSchemaConfig.getSchemaName(), logicSchemaConfig);
}
}
return orginal;
}
private static void addInnerTable(List<LogicSchemaConfig> schemaConfigs) {
String schemaName = "mysql";
String targetName = "prototype";
String tableName = "proc";
LogicSchemaConfig logicSchemaConfig = schemaConfigs.stream()
.filter(i -> schemaName.equals(i.getSchemaName()))
.findFirst()
.orElseGet(() -> {
LogicSchemaConfig config = new LogicSchemaConfig();
config.setSchemaName(schemaName);
config.setTargetName(MetadataManager.prototype);
schemaConfigs.add(config);
return config;
});
Map<String, NormalTableConfig> normalTables = logicSchemaConfig.getNormalTables();
normalTables.putIfAbsent(tableName, NormalTableConfig.create(schemaName, tableName,
"CREATE TABLE `mysql`.`proc` (\n" +
" `db` varchar(64) DEFAULT NULL,\n" +
" `name` varchar(64) DEFAULT NULL,\n" +
" `type` enum('FUNCTION','PROCEDURE','PACKAGE', 'PACKAGE BODY'),\n" +
" `specific_name` varchar(64) DEFAULT NULL,\n" +
" `language` enum('SQL'),\n" +
" `sql_data_access` enum('CONTAINS_SQL', 'NO_SQL', 'READS_SQL_DATA', 'MODIFIES_SQL_DATA'),\n" +
" `is_deterministic` enum('YES','NO'),\n" +
" `security_type` enum('INVOKER','DEFINER'),\n" +
" `param_list` blob,\n" +
" `returns` longblob,\n" +
" `body` longblob,\n" +
" `definer` varchar(141),\n" +
" `created` timestamp,\n" +
" `modified` timestamp,\n" +
" `sql_mode` \tset('REAL_AS_FLOAT', 'PIPES_AS_CONCAT', 'ANSI_QUOTES', 'IGNORE_SPACE', 'IGNORE_BAD_TABLE_OPTIONS', 'ONLY_FULL_GROUP_BY', 'NO_UNSIGNED_SUBTRACTION', 'NO_DIR_IN_CREATE', 'POSTGRESQL', 'ORACLE', 'MSSQL', 'DB2', 'MAXDB', 'NO_KEY_OPTIONS', 'NO_TABLE_OPTIONS', 'NO_FIELD_OPTIONS', 'MYSQL323', 'MYSQL40', 'ANSI', 'NO_AUTO_VALUE_ON_ZERO', 'NO_BACKSLASH_ESCAPES', 'STRICT_TRANS_TABLES', 'STRICT_ALL_TABLES', 'NO_ZERO_IN_DATE', 'NO_ZERO_DATE', 'INVALID_DATES', 'ERROR_FOR_DIVISION_BY_ZERO', 'TRADITIONAL', 'NO_AUTO_CREATE_USER', 'HIGH_NOT_PRECEDENCE', 'NO_ENGINE_SUBSTITUTION', 'PAD_CHAR_TO_FULL_LENGTH', 'EMPTY_STRING_IS_NULL', 'SIMULTANEOUS_ASSIGNMENT'),\n" +
" `comment` text,\n" +
" `character_set_client` char(32),\n" +
" `collation_connection` \tchar(32),\n" +
" `db_collation` \tchar(32),\n" +
" `body_utf8` \tlongblob,\n" +
" `aggregate` \tenum('NONE', 'GROUP')\n" +
") ", targetName));
LogicSchemaConfig mycat = schemaConfigs.stream().filter(i ->
"mycat".equalsIgnoreCase(i.getSchemaName()))
.findFirst().orElseGet(() -> {
LogicSchemaConfig schemaConfig = new LogicSchemaConfig();
schemaConfig.setSchemaName("mycat");
schemaConfigs.add(schemaConfig);
return schemaConfig;
});
Map<String, CustomTableConfig> customTables = mycat.getCustomTables();
customTables.computeIfAbsent("dual", (n) -> {
CustomTableConfig tableConfig = CustomTableConfig.builder().build();
tableConfig.setClazz(DualCustomTableHandler.class.getCanonicalName());
tableConfig.setCreateTableSQL("create table mycat.dual(id int)");
return tableConfig;
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment