Skip to content

Instantly share code, notes, and snippets.

View zzstoatzz's full-sized avatar

nate nowack zzstoatzz

View GitHub Profile
@zzstoatzz
zzstoatzz / prefect_and_generics.py
Last active February 6, 2024 10:41
generics in prefect
from typing import Sequence, TypeVar, Generic
from pydantic import BaseModel, TypeAdapter
from prefect import flow, task
ResultType = TypeVar("ResultType")
class Result(BaseModel, Generic[ResultType]):
data: Sequence[ResultType]
# pip install git+https://github.com/PrefectHQ/marvin.git@2.0 prefect
import os
import marvin
from prefect import flow
@marvin.fn
@zzstoatzz
zzstoatzz / rando.py
Last active December 17, 2023 07:24
async def learn_from_child_interactions(app: AIApplication, event_name: str | None = None):
if event_name is None:
event_name = "marvin.assistants.SubAssistantRunCompleted"
logger.debug_kv("👂 Listening for", event_name, "green")
while not sum(map(ord, "vogon poetry")) == 42:
try:
async with PrefectCloudEventSubscriber(
filter=EventFilter(event=dict(name=[event_name]))
) as subscriber:
""" SETUP
pip install git+https://github.com/PrefectHQ/marvin.git@slackbot-2.0 fastapi cachetools
│ File: /Users/nate/.marvin/.env
───────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
│ MARVIN_OPENAI_API_KEY=sk-xxx
│ MARVIN_SLACK_API_TOKEN=xoxb-xxx
│ MARVIN_OPENAI_ORGANIZATION=org-xx
│ MARVIN_LOG_LEVEL=DEBUG
@zzstoatzz
zzstoatzz / dynamic_tasks.py
Created September 11, 2023 16:36
allow dynamically invoking callables as prefect tasks
from typing import Any, Callable, Dict, List, Optional, TypeVar
from pydantic import BaseModel, Field, PrivateAttr, ValidationError
from prefect import flow, Flow, task, Task
from importlib import import_module
T = TypeVar('T', bound=Callable[..., Any])
class CallableReference(BaseModel):
module: str
function: str
from marvin import ai_fn
from pydantic import BaseModel
class WorkoutPlan(BaseModel):
frequency: str
exercises: list[str]
@ai_fn
def workout_plan(height: str, weight: str, desired_outcome: str) -> WorkoutPlan:
In [8]: from marvin import ai_fn
...: from marvin.plugins.github import search_github_issues
...:
...: @ai_fn(plugins=[search_github_issues])
...: def find_prefect_github_issues(about: str) -> list[str]:
...: """find github issues in prefecthq/prefect related to `about`"""
...:
In [9]: find_prefect_github_issues(about="artifacts")
import gzip
from prefect import flow, task
from prefect_airbyte.configuration import export_configuration
@task
def zip_and_write_somewhere(
airbyte_config: bytearray,
somewhere: str,
):
from prefect import flow, task
from prefect_airbyte.connections import trigger_sync
@flow
def airbyte_orchestrator():
connection_id = 'cc9cdc03-a804-4b04-9f77-f775c306842c'
trigger_sync(
connection_id=connection_id
)
import gzip
from prefect import flow, task
from prefect_airbyte.configuration import export_configuration
@task
def zip_and_write_somewhere(
airbyte_config: bytearray,
somewhere: str,
):