name = "ai-worker"
version = "0.1.0"
description = "api server that posts capabilities, and accepts jobs"
authors = ["erik aronesty <>"]
license = "MIT"
readme = ""
packages = [{include = "ai_worker"}]
# very simple RPC server in python
import sys, json
from http.server import BaseHTTPRequestHandler, HTTPServer
import urllib.parse as urlparse
import threading
import logging
log = logging.getLogger(__name__)
class ApiError(Exception):
class TestUniqueQueue(unittest.TestCase):
def test_basic(self):
q = util.UniqueQueue()
for i in range(100):
res = set(q)
for i in range(100):
earonesty /
Last active January 31, 2023 10:50
NamedTemporaryFile drop in replacement that deletes on gc, not close(), and supports mode=None
import os, tempfile, gc
class TemporaryFile:
def __init__(self, name, io, delete): = name
self.__io = io
self.__delete = delete
def __getattr__(self, k):
class FutureWaitQueue:
"""Wait for futures without making an infinite list.
Any exceptions are ignored at put() time, except for timeout errors, which are raised.
The last exception raised is raised at wait() time
def __init__(self, maxsize, timeout, err_on_timeout=concurrent.futures.TimeoutError):
"""Construct a future wait queue.
#!/usr/bin/env python3
import sys
from tracemalloc import Snapshot
def main():
# todo: argparser
fil = sys.argv[1]
num = 10
if len(sys.argv) > 2:
num = int(sys.argv[2])
View testman.js
* Very simple test runner for nodejs:
* Supports:
* before, after, beforeAll, afterAll
* fixture object passed to each test, that before/after/beforeAll/afterAll can modify
* -[t]est option on command line to pick tests to run
* -[l]inear option on command to disable parallel
* built in fixture logger, captures log lines, adds line numbers/file names/timestamps
View loggy.h
#pragma once
#include <codecvt>
#include <condition_variable>
#include <fstream>
#include <iostream>
#include <locale>
#include <memory>
#include <mutex>
#include <ostream>
import os
import sys
import json
import math
from runstats import Statistics, Regression
__all__ = ["randtest", "randtestall", "buildstats"]
def primes(givenNumber):
# Initialize a list