Skip to content

Instantly share code, notes, and snippets.

View edoakes's full-sized avatar

Edward Oakes edoakes

  • Anyscale
  • Minneapolis, MN
View GitHub Profile
@edoakes
edoakes / message_consumer.py
Created June 9, 2023 21:43
Example of how to use a message passing architecture with Ray Serve.
import asyncio
from ray import serve
@serve.deployment
class MessageConsumer:
def __init__(self, topic: str):
asyncio.get_running_loop().create_task(
self.poll_for_messages(topic)
)
@edoakes
edoakes / monte_carlo_pi.py
Last active May 10, 2022 11:46
Monte Carlo Pi estimation
import argparse
import time
import random
import math
parser = argparse.ArgumentParser(description="Approximate digits of Pi using Monte Carlo simulation.")
parser.add_argument("--num-samples", type=int, default=1000000)
parser.add_argument("--parallel", default=False, action="store_true")
parser.add_argument("--distributed", default=False, action="store_true")
@pipeline.step
def preprocess(_input: str) -> PreprocessOutput:
pass
@pipeline.step(num_replicas=10, num_gpus=1)
class Model1:
def __call__(self, _input: PreprocessOutput) -> Model1Output:
pass
@pipeline.step(num_replicas=5, num_cpus=1)
@edoakes
edoakes / serve_plotly.py
Created September 21, 2021 20:22
Ray Serve plotly wrapper (working but hacky)
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import pandas as pd
import plotly.graph_objs as obj
import uvicorn as uvicorn
from fastapi import FastAPI
from starlette.middleware.wsgi import WSGIMiddleware
import ray
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import pandas as pd
import plotly.graph_objs as obj
import uvicorn as uvicorn
from fastapi import FastAPI
from starlette.middleware.wsgi import WSGIMiddleware
app = dash.Dash(__name__, requests_pathname_prefix="/dash/")
from transformers import pipeline
@serve.deployment(route_prefix="/sentiment", name="sentiment")
class SentimentDeployment:
def __init__(self):
self.classifier = pipeline("sentiment-analysis")
async def __call__(self, request):
data = await request.body()
[result] = self.classifier(str(data))
import ray
from ray import serve
# Connect to the running Ray Serve instance.
ray.init(address='auto', namespace="serve-example", ignore_reinit_error=True)
serve.start(detached=True)
# Deploy the model.
SentimentDeployment.deploy()
import joblib
import s3fs
import sklearn
@serve.deployment(route_prefix="/sentiment", name="sentiment-deployment")
class SentimentDeployment:
def __init__(self):
fs = s3fs.S3FileSystem(anon=True)
with fs.open('ray-serve-blog/unigram_vectorizer.joblib', 'rb') as f:
self.vectorizer = joblib.load(f)
import ray
from ray import serve
ray.init(address='auto', namespace="serve-example", ignore_reinit_error=True)
serve.start(detached=True)
SentimentDeployment.deploy()
import ray
from ray import serve
ray.init(address='auto', namespace="serve-example", ignore_reinit_error=True) # Connect to the local running Ray cluster.
serve.start(detached=True) # Start the Ray Serve processes within the Ray cluster.