Skip to content

Instantly share code, notes, and snippets.

@geoHeil
Last active July 23, 2020 08:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save geoHeil/5a5a4ae0ca2a8049617afa91acf40f89 to your computer and use it in GitHub Desktop.
Save geoHeil/5a5a4ae0ca2a8049617afa91acf40f89 to your computer and use it in GitHub Desktop.
failing
// setup in bash
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.11.1/flink-connector-kafka_2.11-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka-base_2.11/1.11.1/flink-connector-kafka-base_2.11-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.11.1/flink-avro-confluent-registry-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.1/flink-avro-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/force-shading/1.11.1/force-shading-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P lib/
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.11.1/jackson-core-2.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.11.1/jackson-databind-2.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.11.1/jackson-annotations-2.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.0/avro-1.10.0.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/commons/commons-compress/1.20/commons-compress-1.20.jar -P lib/
// upload the schema to the registry
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"nifiRecord\",\"namespace\":\"org.apache.nifi\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"text\",\"type\":[\"null\",\"string\"]},{\"name\":\"source\",\"type\":[\"null\",\"string\"]},{\"name\":\"geo\",\"type\":[\"null\",\"string\"]},{\"name\":\"place\",\"type\":[\"null\",\"string\"]},{\"name\":\"lang\",\"type\":[\"null\",\"string\"]},{\"name\":\"created_at\",\"type\":[\"null\",\"string\"]},{\"name\":\"timestamp_ms\",\"type\":[\"null\",\"string\"]},{\"name\":\"coordinates\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_id\",\"type\":[\"null\",\"long\"]},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"]},{\"name\":\"screen_name\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_created_at\",\"type\":[\"null\",\"string\"]},{\"name\":\"followers_count\",\"type\":[\"null\",\"long\"]},{\"name\":\"friends_count\",\"type\":[\"null\",\"long\"]},{\"name\":\"user_lang\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_location\",\"type\":[\"null\",\"string\"]},{\"name\":\"hashtags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}]}]}"}' http://localhost:8081/subjects/tweets-raw-value/versions
curl --silent -X GET http://localhost:8081/subjects/ | jq .
// start the shell
export TERM=xterm-color
./bin/start-scala-shell.sh local
// try to execute
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
FlinkKafkaProducer
}
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import java.util.Properties
senv.enableCheckpointing(5000)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val schemaRegistryUrl = "http://localhost:8081"
// **************************************************************
// have avro hugger generate the class
// class is defined below (comments)
// **************************************************************
val serializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl)
val stream = senv.addSource(
new FlinkKafkaConsumer(
"tweets-raw",
serializer,
properties
).setStartFromEarliest() // TODO experiment with different start values
)
stream.print
senv.execute("Kafka Consumer Test")
@geoHeil
Copy link
Author

geoHeil commented Jul 10, 2020

The Avro Schema definition:

Tweets.avsc
{
    "type": "record",
    "name": "Tweet",
    "namespace": "com.github.geoheil.streamingreference",
    "doc": "Twitter tweet record limited to basic information",
    "fields": [
        {
            "name": "tweet_id",
            "type": [
                "null",
                "string"
            ],
            "doc": "System-assigned numeric tweet ID. Cannot be changed by the user."
        },
        {
            "name": "text",
            "type": [
                "null",
                "string"
            ],
            "doc": "the main text of a tweet"
        },
        {
            "name": "source",
            "type": [
                "null",
                "string"
            ],
            "doc": "user agent of tweet submitting device"
        },
        {
            "name": "geo",
            "type": [
                "null",
                "string"
            ],
            "doc": "geo if available"
        },
        {
            "name": "place",
            "type": [
                "null",
                "string"
            ],
            "doc": "place if available"
        },
        {
            "name": "lang",
            "type": [
                "null",
                "string"
            ],
            "doc": "language of the tweet"
        },
        {
            "name": "created_at",
            "type": [
                "null",
                "string"
            ],
            "doc": "created at timestamp string formatted"
        },
        {
            "name": "timestamp_ms",
            "type": [
                "null",
                "string"
            ],
            "doc": "created at timestamp epoch long formatted"
        },
        {
            "name": "coordinates",
            "type": [
                "null",
                "string"
            ],
            "doc": "coordinates if available"
        },
        {
            "name": "user_id",
            "type": [
                "null",
                "long"
            ],
            "doc": "System-assigned numeric tweet ID. Cannot be changed by the user."
        },
        {
            "name": "user_name",
            "type": [
                "null",
                "string"
            ],
            "doc": "speaking user name, can be changed"
        },
        {
            "name": "screen_name",
            "type": [
                "null",
                "string"
            ],
            "doc": "screen name, can be changed"
        },
        {
            "name": "user_created_at",
            "type": [
                "null",
                "string"
            ],
            "doc": "Timestamp of user creation"
        },
        {
            "name": "followers_count",
            "type": [
                "null",
                "long"
            ],
            "doc": "follower count"
        },
        {
            "name": "friends_count",
            "type": [
                "null",
                "long"
            ],
            "doc": "friends count"
        },
        {
            "name": "user_lang",
            "type": [
                "null",
                "string"
            ],
            "doc": "language of user profile"
        },
        {
            "name": "user_location",
            "type": [
                "null",
                "string"
            ],
            "doc": "location if available"
        },
        {
            "name": "hashtags",
            "type": [
                "null",
                {
                    "type": "array",
                    "items": "string"
                }
            ],
            "doc": "hashtags as list of strings if available"
        }
    ]
}

@geoHeil
Copy link
Author

geoHeil commented Jul 10, 2020

The generated Specific Class

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */

import scala.annotation.switch

/**
 * Twitter tweet record limited to basic information
 * @param tweet_id System-assigned numeric tweet ID. Cannot be changed by the user.
 * @param text the main text of a tweet
 * @param source user agent of tweet submitting device
 * @param geo geo if available
 * @param place place if available
 * @param lang language of the tweet
 * @param created_at created at timestamp string formatted
 * @param timestamp_ms created at timestamp epoch long formatted
 * @param coordinates coordinates if available
 * @param user_id System-assigned numeric tweet ID. Cannot be changed by the user.
 * @param user_name speaking user name, can be changed
 * @param screen_name screen name, can be changed
 * @param user_created_at Timestamp of user creation
 * @param followers_count follower count
 * @param friends_count friends count
 * @param user_lang language of user profile
 * @param user_location location if available
 * @param hashtags hashtags as list of strings if available
 */
final case class Tweet(var tweet_id: Option[String], var text: Option[String], var source: Option[String], var geo: Option[String], var place: Option[String], var lang: Option[String], var created_at: Option[String], var timestamp_ms: Option[String], var coordinates: Option[String], var user_id: Option[Long], var user_name: Option[String], var screen_name: Option[String], var user_created_at: Option[String], var followers_count: Option[Long], var friends_count: Option[Long], var user_lang: Option[String], var user_location: Option[String], var hashtags: Option[Seq[String]]) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case 0 => {
        tweet_id match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 1 => {
        text match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 2 => {
        source match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 3 => {
        geo match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 4 => {
        place match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 5 => {
        lang match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 6 => {
        created_at match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 7 => {
        timestamp_ms match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 8 => {
        coordinates match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 9 => {
        user_id match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 10 => {
        user_name match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 11 => {
        screen_name match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 12 => {
        user_created_at match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 13 => {
        followers_count match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 14 => {
        friends_count match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 15 => {
        user_lang match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 16 => {
        user_location match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case 17 => {
        hashtags match {
          case Some(x) => scala.collection.JavaConverters.bufferAsJavaListConverter({
            x map { x =>
              x
            }
          }.toBuffer).asJava
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case 0 => this.tweet_id = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 1 => this.text = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 2 => this.source = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 3 => this.geo = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 4 => this.place = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 5 => this.lang = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 6 => this.created_at = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 7 => this.timestamp_ms = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 8 => this.coordinates = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 9 => this.user_id = {
        value match {
          case null => None
          case _ => Some(value)
        }
      }.asInstanceOf[Option[Long]]
      case 10 => this.user_name = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 11 => this.screen_name = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 12 => this.user_created_at = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 13 => this.followers_count = {
        value match {
          case null => None
          case _ => Some(value)
        }
      }.asInstanceOf[Option[Long]]
      case 14 => this.friends_count = {
        value match {
          case null => None
          case _ => Some(value)
        }
      }.asInstanceOf[Option[Long]]
      case 15 => this.user_lang = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 16 => this.user_location = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case 17 => this.hashtags = {
        value match {
          case null => None
          case _ => Some(value match {
            case (array: java.util.List[_]) => {
              Seq((scala.collection.JavaConverters.asScalaIteratorConverter(array.iterator).asScala.toSeq map { x =>
                x.toString
              }: _*))
            }
          })
        }
      }.asInstanceOf[Option[Seq[String]]]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = Tweet.SCHEMA$
}

object Tweet {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Tweet\",\"namespace\":\"com.github.geoheil.streamingreference\",\"doc\":\"Twitter tweet record limited to basic information\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"],\"doc\":\"System-assigned numeric tweet ID. Cannot be changed by the user.\"},{\"name\":\"text\",\"type\":[\"null\",\"string\"],\"doc\":\"the main text of a tweet\"},{\"name\":\"source\",\"type\":[\"null\",\"string\"],\"doc\":\"user agent of tweet submitting device\"},{\"name\":\"geo\",\"type\":[\"null\",\"string\"],\"doc\":\"geo if available\"},{\"name\":\"place\",\"type\":[\"null\",\"string\"],\"doc\":\"place if available\"},{\"name\":\"lang\",\"type\":[\"null\",\"string\"],\"doc\":\"language of the tweet\"},{\"name\":\"created_at\",\"type\":[\"null\",\"string\"],\"doc\":\"created at timestamp string formatted\"},{\"name\":\"timestamp_ms\",\"type\":[\"null\",\"string\"],\"doc\":\"created at timestamp epoch long formatted\"},{\"name\":\"coordinates\",\"type\":[\"null\",\"string\"],\"doc\":\"coordinates if available\"},{\"name\":\"user_id\",\"type\":[\"null\",\"long\"],\"doc\":\"System-assigned numeric tweet ID. Cannot be changed by the user.\"},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"],\"doc\":\"speaking user name, can be changed\"},{\"name\":\"screen_name\",\"type\":[\"null\",\"string\"],\"doc\":\"screen name, can be changed\"},{\"name\":\"user_created_at\",\"type\":[\"null\",\"string\"],\"doc\":\"Timestamp of user creation\"},{\"name\":\"followers_count\",\"type\":[\"null\",\"long\"],\"doc\":\"follower count\"},{\"name\":\"friends_count\",\"type\":[\"null\",\"long\"],\"doc\":\"friends count\"},{\"name\":\"user_lang\",\"type\":[\"null\",\"string\"],\"doc\":\"language of user profile\"},{\"name\":\"user_location\",\"type\":[\"null\",\"string\"],\"doc\":\"location if available\"},{\"name\":\"hashtags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"hashtags as list of strings if available\"}]}")
}

@geoHeil
Copy link
Author

geoHeil commented Jul 10, 2020

The remaining execption in taskmanager logs:

ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to scala.Product
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment