Skip to content

Instantly share code, notes, and snippets.

@last-ent
Created August 28, 2019 07:56
Show Gist options
  • Save last-ent/f5cfd2680312b39f120cccdaa1ac8fb8 to your computer and use it in GitHub Desktop.
Save last-ent/f5cfd2680312b39f120cccdaa1ac8fb8 to your computer and use it in GitHub Desktop.

Navika Behaviour

Consuming Nakadi Subscription

  • Url: GET /subscriptions/{subscription_id}/events

  • Query Parameters:

    • batch_limit:
      • Maximum number of Events in each chunk (and therefore per partition) of the stream.

      • Default: 1
    • max_uncommitted_events:
      • The maximum number of uncommitted events that Nakadi will stream before pausing the stream. When in paused state and commit comes - the stream will resume.

      • Default: 10
  • Header: X-Flow-Id

  • Response:

    • 200 OK.

      • Header: X-Nakadi-StreamId - The id of this stream generated by Nakadi. Must be used for committing events that were read by client from this stream.
      • Payload:
      {
          "cursor": {
              "partition": "5",
              "offset": "543",
              "event_type": "order.ORDER_RECEIVED",
              "cursor_token": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
          },
          "events": [{
              "metadata": {
                  "occurred_at": "1996-10-15T16:39:57+07:00",
                  "eid": "1f5a76d8-db49-4144-ace7-e683e8ff4ba4",
                  "event_type": "aruha-test-hila",
                  "partition": "5",
                  "received_at": "2016-09-30T09:19:00.525Z",
                  "flow_id": "blahbloh"
              },
              "data_op": "C",
              "data": {
                  "order_number": "abc",
                  "id": "111"
              },
              "data_type": "blah"
          }],
          "info": {
            "debug": "Stream started"
          }
      }
    • 400 Bad Request

      • Payload:
      {
        "type": "http://httpstatus.es/503", // uri
        "title": "Service Unavailable",
        "status": 503,
        "detail": "Connection to database timed out",
        "instance": "<link>" // uri
      }
    • 403 Access forbidden

      • Payload:

      Same as 400 Bad Request

    • 404 Subscription Not Found

      • Payload:

      Same as 400 Bad Request

    • 409 Conflict

      • There are several possible reasons for receiving this status code:
      • There are no empty slots for this subscriptions. The amount of consumers for this subscription already equals the maximum value - the total number of partitions in this subscription.
      • Request to reset subscription cursors is still in progress.

Committing Nakadi Stream Cursor

  • Url: POST /subscriptions/{subscription_id}/cursors
  • Header:
    • X-Nakadi-StreamId:
      • Id of stream which client uses to read events. It is not possible to make a commit for a terminated or none-existing stream.

  • Request Payload:
{
    "items": [{
            "partition": "0",
            "offset": "543",
            "event_type": "order.ORDER_RECEIVED",
            "cursor_token": "b75c3102-98a4-4385-a5fd-b96f1d7872f2"
        },
        {
            "partition": "1",
            "offset": "923",
            "event_type": "order.ORDER_RECEIVED",
            "cursor_token": "a28568a9-1ca0-4d9f-b519-dd6dd4b7a610"
        }
    ]
}
  • Response:
    • 204 Offsets were committed

    • 200 Ok.

      • List of items which describe commit result for each cursor At least one cursor which was tried to be committed is older or equal to already committed one. Array of commit results is returned for this status code.

      • Payload: List[SubscriptionCursor] https://nakadi.io/manual.html#definition_CursorCommitResult
    • 403 Access forbidden

      • Payload:

      Same as 400 Bad Request

    • 404 Subscription Not Found

      • Payload:

      Same as 400 Bad Request

    • 422 Unprocesable Entity

      • Payload:

      Same as 400 Bad Request

Behaviour Matrix

Logic Type 200 204 400 403 404 409 422
Read Subscription N x F F F R (1) x
Commit Cursor L N x F F x F
N: Normal Execution
R: Retry on Error
F: Fatal Error
L: Ignore or log on error
x: Not Applicable
  1. Q: What is the max retry limit?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment