Skip to content

Instantly share code, notes, and snippets.

View zzstoatzz's full-sized avatar

nate nowack zzstoatzz

View GitHub Profile
import functools
import os
import threading
import time
import uuid
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from prefect import flow
import asyncio
import functools
import asyncpg as apg
from prefect import flow, task
async def create_db_pool() -> apg.Pool:
pool = await apg.create_pool(
from functools import lru_cache
import marvin # pip install marvin
import praw # pip install praw
from marvin.utilities.logging import get_logger
import raggy # pip install raggy
from raggy.documents import Document, document_to_excerpts
from raggy.vectorstores.tpuf import TurboPuffer, query_namespace
package main
import (
"fmt"
"math/rand"
"sync"
"gonum.org/v1/gonum/stat"
)
@zzstoatzz
zzstoatzz / last_n_lines.py
Last active March 2, 2024 17:00
find the last n lines of a text file assumed to end with a newline character
from pathlib import Path
def reverse_readline(filepath, N):
with filepath.open('rb') as f:
f.seek(0, 2)
filesize = f.tell()
lines_found = []
buffer = bytearray()
chunk_size = 8192 # Read in chunks of 8KB
position = filesize
import inspect
from contextlib import contextmanager
import marvin
from devtools import debug # pip install devtools
from marvin.ai.text import ChatRequest, EjectRequest
from marvin.client.openai import AsyncMarvinClient
from marvin.utilities.asyncio import run_sync
from marvin.utilities.context import ctx
from marvin.utilities.strings import count_tokens
In [1]: from pydantic import BaseModel
In [2]: import marvin
In [3]: class Book(BaseModel):
...: title: str
...: author: str
...:
In [4]: @marvin.fn
from enum import Enum
import marvin
class Fruit(str, Enum):
apple = "apple"
cherry = "cherry"
orange = "orange"
banana = "banana"
import marvin
@marvin.fn
def rank_answers(question: str, answers: list[str], context: str) -> list[str]:
"""Rank answers to a question based on context"""
if __name__ == "__main__":
ranked_answers = rank_answers(
question="What is the best programming language?",
@zzstoatzz
zzstoatzz / prefect_and_generics.py
Last active February 6, 2024 10:41
generics in prefect
from typing import Sequence, TypeVar, Generic
from pydantic import BaseModel, TypeAdapter
from prefect import flow, task
ResultType = TypeVar("ResultType")
class Result(BaseModel, Generic[ResultType]):
data: Sequence[ResultType]