Skip to content

Instantly share code, notes, and snippets.

Last active July 23, 2020 08:44
Show Gist options
  • Save geoHeil/5a5a4ae0ca2a8049617afa91acf40f89 to your computer and use it in GitHub Desktop.
Save geoHeil/5a5a4ae0ca2a8049617afa91acf40f89 to your computer and use it in GitHub Desktop.
// setup in bash
wget -P lib/
wget -P lib/
wget -P lib/
wget -P lib/
wget -P lib/
wget -P lib/
wget -P lib/
wget -P lib/
wget -P lib/
wget -P lib/
wget -P lib/
wget -P lib/
// upload the schema to the registry
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"nifiRecord\",\"namespace\":\"org.apache.nifi\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"text\",\"type\":[\"null\",\"string\"]},{\"name\":\"source\",\"type\":[\"null\",\"string\"]},{\"name\":\"geo\",\"type\":[\"null\",\"string\"]},{\"name\":\"place\",\"type\":[\"null\",\"string\"]},{\"name\":\"lang\",\"type\":[\"null\",\"string\"]},{\"name\":\"created_at\",\"type\":[\"null\",\"string\"]},{\"name\":\"timestamp_ms\",\"type\":[\"null\",\"string\"]},{\"name\":\"coordinates\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_id\",\"type\":[\"null\",\"long\"]},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"]},{\"name\":\"screen_name\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_created_at\",\"type\":[\"null\",\"string\"]},{\"name\":\"followers_count\",\"type\":[\"null\",\"long\"]},{\"name\":\"friends_count\",\"type\":[\"null\",\"long\"]},{\"name\":\"user_lang\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_location\",\"type\":[\"null\",\"string\"]},{\"name\":\"hashtags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}]}]}"}' http://localhost:8081/subjects/tweets-raw-value/versions
curl --silent -X GET http://localhost:8081/subjects/ | jq .
// start the shell
export TERM=xterm-color
./bin/ local
// try to execute
import org.apache.flink.streaming.connectors.kafka.{
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import java.util.Properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("", "test")
val schemaRegistryUrl = "http://localhost:8081"
// **************************************************************
// have avro hugger generate the class
// class is defined below (comments)
// **************************************************************
val serializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl)
val stream = senv.addSource(
new FlinkKafkaConsumer(
).setStartFromEarliest() // TODO experiment with different start values
senv.execute("Kafka Consumer Test")
Copy link

geoHeil commented Jul 10, 2020

The Avro Schema definition:

    "type": "record",
    "name": "Tweet",
    "namespace": "com.github.geoheil.streamingreference",
    "doc": "Twitter tweet record limited to basic information",
    "fields": [
            "name": "tweet_id",
            "type": [
            "doc": "System-assigned numeric tweet ID. Cannot be changed by the user."
            "name": "text",
            "type": [
            "doc": "the main text of a tweet"
            "name": "source",
            "type": [
            "doc": "user agent of tweet submitting device"
            "name": "geo",
            "type": [
            "doc": "geo if available"
            "name": "place",
            "type": [
            "doc": "place if available"
            "name": "lang",
            "type": [
            "doc": "language of the tweet"
            "name": "created_at",
            "type": [
            "doc": "created at timestamp string formatted"
            "name": "timestamp_ms",
            "type": [
            "doc": "created at timestamp epoch long formatted"
            "name": "coordinates",
            "type": [
            "doc": "coordinates if available"
            "name": "user_id",
            "type": [
            "doc": "System-assigned numeric tweet ID. Cannot be changed by the user."
            "name": "user_name",
            "type": [
            "doc": "speaking user name, can be changed"
            "name": "screen_name",
            "type": [
            "doc": "screen name, can be changed"
            "name": "user_created_at",
            "type": [
            "doc": "Timestamp of user creation"
            "name": "followers_count",
            "type": [
            "doc": "follower count"
            "name": "friends_count",
            "type": [
            "doc": "friends count"
            "name": "user_lang",
            "type": [
            "doc": "language of user profile"
            "name": "user_location",
            "type": [
            "doc": "location if available"
            "name": "hashtags",
            "type": [
                    "type": "array",
                    "items": "string"
            "doc": "hashtags as list of strings if available"

Copy link

geoHeil commented Jul 10, 2020

The generated Specific Class


import scala.annotation.switch

 * Twitter tweet record limited to basic information
 * @param tweet_id System-assigned numeric tweet ID. Cannot be changed by the user.
 * @param text the main text of a tweet
 * @param source user agent of tweet submitting device
 * @param geo geo if available
 * @param place place if available
 * @param lang language of the tweet
 * @param created_at created at timestamp string formatted
 * @param timestamp_ms created at timestamp epoch long formatted
 * @param coordinates coordinates if available
 * @param user_id System-assigned numeric tweet ID. Cannot be changed by the user.
 * @param user_name speaking user name, can be changed
 * @param screen_name screen name, can be changed
 * @param user_created_at Timestamp of user creation
 * @param followers_count follower count
 * @param friends_count friends count
 * @param user_lang language of user profile
 * @param user_location location if available
 * @param hashtags hashtags as list of strings if available
final case class Tweet(var tweet_id: Option[String], var text: Option[String], var source: Option[String], var geo: Option[String], var place: Option[String], var lang: Option[String], var created_at: Option[String], var timestamp_ms: Option[String], var coordinates: Option[String], var user_id: Option[Long], var user_name: Option[String], var screen_name: Option[String], var user_created_at: Option[String], var followers_count: Option[Long], var friends_count: Option[Long], var user_lang: Option[String], var user_location: Option[String], var hashtags: Option[Seq[String]]) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case 0 => {
        tweet_id match {
          case Some(x) => x
          case None => null
      case 1 => {
        text match {
          case Some(x) => x
          case None => null
      case 2 => {
        source match {
          case Some(x) => x
          case None => null
      case 3 => {
        geo match {
          case Some(x) => x
          case None => null
      case 4 => {
        place match {
          case Some(x) => x
          case None => null
      case 5 => {
        lang match {
          case Some(x) => x
          case None => null
      case 6 => {
        created_at match {
          case Some(x) => x
          case None => null
      case 7 => {
        timestamp_ms match {
          case Some(x) => x
          case None => null
      case 8 => {
        coordinates match {
          case Some(x) => x
          case None => null
      case 9 => {
        user_id match {
          case Some(x) => x
          case None => null
      case 10 => {
        user_name match {
          case Some(x) => x
          case None => null
      case 11 => {
        screen_name match {
          case Some(x) => x
          case None => null
      case 12 => {
        user_created_at match {
          case Some(x) => x
          case None => null
      case 13 => {
        followers_count match {
          case Some(x) => x
          case None => null
      case 14 => {
        friends_count match {
          case Some(x) => x
          case None => null
      case 15 => {
        user_lang match {
          case Some(x) => x
          case None => null
      case 16 => {
        user_location match {
          case Some(x) => x
          case None => null
      case 17 => {
        hashtags match {
          case Some(x) => scala.collection.JavaConverters.bufferAsJavaListConverter({
            x map { x =>
          case None => null
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case 0 => this.tweet_id = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 1 => this.text = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 2 => this.source = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 3 => this.geo = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 4 => = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 5 => this.lang = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 6 => this.created_at = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 7 => this.timestamp_ms = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 8 => this.coordinates = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 9 => this.user_id = {
        value match {
          case null => None
          case _ => Some(value)
      case 10 => this.user_name = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 11 => this.screen_name = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 12 => this.user_created_at = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 13 => this.followers_count = {
        value match {
          case null => None
          case _ => Some(value)
      case 14 => this.friends_count = {
        value match {
          case null => None
          case _ => Some(value)
      case 15 => this.user_lang = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 16 => this.user_location = {
        value match {
          case null => None
          case _ => Some(value.toString)
      case 17 => this.hashtags = {
        value match {
          case null => None
          case _ => Some(value match {
            case (array: java.util.List[_]) => {
              Seq((scala.collection.JavaConverters.asScalaIteratorConverter(array.iterator).asScala.toSeq map { x =>
              }: _*))
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
  def getSchema: org.apache.avro.Schema = Tweet.SCHEMA$

object Tweet {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Tweet\",\"namespace\":\"com.github.geoheil.streamingreference\",\"doc\":\"Twitter tweet record limited to basic information\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"],\"doc\":\"System-assigned numeric tweet ID. Cannot be changed by the user.\"},{\"name\":\"text\",\"type\":[\"null\",\"string\"],\"doc\":\"the main text of a tweet\"},{\"name\":\"source\",\"type\":[\"null\",\"string\"],\"doc\":\"user agent of tweet submitting device\"},{\"name\":\"geo\",\"type\":[\"null\",\"string\"],\"doc\":\"geo if available\"},{\"name\":\"place\",\"type\":[\"null\",\"string\"],\"doc\":\"place if available\"},{\"name\":\"lang\",\"type\":[\"null\",\"string\"],\"doc\":\"language of the tweet\"},{\"name\":\"created_at\",\"type\":[\"null\",\"string\"],\"doc\":\"created at timestamp string formatted\"},{\"name\":\"timestamp_ms\",\"type\":[\"null\",\"string\"],\"doc\":\"created at timestamp epoch long formatted\"},{\"name\":\"coordinates\",\"type\":[\"null\",\"string\"],\"doc\":\"coordinates if available\"},{\"name\":\"user_id\",\"type\":[\"null\",\"long\"],\"doc\":\"System-assigned numeric tweet ID. Cannot be changed by the user.\"},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"],\"doc\":\"speaking user name, can be changed\"},{\"name\":\"screen_name\",\"type\":[\"null\",\"string\"],\"doc\":\"screen name, can be changed\"},{\"name\":\"user_created_at\",\"type\":[\"null\",\"string\"],\"doc\":\"Timestamp of user creation\"},{\"name\":\"followers_count\",\"type\":[\"null\",\"long\"],\"doc\":\"follower count\"},{\"name\":\"friends_count\",\"type\":[\"null\",\"long\"],\"doc\":\"friends count\"},{\"name\":\"user_lang\",\"type\":[\"null\",\"string\"],\"doc\":\"language of user profile\"},{\"name\":\"user_location\",\"type\":[\"null\",\"string\"],\"doc\":\"location if available\"},{\"name\":\"hashtags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"hashtags as list of strings if available\"}]}")

Copy link

geoHeil commented Jul 10, 2020

The remaining execption in taskmanager logs:

ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to scala.Product
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator( ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect( ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect( ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect( ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect( ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect( ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp( ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps( ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler( ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop( ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
	at ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
	at ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$ ~[flink-dist_2.11-1.11.1.jar:1.11.1]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment