Skip to content

Instantly share code, notes, and snippets.

@andreyvit
Last active March 3, 2023 07:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andreyvit/edc97c9b3769f1d7004b650d1875b0e0 to your computer and use it in GitHub Desktop.
Save andreyvit/edc97c9b3769f1d7004b650d1875b0e0 to your computer and use it in GitHub Desktop.
ChatGPT Chromium driver from before OpenAI released ChatGPT API
let puppeteer = require('puppeteer-core') // tested with puppeteer-core 19.7.1
let fs = require('fs')
let BROWSER_ENDPOINT = process.env.BROWSER_ENDPOINT
if (!BROWSER_ENDPOINT) {
console.error("** BROWSER_ENDPOINT not set (expect smt like ws://127.0.0.1:9222/devtools/browser/aaaaaaaa-bbbb-cccc-dddd-ffffffffffff)\n\n" +
"To obtain, launch your favorite Chromium-based browser with --remote-debugging-port=9222\n" +
"and look at the output. E.g.:\n\n" +
" '/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge' --remote-debugging-port=9222\n\n" +
"and it's gonna say:\n\n" +
" DevTools listening on ws://127.0.0.1:9222/devtools/browser/aaaaaaaa-bbbb-cccc-dddd-ffffffffffff")
process.exit(2)
}
async function run() {
let processor = new ChatGPTConnection()
await processor.connect(BROWSER_ENDPOINT)
await readDelimitedSections(async (section) => {
if (/^!!new$/.test(section)) {
await processor.startNewChat()
} else if (/^!!/.test(section)) {
throw new Error(`unknown command: ${section}`)
} else if (/^\{/.test(section)) {
let metadata = JSON.parse(section)
await processor.setMetadata(metadata)
} else {
let response = (await processor.say(section)).trim()
process.stdout.write(`${response}\n---\n`)
}
})
// await sendMessage(frame, prompt)
// await awaitResponse(frame)
// $x('//button[contains(@class, "md:bottom-[120px]")]')
// button.justify-center.btn-neutral
// await page.screenshot({ path: 'example.png' })
}
async function readDelimitedSections(handler) {
process.stdin.setEncoding('utf8')
let inputBuffer = ''
for await (let chunk of process.stdin) {
inputBuffer += chunk
let sections = inputBuffer.split('\n---\n')
inputBuffer = sections.pop()
for (let section of sections) {
section = section.trim()
// log(`processing:\n${section}`)
await handler(section)
}
}
inputBuffer = inputBuffer.trim()
if (inputBuffer.length > 0) {
throw new Error(`unfinished chunk in input buffer:\n${inputBuffer}`)
}
}
class ChatGPTConnection {
constructor() {
this.metadata = {}
this.count = 0
this.priorResponseCount = 0
}
async connect(endpoint) {
let browser = await puppeteer.connect({
browserWSEndpoint: endpoint,
// slowMo: 200,
})
let context = browser.defaultBrowserContext()
let pages = await context.pages()
let page = pages.filter((page) => (page.url() || '').indexOf('https://chat.openai.com/chat') == 0)[0]
if (!page) {
throw new Error('ChatGPT page not found')
}
this.page = page
this.frame = page.mainFrame()
}
async setMetadata(metadata) {
this.metadata = metadata
// log(`new metadata: ${JSON.stringify(metadata)}`)
if (this.metadata.new_chat) {
await this.startNewChat()
}
}
async startNewChat() {
const modelSelectorXPath = '//button/label[text()="Model"]'
if ((await this.frame.$x(modelSelectorXPath)).length > 0) {
log('already on new chat')
return
}
let buttons = await this.frame.$x('//a[text()="New chat"]')
if (buttons.length == 0) {
throw new Error('cannot find New Chat button')
}
log('starting new chat')
await buttons[0].click({})
await this.frame.waitForXPath(modelSelectorXPath, {visible: true, timeout: 5000})
}
async say(prompt) {
// let m =
await this.sendOurMessage(prompt)
await this.awaitBotResponse()
return await this.readBotResponse()
}
async sendOurMessage(prompt) {
await sleep((this.count == 0 ? 3000 : 3000)) // extra time to ensure browser is ready to accept input
log('sending prompt')
let input = await this.frame.$('textarea')
await input.evaluate((el, value) => el.value = value, prompt)
// log('waiting for prompt confirmation')
// await this.frame.waitForFunction((value) => (document.querySelector('textarea').value == value), prompt)
// await input.type(prompt, {delay: 0})
await sleep(200)
await input.press('Enter')
this.count++
}
async awaitBotResponse() {
// log('waiting for start')
// await this.frame.waitForXPath('//button[text()="Stop generating"]', {visible: true, timeout: 5000})
await sleep(5000) // give GPT time to start responding
// log('waiting for end')
// await this.frame.waitForXPath('//button[text()="Regenerate response"]', {visible: true, timeout: 180000})
// log('waiting for end-idle')
// await this.page.waitForNetworkIdle({idleTime: 500, timeout: 180000})
log('waiting for end')
await this.frame.waitForXPath('//button[text()="Stop generating"]', {hidden: true, timeout: 180000})
}
async readBotResponse() {
log('reading response')
let responseBlocks = await this.frame.$$('div.w-full.dark\\:bg\\-\\[\\#444654\\]')
let responseCount = responseBlocks.length
if (responseCount == 0) {
throw new Error('cannot find responseCount block')
}
if (responseCount <= this.priorResponseCount) {
throw new Error('no new response blocks')
}
this.priorResponseCount = responseCount
let lastResponseBlock = responseBlocks[responseCount - 1]
let errorBlock = await lastResponseBlock.$('.text-red-500')
if (errorBlock) {
let errorMsg = errorBlock.evaluate((el) => el.textContent)
log('got error: ' + errorMsg)
if (errorMsg == 'TODO too long TODO') {
process.stdout.write("!!toolong\n---\n")
process.exit(0)
} else {
throw new Error("ChatGPT said: " + errorMsg)
}
}
let paragraphs = await lastResponseBlock.evaluate((el) => Array.from(el.querySelectorAll('p')).map((p) => p.textContent))
return paragraphs.join('\n\n')
}
}
function log(str) {
process.stderr.write(`chatgpt: ${str.trim().replace(/\n/g, '\n ')}\n`)
}
function assignValue(el, newValue) {
return el.evaluate()
}
function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
run().then(function() {
process.exit(0)
}, function(err) {
process.stdout.write(`!!error\n${err.message}\n\n${err.stack}\n---\n`)
process.exit(0)
})
// Package chatgpt communicates with chatgpt.js driver to talk to ChatGPT running in a Chromium-based browser.
package chatgpt
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"
"strings"
)
var (
ErrInputTooLong = errors.New("input too long")
)
var (
sep = []byte("\n---\n")
commandPrefix = []byte("!!")
errorPrefix = []byte("!!error\n")
tooLong = []byte("!!toolong")
)
type Metadata struct {
Title string `json:"title"`
NewChat bool `json:"new_chat"`
}
func Run(meta *Metadata, prompts []string) ([]string, error) {
c := exec.Command("node", "chatgpt.js")
stdin, err := buildStdin(meta, prompts)
if err != nil {
return nil, err
}
c.Stdin = bytes.NewReader(stdin)
ensure(os.WriteFile("_work/prompt.txt", stdin, 0644))
var stdout bytes.Buffer
c.Stdout = &stdout
var stderr bytes.Buffer
c.Stderr = os.Stderr
err = c.Run()
if err != nil {
if ee, ok := err.(*exec.ExitError); ok {
ee.Stderr = stderr.Bytes()
}
return nil, err
}
outputs := bytes.Split(stdout.Bytes(), sep)
if len(bytes.TrimSpace(outputs[len(outputs)-1])) != 0 {
return nil, fmt.Errorf("ChatGPT driver protocol failure, last chunk not empty: %s", bytes.TrimSpace(outputs[len(outputs)-1]))
}
outputs = outputs[:len(outputs)-1]
var responses []string
for _, output := range outputs {
output = bytes.TrimSpace(output)
if bytes.HasPrefix(output, commandPrefix) {
if bytes.HasPrefix(output, errorPrefix) {
return nil, fmt.Errorf("ChatGPT communication failure: %s", strings.TrimSpace(string(output[len(errorPrefix):])))
} else if bytes.Equal(output, tooLong) {
return nil, ErrInputTooLong
} else {
return nil, fmt.Errorf("ChatGPT driver protocol failure, unknown command: %s", strings.TrimSpace(string(output)))
}
} else {
responses = append(responses, string(output))
}
}
if len(responses) != len(prompts) {
return nil, fmt.Errorf("ChatGPT driver protocol failure, number of respones %d does not match number of prompts %d in stdout:\n%s", len(responses), len(prompts), strings.TrimSpace(string(stdout.Bytes())))
}
return responses, nil
}
func buildStdin(meta *Metadata, prompts []string) ([]byte, error) {
var buf bytes.Buffer
buf.Write(must(json.Marshal(meta)))
buf.Write(sep)
for i, p := range prompts {
p = strings.TrimSpace(p)
if p == "" {
return nil, fmt.Errorf("prompt %d is empty", i+1)
}
buf.WriteString(p)
buf.Write(sep)
}
return buf.Bytes(), nil
}
func must[T any](v T, err error) T {
if err != nil {
panic(err)
}
return v
}
func ensure(err error) {
if err != nil {
panic(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment