Skip to content

Instantly share code, notes, and snippets.

@naporin0624
Last active February 24, 2024 03:14
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save naporin0624/41ed65a82bb21d5cf330604b6aa156b4 to your computer and use it in GitHub Desktop.
Save naporin0624/41ed65a82bb21d5cf330604b6aa156b4 to your computer and use it in GitHub Desktop.
cloudfire D1 と DurableObjects で firestore の夢を見る

cloudflare d1 と DurableObject で firestore の夢を見る

完成形

https://github.com/napolab/cloudflare-workers-playground

firestore のメリット・デメリット

メリデメはこんな感じ。firestore を多用する理由はデメリットが大きい反面、圧倒的なスピードを得られるから。(refetch を websocket にすべて任せられるのは強いがよい

メリット

  • firestore の realtime 性はとても強力
  • 大体のケースにおいて web api server を書く必要がない
    • 逆に書くと遅くなるので client side sdk で完結させるように書くことが多い

デメリット

  • NoSQL なので変更に弱い
  • rules がマジできつい
    • client side sdk で DB を変更するのでセキュリティを rules という独自言語で担保するのだが、これが型がないうえに複雑な状態が持てないので roles による可視性制御がめちゃくちゃ大変(できることにはできる)
  • web api server を書こうと思ったときに firebase cloudfunction という手段をとることになるが、めちゃくちゃデプロイが遅くて気がくるってしまう

cloudflare workers で firestore 相当のものを作る

  • ほしい機能
    • 永続化層がある
    • DB にスキーマがある
      • RDBMS が使える
    • web api server が書ける
    • websocket による変更のリアルタイム通知ができる
    • デプロイが高速である

それぞれ workers にある機能に割り当てる

  • 永続化層 --> D1
  • DB にスキーマがある --> drizzle によるスキーマファースト開発
  • web api server が書ける --> cloudflare workers そのもの
  • websocket による変更のリアルタイム通知 --> DurableObjects + WebSocketPiar, Response で達成可能
  • デプロイが高速である --> cloudflare workers は v8 エンジンを積んでるので js を直でデプロイできることから高速である(コンテナ技術を使用していない)

いけそう

cloudfalre workers ってサーバーレスだから global に状態持てなくない?

websocket で client side とコネクションを張りっぱなしにしておくためには WebSocketPair を接続が破棄される or 接続を破棄するまでは保持しておかないといけない。 しかし、普通にサーバーレスでは in-memory で状態を持っていても次々に別のプロセスでサーバーが起動するので同じ in-memory を参照できないので connection を保持することができない。

そこで登場するのが DurableObjects という機能。

workers が stateless な function だとすると DurableObjects はa singleton の class instance とみることができる。(実際に class で記述するし....

https://blog.cloudflare.com/ja-jp/durable-objects-ga-ja-jp/

アプリケーションがそのクラスの名前付きインスタンス(Cloudflareネットワーク全体を通して必ずユニークなものになります)を作成できます。そのインスタンスが1つのDurable Objectであり、Workers(と他のDurable Objects)はもとのDurable Objectに対し、ID経由でメッセージを送ることができます。Durable Objectは送られてきたメッセージを順番にシングルスレッドで処理し、メッセージ間の調整を行います。

DurableObjects の強みは強整合であること、トランザクション系に強そうだなと思っている。DB を使わずに状態を表現できるので counter を簡単に作ることができる。 singleton とみることができるといったがブラウザの window みたいなところに生えるわけではなく、fetch を返して network 越しに singleton instance の method をたたくという I/F になっている。

https://zenn.dev/mizchi/articles/5130b02c5b490e4f871a

つまり Workers KV は結果整合、 Durable Objects は強整合とのこと。地理的に離れてる場合はどうなるんだろう?と思ったんですが、次のような記述があります。

適当な Counter を作るならこう(下にコードを用意した)。動かしたかったらこちらで

firestore 相当のものを作る

  1. Hono と drizzle で CRUD を作る
  • drizzle-orm + d1 で table と client を作る
  • midlleware で認証入れたり cache する
  1. subscription api を作って websocket で変更を通知する
  2. 変更通知対象は DurableObjects で保持する

1. Hono で CRUD を作る

import { zValidator } from "@hono/zod-validator";
import { drizzle } from "drizzle-orm/d1";
import { Hono } from "hono";
import { cors } from "hono/cors";

type Bindings = {
  readonly DB: D1Database;
};
type Environment = {
  readonly Bindings: Bindings;
};

const app = new Hono<Environment>();
app.use("/api/*", cors());

app.get("/api/posts", async (c) => {
  const db = drizzle(c.env.DB);
  const data = await db.select().from(posts).all();

  return c.json(data);
});

app.post(
  "/api/post",
  zValidator("json", z.object({ title: z.string(), body: z.string() })),
  async (c) => {
    const res = c.req.valid("json");
    const db = drizzle(c.env.DB);

    await db.insert(posts).values({
      title: res.title,
      body: res.body,
      createdAt: new Date(),
    }).run();
    const result = await db.select().from(posts).all();

    return c.json(result);
  }
);

export default app

2. subscription api を作って websocket で変更を通知する

type Bindings = {
  readonly DB: D1Database;
+ readonly SHARED_EVENT: DurableObjectNamespace;
};
const sharedEvent = (c: Context<Environment>) => (type: string) => {
  const doId = c.env.SHARED_EVENT.idFromName(type);

  return c.env.SHARED_EVENT.get(doId);
};

app.get("/subscribe/posts", async (c) => {
  const obj = sharedEvent(c)("posts");
  const response = await obj.fetch(new URL("/events", c.req.url), {
    headers: c.req.headers,
  });

  return response;
});

3. 変更通知対象は DurableObjects で保持する

WebScoketPair を保持する DurableObjects を作る

import { Hono } from "hono";

import { wsupgrade } from "../middleware";

export class SharedEvent implements DurableObject {
  private readonly app = new Hono();
  private readonly sessions = new Set<WebSocket>();

  constructor(private readonly state: DurableObjectState) {
    this.app.get("/events", wsupgrade(), async (c) => {
      const pair = new WebSocketPair();
      this.handleSession(pair[1]);

      return new Response(null, { status: 101, webSocket: pair[0] });
    });
    this.app.post("/event", async (c) => {
      const data = await c.req.json();
      const json = JSON.stringify(data);
      for (const socket of this.sessions) {
        socket.send(json);
      }
    });
  }

  private handleSession(socket: WebSocket): void {
    socket.accept();
    this.sessions.add(socket);

    socket.addEventListener("close", () => {
      this.sessions.delete(socket);
      socket.close();
    });
  }

  fetch(request: Request) {
    return this.app.fetch(request);
  }
}

あとは workers の endpoint に post されたときに SHARED_EVENT に対して /event を post することで /subscribe/xxx から client side に websocket で通知が送られる。

Image from Gyazo

余談

  • websocket の connection を何個同時接続できるのか実際にリクエストしてみた結果。7000個くらいが限界っぽい。詳しくは調べてはない。
/* eslint-disable import/no-extraneous-dependencies */
/* eslint-disable no-console */
import { client as WebSocket } from "websocket";

import type { connection as Connection } from "websocket";

const main = async () => {
  const waitlist = new Set<string>();
  const connections = new Set<Connection>()
  process.on("SIGINT", () => {
    for(const connection of connections) {
      connection.close()
      console.log(connection.state)
    }

    process.exit(0)
  })
  const receiveCountMap = new Map<string, number>();

  {
    const id = setInterval(() => {
      console.log("waitlist", waitlist.size);

      if (waitlist.size === 0) {
        clearInterval(id);
      }
    }, 5000);
  }
  {
    const id = setInterval(() => {
      if(receiveCountMap.size === 0) return;

      console.log("size", receiveCountMap.size);
    }, 5000);
  }

  new Array(10000).fill(null).forEach(async (_, idx) => {
    const key = `client[${idx}]`;
    waitlist.add(key);
    const client = new WebSocket();
    client.connect("wss://<worker-url>/subscribe/posts");

    // client が connected になるまで待機
    const connection = await new Promise<Connection>((resolve, reject) => {
      client.on("connect", (con) => resolve(con));
      // client.on("connectFailed", (error) => reject(error));
    });
    connections.add(connection);
    waitlist.delete(key);

    connection.on("message", (message) => {
      if (message.type === "utf8") {
        const count = receiveCountMap.get(key) ?? 0;
        receiveCountMap.set(key, count + 1);
      }
    });

    connection.on("error", (error) => {
      console.error(`error[${key}]`, error);
    });
    connection.on("close", (close) => {
      console.error(`close[${key}]`, close);
    });
  });
};

void main()
実装によるかもだけど同時接続 7000 くらいで durable object の fetch が止まってる? 
waitlist 9540
waitlist 6741
waitlist 3667
waitlist 2517
waitlist 2487
waitlist 2487
waitlist 2437
waitlist 2435
waitlist 2434
waitlist 2434
waitlist 2434
waitlist 2434
waitlist 2426
waitlist 2425
waitlist 2425
waitlist 2425
10000 件 websocket のコネクション飛ばして残り 2425 個以降進まない
size 7533 帰ってきた message の数は 7533 個だから結構落ちるなぁ
connected すらも行かないのかー
import { Hono } from 'hono'
export { Counter } from './counter'
type Bindings = {
COUNTER: DurableObjectNamespace
}
const app = new Hono<{ Bindings: Bindings }>()
app.get('*', async (c) => {
const id = c.env.COUNTER.idFromName('A')
const obj = c.env.COUNTER.get(id)
const resp = await obj.fetch(c.req.url)
if (resp.status === 404) {
return c.text('404 Not Found', 404)
}
const count = parseInt(await resp.text())
return c.text(`Count is ${count}`)
})
export default app
import { Hono } from 'hono'
export class Counter {
value: number = 0
state: DurableObjectState
app: Hono = new Hono()
constructor(state: DurableObjectState) {
this.state = state
this.state.blockConcurrencyWhile(async () => {
const stored = await this.state.storage?.get<number>('value')
this.value = stored || 0
})
this.app.get('/increment', async (c) => {
const currentValue = ++this.value
await this.state.storage?.put('value', this.value)
return c.text(currentValue.toString())
})
this.app.get('/decrement', async (c) => {
const currentValue = --this.value
await this.state.storage?.put('value', this.value)
return c.text(currentValue.toString())
})
this.app.get('/', async (c) => {
return c.text(this.value.toString())
})
}
async fetch(request: Request) {
return this.app.fetch(request)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment