Skip to content

Instantly share code, notes, and snippets.

@marvs
Created May 14, 2026 13:55
Show Gist options
  • Select an option

  • Save marvs/0b39896c01975817d2edda2581caa730 to your computer and use it in GitHub Desktop.

Select an option

Save marvs/0b39896c01975817d2edda2581caa730 to your computer and use it in GitHub Desktop.
A Python script that uses an LLM to access the internet, using Gradio as a chat interface
import json
import logging
import time
import re
from datetime import date
from threading import Thread
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S"
)
log = logging.getLogger("chat")
import gradio as gr
import requests
from bs4 import BeautifulSoup
from transformers import AutoTokenizer, TextIteratorStreamer
from optimum.intel import OVModelForCausalLM
MODEL_DIR = "qwen3-4b-ov"
BASE_MODEL_ID = "Qwen/Qwen3-4B"
DEVICE = "GPU" # change to "CPU" if needed
MAX_TOOL_ITERATIONS = 5
MAX_BROWSE_CHARS = 3000
SYSTEM_PROMPT = (
f"You are a concise and direct assistant. "
f"Today's date is {date.today().strftime('%B %d, %Y')}. "
"Never reveal internal reasoning. "
"Never output <think> tags. "
"Only output the final answer. "
"When you need current information from the web, use the available tools."
)
TOOLS = [
{
"type": "function",
"function": {
"name": "web_search",
"description": (
"Search the web for current information. "
"Use this when you need up-to-date facts, news, or data."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "browse_url",
"description": (
"Fetch and read the text content of a web page. "
"Use this to read a specific URL in detail."
),
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The full URL to fetch (must start with http:// or https://)"
}
},
"required": ["url"]
}
}
}
]
tokenizer = AutoTokenizer.from_pretrained(
BASE_MODEL_ID,
trust_remote_code=True
)
model = OVModelForCausalLM.from_pretrained(
MODEL_DIR,
device=DEVICE
)
# ---------------------------------------------------------------------------
# Tool execution
# ---------------------------------------------------------------------------
def tool_web_search(query: str) -> str:
"""
Scrape DuckDuckGo HTML search results (no API key required).
For richer/more reliable results, swap with SerpAPI or Brave Search API.
"""
log.info(f"[web_search] query={repr(query)}")
try:
resp = requests.get(
"https://html.duckduckgo.com/html/",
params={"q": query},
timeout=10,
headers={"User-Agent": "Mozilla/5.0"}
)
log.debug(f"[web_search] HTTP {resp.status_code} | content-length={len(resp.text)}")
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for result in soup.select(".result__body")[:5]:
title = result.select_one(".result__title")
snippet = result.select_one(".result__snippet")
url = result.select_one(".result__url")
if title and snippet:
results.append(
f"Title: {title.get_text(strip=True)}\n"
f"Snippet: {snippet.get_text(strip=True)}\n"
f"URL: {url.get_text(strip=True) if url else 'N/A'}"
)
log.info(f"[web_search] parsed {len(results)} results")
if not results:
log.warning("[web_search] 0 results — raw HTML snippet:\n" + resp.text[:500])
return "\n\n".join(results) if results else f"No results found for: {query}"
except Exception as e:
log.error(f"[web_search] exception: {e}", exc_info=True)
return f"Search error: {e}"
def tool_browse_url(url: str) -> str:
"""Fetch a URL and return stripped plain text, truncated."""
log.info(f"[browse_url] url={repr(url)}")
try:
resp = requests.get(
url,
timeout=15,
headers={"User-Agent": "Mozilla/5.0"}
)
log.debug(f"[browse_url] HTTP {resp.status_code} | content-length={len(resp.text)}")
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
for tag in soup(["script", "style", "noscript", "header", "footer", "nav"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
# Collapse excessive blank lines
text = re.sub(r"\n{3,}", "\n\n", text)
if len(text) > MAX_BROWSE_CHARS:
text = text[:MAX_BROWSE_CHARS] + "\n\n[... truncated ...]"
log.info(f"[browse_url] returning {len(text)} chars")
return text or "Page returned no readable content."
except Exception as e:
log.error(f"[browse_url] exception: {e}", exc_info=True)
return f"Browse error: {e}"
def execute_tool(name: str, arguments: dict) -> str:
if name == "web_search":
return tool_web_search(arguments.get("query", ""))
elif name == "browse_url":
return tool_browse_url(arguments.get("url", ""))
return f"Unknown tool: {name}"
# ---------------------------------------------------------------------------
# Tool call parsing
# ---------------------------------------------------------------------------
def parse_tool_call(text: str):
"""
Returns a dict with 'name' and 'arguments' if a tool call is found,
otherwise returns None.
Handles two formats Qwen3 may emit:
1. <tool_call>{"name": "...", "arguments": {...}}</tool_call>
2. ```json\n{"name": "...", "arguments": {...}}\n```
"""
# Format 1: <tool_call> tags
match = re.search(r"<tool_call>\s*(.*?)\s*</tool_call>", text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
pass
# Format 2: markdown JSON block
match = re.search(r"```json\s*(\{.*?\})\s*```", text, re.DOTALL)
if match:
try:
data = json.loads(match.group(1))
if "name" in data and "arguments" in data:
return data
except json.JSONDecodeError:
pass
return None
# ---------------------------------------------------------------------------
# Think block filtering
# ---------------------------------------------------------------------------
def remove_think_blocks(text: str) -> str:
# Remove complete <think>...</think> blocks
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE)
# Remove unclosed <think> blocks (from <think> to end of string)
text = re.sub(r"<think>.*$", "", text, flags=re.DOTALL | re.IGNORECASE)
return text.strip()
def filter_streaming_text(text: str) -> str:
"""
Removes complete and incomplete <think> blocks during live streaming.
Also buffers partial opening tags at the end of the string to avoid
rendering them before we know if they are a think block.
"""
# Remove complete <think>...</think> blocks
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE)
# Remove everything from an unclosed <think> to end of string
text = re.sub(r"<think>.*$", "", text, flags=re.DOTALL | re.IGNORECASE)
# Buffer a trailing partial <think> tag (e.g. "<thi", "<think" without ">")
text = re.sub(r"<(?:t(?:h(?:i(?:n(?:k)?)?)?)?)?$", "", text, flags=re.IGNORECASE)
return text.strip()
# ---------------------------------------------------------------------------
# Message builder
# ---------------------------------------------------------------------------
def build_messages(message, history):
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
for item in history or []:
if isinstance(item, dict):
role = item.get("role")
content = item.get("content")
if role in ("user", "assistant") and content:
messages.append({"role": role, "content": str(content)})
elif isinstance(item, (list, tuple)) and len(item) == 2:
user_msg, assistant_msg = item
if user_msg:
messages.append({"role": "user", "content": str(user_msg)})
if assistant_msg:
messages.append({"role": "assistant", "content": str(assistant_msg)})
messages.append({"role": "user", "content": str(message)})
return messages
# ---------------------------------------------------------------------------
# Main chat function (agentic tool loop)
# ---------------------------------------------------------------------------
def chat_fn(message, history):
messages = build_messages(message, history)
log.info(f"[chat_fn] user message={repr(message[:80])}")
start = time.time()
last_display = ""
yielded_anything = False
iteration = 0
for iteration in range(MAX_TOOL_ITERATIONS):
log.info(f"[chat_fn] --- iteration {iteration + 1} ---")
# --- Build prompt and stream this generation pass ---
prompt = tokenizer.apply_chat_template(
messages,
tools=TOOLS,
tokenize=False,
add_generation_prompt=True
)
inputs = tokenizer(prompt, return_tensors="pt")
streamer = TextIteratorStreamer(
tokenizer,
skip_prompt=True,
skip_special_tokens=True
)
generation_kwargs = dict(
**inputs,
max_new_tokens=512,
do_sample=False,
repetition_penalty=1.05,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.eos_token_id,
streamer=streamer,
)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
partial_text = ""
for chunk in streamer:
partial_text += chunk
# Don't stream tool call JSON to the UI — wait for full output
if "<tool_call>" in partial_text:
continue
cleaned = filter_streaming_text(partial_text)
if cleaned and cleaned != last_display:
last_display = cleaned
yielded_anything = True
yield cleaned
thread.join()
log.debug(f"[chat_fn] raw model output:\n{partial_text}")
# --- Check for tool call in full output ---
tool_call = parse_tool_call(partial_text)
log.info(f"[chat_fn] tool_call parsed={tool_call}")
if not tool_call:
# Plain response — finalize and exit loop
final = remove_think_blocks(partial_text)
if final and final != last_display:
yielded_anything = True
yield final
break
# --- Execute the tool ---
tool_name = tool_call.get("name", "")
tool_args = tool_call.get("arguments", {})
status = f"🔧 Using `{tool_name}`"
if tool_name == "web_search":
status += f": *{tool_args.get('query', '')}*"
elif tool_name == "browse_url":
status += f": *{tool_args.get('url', '')}*"
yield status
last_display = status
yielded_anything = True
tool_result = execute_tool(tool_name, tool_args)
log.info(f"[chat_fn] tool_result preview: {repr(tool_result[:200])}")
# Inject assistant tool call and tool result into message history
messages.append({"role": "assistant", "content": partial_text})
messages.append({
"role": "tool",
"name": tool_name,
"content": tool_result
})
# Loop back for next generation pass
else:
# Exceeded MAX_TOOL_ITERATIONS without a plain response
if not yielded_anything:
yield "Sorry, I was unable to complete the request within the tool call limit."
yielded_anything = True
# Guarantee at least one yield for Gradio's async wrapper
if not yielded_anything:
yield "..."
end = time.time()
print(f"Device={DEVICE} | Iterations={iteration + 1} | Time={end - start:.2f}s")
# ---------------------------------------------------------------------------
# Gradio UI
# ---------------------------------------------------------------------------
demo = gr.ChatInterface(
fn=chat_fn,
title="Qwen 3 4B Local Chat",
description=f"Model: {MODEL_DIR} | Device: {DEVICE} | Tools: web_search, browse_url"
)
if __name__ == "__main__":
demo.launch(inbrowser=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment