Skip to content

Instantly share code, notes, and snippets.

@fr-ser
Created October 11, 2020 08:12
Show Gist options
  • Save fr-ser/347c00e0026cd7941e1d73fd36e54887 to your computer and use it in GitHub Desktop.
Save fr-ser/347c00e0026cd7941e1d73fd36e54887 to your computer and use it in GitHub Desktop.
Example code to allow take to have manually acknowledged messages
import faust
from faust import Stream
from noack_take import noack_take
# monkeypatch noack_take on streams
Stream.noack_take = noack_take
import asyncio
from asyncio import CancelledError
from mode import Seconds, want_seconds
from mode.utils.aiter import aiter
from mode.utils.futures import notify
from typing import (
AsyncIterable,
List,
Optional,
Sequence,
cast,
)
from faust.types import ChannelT, EventT
from faust.types.streams import (
T,
T_co,
)
# mainly copied from faust
# faust/streams.py
# changes from the faust tests are marked with a "code change" comment
async def noack_take(self, max_: int,
within: Seconds) -> AsyncIterable[Sequence[T_co]]:
"""Buffer n values at a time and yield a list of buffered values.
Arguments:
within: Timeout for when we give up waiting for another value,
and process the values we have.
Warning: If there's no timeout (i.e. `timeout=None`),
the agent is likely to stall and block buffered events for
an unreasonable length of time(!).
"""
buffer: List[T_co] = []
events: List[EventT] = []
buffer_add = buffer.append
event_add = events.append
buffer_size = buffer.__len__
buffer_full = asyncio.Event(loop=self.loop)
buffer_consumed = asyncio.Event(loop=self.loop)
timeout = want_seconds(within) if within else None
stream_enable_acks: bool = self.enable_acks
buffer_consuming: Optional[asyncio.Future] = None
channel_it = aiter(self.channel)
# We add this processor to populate the buffer, and the stream
# is passively consumed in the background (enable_passive below).
async def add_to_buffer(value: T) -> T:
try:
# buffer_consuming is set when consuming buffer
# after timeout.
nonlocal buffer_consuming
if buffer_consuming is not None:
try:
await buffer_consuming
finally:
buffer_consuming = None
# code change: We want to save events instead of values
# buffer_add(cast(T_co, value))
event = self.current_event
# code change: We want to save events instead of values
buffer_add(cast(T_co, event))
if event is None:
raise RuntimeError(
'Take buffer found current_event is None')
event_add(event)
if buffer_size() >= max_:
# signal that the buffer is full and should be emptied.
buffer_full.set()
# strict wait for buffer to be consumed after buffer
# full.
# If max is 1000, we are not allowed to return 1001
# values.
buffer_consumed.clear()
await self.wait(buffer_consumed)
except CancelledError: # pragma: no cover
raise
except Exception as exc:
self.log.exception('Error adding to take buffer: %r', exc)
await self.crash(exc)
return value
# Disable acks to ensure this method acks manually
# events only after they are consumed by the user
self.enable_acks = False
self.add_processor(add_to_buffer)
self._enable_passive(cast(ChannelT, channel_it))
try:
while not self.should_stop:
# wait until buffer full, or timeout
await self.wait_for_stopped(buffer_full, timeout=timeout)
if buffer:
# make sure background thread does not add new items to
# buffer while we read.
buffer_consuming = self.loop.create_future()
try:
yield list(buffer)
finally:
buffer.clear()
# code change: We want to manually ack
# for event in events:
# await self.ack(event)
events.clear()
# allow writing to buffer again
notify(buffer_consuming)
buffer_full.clear()
buffer_consumed.set()
else: # pragma: no cover
pass
else: # pragma: no cover
pass
finally:
# Restore last behaviour of "enable_acks"
self.enable_acks = stream_enable_acks
self._processors.remove(add_to_buffer)
import asyncio
import os
from typing import NamedTuple
import faust
from faust.utils.tracing import set_current_span
from mode.utils.mocks import Mock
import pytest
from noack_take import noack_take
# mainly copied from faust tests
# faust/t/functional/test_streams.py
# changes from the faust tests are marked with a "code change" comment
@pytest.mark.asyncio
async def test_take(app):
async with new_stream(app) as s:
# code change: change assert to False
assert s.enable_acks is False
await s.channel.send(value=1)
event = None
# code change: using noack_take instead of take
# code change: noack_take returns event instead of value
async for noack_value in s.noack_take(s, 1, within=1):
assert noack_value[0].value == 1
assert s.enable_acks is False
event = mock_stream_event_ack(s)
break
assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
if not event.ack.called:
# code change: assert to not acked
assert not event.message.acked
assert not event.message.refcount
# code change: change assert to False
assert s.enable_acks is False
@pytest.mark.asyncio
async def test_take__10(app, loop):
s = new_stream(app)
async with s:
# code change: change assert to False
assert s.enable_acks is False
for i in range(9):
await s.channel.send(value=i)
async def in_one_second_finalize():
await s.sleep(1.0)
await s.channel.send(value=9)
for i in range(10):
await s.channel.send(value=i + 10)
asyncio.ensure_future(in_one_second_finalize())
event = None
# code change: using noack_take instead of take
buffer_processor = s.noack_take(s, 10, within=10.0)
async for noack_value in buffer_processor:
assert [nv.value for nv in noack_value] == list(range(10))
assert s.enable_acks is False
event = mock_stream_event_ack(s)
break
async for noack_value in buffer_processor:
assert [nv.value for nv in noack_value] == list(range(10, 20))
assert s.enable_acks is False
break
try:
await buffer_processor.athrow(asyncio.CancelledError())
except asyncio.CancelledError:
pass
assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
if not event.ack.called:
# code change: assert to not acked
assert not event.message.acked
assert not event.message.refcount
# code change: change assert to False
assert s.enable_acks is False
# code change: my own test to manually acknowledge
@pytest.mark.asyncio
async def test_take__10_and_ack(app, loop):
s = new_stream(app)
async with s:
# code change: change assert to False
assert s.enable_acks is False
for i in range(9):
await s.channel.send(value=i)
async def in_one_second_finalize():
await s.sleep(1.0)
await s.channel.send(value=9)
for i in range(10):
await s.channel.send(value=i + 10)
asyncio.ensure_future(in_one_second_finalize())
event = None
# code change: using noack_take instead of take
buffer_processor = s.noack_take(s, 10, within=10.0)
async for noack_values in buffer_processor:
assert [nv.value for nv in noack_values] == list(range(10))
assert s.enable_acks is False
event = mock_stream_event_ack(s)
# code change: acknowledge
for noack_event in noack_values:
await s.ack(noack_event)
break
async for noack_values in buffer_processor:
assert [nv.value for nv in noack_values] == list(range(10, 20))
assert s.enable_acks is False
# code change: acknowledge
for noack_event in noack_values:
await s.ack(noack_event)
break
try:
await buffer_processor.athrow(asyncio.CancelledError())
except asyncio.CancelledError:
pass
assert event
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
if not event.ack.called:
assert event.message.acked
assert not event.message.refcount
# code change: change assert to False
assert s.enable_acks is False
@pytest.mark.asyncio
async def test_take__no_event_crashes(app, loop):
class NoCurrentEventStream(faust.Stream):
@property
def current_event(self):
return None
@current_event.setter
def current_event(self, event):
pass
app.conf.Stream = NoCurrentEventStream
s = new_stream(app)
assert isinstance(s, NoCurrentEventStream)
async with s:
# code change: change assert to False
assert s.enable_acks is False
await s.channel.send(value=1)
# code change: using noack_take instead of takevalue=1)
buffer_processor = s.noack_take(s, 10, within=10.0)
print('STARTING STREAM ITERATION')
async for value in buffer_processor:
print(f'RECEIVED VALUE: {value!r}')
break
print('ENDING STREAM ITERATION')
try:
await buffer_processor.athrow(asyncio.CancelledError())
except asyncio.CancelledError:
pass
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
await asyncio.sleep(0) # needed for some reason
assert isinstance(s._crash_reason, RuntimeError)
print('RETURNING')
# code change: change assert to False
assert s.enable_acks is False
def mock_stream_event_ack(stream, return_value=False):
return mock_event_ack(stream.current_event, return_value=return_value)
def mock_event_ack(event, return_value=False):
event.ack = Mock(name='ack')
event.ack.return_value = return_value
return event
def new_stream(app, *args, **kwargs):
app = _prepare_app(app)
# code change: Changed stream to noack stream and added noack_take
ack_stream = _new_stream(app, app.channel(loop=app.loop, maxsize=1000), **kwargs)
noack_stream = ack_stream.noack()
setattr(noack_stream, "noack_take", noack_take)
return noack_stream
def new_topic_stream(app, *args, name: str = 'test', **kwargs):
app = _prepare_app(app)
return _new_stream(app, app.topic(name, loop=app.loop), **kwargs)
def _new_stream(app, channel, *args, **kwargs):
return channel.stream(*args, loop=app.loop, **kwargs)
# faust/t/functional/conftest.py
class AppMarks(NamedTuple):
name: str = 'funtest'
store: str = 'memory://'
cache: str = 'memory://'
def create_appmarks(name='funtest',
store='memory://',
cache='memory://',
**rest):
options = AppMarks(
name=name,
store=store,
cache=cache,
)
return options, rest
@pytest.yield_fixture()
def app(event_loop, request):
os.environ.pop('F_DATADIR', None)
os.environ.pop('FAUST_DATADIR', None)
os.environ.pop('F_WORKDIR', None)
os.environ.pop('FAUST_WORKDIR', None)
marks = request.node.get_closest_marker('app')
options, rest = create_appmarks(
**((marks.kwargs or {}) if marks else {}))
app = faust.App(
options.name,
store=options.store,
cache=options.cache,
**rest,
)
app.finalize()
set_current_span(None)
try:
yield app
finally:
assert app.tracer is None
def _prepare_app(app):
loop = asyncio.get_event_loop()
app.loop = loop
app.flow_control.resume() # <-- flow control initially suspended
return app
# faust/t/conftest.py
@pytest.fixture()
def loop(event_loop):
return event_loop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment