Skip to content

Instantly share code, notes, and snippets.

@rerorero
Last active February 7, 2020 16:51
Show Gist options
  • Save rerorero/825d57ffc394f5dc55cc9fb2eb27b21b to your computer and use it in GitHub Desktop.
Save rerorero/825d57ffc394f5dc55cc9fb2eb27b21b to your computer and use it in GitHub Desktop.
envoy-doc

https://github.com/envoyproxy/envoy/blob/master/source/docs/stats.md

  • Counter, Gauge, Histogram
  • hot restartをサポートするためRPCで親から子へ(以前は共有メモリに置いてて色々面倒だった)
  • ThreadLocalで実装するためのThreadLocalStoreで実装

ThreadLocalStore

  • スレッドの初期化前にも使えるようになってる
  • Scopeはどのスレッドからでも作れるがmainからのみつくってる
  • Scopeはどんなスレッドからでも参照、削除できる
  • 各スレッドキャッシュを持ち、キャッシュがない場合はcentral cacheニトリに行く
  • Scopeが破棄されると、キャッシュフラッシュ操作がすべてのスレッドにポストされ、破棄されたScopeが所有するキャッシュデータをフラッシュします。
  • Scopeはキャッシュキーに一意のIDを使用し、古い削除されたScopeのキャッシュを参照しないようにする
  • counters(), gauges() が呼ばれる時、Scopeが重複する可能性があるのでde-dupする (?)

Histogram threading

  • main thread: ParentHistogram
  • per-thread collector: ThreadLocalHistogram
  • mainスレッドが全てのworkerにflushメッセージを通知, workerはactiveヒストグラムとbackupヒストグラムをswapする
  • 各TLSヒストグラムはbackとforthスワッpする
  • workerの処理が終わったらmainはmergeする
  • 各ThreadLocalstoreはstat mapを持つ
  • cacheのルックアップを安全にするため [] オペレーターではなく .findを使え

name

  • CLUSTER.upstream_cx_connect_attempts_exceeded みたいな名前でクラスターごとフラットなstringで識別すると、文字列の組合せ爆発が起きる
  • 代わりにシンボル(symbolized)array。これと文字列との変換には、共有マップ内でのパースや解析をmutexで排他する必要がある
  • ランタイムでのcpu負荷を減らすため、Symbolizationはスタートアップ時と、クラスター,ホストなどが変更された時にのみ行う
  • StatNamePool, StatNameSetはこれらへのアクセスを便利にする君
  • ランタイムで生成される名前もあるのでそれらはStatNameDynamicStorageにいれられる

Symbol Table Memory Layout

  • 図参照

API

  • systemのapiを抽象化したもの
  • thread, process, time, file syste, stats store
  • PlatformImpl() thread_factory と fiely_systemの実装
    • posixはThread::ThreadFactoryImplPosix Thread::ThreadFactoryImplPosix に実装されてる

Scheduler

  • Dispatcher, Scheduler, TypeSystem
  • LibEventScheduler: Schedulerをlibeventで実装したもの
    • libenevt - epollやkqueue, select等沢山あるイベント監視系のAPIをラップし、シンプルに使えるようにしたイベント監視ライブラリ。マルチプラットフォーム対応
      • libevent: event_base_loop()でブロックしてイベントループ実行 (LibEventScheduler::run())
      • event_add()でタイマー登録、event_active()で即イベント発火 (TimerImpl)
      • event_base が context的な扱いで、各スレッド(workerやmain)に一つずつ
  • DispatcherImpl
    • イベント登録
      • post(callback) -> キューに追加, runPostCallbacks()で実行される。スレッド間の通信に利用
      • createTimer(cb, time) -> libeventのevent_add/active
    • run()
      • runPostCallbacks() でキューを空に
      • LibEventScheduler::run() event_loop

tls = ThreaLocalImpl

Main: MainCommonBase::run() -> server::run()

  • RunHelper: signal listen
  • watch dog timer, guard_dogが複数のwatch dogを管理する
  • Dispather::run()
  • notifyCallbacksForStage()ステージごとにコールバック呼ぶ。
    • PostInit: init/ManagerImplからコールバックで呼ばれる。ManagerImplは登録されたfuncを実行してその間のstateを管理するだけ。PostInitの後startworkers()が呼ばれる
    • StartUp: run()内でDispatcher::run()の前に呼ばれる
  • startWorkers()
    • ListnerManagerImpl::start_workers()
      • ProdWorkerFactory, workerImpl: workerを作る
      • ProdListenerComponentFactory: protoからxDSのネットワークフィルター系を作る, createListenSocket()でTCP/UDPソケット作成
      • LibEventScheduler::run() event_loop

ListnereManager::start_workers()

  • addListenerToWorker()でworkerにListenerを登録
    • WorkerImpl::addListener -> ConnectionHandlerImpl::addListener -> ActiveTcpListner new -> tcpハンドラ登録
    • udpはquickとraw
  • worker->start()を呼ぶ
    • workerImpl::start -> createThread()

ActiveTcpListener

  • コンストラクタでdispatcher.createListener() -> ListenImpl::setupServerSocket -> evconnlistener_new でtcpのlistenハンドラ登録
  • IoSocketHandlerImplはreadv/writev/sendmsg/recvmsgをラップしたもの。byte arrayの代わりにBuffer::RawSliceを使う
  • 接続コールバックではActiveTcpListener::onAccept(AcceptedSocketImpl)へ->onAcceptWorker
    • ActiveTcpSocketをnewして(Serverの)createListenerChainしてcontinueFilterChain()
    • Network::ListenerFilter::onAccept()をイテレーションして呼ぶ
    • 全て成功したらActiveTcpSocket::newConnection() 他のリスナーにリダイレクトする場合(findActiveTcpListenerByAddressで見つかった)はnew_listener.onAccept()にコールバック。それ以外はActiveTcpListener::newConnection()へコールバック
    • ActiveTcpListener::newConnection()ではActiveTcpConnectionをactive_connectionsに追加する ここでTCPの初期化だいたいやる
      • ActiveTcpConnection = TransportSocket + TcpSocket
        • なんかこのコンストラクタでtcpの場合TCP_NODELAYオプションを設定してる。
      • TransportSocketを新規に作る
        • commonにRawBufferSocketFactory
        • extentionsにquicやsslというったものがある
      • ConnectionImplに新規のTransportSocketと、先ほど接続されたTcpソケットを渡す。
        • dispatcher.createFileEventでReadyのコールバック登録。(file_event_implでlibeventを使いread/writeのイベントハンドラ登録) -> ConnectionImpl::onWriteReady() -> transport_socket->doWrite() -> 各TransportSocket実装へ -> onReadReady() -> transport_socket->doRead() -> 各transport socket実装へ -> onRead() -> filter_manager->onRead
        • 同時にtransport_socketにもコールバックを登録。主にfdを渡すだけ(ioHandler()) RawBufferSocket(TransportSocket)::doRead(),doWrite() -> CoonectionImpl::IoHandler()でsocket_からfdを取得しread/write
        • fileter_managerは onRead() onWrite()を持つ
        • ConncectionImpl::write(), rawWrite(), -> バッファに書いて filter_manager->onWrite()
      • config->filterChainFactory->createNetworkFilterChaion() -> ConnectionImplはFilterManagerを実装している。各FilterFactoryがFilterManager::add(Read/Write)Filterしていく

FilterManagerImpl

  • ActiveReadFilter, ActiveWriteFilterというinner struct
    • ReadFilter -> upstream_filters
    • WriteFilter -> downstream_filters
  • add時に filter実装の setTransportSocketCallbacks をコールバックする
  • FilterStatusというものを持っている (Continue, Stopだけ) onWriteでstatusを返す
  • onRead
    • upstream_filtersをiterateして
    • 初期化されてなければ filter->onNewConnectionをコールバック
    • read bufferをfilter->onData()にコールバック
  • onWrite
    • downstream_filtersをiterateして
    • write bufferをfilter->onWrite()にコールバック

Buffer::RawSlice

  • 何が嬉しい?

ListenerManager

  • active_listenersでListnerImpl(ActiveTCPListenerとか)のListを管理
  • addOrUpdateListener()でadtive_listneresを登録
    • configuration_impl MainImpl::initialize()から呼ばれる
    • LDSApiImpl::onConfigUpdate()から呼ばれる
      • こちらでは ListnerManager::beginListnerUpdate() -> remove/addOrUpdateListner() -> endListenerUpdate() と操作する(内部のエラー状態をLDS側で取得するため)
  • addOrUpdateListener()
    • active_listeners, warm_listenersから現在のlistenerインスタンスを取得(existing_warm_listeners, existing_active_listner)
      • Configからとったhash値を持ってて、排他に使ってる
    • ListnerImplをnewする
    • 名前が同じでaddressが違うやつを弾いたりする
    • existing_warmにある場合: new listnereで上書きする
    • existing_listnerにある場合: warming listenerに追加する(workerがまだ開始してない場合はそのままactive_listner上書き)
    • 新規の場合: なんか削除して同じ名前で違うのが追加されたケースとか考慮されてて大変そう。drain_listneresというやつの中に同じアドレスが無いかチェック。あればそのlistnerのsocketを使う。なければアドレス名からSocketFactoryを生成してlitener->SetSockerFactory。こちらも同じくworkerがスタートしてたらwarmingに追加する
    • 最後にListnerImplのinitialize()
      • InitManager(ManagerImpl)のinitializeを実行する
  • removeListener()
    • TODO
  • ListnerImplのInitManagerとInitWatcher
    • parent(ListnerManager)のonListenerWarmedをWatcherHandlerとして登録
    • InitManagerは1つのready()と複数のtarget_handlersを管理できるようだ
  • onListenerWarmed()
    • addListenerToWorker()->worker.addListener()でワーカーのイベントハンドラを更新、あとはActiveTcpListener newとかへ
    • warmedからactive listneresへ移動

WorkerImpl

  • start()前にaddListener()->ConnectinoHandlerImpl::addListener()でイベントハンドラが設定される
  • start()でスレッドを作成。
  • threadRoutine()
    • Dispather->run() -> event_base_loop

LDSAPIImpl

  • Server InstanceImpl::initialize() 内でListenManager::createLdsAPI()で作成されListenerManagerにそのまま保存
    • ListenerComponentFactory::createLDSApi() -> LdsApiImpl
  • LdsApiImplコンストラクタでInitManagerのtargetハンドラにsubsription_->start()
  • subscriptionとはClusterManagerのイベントをサブスクライブするくんらしい。subscriptionFromConfigSource()の中でConfigに応じてGrpcSubscriptionImplとかのインスタンスを返す。
  • onConfigurUPdate: LDS resourceが更新された時のハンドラ listenermangerのaddOrUpdateListnere()へ

GrpcSubscriptionImpl (xDSのgRPC実装)

  • GrpcSubscriptionImpl::start() type_urlでどのxDSかを指定
  • GrpcMuxImpl::subscribe() api_stateにtype_urlごとにコールバック(watches_)を登録する
    • ApiState: type_urlごとにDiscoverRequestの状態を管理している
  • GrpcMuxImpl: GrpcStream<DiscoveryRequest, DiscoveryResponse>を持ってる
    • start() -> establishNewStream -> onEstablish() -> DiscoverRequestを送信する
    • onDisvoeryResponse()
      • messageの内容からリソースの差分を見つける
      • 初回(今の状態がempty())はcallback_.onConfigUpdate(resource)で全部渡す、存在する奴はMerge()してonConfigUpdate()リソースごとに
      • レスポンスの処理が終わったらrequest_.set_version_infoでmessageのバージョンをセーブする、さらに処理に失敗成功関わらず、nonceは保存する。
      • またdiscovery request飛ばす

ClusterManager

  • TODO

LDS登録からfilter_managerへの追加まで

  • MainImpl::initialize(): boostrap.static_resources().listneres()、あるいはLDSAPIから..
  • ListenerImplにそのままprotoのconfigが渡る。コンストラクタ内でListnerManager::factory_.createListenereFactoryList()が呼ばれる
    • これはServer::createListenerFilterFactoryList -> ProdListenerComponentFactory::createListenerFilterFactoryList_()
    • Config::Utility::getAndCheckFactoryConfiguration::NamedListenerFilterConfigFactory(proto_config);
      • staticなactoryRegisteryからとってくる。REGISTER_FACTORYマクロで登録
      • 例えばHTTPInspectorとか https://www.envoyproxy.io/docs/envoy/latest/configuration/listeners/listener_filters/http_inspector
      • createFilterFactoryFromProto内でListnerFilterManagerのaddAcceptFilter()を読んでいる
      • HTTPInspectorFilter::Filter().onAccept()が登録される
        • ここではsyscall.recvでPEEKしてバッファを少し読み、parseHTTPHeaderでヘッダの中身を解析、statsを更新。

ClusterManager

  • RunHelper -> ProdClusterManagerFactory::clusterManagerFromProt()で生成
    • ClusterManagerImplコンストラクタ内で
    • static_resourcesのcluster -> EDSだったら loadCluster()
      • clusterFromProto()->ClusterFactoryImplBase::create() -> ClusterFactoryRegisterにクラスタータイプごとに登録されている
      • 例: EDSClusterFactoryImpl -> new EDSClusterImpl
        • clusterManager->subscriptionFactory->subscriptionFromConfigSourceで生成したsubscription(LDS APIt同じgRPCのイベントハンドラ)を保持
        • EDSClusterImpl::startPreInit()でsubscription->start()する
        • onConfigUpdate()
    • dynamic_resources.CDS configがあったら -> init_helper_.setCds(cds api)

Envoy threading model

  • Main
  • Worker
    • Listening
    • Each worker has each connectin pool. If you have 4 threads, you would have at leaset 4 upstream connections.
  • File flusher
    • primiary accessl logs

When it locks? (from 'What non-blocking means')

  • To put logs into memory.
  • To merge the thread local stats to central store.
  • Message passing (from Main to the workers)
  • To log to stderr
  • etc

TLS: Thread Local Storage

  • WorkerからThreadへのmessage passingにつかう
  • RCU 的な考え
  • 格納するデータは各スレッドがロックなしで触れる用と、ReadOnlyのグローバルなポインタ(RCで管理)とがある
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment