Skip to content

Instantly share code, notes, and snippets.

@appcypher
Created December 2, 2021 10:18
Show Gist options
  • Save appcypher/dd806c20fe4872dae536539905cc8ccd to your computer and use it in GitHub Desktop.
Save appcypher/dd806c20fe4872dae536539905cc8ccd to your computer and use it in GitHub Desktop.
Solving HTTP Request / Response Streaming NATS

REQUEST RESPONSE STREAMING VIA NATS

TODO

  • Check error for no subscriber connected
  • code /Users/appcypher/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.15/src/body/to_bytes.rs

Subjects

  1. worspaces - v1.run_surl.workspaces.9ccec027-68a3-47a2-bd3d-85a9c6faebfb
  2. directives - ovsJHJmrsxbs6GaQw1JtRe
  3. request_body_channel - ovsJHJmrsxbs6GaQw1Jtof
  4. response_body_channel - ovsJHJmrsxbs6GaQw1Jtof

Engine Router

  • Must have a max body stream.
  • publish {connection_info, request_parts} to [workspaces] & no connection error handling
  • subscribe to [directives]
  • subscribe to [response_body_channel]
  • [directives.next]
    • if header:"need_request_body"

      • loop
        • if max data length reached

          • publish header:"stop_request_body" [directives]
          • break
        • select

          • if header:"stop_request_body" [directives.next]

            • break
          • publish data [request_body_channel]

          • set timeout on waiting for next data

            • publish header:"stop_request_body" [directives]
            • break
    • if header:"sending_response_body"

      • loop
        • if max data length reached

          • publish header:"stop_response_body" [directives.next]
          • unsubscribe from [response_body_channel]
          • break
        • select

          • if header:"stop_response_body" [directives.next]

            • unsubscribe from [response_body_channel]
            • break
          • [response_body_channel.next]

            • save data
          • set timeout on waiting for next data

            • publish header:"stop_response_body" [directives.next]
            • unsubscribe from [response_body_channel]
            • break

Engine Backend

  • subscribe to [workspaces]
  • subscribe to [directives]
  • inside handler
    • subscribe to [request_body_channel]

    • publish header:"need_request_body" [directives.next]

      • loop
        • if max data length reached

          • publish header:"request_body_channel" [directives.next]
          • unsubscribe from [request_body_channel]
          • break
        • select

          • if header:"request_body_channel" [directives.next]

            • unsubscribe from [request_body_channel]
            • break
          • [request_body_channel.next]

            • save data
          • set timeout on waiting for next data

            • publish header:"request_body_channel" [directives.next]
            • unsubscribe from [request_body_channel]
            • break
    • publish header:"sending_response_body" [directives.next]

      • loop
        • if max data length reached

          • publish header:"stop_response_body" [directives]
          • break
        • select

          • if header:"stop_response_body" in [directives.next]

            • break
          • publish data [response_body_channel]

          • set timeout on waiting for next data

            • publish header:"stop_response_body" [directives]
            • break
    • publish header:"need_request_body" via [from_subscriber] & no connection error handling

    • subscribe to [request_body_channel] // REUSABLE? for Response

    • loop

      • subscription.next
        • if header:"stop_body"
          • unsubscribe from [to_subscriber]
        • if max data length reached
          • publish header:"stop_body" [from_subscriber]
          • unsubscribe from [to_subscriber]
        • save data
      • set timeout
        • publish header:"stop_body" [from_subscriber]
        • unsubscribe from [to_subscriber]
    • publish header:"stop_body" [from_subscriber]

    • unsubscribe from [to_subscriber]

    • publish header:"sending_response_body"

    • loop

      • if max data length reached
        • publish header:"stop_response_body" [from_subscriber]
      • set timeout on waiting for next data
        • publish header:"stop_response_body" [to_subscriber]
      • if header:"stop_response_body"
        • break
      • publish data [from_subscriber]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment