Skip to content

Instantly share code, notes, and snippets.

@nasir733
Created April 8, 2024 22:32
Show Gist options
  • Save nasir733/d72b88565a70d0ec10cd54bdc1beaf2f to your computer and use it in GitHub Desktop.
Save nasir733/d72b88565a70d0ec10cd54bdc1beaf2f to your computer and use it in GitHub Desktop.
from celery import shared_task
import os
from pprint import pprint
from dotenv import load_dotenv
import logging, verboselogs
from datetime import datetime, timedelta
import json
from django.conf import settings
from langchain_community.llms import OpenAI as LLMOpenAI
from langchain.prompts import PromptTemplate
from openai import OpenAI
from langchain.chains import LLMChain
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
PrerecordedOptions,
FileSource,
)
from notes.models import Notes
from visits.models import Visits
from users.models import User
from Transcripts.models import AudioFile, Transcript
from celery.exceptions import MaxRetriesExceededError
from patients.models import Patients
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
load_dotenv()
@shared_task(bind=True, max_retries=300, default_retry_delay=5 * 60)
def create_soap_format_notes_for_a_visit(
self, audio_file_id, visit_id, user_id, note_id, patient_id
):
"""Create SOAP format notes for a visit
Args:
audio_file (string): path to the audio file
"""
try:
channel_layer = get_channel_layer()
task_id = self.request.id
print("task_id", task_id)
print("visit_id", visit_id)
user = User.objects.get(id=user_id)
Visit = Visits.objects.get(id=int(visit_id))
Note = Notes.objects.get(id=note_id)
Audio_File = AudioFile.objects.get(id=audio_file_id)
async_to_sync(channel_layer.group_send)(
task_id,
{
"type": "celery_task_update",
"message": {"progress": 5, "status": "Processing"},
},
)
audio_file_path = Audio_File.file.path
pprint(f"audio_file: {audio_file_path}")
transcript = transcribe_whisper_audio(
audio_file_path,
)
transcript = Transcript.objects.create(
transcript=transcript, audio_file=Audio_File, user=user
)
async_to_sync(channel_layer.group_send)(
task_id,
{
"type": "celery_task_update",
"message": {"progress": 20, "status": "Processing"},
},
)
pprint(f"transcript: {transcript}")
soap_note = generate_soap_format_notes(
transcript.transcript, channel_layer, task_id
)
pprint(f"soap_note: {soap_note}")
patient = None
Note.subjective = soap_note["Subjective"]
Note.objective = soap_note["Objective"]
Note.assessment = soap_note["Assessment"]
Note.plan = soap_note["Plan"]
if patient_id:
print("got the patient_id", patient_id)
patient = Patients.objects.get(id=patient_id)
else:
patient, created = Patients.objects.get_or_create(
user=user,
name__iexact=soap_note["Patient Name"].replace("\n", ""),
defaults={
"name": soap_note["Patient Name"].replace("\n", ""),
"user": user,
},
)
patient.notes.add(Note)
Note.patient = patient
Visit.patient = patient
Visit.transcript = transcript
Visit.notes_status = "completed"
Visit.save()
Note.save()
except Exception as e:
try:
raise self.retry(exc=e)
except MaxRetriesExceededError:
# Handle the case where all retries failed, e.g. by logging
logging.error(
f"Failed to create SOAP format notes for visit {visit_id} after maximum retries"
)
def generate_soap_format_notes(transcript, channel_layer, task_id):
try:
llm = LLMOpenAI(temperature=0.7)
loading_percentage = 0
# Objective
objective_template = PromptTemplate(
input_variables=["transcript"],
template="""You are a senior Doctor at a reputable hospital in the United States of America. Based on this transcript,
provide the Objective part of the SOAP note, focusing on the physical exam findings and vital signs: {transcript}""",
)
objective_chain = LLMChain(llm=llm, prompt=objective_template)
objective_response = objective_chain({"transcript": transcript})
loading_percentage += 20
async_to_sync(channel_layer.group_send)(
task_id,
{
"type": "celery_task_update",
"message": {"progress": 40, "status": "Processing"},
},
)
print(f"Loading... {loading_percentage}%")
# Subjective
subjective_template = PromptTemplate(
input_variables=["transcript"],
# template="""Considering OLD CARTS (Onset, Location, Duration, Character, Alleviating/Aggravating factors, Radiation, Timing, Severity),
# provide the Subjective part of the SOAP note based on the patient's verbal report: {transcript}""",
template ="""
You're a meticulous medical assistant who prioritizes accuracy and detail in documenting patient information. Your expertise lies in extracting relevant details from patient interviews to create comprehensive SOAP notes.
Your task is to provide the Subjective part of the SOAP note based on the patient's verbal report. Remember to consider OLD CARTS (Onset, Location, Duration, Character, Alleviating/Aggravating factors, Radiation, Timing, Severity) method while recording the subjective information.
Patient Details:
- Name: __________
- Age: ________
- Gender: ________
- Chief Complaint: ________
Begin by gathering information using the OLD CARTS method, ensuring that the subjective part of the SOAP note is thorough and accurately represents the patient's verbal report.
Examples:
- Onset: "The patient reported that the pain started two days ago..."
- Location: "The patient pointed to the lower back area as the primary location of the discomfort..."
{transcript}
"""
)
subjective_chain = LLMChain(llm=llm, prompt=subjective_template)
subjective_response = subjective_chain({"transcript": transcript})
loading_percentage += 20
async_to_sync(channel_layer.group_send)(
task_id,
{
"type": "celery_task_update",
"message": {"progress": 60, "status": "Processing"},
},
)
print(f"Loading... {loading_percentage}%")
# Assessment
assessment_template = PromptTemplate(
input_variables=["transcript"],
template="""Based on the Objective and Subjective information provided, list the possible diagnoses in a numerical format
for the Assessment part of the SOAP note: {transcript}""",
)
assessment_chain = LLMChain(llm=llm, prompt=assessment_template)
assessment_response = assessment_chain({"transcript": transcript})
loading_percentage += 20
async_to_sync(channel_layer.group_send)(
task_id,
{
"type": "celery_task_update",
"message": {"progress": 80, "status": "Processing"},
},
)
print(f"Loading... {loading_percentage}%")
# Plan
plan_template = PromptTemplate(
input_variables=["transcript"],
template="""Given the Assessment, outline a treatment Plan including any further testing, medications, or follow-up needed: {transcript}""",
)
plan_chain = LLMChain(llm=llm, prompt=plan_template)
plan_response = plan_chain({"transcript": transcript})
loading_percentage += 20
# Patient Name
patient_name_template = PromptTemplate(
input_variables=["transcript"],
template="""Figure out the patient's name from the transcript only give me the name back in the response nothing else: {transcript}""",
)
patient_name_chain = LLMChain(llm=llm, prompt=patient_name_template)
patient_name_response = patient_name_chain({"transcript": transcript})
loading_percentage += 20
async_to_sync(channel_layer.group_send)(
task_id,
{
"type": "celery_task_update",
"message": {"progress": 100, "status": "Processing"},
},
)
print(f"Loading... {loading_percentage}%")
# Combining responses into a single SOAP note object
soap_note = {
"Subjective": subjective_response["text"],
"Objective": objective_response["text"],
"Assessment": assessment_response["text"],
"Plan": plan_response["text"],
"Patient Name": patient_name_response["text"],
}
return soap_note
except Exception as e:
try:
raise create_soap_format_notes_for_a_visit.retry(exc=e)
except MaxRetriesExceededError:
# Handle the case where all retries failed, e.g. by logging
logging.error(
f"Failed to generate SOAP format notes for visit after maximum retries"
)
def transcribe_whisper_audio(audio_file):
try:
client = OpenAI(api_key=settings.OPENAI_API_KEY)
with open(audio_file, "rb") as file:
transcript = client.audio.transcriptions.create(
model="whisper-1", file=file, response_format="json"
)
return json.loads(transcript.json())["text"]
except Exception as e: # Replace with more specific exceptions if possible
try:
raise create_soap_format_notes_for_a_visit.retry(exc=e)
except MaxRetriesExceededError:
# Handle the case where all retries failed, e.g. by logging
logging.error(
f"Failed to transcribe audio {audio_file} after maximum retries"
)
def transcribe_audio(audio_file):
# STEP 1 Create a Deepgram client using the API key in the environment variables
deepgram = DeepgramClient(api_key=settings.DEEPGRAM_API_KEY)
print(f"Deepgram client: {deepgram}")
# STEP 2 Call the transcribe_file method on the prerecorded class
with open(audio_file, "rb") as file:
print(f"Transcribing {audio_file}")
buffer_data = file.read()
payload: FileSource = {
"buffer": buffer_data,
}
print("hello world")
options = PrerecordedOptions(
model="nova-2-medical",
smart_format=True,
diarize=True,
)
before = datetime.now()
response = deepgram.listen.prerecorded.v("1").transcribe_file(payload, options)
after = datetime.now()
difference = after - before
print(f"time: {difference.seconds}")
print(response.to_json(indent=4))
return response.to_json()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment