Skip to content

Instantly share code, notes, and snippets.

@brennv
Last active May 19, 2017 19:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brennv/2a93fdc45a196ae02c528eb5dc1558e1 to your computer and use it in GitHub Desktop.
Save brennv/2a93fdc45a196ae02c528eb5dc1558e1 to your computer and use it in GitHub Desktop.
Twitter -> Kafka -> RethinkDB: duplicate primary `id` trace

The fine folks at Datamountaineer have developed the stream-reactor making it easier to integrate data piplines with Kafka.

As a demo I'm streaming Twitter data to Kafka, and then from Kafka to RethinkDB.

Configs using Landoop's fast-data-dev environment:

Source: Twitter

name=TwitterSourceConnector
connector.class=com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector
tasks.max=1
topic=storm
track.terms=storm
twitter.secret=xxx
twitter.token=xxx
twitter.consumersecret=xxx
twitter.consumerkey=xxx

Sink: RethinkDB

name=ReThinkSinkConnector
connector.class=com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkConnector
tasks.max=1
topics=storm
connect.rethink.sink.db=twitter_data
connect.rethink.sink.host=172.17.0.4
connect.rethink.sink.kcql=INSERT INTO tweets SELECT * FROM storm
value.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:3030/api/schema-registry

Everything hums along fine for several hours until it looks like kafka eats a duplicate tweet. Full trace below.

Side note: It sounds like Rethink is working on some error handling improvements around duplicate primary ids.

rethinkdb/rethinkdb#899

Rethink kafka connector docs and relevant bits:

http://docs.datamountaineer.com/en/latest/rethink.html

https://github.com/datamountaineer/stream-reactor/blob/master/kafka-connect-rethink/src/main/scala/com/datamountaineer/streamreactor/connect/rethink/sink/ReThinkWriter.scala#L133

https://github.com/Landoop/kafka-connectors-tests/tree/master/kafka-connect-rethink

[2017-05-19 16:51:38,883] ERROR Encountered error Write error occurred. Duplicate primary key `id`:
{
	"created_at":	"2017-05-19T16:28:40.000+0000",
	"entities":	{
		"hashtags":	[],
		"media":	[],
		"urls":	[],
		"user_mentions":	[
			{}
		]
	},
	"id":	865605134142722000.0,
	"is_retweet":	true,
	"lang":	"en",
	"text":	"RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
	"user":	{
		"followers_count":	42577,
		"friends_count":	42548,
		"id":	2300088499,
		"location":	"St Louis, MO",
		"name":	"ℹL๑บ∂β๑γ🍃💨",
		"screen_name":	"iBleedLxyalty",
		"statuses_count":	13267,
		"verified":	false
	}
}
{
	"created_at":	"2017-05-19T16:28:40.000+0000",
	"entities":	{
		"hashtags":	[],
		"media":	[],
		"urls":	[],
		"user_mentions":	[
			{}
		]
	},
	"id":	865605134142722000.0,
	"is_retweet":	true,
	"lang":	"en",
	"text":	"RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
	"user":	{
		"followers_count":	42577,
		"friends_count":	42548,
		"id":	2300088499,
		"location":	"St Louis, MO",
		"name":	"ℹL๑บ∂β๑γ🍃💨",
		"screen_name":	"iBleedLxyalty",
		"statuses_count":	13267,
		"verified":	false
	}
} (com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter:62)
java.lang.Throwable: Write error occurred. Duplicate primary key `id`:
{
	"created_at":	"2017-05-19T16:28:40.000+0000",
	"entities":	{
		"hashtags":	[],
		"media":	[],
		"urls":	[],
		"user_mentions":	[
			{}
		]
	},
	"id":	865605134142722000.0,
	"is_retweet":	true,
	"lang":	"en",
	"text":	"RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
	"user":	{
		"followers_count":	42577,
		"friends_count":	42548,
		"id":	2300088499,
		"location":	"St Louis, MO",
		"name":	"ℹL๑บ∂β๑γ🍃💨",
		"screen_name":	"iBleedLxyalty",
		"statuses_count":	13267,
		"verified":	false
	}
}
{
	"created_at":	"2017-05-19T16:28:40.000+0000",
	"entities":	{
		"hashtags":	[],
		"media":	[],
		"urls":	[],
		"user_mentions":	[
			{}
		]
	},
	"id":	865605134142722000.0,
	"is_retweet":	true,
	"lang":	"en",
	"text":	"RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
	"user":	{
		"followers_count":	42577,
		"friends_count":	42548,
		"id":	2300088499,
		"location":	"St Louis, MO",
		"name":	"ℹL๑บ∂β๑γ🍃💨",
		"screen_name":	"iBleedLxyalty",
		"statuses_count":	13267,
		"verified":	false
	}
}
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.handleFailure(ReThinkWriter.scala:146)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.com$datamountaineer$streamreactor$connect$rethink$sink$ReThinkWriter$$writeRecords(ReThinkWriter.scala:100)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter$$anonfun$write$1.apply(ReThinkWriter.scala:75)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter$$anonfun$write$1.apply(ReThinkWriter.scala:75)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.write(ReThinkWriter.scala:75)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask$$anonfun$put$2.apply(ReThinkSinkTask.scala:53)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask$$anonfun$put$2.apply(ReThinkSinkTask.scala:53)
	at scala.Option.foreach(Option.scala:257)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask.put(ReThinkSinkTask.scala:53)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2017-05-19 16:51:38,884] ERROR Task ReThinkSinkConnector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)
java.lang.RuntimeException: java.lang.Throwable: Write error occurred. Duplicate primary key `id`:
{
	"created_at":	"2017-05-19T16:28:40.000+0000",
	"entities":	{
		"hashtags":	[],
		"media":	[],
		"urls":	[],
		"user_mentions":	[
			{}
		]
	},
	"id":	865605134142722000.0,
	"is_retweet":	true,
	"lang":	"en",
	"text":	"RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
	"user":	{
		"followers_count":	42577,
		"friends_count":	42548,
		"id":	2300088499,
		"location":	"St Louis, MO",
		"name":	"ℹL๑บ∂β๑γ🍃💨",
		"screen_name":	"iBleedLxyalty",
		"statuses_count":	13267,
		"verified":	false
	}
}
{
	"created_at":	"2017-05-19T16:28:40.000+0000",
	"entities":	{
		"hashtags":	[],
		"media":	[],
		"urls":	[],
		"user_mentions":	[
			{}
		]
	},
	"id":	865605134142722000.0,
	"is_retweet":	true,
	"lang":	"en",
	"text":	"RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
	"user":	{
		"followers_count":	42577,
		"friends_count":	42548,
		"id":	2300088499,
		"location":	"St Louis, MO",
		"name":	"ℹL๑บ∂β๑γ🍃💨",
		"screen_name":	"iBleedLxyalty",
		"statuses_count":	13267,
		"verified":	false
	}
}
	at com.datamountaineer.streamreactor.connect.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:58)
	at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleError(ErrorHandler.scala:83)
	at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleTry(ErrorHandler.scala:64)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.handleTry(ReThinkWriter.scala:54)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.handleFailure(ReThinkWriter.scala:148)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.com$datamountaineer$streamreactor$connect$rethink$sink$ReThinkWriter$$writeRecords(ReThinkWriter.scala:100)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter$$anonfun$write$1.apply(ReThinkWriter.scala:75)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter$$anonfun$write$1.apply(ReThinkWriter.scala:75)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.write(ReThinkWriter.scala:75)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask$$anonfun$put$2.apply(ReThinkSinkTask.scala:53)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask$$anonfun$put$2.apply(ReThinkSinkTask.scala:53)
	at scala.Option.foreach(Option.scala:257)
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask.put(ReThinkSinkTask.scala:53)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Throwable: Write error occurred. Duplicate primary key `id`:
{
	"created_at":	"2017-05-19T16:28:40.000+0000",
	"entities":	{
		"hashtags":	[],
		"media":	[],
		"urls":	[],
		"user_mentions":	[
			{}
		]
	},
	"id":	865605134142722000.0,
	"is_retweet":	true,
	"lang":	"en",
	"text":	"RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
	"user":	{
		"followers_count":	42577,
		"friends_count":	42548,
		"id":	2300088499,
		"location":	"St Louis, MO",
		"name":	"ℹL๑บ∂β๑γ🍃💨",
		"screen_name":	"iBleedLxyalty",
		"statuses_count":	13267,
		"verified":	false
	}
}
{
	"created_at":	"2017-05-19T16:28:40.000+0000",
	"entities":	{
		"hashtags":	[],
		"media":	[],
		"urls":	[],
		"user_mentions":	[
			{}
		]
	},
	"id":	865605134142722000.0,
	"is_retweet":	true,
	"lang":	"en",
	"text":	"RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
	"user":	{
		"followers_count":	42577,
		"friends_count":	42548,
		"id":	2300088499,
		"location":	"St Louis, MO",
		"name":	"ℹL๑บ∂β๑γ🍃💨",
		"screen_name":	"iBleedLxyalty",
		"statuses_count":	13267,
		"verified":	false
	}
}
	at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.handleFailure(ReThinkWriter.scala:146)
	... 20 more
[2017-05-19 16:51:38,885] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:450)
[2017-05-19 16:51:38,885] ERROR Task ReThinkSinkConnector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:451)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2017-05-19 16:51:38,886] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
[2017-05-19 16:51:38,886] INFO Stopping Rethink sink. (com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask:61)

Resolution

The default, the error policy is to throw the exception.

connect.rethink.sink.error.policy=THROW

connect.rethink.sink.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are three available options, noop, the error is swallowed, throw, the error is allowed to propagate and retry. For retry the Kafka message is redelivered up to a maximum number of times specified by the connect.rethink.sink.max.retries option. The connect.rethink.sink.retry.interval option specifies the interval between retries.

By setting the config to:

connect.rethink.sink.error.policy=NOOP

Rethink is running without complaint, we'll keep an eye on the logs to make sure we're not ignoring anything unintended.

Thanks to Andrew at Datamountaineer for the help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment