Skip to content

Instantly share code, notes, and snippets.

@mgmarino
Last active October 30, 2023 13:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mgmarino/19a4a26a40dfbc7f4249e3c567d32afa to your computer and use it in GitHub Desktop.
Save mgmarino/19a4a26a40dfbc7f4249e3c567d32afa to your computer and use it in GitHub Desktop.
Example to use Iceberg on AWS KDA (Managed Flink)
from typing import Dict
from pyflink.java_gateway import get_gateway
from pyflink.table.catalog import Catalog
class IcebergCatalog(Catalog):
_properties: Dict[str, str]
def __init__(self, name: str, properties: Dict[str, str]):
self._properties = properties
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in properties.items():
j_properties.setProperty(key, value)
j_catalog_factory = gateway.jvm.org.apache.iceberg.flink.FlinkCatalogFactory()
super().__init__(j_catalog_factory.createCatalog(name, j_properties))
@property
def properties(self) -> Dict[str, str]:
return self._properties
from typing import Literal, get_args
from flink_applications.catalogs import IcebergCatalog
from flink_applications.utils import ApplicationProperties
from pyflink.table import (
EnvironmentSettings,
ExplainDetail,
Table,
TableEnvironment,
TableResult,
)
def main():
table_environment = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
catalog = "iceberg"
iceberg_catalog = IcebergCatalog(
catalog,
{
"catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"warehouse": ApplicationProperties.get_property("S3Output", "warehouse"),
},
)
table_environment.register_catalog(catalog_name=catalog, catalog=iceberg_catalog)
#... Then make tables referencing the above catalog.
if __name__ == "__main__":
main()
package org.apache.flink.runtime.util;
import org.apache.hadoop.conf.Configuration;
public class HadoopUtils {
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration) {
return new Configuration(false);
}
}
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>group</groupId>
<artifactId>java_app</artifactId>
<version>0.0</version>
<packaging>jar</packaging>
<name>My App</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.2</flink.version>
<mysql_cdc.version>2.4.1</mysql_cdc.version>
<protobuf.version>3.17.3</protobuf.version>
<target.java.version>11</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop.version>2.8.5</hadoop.version>
<iceberg.version>1.3.1</iceberg.version>
<awssdk.version>2.20.118</awssdk.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.1</log4j.version>
<janino.version>3.0.11</janino.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kms</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.15</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${mysql_cdc.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<!-- this should be the same version of flink-table module -->
<version>${janino.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.0</version>
</extension>
</extensions>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<plugin>
<!-- a hint for IDE's to add the java sources to the classpath -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.4.0</version>
<executions>
<execution>
<id>01-add-proto-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/protobuf</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<id>java</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<clearOutputDirectory>false</clearOutputDirectory>
<outputDirectory>
${project.build.directory}/generated-sources/java
</outputDirectory>
</configuration>
</execution>
<execution>
<id>python</id>
<goals>
<goal>compile-python</goal>
</goals>
<configuration>
<clearOutputDirectory>false</clearOutputDirectory>
<outputDirectory>
${basedir}/src/python/proto
</outputDirectory>
</configuration>
</execution>
</executions>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<protoSourceRoot>
${basedir}/src/proto/linex
</protoSourceRoot>
<attachProtoSources>false</attachProtoSources>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary
dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>commons-cli:*</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>org.apache.hadoop.conf</pattern>
<shadedPattern>shaded.org.apache.hadoop.conf</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.flink.runtime.util.HadoopUtils</pattern>
<shadedPattern>shadow.org.apache.flink.runtime.util.HadoopUtils</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment