Skip to content

Instantly share code, notes, and snippets.

@timrwilliams
Created September 19, 2023 09:17
Show Gist options
  • Save timrwilliams/965fab730999d97af54214d8039eb978 to your computer and use it in GitHub Desktop.
Save timrwilliams/965fab730999d97af54214d8039eb978 to your computer and use it in GitHub Desktop.
OpenAI process streaming JSON in Ruby

Summary

This example shows how to process a JSON response from the OpenAI Chat API in streaming mode.

  • We are looking for a JSON key named questions which contains an array of JSON objects with the keys question and answer
  • As each question and answer pair is found we process it via the end_object block.

Dependencies

https://github.com/dgraham/json-stream https://github.com/alexrudall/ruby-openai

Running the code

Add environment variables:

  • OPENAI_API_KEY - your OpenAI Access Token
  • OPENAI_API_ORG - your OpenAI Org ID
StructuredStreamExample.new.call

An example JSON response is shown but is not needed to try the code out.

{
"questions": [
{
"question": "What is the capital city of France?",
"answer": "Paris"
},
{
"question": "Which continent is Egypt located in?",
"answer": "Africa"
},
{
"question": "What is the longest river in the world?",
"answer": "Nile River"
},
{
"question": "Which country is known as the Land of the Rising Sun?",
"answer": "Japan"
},
{
"question": "What is the tallest mountain in the world?",
"answer": "Mount Everest"
}
]
}
require 'json/stream'
class StructuredStreamExample
def call
initialize_json_stream
prompt = 'Produce a list of 5 short questions about geography in JSON a JSON array with JSON properties: "question", "answer". The root node should be named "questions"'
client = OpenAI::Client.new(access_token: ENV.fetch('OPENAI_API_KEY'),
organization_id: ENV.fetch('OPENAI_API_ORG'),)
client.chat(
parameters: {
model: 'gpt-3.5-turbo', # Required.
messages: [ { role: 'user', content: prompt } ], # Required.
temperature: 0.7,
stream: stream_proc,
})
end
def stream_proc
proc do |chunk, _bytesize|
new_content = chunk.dig('choices', 0, 'delta', 'content') || ''
receive_data(new_content)
end
end
def receive_data(data)
begin
@parser << data
rescue JSON::Stream::ParserError => _e
close_connection
end
end
def initialize_json_stream
current_key = nil
current_question = nil
current_answer = nil
processing = false
@parser = JSON::Stream::Parser.new do
end_object do
puts "Found: Question: #{current_question}. answer: #{current_answer}" if processing
# In real world e.g. Question.new(question: current_question, answer: current_answer)
end
start_array { processing = true if current_key == 'questions' }
end_array { processing = false }
key { |key| current_key = key }
value do |value|
case current_key
when 'question'
current_question = value
when 'answer'
current_answer = value
end
current_key = nil
# Reset the current key
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment