Created
February 11, 2025 17:13
Stream response from a deployed flow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
static const String _flowStreamDelimiter = '\n\n'; | |
static StreamFlowResponse<O, S> streamFlow<O, S>({ | |
required String url, | |
dynamic input, | |
Map<String, String>? headers, | |
}) { | |
// Create a StreamController to manage the stream of messages | |
final controller = StreamController<S>(); | |
// Create a Completer to manage the final output | |
final completer = Completer<O>(); | |
// Prepare headers | |
final requestHeaders = { | |
'Accept': 'text/event-stream', | |
'Content-Type': 'application/json', | |
if (headers != null) ...headers, | |
}; | |
// Start the stream processing | |
_processStream( | |
url: url, | |
input: input, | |
headers: requestHeaders, | |
onMessage: (message) => controller.add(message as S), | |
onResult: (result) { | |
completer.complete(result as O); | |
controller.close(); | |
}, | |
onError: (error) { | |
controller.addError(error); | |
completer.completeError(error); | |
controller.close(); | |
}, | |
); | |
return StreamFlowResponse( | |
output: completer.future, | |
stream: controller.stream, | |
); | |
} | |
static Future<void> _processStream({ | |
required String url, | |
required dynamic input, | |
required Map<String, String> headers, | |
required Function(dynamic) onMessage, | |
required Function(dynamic) onResult, | |
required Function(dynamic) onError, | |
}) async { | |
try { | |
final request = http.Request('POST', Uri.parse(url)); | |
request.headers.addAll(headers); | |
request.body = jsonEncode({'data': input}); | |
final streamedResponse = await http.Client().send(request); | |
if (streamedResponse.statusCode != 200) { | |
throw Exception('Server returned: ${streamedResponse.statusCode}'); | |
} | |
String buffer = ''; | |
await for (final chunk in streamedResponse.stream.transform(utf8.decoder)) { | |
buffer += chunk; | |
while (buffer.contains(_flowStreamDelimiter)) { | |
final messageEnd = buffer.indexOf(_flowStreamDelimiter); | |
final message = buffer.substring(0, messageEnd); | |
buffer = buffer.substring(messageEnd + _flowStreamDelimiter.length); | |
if (message.isEmpty) continue; | |
final data = message.startsWith('data: ') ? message.substring(6) : message; | |
try { | |
final chunk = jsonDecode(data); | |
//print(chunk); | |
if (chunk.containsKey('message')) { | |
//print(chunk['message']); | |
onMessage(chunk['message']); | |
} else if (chunk.containsKey('result')) { | |
//print(chunk['result']); | |
onResult(chunk['result']); | |
return; | |
} else if (chunk.containsKey('error')) { | |
final error = chunk['error']; | |
throw Exception( | |
'${error['status']}: ${error['message']}\n${error['details']}', | |
); | |
} else { | |
throw Exception('Unknown chunk format: $chunk'); | |
} | |
} catch (e) { | |
onError(e); | |
return; | |
} | |
} | |
} | |
if (buffer.isNotEmpty) { | |
onError(Exception('Stream ended with partial data: $buffer')); | |
} | |
} catch (e) { | |
onError(e); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment