Created
October 19, 2021 09:47
-
-
Save TheDIM47/7156b2b5c53e6d692fdfd286520ba9fb to your computer and use it in GitHub Desktop.
MySqlTablePath - Flink MySQL catalog implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.util.StringUtils; | |
import java.util.Objects; | |
import static org.apache.flink.util.Preconditions.checkArgument; | |
public class MySqlTablePath { | |
private static final String DEFAULT_SCHEMA_NAME = "public"; | |
private final String schemaName; | |
private final String tableName; | |
public MySqlTablePath(String schemaName, String tableName) { | |
checkArgument(!StringUtils.isNullOrWhitespaceOnly(schemaName)); | |
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tableName)); | |
this.schemaName = schemaName; | |
this.tableName = tableName; | |
} | |
public static MySqlTablePath fromFlinkTableName(String flinkTableName) { | |
if (flinkTableName.contains(".")) { | |
String[] path = flinkTableName.split("\\."); | |
checkArgument(path.length == 2, | |
String.format("Table name '%s' is not valid. The parsed length is %d", flinkTableName, path.length)); | |
return new MySqlTablePath(path[0], path[1]); | |
} else { | |
return new MySqlTablePath(DEFAULT_SCHEMA_NAME, flinkTableName); | |
} | |
} | |
public String getFullPath() { | |
return String.format("%s.%s", schemaName, tableName); | |
} | |
public String getTableName() { | |
return tableName; | |
} | |
public String getSchemaName() { | |
return schemaName; | |
} | |
@Override | |
public String toString() { | |
return getFullPath(); | |
} | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) return true; | |
if (o == null || getClass() != o.getClass()) return false; | |
MySqlTablePath that = (MySqlTablePath) o; | |
return Objects.equals(schemaName, that.schemaName) && Objects.equals(tableName, that.tableName); | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hash(schemaName, tableName); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment