Skip to content

Instantly share code, notes, and snippets.

@feymartynov
Last active December 20, 2019 13:52
Show Gist options
  • Save feymartynov/5f38c76ad9ab8625856313c2644d0166 to your computer and use it in GitHub Desktop.
Save feymartynov/5f38c76ad9ab8625856313c2644d0166 to your computer and use it in GitHub Desktop.
Как пользоваться MQTT Gateway

Подключение

Хост/порт

Для сервисов:

  • Testing: tcp://mqtt.testing.svc.netology-group.services:51883
  • Staging: tcp://mqtt.staging.svc.netology-group.services:51883
  • Production: tcp://mqtt.svc.netology-group.services:51883

Для браузеров подключение через WebSocket:

  • Testing: wss://mqtt.testing.svc.netology-group.services/mqtt
  • Staging: wss://mqtt.staging.svc.netology-group.services/mqtt
  • Production: wss://mqtt.svc.netology-group.services/mqtt

Версия протокола

Предпочтительно использование MQTT 5. Дальше по тексту предполагается, что используется эта версия.

Если всё-таки нужно использовать MQTT 3, отличия описаны в конце документа.

Юзернейм и пароль

Username не указывается.

В качестве пароля для аккаунтов тенанта указывается токен тенанта.

Для сервисов в качестве пароля указывается сервисный jws-токен. Этот токен нужно генерить для конкретного agent ID, с которым подключаемся (см. Client ID ниже). Для этого используется либа, которая реализует JOSE. В subject указывается agent ID. В issuer указывается audience этого agent ID, он же audience платформы: (testing.|staging.).svc.netology-group.services. Также нужно указать ключ подписи. Файл с ключом подкладывается при деплое. Алгоритм подписи – ES256.

Client ID

Каждый агент при подключении к брокеру указывает в опции client_id свой agent ID.

Agent ID выглядит так:

web.user123.testing.beta.netology.ru

где

  • webagent label. Он произвольный, но должен быть уникален, чтобы не было одновременно двух агентов с одинаковыми agent id, иначе брокер отключит первого. Для фронта это может быть идентификатор устройства пользователя, для сервиса – идентификатор инстанса.
  • user123.testing.svc.beta.netology.ru – account id. Состоит из:
    • user123 – account label идентификатор сервиса или пользователя в тенанте.
    • testing.beta.netology.ruaudience, т.е. кому принадлежит аккаунт. В данном случае – это юзера тестинга нетологии. Для прода по аналогии с k8s поддомен testing/staging не указывается. Сервисы принадлежат платформе, поэтому для них указывается audience платформы: svc.netology-group.services, staging.svc.netology-group.services, testing.svc.netology-group.services.

User properties

Нужно обязательно указать следующие свойства:

  • connection_versionv2 (это версия API MQTT-Gateway / соглашений по топикам, свойствам и прочему).
  • connection_modedefault для фронта, service для сервисов.

Публикация и получение сообщений

Отправка запроса в сервис с фронта (multicast)

Фронт отправляет запрос, его получает любой инстанс сервиса.

Топик для публикации: agents/:client_agent_id/api/:service_api_version/out/:service_account_id.

  • client_agent_id – agent ID клиента, который использовался при подключении.
  • service_api_version – Версия API сервиса.
  • :service_account_id – account ID сервиса. Именно account ID, а не agent id, потому что запрос может быть обработан любым инстансом сервиса и клиенту неважно каким.

Пример: agents/web.user123.testing.beta.netology.ru/api/v1/out/conference.testing.svc.netology-group.services.

Вообще все фронтовые клиенты подключены в режиме default, поэтому имеют право паблишить сообщения только в топики, начинающиеся с agents/:client_agent_id/api/:version/out/.

В payload сообщения указывается то, что требует сервис в интересующей нас тычке.

Также нужно указать обязательные свойства:

  • typerequest.
  • method – название метода сервиса в формате :ресурс.:действие. Например, room.create. По этому свойству сервис определяет, какую тычку дёрнули.
  • response_topicagents/:client_agent_id/api/:service_api_version/in/:service_account_id (in, не out).
  • correlation_data – любая строка. С ней же вернётся ответ. Таким образом можно понять от какого запроса этот ответ.
  • local_timestamp – время по UNIX в мс.

response_topic и correlation_data – стандартные свойства MQTT 5. Остальные задаются в user properties.

Чтобы получить это сообщение сервис должен предварительно подписаться на топик: $share/:service_name.loadbalancer/agents/+/api/:service_api_version/out/:service_account_id.

  • $share/:service_name.loadbalancer – указание брокеру на использование механизма shared subscription. По дефолту MQTT бродкастит сообщения на всех подписичков, но нам нужно, чтобы запрос балансировался на какой-нибудь один инстанс. Это то, что делает shared subscription. :service_name.loadbalancer – имя группы. :service_name подставляется, чтобы не путалась балансировка от разных сервисов.
  • + – это вайлдкард в MQTT. Т.е. сервис подписывается на запросы от всех агентов.

Отправка ответа фронту из сервиса (unicast)

Сервис обработал запрос и теперь отправляет ответ тому клиенту, который дёрнул запрос.

Топик для публикации указан клиентом в свойстве response_topic запроса. Подписавшись на этот же топик перед отправкой запроса клиент получит ответ.

В payload сообщения указывается специфичный для тычки ответ.

Свойства:

  • typeresponse.
  • status – HTTP статус ответа.
  • correlation_data – скопировать из запроса.
  • timestamp – время по UNIX в мс.
  • local_initial_timediff – скопировать из запроса, если есть.
  • initial_timestamp – скопировать из запроса, если есть.
  • broker_timestamp – скопировать из запроса.
  • broker_processing_timestamp – скопировать из запроса.
  • broker_initial_processing_timestamp – скопировать из запроса.
  • tracking_id – скопировать из запроса.
  • session_tracking_label – скопировать из запроса.
  • local_tracking_label – скопировать из запроса, если есть.
  • processing_time – (опционально) сколько мс заняла обработка запроса.
  • cumulative_processing_time – если есть в запросе, увеличить на processing_time или скопировать, если processing_time не указывается.
  • authorization_time – (опционально) если делался запрос авторизации в тенанта, сколько он занял из processing_time.
  • cumulative_authorization_time – если есть в запросе, увеличить на authorization_time или скопировать, если authorization_time не указывается.

Все свойства, кроме correlation_data задаются в user properties.

Отправка уведомлений о событиях (broadcast):

В сервисе что-то происходит и он уведомляет об этом подписчиков.

Топик для публикации и подписки: apps/:service_account_id/api/:service_api_version/:resource_path.

  • service_account_id – события может публиковать любой инстанс сервиса, неважно какой именно, поэтому account ID.
  • resource_path – произвольный путь до ресурса в логике сервиса. Например, rooms/123/events.

В payload указывается JSON, описывающий событие.

Обязательные свойства:

  • typeevent
  • label – тип события в формате :resource.:action. Например, room.enter.

Динамические подписки

Так называется подписка фронта на топик через сервис. Например, фронт заходит в комнату и сервис подписывает его на broadcast-события в этой комнате, чтобы он узнавал о том, что делают другие юзеры.

Сценарий следующий:

  1. Фронт отправляет запрос на вход в комнату.
  2. Сервис уточняет у тенанта, можно ли пользователю с этим аккаунтом входить в эту комнату, и если ответ положительный, отправляет запрос брокеру subscription.create, чтобы он подписал агента на топик комнаты. Запрос делается в топик клиента: agents/:client_agent_id/api/:service_api_version/in/:service_account_id.
  3. Брокер подписывает агента на топик и отправляет unicast-ответ фронту на запрос из п.1.
  4. Также брокер отправляет multicast-событие subscription.create в сервис в топик agents/:mqtt_gateway_agent_id/api/:service_api_version/in/:service_account_id. Сервис запоминает у себя в БД, что агент фронта в комнате.
  5. Фронт тем временем начинает получать сообщения из топика комнаты.

Отписка работает аналогичным образом.

  1. Фронт отправляет запрос на выход из команты.
  2. Сервис проверяет по своей БД, что агент в комнате и отправляет запрос брокеру subscription.delete, чтобы он отписал агента от комнаты в тот же топик, что на subscription.create.
  3. Брокер отписывает агента от топика и отправляет unciast-ответ фронту на запрос из п.1.
  4. Также брокер отправляет multicast-событие subscription.delete в сервис. Сервис удаляет у себя из БД активного агента фронта.
  5. Фронт тем временем перестаёт получать сообщения из топика комнаты.

Если клиент отскочил от брокера из-за плохой сети или пользователь просто закрыл вкладку, брокер видит, что соединение порвалось и отправляет событие в сервис, подписавший агента на комнату, такое же, как в п.4 в предыдущем случае и всё идёт по тому же сценарию дальше.

Особенности MQTT 3

MQTT 3 не поддерживает свойства, поэтому connection_version и connection_mode указываются в username через :: при подключении. Например, так: v2::default.

При отправке сообщений свойства отправляются в теле сообщения вместе с payload в конверте следующего вида:

{
  "payload": "{\"foo\": \"bar\"}",
  "properties": {
    "some_property": "123"
  }
}

Пара важных моментов:

  • Payload сериализуется в строку, а не шлётся просто объектом.
  • Все значения в properties должны быть строками, даже числа, т.к. в MQTT 5 все свойства – UTF-8 строки. null указывать также нельзя, нужно в этом случае вообще не указывать ключ.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment