Skip to content

Instantly share code, notes, and snippets.

@TheDIM47
Created October 19, 2021 09:47
Show Gist options
  • Save TheDIM47/7156b2b5c53e6d692fdfd286520ba9fb to your computer and use it in GitHub Desktop.
Save TheDIM47/7156b2b5c53e6d692fdfd286520ba9fb to your computer and use it in GitHub Desktop.
MySqlTablePath - Flink MySQL catalog implementation
import org.apache.flink.util.StringUtils;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkArgument;
public class MySqlTablePath {
private static final String DEFAULT_SCHEMA_NAME = "public";
private final String schemaName;
private final String tableName;
public MySqlTablePath(String schemaName, String tableName) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(schemaName));
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tableName));
this.schemaName = schemaName;
this.tableName = tableName;
}
public static MySqlTablePath fromFlinkTableName(String flinkTableName) {
if (flinkTableName.contains(".")) {
String[] path = flinkTableName.split("\\.");
checkArgument(path.length == 2,
String.format("Table name '%s' is not valid. The parsed length is %d", flinkTableName, path.length));
return new MySqlTablePath(path[0], path[1]);
} else {
return new MySqlTablePath(DEFAULT_SCHEMA_NAME, flinkTableName);
}
}
public String getFullPath() {
return String.format("%s.%s", schemaName, tableName);
}
public String getTableName() {
return tableName;
}
public String getSchemaName() {
return schemaName;
}
@Override
public String toString() {
return getFullPath();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MySqlTablePath that = (MySqlTablePath) o;
return Objects.equals(schemaName, that.schemaName) && Objects.equals(tableName, that.tableName);
}
@Override
public int hashCode() {
return Objects.hash(schemaName, tableName);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment