Skip to content

Instantly share code, notes, and snippets.

@seifertm
Last active March 21, 2019 19:08
Show Gist options
  • Save seifertm/10cf65a58b08d9301382d5014fdc9121 to your computer and use it in GitHub Desktop.
Save seifertm/10cf65a58b08d9301382d5014fdc9121 to your computer and use it in GitHub Desktop.
Faust hangs during recovery upon initial application start
import faust
app = faust.App('myapp', broker='kafka://localhost:9092', store='memory://')
topic1 = app.topic('topic1')
topic2 = app.topic('topic2')
topic1_table = app.Table('topic1')
topic2_table = app.Table('topic2') # Comment this line to make the app start without hanging
@app.agent(topic1)
async def myagent(stream):
async for item in stream:
print(item)
version: '3'
services:
zookeeper:
image: digitalwonderland/zookeeper@sha256:3942ff1981d0ff57e8faf48b45814c3f0d604a5c4dcd75ecbe114c7279dbf4ce
kafka:
image: wurstmeister/kafka@sha256:d04dafd2b308f26dbeed8454f67c321579c2818c1eff5e8f695e14a19b1d599b
depends_on:
- 'zookeeper'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: INTERNAL://:9094,EXTERNAL://:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://:9094,EXTERNAL://localhost:9092
KAFKA_CREATE_TOPICS: topic1:1:1,topic2:1:1
ports:
- "127.0.0.1:9092:9092"
# Install faust
python3.6 -m venv venv
. venv/bin/activate
pip install faust==1.4.9
# Start Kafka
docker-compose up -d
# wait a few seconds until the topics "topic1" and "topic2" are created by Kafka
# Start Faust app
faust --loglevel info --app app.app worker --without-web
@seifertm
Copy link
Author

KAFKA_CREATE_TOPICS is an environment variable provided by the Kafka image. It creates the specified topics upon container start. The individual topics are separated with , and their format is <topic_name>:<partition_count>:<replica_count>.
See https://github.com/wurstmeister/kafka-docker

@ask
Copy link

ask commented Mar 21, 2019

I tried doing this exactly using master this time, and the worker did start without having to recover (but the changelogs are also empty?)

One thing I noticed is that the partition count between the source topic and the changelog topics differ.
You set the partitions for the source topics to 1, but the default for topics created by faust is 8.

Change:

app = faust.App('myapp', broker='kafka://localhost:9092', store='memory://')

to:

app = faust.App('myapp', broker='kafka://localhost:9092', store='memory://',
                             version=2, topic_partitions=1)

(also bumping the version to make sure new changelog topics are created with the new partition count)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment