Skip to content

Instantly share code, notes, and snippets.

@KyMidd
Created May 2, 2025 17:16
Show Gist options
  • Select an option

  • Save KyMidd/21337379cd98711ac0a1da58688c2769 to your computer and use it in GitHub Desktop.

Select an option

Save KyMidd/21337379cd98711ac0a1da58688c2769 to your computer and use it in GitHub Desktop.
# Receives the streaming response and updates the slack message, chunk by chunk
def streaming_response_on_slack(client, streaming_response, initial_response, channel_id, thread_ts):
...
guardrail_type = None
guardrail_confidence = None
guardrail_filter_strength = None
guardrail_action = None
for chunk in streaming_response['stream']:
full_event_payload.append(chunk) # accumulate full payload
# Handle streamed text for Slack updates
if "contentBlockDelta" in chunk:
text = chunk["contentBlockDelta"]["delta"]["text"]
response += text
buffer += text
token_counter += 1
if token_counter >= slack_buffer_token_size:
client.chat_update(
text=response,
channel=channel_id,
ts=initial_response
)
token_counter = 0
buffer = ""
# Final Slack update
if buffer:
# Check for blocked message
if "input has been blocked by the content filter" in response:
if os.environ.get("VERA_DEBUG", "False") == "True":
print("🚀 Full event payload:", full_event_payload)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment