Skip to content

Instantly share code, notes, and snippets.

@PlugFox
Last active August 18, 2022 23:33
Show Gist options
  • Save PlugFox/a7a5fd8ef307ea5f717855aff377dc2b to your computer and use it in GitHub Desktop.
Save PlugFox/a7a5fd8ef307ea5f717855aff377dc2b to your computer and use it in GitHub Desktop.
dart:isolate helper class
Stream<String> _isolateHandler(dynamic data, Map<String, dynamic> context) async* {
if (data is! String) {
throw 'Получены неизвестные данные \'${data.toString()}\'!';
}
String telegramMessage = data as String;
final Telegram tg = context['tg'] as Telegram;
try {
final bool sendResult = await tg.send(telegramMessage);
if (!sendResult) throw UnsupportedError('Сообщение не доставлено');
yield 'Сообщение доставлено';
} on dynamic catch (error) {
throw 'Ошибка отправки в телеграм: ${error.toString()}';
}
}
FutureOr<void> _isolateOnCreate(Map<String, dynamic> context) {
final Telegram tg = Telegram(
reciever: context['reciever'] as String,
token: context['token'] as String);
context['tg'] = tg;
}
final IsolatedClient<String> _isolatedClient = IsolatedClient<String>(
_isolateHandler,
onCreate: _isolateOnCreate,
context: <String, dynamic>{'reciever': reciever, 'token': token});
await _isolatedClient.init();
_isolatedClient.stream.listen(l.d, onError: l.w);
/// TODO: Добавить трансформер и расширение для стримов
/// TODO: реализовать общение посредством TransferableTypedData
/// https://api.dart.dev/stable/dart-isolate/TransferableTypedData-class.html
/// TODO: игнорировать функционал в вебе
class IsolatedClient<T> {
final ReceivePort _responses = ReceivePort();
final Stream<T> Function(dynamic, Map<String, dynamic>) _handler;
final FutureOr<void> Function(Map<String, dynamic>) _onCreate;
final Map<String, dynamic> _context;
final StreamController<T> _controller = StreamController<T>.broadcast();
SendPort _sink;
Isolate _client;
bool _isInit = false;
Stream<T> get stream => _controller.stream;
Isolate get client => _client;
bool get isInit => _isInit;
IsolatedClient(
Stream<T> Function(dynamic, Map<String, dynamic>) handler,
{FutureOr<void> Function(Map<String, dynamic>) onCreate,
Map<String, dynamic> context,})
: assert(handler is Stream<dynamic> Function(dynamic, Map<String, dynamic>))
, _handler = handler
, _onCreate = onCreate
, _context = context;
Future<void> init() async {
final Stream<dynamic> _dataFromIsolate = _responses.asBroadcastStream();
try {
Future<dynamic> _futureSink = _dataFromIsolate.first.timeout(const Duration(seconds: 5));
_client = await Isolate.spawn<SendPort>(_handshake, _responses.sendPort);
await _futureSink.then((dynamic v) {
if (v is SendPort) {
_sink = v;
} else {
throw IsolateSpawnException('Handshake error');
}
});
} on IsolateSpawnException catch (_) {
rethrow;
} on TimeoutException catch (_) {
throw IsolateSpawnException('Handshake time is up');
}
try {
Future<bool> initializationResult = _dataFromIsolate.first.timeout(const Duration(seconds: 15)).then((dynamic v) => v as bool);
_sink.send(<String, dynamic>{'handler': _handler, 'context': _context, 'onCreate': _onCreate,});
if (!await initializationResult) throw UnsupportedError('Initialization failed');
} on TimeoutException catch (_, stackTrace) {
throw RemoteError('Timeout reached during initialization', stackTrace.toString());
}
_isInit = true;
_dataFromIsolate.listen((dynamic v) {
if (v is T) {
_controller.add(v);
} else {
_controller.addError(v);
}
},
onError: _controller.addError,
cancelOnError: false,
);
}
static void _handshake(SendPort sendPort) {
final ReceivePort receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
final _IsolatedServer server = _IsolatedServer(sink: sendPort, requests: receivePort.asBroadcastStream());
server.waitForInitialization().then(server.run);
}
void add(dynamic message) => _isInit ? _sink.send(message) : throw UnsupportedError('IsolatedClient not initialized');
Future<void> close() {
_responses.close();
_client.kill();
return _controller.close();
}
}
class _IsolatedServer {
final SendPort sink;
final Stream<dynamic> requests;
const _IsolatedServer({@required this.sink, @required this.requests});
Future<Map<String, dynamic>> waitForInitialization() =>
requests.first
.then((dynamic v) => v as Map<String, dynamic>)
.then((v) {
final Map<String, dynamic> context = (v['context'] ?? <String, dynamic>{}) as Map<String, dynamic>;
final FutureOr<void> Function(Map<String, dynamic>) onCreate =
v.remove('onCreate') as FutureOr<void> Function(Map<String, dynamic>);
if (onCreate != null) {
onCreate(context);
}
v['context'] = context;
sink.send(true);
return v;
});
void run(Map<String, dynamic> initializationData) {
final Stream<dynamic> Function(dynamic, Map<String, dynamic>) _handler = initializationData.remove('handler') as Stream<dynamic> Function(dynamic, Map<String, dynamic>);
final Map<String, dynamic> context = initializationData['context'] as Map<String, dynamic>;
StreamSubscription<dynamic> sub = requests.listen(
(dynamic message) =>
_handler(message, context)
.handleError(_onError)
.forEach(sink.send)
, onError: _onError
, cancelOnError: false);
sub.onDone(sub.cancel);
}
void _onError(dynamic error, StackTrace stackTrace) =>
sink.send(RemoteError(error.toString(), stackTrace.toString()));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment