Skip to content

Instantly share code, notes, and snippets.

@vovahost
Created March 2, 2020 11:56
Show Gist options
  • Save vovahost/7828f525240b75b4db501da172e73f06 to your computer and use it in GitHub Desktop.
Save vovahost/7828f525240b75b4db501da172e73f06 to your computer and use it in GitHub Desktop.
tcp-vertx-eventbus dart port [Work in progress]
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
import 'package:meta/meta.dart';
import 'package:web_socket_channel/io.dart';
import 'package:uuid/uuid.dart';
final uuid = Uuid();
//var net = require('net');
//var tls = require('tls');
var tls;
var net;
class EventBus {
Duration _pingInterval;
final void Function() _onOpen;
final void Function() _onClose;
final void Function(Exception) _onError;
// var _transport;
IOWebSocketChannel _channel;
StreamSubscription _streamSubscription;
EventBusState _state = EventBusState.CONNECTING;
final Map<String, List<Function>> _handlers = {};
final Map _replyHandlers = {};
final Map<String, dynamic> _defaultHeaders;
/// message buffer
Uint8List _buffer = Uint8List(0);
int _length = 0;
Timer _pingTimer;
EventBus({
@required String host,
@required String port,
Map options,
Map<String, dynamic> defaultHeaders,
void Function() onOpen,
void Function() onClose,
void Function(Exception) onError,
bool cancelOnError = false,
}) : _defaultHeaders = defaultHeaders,
_onOpen = onOpen,
_onClose = onClose,
_onError = onError {
options ??= {};
_pingInterval = options['vertxbus_ping_interval'] ?? Duration(seconds: 5);
// if user use certificate need use tls module
// final connectionModule = options.containsKey('pfx') || options.containsKey('cert') ? tls : net;
_channel = IOWebSocketChannel.connect(host, headers: defaultHeaders);
_streamSubscription = _channel.stream.listen(
onConnectionData,
onError: onConnectionError,
onDone: onConnectionClosed,
cancelOnError: cancelOnError,
);
// _transport = connectionModule.connect(port, host, options, _callback);
}
void _callback(err) {
if (err) {
_onError(err);
}
// Send the first ping then send a ping every pingInterval
_sendPing();
_pingTimer = Timer.periodic(_pingInterval, (Timer t) => _sendPing());
_state = EventBusState.OPEN;
_onOpen?.call();
}
void _sendPing() {
_send(json.encode({'type': 'ping'}));
}
// This function is called by the connection
// transport.on('close', () {})
void onConnectionClosed() {
_state = EventBusState.CLOSED;
_pingTimer?.cancel();
_onClose?.call();
}
// This function is called by the connection
// transport.on('error', self.onerror);
void onConnectionError(error) {
// TODO check that the error is the only field passed
_onError?.call(Exception(error.toString()));
}
// This was a callback set on the transport
// transport.on('data', (chunk) {})
void onConnectionData(Uint8List chunk) {
_buffer = _buffer + chunk;
// we need to loop since there can be several messages in a chunk
do {
// read message length (first 4 bytes)
_length ??= ByteData.view(_buffer.buffer).getInt32(0, Endian.big);
if (_length != null && _buffer.lengthInBytes >= _length + 4) {
// we have a full message
final messageBytes = _buffer.sublist(4, _length + 4);
// slice the buffer to consume the next message
_buffer = _buffer.sublist(_length + 4);
_length = 0;
Map json;
try {
json = jsonDecode(utf8.decode(messageBytes));
} catch (e) {
_onError(e);
return;
}
// define a reply function on the message itself
if (json['replyAddress'] != null) {
json['reply'] = (message, headers, callback) {
send(json['replyAddress'], message, headers, callback);
};
}
final deliver = (Function handler, Map json) {
if (json['type'] == 'message' && json.containsKey('failureCode')) {
handler({
'failureCode': json['failureCode'],
'failureType': json['failureType'],
'message': json['message']
});
} else {
handler(null, json);
}
};
final address = json['address'];
if (_handlers[address] != null) {
// Iterate all handlers registered with the address
final handlers = _handlers[address];
// send only goes to one handler
if (json['send'] != null && handlers.isNotEmpty) {
deliver(handlers.first, json);
} else {
for (var i = 0; i < handlers.length; i++) {
deliver(handlers[i], json);
}
}
} else if (_replyHandlers[address]) {
// Might be a reply message
var handler = _replyHandlers[address];
_replyHandlers.remove(address);
deliver(handler, json);
} else {
if (json['type'] == 'err') {
_onError(Exception(json));
} else {
print('No handler found for message: $json');
}
}
} // if data chunked into few frames need concatenate into buffer
} while (_buffer.lengthInBytes > 4 && _length == null);
}
/// Send a message
///
/// @param address address on the event bus
/// @param {Object} message
/// @param {Object} [headers]
/// @param {Function} [callback]
void send(String address, String message, headers, callback) {
// are we ready?
if (_state != EventBusState.OPEN) {
throw Exception('INVALID_STATE_ERR');
}
if (headers is Function) {
callback = headers;
headers = {};
}
var envelope = {
'type': 'send',
'address': address,
'headers': _mergeHeaders(_defaultHeaders, headers),
'body': message
};
if (callback) {
var replyAddress = uuid.v4();
envelope['replyAddress'] = replyAddress;
_replyHandlers[replyAddress] = callback;
}
_send(jsonEncode(envelope));
}
/// Publish a message
///
/// @param {String} address
/// @param {Object} message
/// @param {Object} [headers]
void publish(String address, String message, Map<String, dynamic> headers) {
// are we ready?
if (_state != EventBusState.OPEN) {
throw Exception('INVALID_STATE_ERR');
}
_send(jsonEncode({
'type': 'publish',
'address': address,
'headers': _mergeHeaders(_defaultHeaders, headers),
'body': message
}));
}
/// Register a new handler
///
/// @param {String} address
/// @param {Object} [headers]
/// @param {Function} callback
void registerHandler({
@required String address,
Map<String, dynamic> headers,
@required Function handler,
}) {
if (_state != EventBusState.OPEN) {
throw Exception('INVALID_STATE_ERR');
}
var handlers = _handlers[address];
// Ensure the handler list for this address is initialized
if (handlers == null) {
handlers = [];
// First handler for this address so we should register the connection
_send(jsonEncode({
'type': 'register',
'address': address,
'headers': _mergeHeaders(_defaultHeaders, headers)
}));
}
// Add the handler to the list of all handlers associated with this address
handlers.add(handler);
}
/// Unregister a handler
///
/// @param {String} address
/// @param {Object} [headers]
/// @param {Function} callback
void unregisterHandler({
@required String address,
Map<String, dynamic> headers,
@required Function callback,
}) {
if (_state != EventBusState.OPEN) {
throw Exception('INVALID_STATE_ERR');
}
final handlers = _handlers[address];
if (handlers != null) {
handlers.remove(callback);
if (handlers.isEmpty) {
// No more local handlers so we should unregister the connection
_send(jsonEncode({
'type': 'unregister',
'address': address,
'headers': _mergeHeaders(_defaultHeaders, headers)
}));
_handlers.remove(address);
}
}
}
void _send(String message) {
final msgData = utf8.encode(message) as Uint8List;
final msgLen = message.length;
final data = ByteData(4);
data.setInt32(0, msgLen, Endian.big);
final output = data.buffer.asUint8List() + msgData;
// final outputLength = 4 + msgLen;
// _transport.write(output, outputLength);
_channel.sink.add(output);
}
/// Closes the connection to the EvenBus Bridge.
void close() {
_state = EventBusState.CLOSING;
// _transport.close();
_streamSubscription.cancel();
}
/// Merges the two header maps. If both the maps contain the same (key:value) pair then the pair
/// from [headers] will override the pair from [defaultHeaders].
Map<String, dynamic> _mergeHeaders(
Map<String, dynamic> defaultHeaders, Map<String, dynamic> headers) =>
{...?defaultHeaders, ...?headers};
}
enum EventBusState { CONNECTING, OPEN, CLOSING, CLOSED }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment