Created
May 24, 2025 17:50
-
-
Save OxideDall/b54ec1a7e9c0e92a83144ba281a0aef5 to your computer and use it in GitHub Desktop.
Paramless RPC client proxy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import http.server | |
| import socketserver | |
| import json | |
| import urllib.request | |
| import os | |
| from datetime import datetime | |
| def load_env(): | |
| if os.path.exists('.env'): | |
| with open('.env', 'r') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and not line.startswith('#') and '=' in line: | |
| key, value = line.split('=', 1) | |
| os.environ[key.strip()] = value.strip() | |
| load_env() | |
| RPC_URL = os.getenv('RPC_URL') | |
| class ThreadedHTTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): | |
| allow_reuse_address = True | |
| class AsyncRPCProxyHandler(http.server.BaseHTTPRequestHandler): | |
| def do_POST(self): | |
| content_length = int(self.headers.get('Content-Length', 0)) | |
| if content_length == 0: | |
| self.send_error(400, "Empty request body") | |
| return | |
| body = self.rfile.read(content_length) | |
| try: | |
| request_data = json.loads(body.decode('utf-8')) | |
| except (json.JSONDecodeError, UnicodeDecodeError): | |
| self.send_error(400, "Invalid JSON") | |
| return | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| print(f"[{timestamp}]: {request_data.get('method', 'unknown')} from {self.client_address[0]}") | |
| if 'params' not in request_data: | |
| request_data['params'] = [] | |
| try: | |
| if not RPC_URL: | |
| self.send_error(500, "RPC_URL not configured") | |
| return | |
| req = urllib.request.Request( | |
| RPC_URL, | |
| data=json.dumps(request_data).encode('utf-8'), | |
| headers={'Content-Type': 'application/json'} | |
| ) | |
| with urllib.request.urlopen(req) as response: | |
| response_data = response.read() | |
| self.send_response(200) | |
| self.send_header('Content-Type', 'application/json') | |
| self.send_header('Access-Control-Allow-Origin', '*') | |
| self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS') | |
| self.send_header('Access-Control-Allow-Headers', 'Content-Type') | |
| self.end_headers() | |
| self.wfile.write(response_data) | |
| except Exception as e: | |
| print(f"Error proxying request: {e}") | |
| self.send_error(502, "Bad Gateway") | |
| def do_OPTIONS(self): | |
| self.send_response(200) | |
| self.send_header('Access-Control-Allow-Origin', '*') | |
| self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS') | |
| self.send_header('Access-Control-Allow-Headers', 'Content-Type') | |
| self.end_headers() | |
| def main(): | |
| if not RPC_URL: | |
| print("Error: RPC_URL not found in .env file") | |
| return | |
| PORT = 1234 | |
| HOST = "localhost" | |
| print(f"Threaded RPC Proxy started on {HOST}:{PORT}") | |
| print(f"Proxying to: {RPC_URL}") | |
| try: | |
| with ThreadedHTTPServer((HOST, PORT), AsyncRPCProxyHandler) as httpd: | |
| httpd.serve_forever() | |
| except KeyboardInterrupt: | |
| print("\nServer stopped") | |
| except Exception as e: | |
| print(f"Server error: {e}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment