Last active
August 18, 2022 23:33
-
-
Save PlugFox/a7a5fd8ef307ea5f717855aff377dc2b to your computer and use it in GitHub Desktop.
dart:isolate helper class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Stream<String> _isolateHandler(dynamic data, Map<String, dynamic> context) async* { | |
if (data is! String) { | |
throw 'Получены неизвестные данные \'${data.toString()}\'!'; | |
} | |
String telegramMessage = data as String; | |
final Telegram tg = context['tg'] as Telegram; | |
try { | |
final bool sendResult = await tg.send(telegramMessage); | |
if (!sendResult) throw UnsupportedError('Сообщение не доставлено'); | |
yield 'Сообщение доставлено'; | |
} on dynamic catch (error) { | |
throw 'Ошибка отправки в телеграм: ${error.toString()}'; | |
} | |
} | |
FutureOr<void> _isolateOnCreate(Map<String, dynamic> context) { | |
final Telegram tg = Telegram( | |
reciever: context['reciever'] as String, | |
token: context['token'] as String); | |
context['tg'] = tg; | |
} | |
final IsolatedClient<String> _isolatedClient = IsolatedClient<String>( | |
_isolateHandler, | |
onCreate: _isolateOnCreate, | |
context: <String, dynamic>{'reciever': reciever, 'token': token}); | |
await _isolatedClient.init(); | |
_isolatedClient.stream.listen(l.d, onError: l.w); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// TODO: Добавить трансформер и расширение для стримов | |
/// TODO: реализовать общение посредством TransferableTypedData | |
/// https://api.dart.dev/stable/dart-isolate/TransferableTypedData-class.html | |
/// TODO: игнорировать функционал в вебе | |
class IsolatedClient<T> { | |
final ReceivePort _responses = ReceivePort(); | |
final Stream<T> Function(dynamic, Map<String, dynamic>) _handler; | |
final FutureOr<void> Function(Map<String, dynamic>) _onCreate; | |
final Map<String, dynamic> _context; | |
final StreamController<T> _controller = StreamController<T>.broadcast(); | |
SendPort _sink; | |
Isolate _client; | |
bool _isInit = false; | |
Stream<T> get stream => _controller.stream; | |
Isolate get client => _client; | |
bool get isInit => _isInit; | |
IsolatedClient( | |
Stream<T> Function(dynamic, Map<String, dynamic>) handler, | |
{FutureOr<void> Function(Map<String, dynamic>) onCreate, | |
Map<String, dynamic> context,}) | |
: assert(handler is Stream<dynamic> Function(dynamic, Map<String, dynamic>)) | |
, _handler = handler | |
, _onCreate = onCreate | |
, _context = context; | |
Future<void> init() async { | |
final Stream<dynamic> _dataFromIsolate = _responses.asBroadcastStream(); | |
try { | |
Future<dynamic> _futureSink = _dataFromIsolate.first.timeout(const Duration(seconds: 5)); | |
_client = await Isolate.spawn<SendPort>(_handshake, _responses.sendPort); | |
await _futureSink.then((dynamic v) { | |
if (v is SendPort) { | |
_sink = v; | |
} else { | |
throw IsolateSpawnException('Handshake error'); | |
} | |
}); | |
} on IsolateSpawnException catch (_) { | |
rethrow; | |
} on TimeoutException catch (_) { | |
throw IsolateSpawnException('Handshake time is up'); | |
} | |
try { | |
Future<bool> initializationResult = _dataFromIsolate.first.timeout(const Duration(seconds: 15)).then((dynamic v) => v as bool); | |
_sink.send(<String, dynamic>{'handler': _handler, 'context': _context, 'onCreate': _onCreate,}); | |
if (!await initializationResult) throw UnsupportedError('Initialization failed'); | |
} on TimeoutException catch (_, stackTrace) { | |
throw RemoteError('Timeout reached during initialization', stackTrace.toString()); | |
} | |
_isInit = true; | |
_dataFromIsolate.listen((dynamic v) { | |
if (v is T) { | |
_controller.add(v); | |
} else { | |
_controller.addError(v); | |
} | |
}, | |
onError: _controller.addError, | |
cancelOnError: false, | |
); | |
} | |
static void _handshake(SendPort sendPort) { | |
final ReceivePort receivePort = ReceivePort(); | |
sendPort.send(receivePort.sendPort); | |
final _IsolatedServer server = _IsolatedServer(sink: sendPort, requests: receivePort.asBroadcastStream()); | |
server.waitForInitialization().then(server.run); | |
} | |
void add(dynamic message) => _isInit ? _sink.send(message) : throw UnsupportedError('IsolatedClient not initialized'); | |
Future<void> close() { | |
_responses.close(); | |
_client.kill(); | |
return _controller.close(); | |
} | |
} | |
class _IsolatedServer { | |
final SendPort sink; | |
final Stream<dynamic> requests; | |
const _IsolatedServer({@required this.sink, @required this.requests}); | |
Future<Map<String, dynamic>> waitForInitialization() => | |
requests.first | |
.then((dynamic v) => v as Map<String, dynamic>) | |
.then((v) { | |
final Map<String, dynamic> context = (v['context'] ?? <String, dynamic>{}) as Map<String, dynamic>; | |
final FutureOr<void> Function(Map<String, dynamic>) onCreate = | |
v.remove('onCreate') as FutureOr<void> Function(Map<String, dynamic>); | |
if (onCreate != null) { | |
onCreate(context); | |
} | |
v['context'] = context; | |
sink.send(true); | |
return v; | |
}); | |
void run(Map<String, dynamic> initializationData) { | |
final Stream<dynamic> Function(dynamic, Map<String, dynamic>) _handler = initializationData.remove('handler') as Stream<dynamic> Function(dynamic, Map<String, dynamic>); | |
final Map<String, dynamic> context = initializationData['context'] as Map<String, dynamic>; | |
StreamSubscription<dynamic> sub = requests.listen( | |
(dynamic message) => | |
_handler(message, context) | |
.handleError(_onError) | |
.forEach(sink.send) | |
, onError: _onError | |
, cancelOnError: false); | |
sub.onDone(sub.cancel); | |
} | |
void _onError(dynamic error, StackTrace stackTrace) => | |
sink.send(RemoteError(error.toString(), stackTrace.toString())); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment