Last active
March 21, 2019 19:08
-
-
Save seifertm/10cf65a58b08d9301382d5014fdc9121 to your computer and use it in GitHub Desktop.
Faust hangs during recovery upon initial application start
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import faust | |
app = faust.App('myapp', broker='kafka://localhost:9092', store='memory://') | |
topic1 = app.topic('topic1') | |
topic2 = app.topic('topic2') | |
topic1_table = app.Table('topic1') | |
topic2_table = app.Table('topic2') # Comment this line to make the app start without hanging | |
@app.agent(topic1) | |
async def myagent(stream): | |
async for item in stream: | |
print(item) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3' | |
services: | |
zookeeper: | |
image: digitalwonderland/zookeeper@sha256:3942ff1981d0ff57e8faf48b45814c3f0d604a5c4dcd75ecbe114c7279dbf4ce | |
kafka: | |
image: wurstmeister/kafka@sha256:d04dafd2b308f26dbeed8454f67c321579c2818c1eff5e8f695e14a19b1d599b | |
depends_on: | |
- 'zookeeper' | |
environment: | |
KAFKA_BROKER_ID: 1 | |
KAFKA_ZOOKEEPER_CONNECT: zookeeper | |
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT | |
KAFKA_LISTENERS: INTERNAL://:9094,EXTERNAL://:9092 | |
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL | |
KAFKA_ADVERTISED_LISTENERS: INTERNAL://:9094,EXTERNAL://localhost:9092 | |
KAFKA_CREATE_TOPICS: topic1:1:1,topic2:1:1 | |
ports: | |
- "127.0.0.1:9092:9092" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Install faust | |
python3.6 -m venv venv | |
. venv/bin/activate | |
pip install faust==1.4.9 | |
# Start Kafka | |
docker-compose up -d | |
# wait a few seconds until the topics "topic1" and "topic2" are created by Kafka | |
# Start Faust app | |
faust --loglevel info --app app.app worker --without-web |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I tried doing this exactly using master this time, and the worker did start without having to recover (but the changelogs are also empty?)
One thing I noticed is that the partition count between the source topic and the changelog topics differ.
You set the partitions for the source topics to 1, but the default for topics created by faust is 8.
Change:
to:
(also bumping the version to make sure new changelog topics are created with the new partition count)