Skip to content

Instantly share code, notes, and snippets.

View vndee's full-sized avatar
🎯
Focusing

Duy Huynh vndee

🎯
Focusing
View GitHub Profile
from typing import Optional, List
from llm_sandbox import SandboxSession
from llama_index.llms.openai import OpenAI
from llama_index.core.tools import FunctionTool
from llama_index.core.agent import FunctionCallingAgentWorker
import nest_asyncio
from typing import Optional, List
from llm_sandbox import SandboxSession
from langchain import hub
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain.agents import AgentExecutor, create_tool_calling_agent
@tool
def run_code(lang: str, code: str, libraries: Optional[List] = None) -> str:
from llm_sandbox import SandboxSession
with SandboxSession(lang="python", keep_template=True) as session:
# Copy a file from the host to the sandbox
session.copy_to_runtime("test.py", "/sandbox/test.py")
# Run the copied Python code in the sandbox
result = session.run("python /sandbox/test.py")
print(result)
from llm_sandbox import SandboxSession
# Create a new sandbox session
with SandboxSession(image="python:3.9.19-bullseye", keep_template=True, lang="python") as session:
result = session.run("print('Hello, World!')")
print(result)
from multiprocessing import Process
def cpu_bound_task():
count = 0
for _ in range(10**7):
count += 1
print("CPU-bound task complete")
processes = []
for _ in range(5):
import time
# Simulate an I/O-bound task
def read_file():
time.sleep(2) # Simulating waiting for I/O operation (e.g., reading a file)
return "File content"
# Calling the I/O-bound task
start_time = time.time()
content = read_file()
import time
# Simulate a CPU-bound task
def compute_factorial(n):
if n == 0:
return 1
else:
return n * compute_factorial(n-1)
# Calling the CPU-bound task
import threading
import time
def io_bound_task():
time.sleep(2)
print("I/O operation complete")
threads = []
for _ in range(5):
thread = threading.Thread(target=io_bound_task)
import random
import time
class ChaosMonkey:
def __init__(self, services):
self.services = services
def terminate_random_service(self):
service = random.choice(self.services)
print(f"Terminating {service}")
import uuid
class PaymentProcessor:
def __init__(self):
self.processed_requests = set()
def process_payment(self, payment_id, amount):
if payment_id in self.processed_requests:
return {"status": "already_processed"}
else: