Created
May 14, 2026 13:55
-
-
Save marvs/0b39896c01975817d2edda2581caa730 to your computer and use it in GitHub Desktop.
A Python script that uses an LLM to access the internet, using Gradio as a chat interface
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import logging | |
| import time | |
| import re | |
| from datetime import date | |
| from threading import Thread | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| datefmt="%H:%M:%S" | |
| ) | |
| log = logging.getLogger("chat") | |
| import gradio as gr | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from transformers import AutoTokenizer, TextIteratorStreamer | |
| from optimum.intel import OVModelForCausalLM | |
| MODEL_DIR = "qwen3-4b-ov" | |
| BASE_MODEL_ID = "Qwen/Qwen3-4B" | |
| DEVICE = "GPU" # change to "CPU" if needed | |
| MAX_TOOL_ITERATIONS = 5 | |
| MAX_BROWSE_CHARS = 3000 | |
| SYSTEM_PROMPT = ( | |
| f"You are a concise and direct assistant. " | |
| f"Today's date is {date.today().strftime('%B %d, %Y')}. " | |
| "Never reveal internal reasoning. " | |
| "Never output <think> tags. " | |
| "Only output the final answer. " | |
| "When you need current information from the web, use the available tools." | |
| ) | |
| TOOLS = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "web_search", | |
| "description": ( | |
| "Search the web for current information. " | |
| "Use this when you need up-to-date facts, news, or data." | |
| ), | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The search query" | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "browse_url", | |
| "description": ( | |
| "Fetch and read the text content of a web page. " | |
| "Use this to read a specific URL in detail." | |
| ), | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "url": { | |
| "type": "string", | |
| "description": "The full URL to fetch (must start with http:// or https://)" | |
| } | |
| }, | |
| "required": ["url"] | |
| } | |
| } | |
| } | |
| ] | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| BASE_MODEL_ID, | |
| trust_remote_code=True | |
| ) | |
| model = OVModelForCausalLM.from_pretrained( | |
| MODEL_DIR, | |
| device=DEVICE | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Tool execution | |
| # --------------------------------------------------------------------------- | |
| def tool_web_search(query: str) -> str: | |
| """ | |
| Scrape DuckDuckGo HTML search results (no API key required). | |
| For richer/more reliable results, swap with SerpAPI or Brave Search API. | |
| """ | |
| log.info(f"[web_search] query={repr(query)}") | |
| try: | |
| resp = requests.get( | |
| "https://html.duckduckgo.com/html/", | |
| params={"q": query}, | |
| timeout=10, | |
| headers={"User-Agent": "Mozilla/5.0"} | |
| ) | |
| log.debug(f"[web_search] HTTP {resp.status_code} | content-length={len(resp.text)}") | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| results = [] | |
| for result in soup.select(".result__body")[:5]: | |
| title = result.select_one(".result__title") | |
| snippet = result.select_one(".result__snippet") | |
| url = result.select_one(".result__url") | |
| if title and snippet: | |
| results.append( | |
| f"Title: {title.get_text(strip=True)}\n" | |
| f"Snippet: {snippet.get_text(strip=True)}\n" | |
| f"URL: {url.get_text(strip=True) if url else 'N/A'}" | |
| ) | |
| log.info(f"[web_search] parsed {len(results)} results") | |
| if not results: | |
| log.warning("[web_search] 0 results — raw HTML snippet:\n" + resp.text[:500]) | |
| return "\n\n".join(results) if results else f"No results found for: {query}" | |
| except Exception as e: | |
| log.error(f"[web_search] exception: {e}", exc_info=True) | |
| return f"Search error: {e}" | |
| def tool_browse_url(url: str) -> str: | |
| """Fetch a URL and return stripped plain text, truncated.""" | |
| log.info(f"[browse_url] url={repr(url)}") | |
| try: | |
| resp = requests.get( | |
| url, | |
| timeout=15, | |
| headers={"User-Agent": "Mozilla/5.0"} | |
| ) | |
| log.debug(f"[browse_url] HTTP {resp.status_code} | content-length={len(resp.text)}") | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| for tag in soup(["script", "style", "noscript", "header", "footer", "nav"]): | |
| tag.decompose() | |
| text = soup.get_text(separator="\n", strip=True) | |
| # Collapse excessive blank lines | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| if len(text) > MAX_BROWSE_CHARS: | |
| text = text[:MAX_BROWSE_CHARS] + "\n\n[... truncated ...]" | |
| log.info(f"[browse_url] returning {len(text)} chars") | |
| return text or "Page returned no readable content." | |
| except Exception as e: | |
| log.error(f"[browse_url] exception: {e}", exc_info=True) | |
| return f"Browse error: {e}" | |
| def execute_tool(name: str, arguments: dict) -> str: | |
| if name == "web_search": | |
| return tool_web_search(arguments.get("query", "")) | |
| elif name == "browse_url": | |
| return tool_browse_url(arguments.get("url", "")) | |
| return f"Unknown tool: {name}" | |
| # --------------------------------------------------------------------------- | |
| # Tool call parsing | |
| # --------------------------------------------------------------------------- | |
| def parse_tool_call(text: str): | |
| """ | |
| Returns a dict with 'name' and 'arguments' if a tool call is found, | |
| otherwise returns None. | |
| Handles two formats Qwen3 may emit: | |
| 1. <tool_call>{"name": "...", "arguments": {...}}</tool_call> | |
| 2. ```json\n{"name": "...", "arguments": {...}}\n``` | |
| """ | |
| # Format 1: <tool_call> tags | |
| match = re.search(r"<tool_call>\s*(.*?)\s*</tool_call>", text, re.DOTALL) | |
| if match: | |
| try: | |
| return json.loads(match.group(1)) | |
| except json.JSONDecodeError: | |
| pass | |
| # Format 2: markdown JSON block | |
| match = re.search(r"```json\s*(\{.*?\})\s*```", text, re.DOTALL) | |
| if match: | |
| try: | |
| data = json.loads(match.group(1)) | |
| if "name" in data and "arguments" in data: | |
| return data | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Think block filtering | |
| # --------------------------------------------------------------------------- | |
| def remove_think_blocks(text: str) -> str: | |
| # Remove complete <think>...</think> blocks | |
| text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE) | |
| # Remove unclosed <think> blocks (from <think> to end of string) | |
| text = re.sub(r"<think>.*$", "", text, flags=re.DOTALL | re.IGNORECASE) | |
| return text.strip() | |
| def filter_streaming_text(text: str) -> str: | |
| """ | |
| Removes complete and incomplete <think> blocks during live streaming. | |
| Also buffers partial opening tags at the end of the string to avoid | |
| rendering them before we know if they are a think block. | |
| """ | |
| # Remove complete <think>...</think> blocks | |
| text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE) | |
| # Remove everything from an unclosed <think> to end of string | |
| text = re.sub(r"<think>.*$", "", text, flags=re.DOTALL | re.IGNORECASE) | |
| # Buffer a trailing partial <think> tag (e.g. "<thi", "<think" without ">") | |
| text = re.sub(r"<(?:t(?:h(?:i(?:n(?:k)?)?)?)?)?$", "", text, flags=re.IGNORECASE) | |
| return text.strip() | |
| # --------------------------------------------------------------------------- | |
| # Message builder | |
| # --------------------------------------------------------------------------- | |
| def build_messages(message, history): | |
| messages = [{"role": "system", "content": SYSTEM_PROMPT}] | |
| for item in history or []: | |
| if isinstance(item, dict): | |
| role = item.get("role") | |
| content = item.get("content") | |
| if role in ("user", "assistant") and content: | |
| messages.append({"role": role, "content": str(content)}) | |
| elif isinstance(item, (list, tuple)) and len(item) == 2: | |
| user_msg, assistant_msg = item | |
| if user_msg: | |
| messages.append({"role": "user", "content": str(user_msg)}) | |
| if assistant_msg: | |
| messages.append({"role": "assistant", "content": str(assistant_msg)}) | |
| messages.append({"role": "user", "content": str(message)}) | |
| return messages | |
| # --------------------------------------------------------------------------- | |
| # Main chat function (agentic tool loop) | |
| # --------------------------------------------------------------------------- | |
| def chat_fn(message, history): | |
| messages = build_messages(message, history) | |
| log.info(f"[chat_fn] user message={repr(message[:80])}") | |
| start = time.time() | |
| last_display = "" | |
| yielded_anything = False | |
| iteration = 0 | |
| for iteration in range(MAX_TOOL_ITERATIONS): | |
| log.info(f"[chat_fn] --- iteration {iteration + 1} ---") | |
| # --- Build prompt and stream this generation pass --- | |
| prompt = tokenizer.apply_chat_template( | |
| messages, | |
| tools=TOOLS, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| inputs = tokenizer(prompt, return_tensors="pt") | |
| streamer = TextIteratorStreamer( | |
| tokenizer, | |
| skip_prompt=True, | |
| skip_special_tokens=True | |
| ) | |
| generation_kwargs = dict( | |
| **inputs, | |
| max_new_tokens=512, | |
| do_sample=False, | |
| repetition_penalty=1.05, | |
| eos_token_id=tokenizer.eos_token_id, | |
| pad_token_id=tokenizer.eos_token_id, | |
| streamer=streamer, | |
| ) | |
| thread = Thread(target=model.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| partial_text = "" | |
| for chunk in streamer: | |
| partial_text += chunk | |
| # Don't stream tool call JSON to the UI — wait for full output | |
| if "<tool_call>" in partial_text: | |
| continue | |
| cleaned = filter_streaming_text(partial_text) | |
| if cleaned and cleaned != last_display: | |
| last_display = cleaned | |
| yielded_anything = True | |
| yield cleaned | |
| thread.join() | |
| log.debug(f"[chat_fn] raw model output:\n{partial_text}") | |
| # --- Check for tool call in full output --- | |
| tool_call = parse_tool_call(partial_text) | |
| log.info(f"[chat_fn] tool_call parsed={tool_call}") | |
| if not tool_call: | |
| # Plain response — finalize and exit loop | |
| final = remove_think_blocks(partial_text) | |
| if final and final != last_display: | |
| yielded_anything = True | |
| yield final | |
| break | |
| # --- Execute the tool --- | |
| tool_name = tool_call.get("name", "") | |
| tool_args = tool_call.get("arguments", {}) | |
| status = f"🔧 Using `{tool_name}`" | |
| if tool_name == "web_search": | |
| status += f": *{tool_args.get('query', '')}*" | |
| elif tool_name == "browse_url": | |
| status += f": *{tool_args.get('url', '')}*" | |
| yield status | |
| last_display = status | |
| yielded_anything = True | |
| tool_result = execute_tool(tool_name, tool_args) | |
| log.info(f"[chat_fn] tool_result preview: {repr(tool_result[:200])}") | |
| # Inject assistant tool call and tool result into message history | |
| messages.append({"role": "assistant", "content": partial_text}) | |
| messages.append({ | |
| "role": "tool", | |
| "name": tool_name, | |
| "content": tool_result | |
| }) | |
| # Loop back for next generation pass | |
| else: | |
| # Exceeded MAX_TOOL_ITERATIONS without a plain response | |
| if not yielded_anything: | |
| yield "Sorry, I was unable to complete the request within the tool call limit." | |
| yielded_anything = True | |
| # Guarantee at least one yield for Gradio's async wrapper | |
| if not yielded_anything: | |
| yield "..." | |
| end = time.time() | |
| print(f"Device={DEVICE} | Iterations={iteration + 1} | Time={end - start:.2f}s") | |
| # --------------------------------------------------------------------------- | |
| # Gradio UI | |
| # --------------------------------------------------------------------------- | |
| demo = gr.ChatInterface( | |
| fn=chat_fn, | |
| title="Qwen 3 4B Local Chat", | |
| description=f"Model: {MODEL_DIR} | Device: {DEVICE} | Tools: web_search, browse_url" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(inbrowser=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment