Skip to content

Instantly share code, notes, and snippets.

@victoire-hergan
Created February 11, 2025 17:13
Stream response from a deployed flow
static const String _flowStreamDelimiter = '\n\n';
static StreamFlowResponse<O, S> streamFlow<O, S>({
required String url,
dynamic input,
Map<String, String>? headers,
}) {
// Create a StreamController to manage the stream of messages
final controller = StreamController<S>();
// Create a Completer to manage the final output
final completer = Completer<O>();
// Prepare headers
final requestHeaders = {
'Accept': 'text/event-stream',
'Content-Type': 'application/json',
if (headers != null) ...headers,
};
// Start the stream processing
_processStream(
url: url,
input: input,
headers: requestHeaders,
onMessage: (message) => controller.add(message as S),
onResult: (result) {
completer.complete(result as O);
controller.close();
},
onError: (error) {
controller.addError(error);
completer.completeError(error);
controller.close();
},
);
return StreamFlowResponse(
output: completer.future,
stream: controller.stream,
);
}
static Future<void> _processStream({
required String url,
required dynamic input,
required Map<String, String> headers,
required Function(dynamic) onMessage,
required Function(dynamic) onResult,
required Function(dynamic) onError,
}) async {
try {
final request = http.Request('POST', Uri.parse(url));
request.headers.addAll(headers);
request.body = jsonEncode({'data': input});
final streamedResponse = await http.Client().send(request);
if (streamedResponse.statusCode != 200) {
throw Exception('Server returned: ${streamedResponse.statusCode}');
}
String buffer = '';
await for (final chunk in streamedResponse.stream.transform(utf8.decoder)) {
buffer += chunk;
while (buffer.contains(_flowStreamDelimiter)) {
final messageEnd = buffer.indexOf(_flowStreamDelimiter);
final message = buffer.substring(0, messageEnd);
buffer = buffer.substring(messageEnd + _flowStreamDelimiter.length);
if (message.isEmpty) continue;
final data = message.startsWith('data: ') ? message.substring(6) : message;
try {
final chunk = jsonDecode(data);
//print(chunk);
if (chunk.containsKey('message')) {
//print(chunk['message']);
onMessage(chunk['message']);
} else if (chunk.containsKey('result')) {
//print(chunk['result']);
onResult(chunk['result']);
return;
} else if (chunk.containsKey('error')) {
final error = chunk['error'];
throw Exception(
'${error['status']}: ${error['message']}\n${error['details']}',
);
} else {
throw Exception('Unknown chunk format: $chunk');
}
} catch (e) {
onError(e);
return;
}
}
}
if (buffer.isNotEmpty) {
onError(Exception('Stream ended with partial data: $buffer'));
}
} catch (e) {
onError(e);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment