Skip to content

Instantly share code, notes, and snippets.

@UtsavChokshiCNU
Last active August 9, 2023 09:26
Show Gist options
  • Save UtsavChokshiCNU/d8c1660b5cf3b4b58bce0877c64f4a91 to your computer and use it in GitHub Desktop.
Save UtsavChokshiCNU/d8c1660b5cf3b4b58bce0877c64f4a91 to your computer and use it in GitHub Desktop.
Sample Websocket Server that follows Query Pipeline contract and mocks it
"""
### How to use it ?
### Run following commands :
pip install simple-websocket-server==0.4.4 pydantic==2.1.1
python3 qa_websocket_server.py
### Send following request in postman using websocket :
ws://localhost:8000
{
"id": "test123",
"service": "qa",
"attributes": {
"stream": true,
"question": "Hello, how are you ?",
"top_k": 10
}
}
"""
import json
import logging
import secrets
import string
import threading
from datetime import datetime
from enum import Enum
from time import sleep
from typing import Dict, List
from pydantic import BaseModel
from simple_websocket_server import WebSocket, WebSocketServer
import simple_websocket_server
# Create a logger object
logger = logging.getLogger("server_logger")
logger.setLevel(logging.DEBUG) # Set the logging level
# Create a handler to define where the log messages will be sent
handler = logging.StreamHandler() # Sends log messages to console
handler.setLevel(logging.DEBUG) # Set the handler's logging level
# Create a formatter to define the log message format
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(handler)
simple_websocket_server.HANDSHAKE_STR = (
'HTTP/1.1 101 Switching Protocols\r\n'
'Upgrade: WebSocket\r\n'
'Connection: Upgrade\r\n'
'Access-Control-Allow-Origin: *\r\n'
'Sec-WebSocket-Accept: %(acceptstr)s\r\n\r\n'
)
class ServiceEnum(str, Enum):
Summary = "summary"
QA = "qa"
class WebsocketRequestAttributes(BaseModel):
stream: bool
question: str
top_k: int
class WebsocketRequest(BaseModel):
id: str
service: ServiceEnum
attributes: WebsocketRequestAttributes
def get_start_notification_message(request_id: str, llm_corpus_id: str, question: str) -> Dict:
return {
"id": request_id,
"service": "qa",
"response": {
"llm_corpus_id": llm_corpus_id,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
"type": "NOTIFICATION",
"data": {
"notification_type": "START",
"message": f"Searching for: {question}",
"question": question
}
}
}
def get_intermediate_notification_message(request_id: str, llm_corpus_id: str, question: str) -> Dict:
return {
"id": request_id,
"service": "qa",
"response": {
"llm_corpus_id": llm_corpus_id,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
"type": "NOTIFICATION",
"data": {
"notification_type": "INTERMEDIATE",
"message": "Generating answer for you....",
"question": question
}
}
}
def split_into_chunks(input_string: str, chunk_size: int = 3) -> List[str]:
return [input_string[i:i + chunk_size] for i in range(0, len(input_string), chunk_size)]
def get_streaming_answer_chunk(request_id: str, llm_corpus_id: str, question: str, chunk: str) -> Dict:
return {
"id": request_id,
"service": "qa",
"response": {
"llm_corpus_id": llm_corpus_id,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
"type": "STREAMING_ANSWER",
"data": {
"question": question,
"chunk": chunk,
"references": []
}
}
}
def get_streaming_answer_messages(request_id: str, llm_corpus_id: str, question: str) -> List[Dict]:
sample_answer = """
Yes, you can add more direct numbers to an existing user extension.
Login to your account portal with an admin user's credentials, select a user and see the screenshot below to add a direct number.
https://community.ringcentral.com/storage/attachments/3290-1650565501457.png
No, you cannot send SMS on behalf of another person within a user group and using the RC app. However, you can implement your own app using the /sms or /batches (for high volume SMS) API to send SMS messages. Your app can be a password flow authentication which always uses the user name and password (or JWT token) of the user who owns the SMS phone number for sending and receiving SMS messages. Now, you can control your app's SMS feature access by requiring users to login the app with their own credentials, then detect their identity (e.g. extension id) to allow the user continue using the app or not.
Think about a fax machine in a locked room and you allow only certain employees to enter the room to send faxes.
"""
chunks = ["START"] + split_into_chunks(sample_answer) + ["END"]
messages = [get_streaming_answer_chunk(
request_id, llm_corpus_id, question, chunk) for chunk in chunks]
messages[-1]["response"]["data"]["references"] = [
{
"id": "1",
"title": "Sample Doc 1",
"link": "https://google.co.in",
"date": "2022-02-28T15:03:19.600+0000"
},
{
"id": "2",
"title": "Sample Doc 2",
"link": "https://google.co.in",
"date": "2023-02-28T15:03:19.600+0000",
},
{
"id": "3",
"title": "Sample Doc 3",
"link": "https://google.co.in",
"date": "2021-02-28T15:03:19.600+0000",
}
]
messages[-1]["response"]["data"]["latency"] = 19
messages[-1]["response"]["data"]["cost"] = 0.003
return messages
def generate_random_string(length: int = 22):
characters = string.ascii_letters + string.digits
random_string = ''.join(secrets.choice(characters) for _ in range(length))
return random_string
def send_messages(client, ws_request: WebsocketRequest):
llm_corpus_id = generate_random_string()
try:
message = get_start_notification_message(
request_id=ws_request.id, llm_corpus_id=llm_corpus_id, question=ws_request.attributes.question)
logger.debug(message)
client.send_message(json.dumps(message))
except Exception:
logger.exception("Error in sending start notification")
sleep(3)
try:
message = get_intermediate_notification_message(
request_id=ws_request.id, llm_corpus_id=llm_corpus_id, question=ws_request.attributes.question)
logger.debug(message)
client.send_message(json.dumps(message))
except Exception:
logger.exception("Error in sending intermediate notification")
sleep(3)
try:
messages = get_streaming_answer_messages(
request_id=ws_request.id, llm_corpus_id=llm_corpus_id, question=ws_request.attributes.question)
logger.debug(
f"Sending {len(messages)} messages for streaming answer ..")
for message in messages:
client.send_message(json.dumps(message))
sleep(0.2)
logger.info(f"Sent {len(messages)} messages for streaming answer")
except Exception:
logger.exception("Error in sending streaming answers")
logger.info(f"Total active threads: {threading.active_count()}")
class QAWebSocket(WebSocket):
def handle(self):
try:
ws_request = WebsocketRequest(**json.loads(self.data))
except Exception as e:
logger.exception("Error in handling request message")
thread = threading.Thread(
target=send_messages, args=(self, ws_request))
logger.info("Offloading sending messages to new thread !")
thread.start()
def connected(self):
logger.info(f"{self.address} connected")
def handle_close(self):
logger.info(f"{self.address} closed")
server = WebSocketServer("", 8000, QAWebSocket)
logger.info("Starting server at : ws://localhost:8000")
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment