Skip to content

Instantly share code, notes, and snippets.

@anandtripathi5
Created March 3, 2022 23:39
Show Gist options
  • Save anandtripathi5/b15678e1445cc26b24964b7a1328e717 to your computer and use it in GitHub Desktop.
Save anandtripathi5/b15678e1445cc26b24964b7a1328e717 to your computer and use it in GitHub Desktop.
import celery as celery
from extensions import socketio
@celery.task(name="some_task", bind=True)
def some_task(self, room_name=None, namespace=None):
# Do some task
# Send a event to room
socketio.emit(
"some_event", dict(random_key="random_value"),
namespace=namespace, room=room,
include_self=False, skip_sid=["testing"]
)
return "succeeded"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment