fix: stabilize chat run lifecycle rendering

This commit is contained in:
qzl
2026-03-17 15:58:29 +08:00
parent 3bf7640000
commit cf56b358ad
5 changed files with 673 additions and 75 deletions
@@ -13,6 +13,34 @@ typedef EventCallback = void Function(AgUiEvent event);
const _runIdPrefix = 'run_';
class UploadedAttachment {
const UploadedAttachment({
required this.localPath,
required this.url,
required this.mimeType,
});
final String localPath;
final String url;
final String mimeType;
}
class SendMessageResult {
const SendMessageResult({required this.uploadedAttachments});
final List<UploadedAttachment> uploadedAttachments;
}
class _RunInputPayload {
const _RunInputPayload({
required this.input,
required this.uploadedAttachments,
});
final Map<String, dynamic> input;
final List<UploadedAttachment> uploadedAttachments;
}
class AgUiService {
final IApiClient _apiClient;
EventCallback onEvent;
@@ -26,23 +54,40 @@ class AgUiService {
: onEvent = onEvent ?? ((_) {}),
_apiClient = apiClient;
Future<void> sendMessage(String content, {List<XFile>? images}) async {
Future<SendMessageResult> sendMessage(
String content, {
List<XFile>? images,
}) async {
final streamToken = ++_activeStreamToken;
final runInput = await _buildRunInput(content: content, images: images);
final runInputPayload = await _buildRunInput(
content: content,
images: images,
);
final response = await _apiClient.post<Map<String, dynamic>>(
'/api/v1/agent/runs',
data: runInput,
data: runInputPayload.input,
);
final payload = response.data;
if (payload is! Map<String, dynamic>) {
throw StateError('Invalid /agent/runs response');
}
final threadId = payload['threadId'] as String?;
final runId = payload['runId'] as String?;
if (threadId == null || threadId.isEmpty) {
throw StateError('Missing threadId in /agent/runs response');
}
if (runId == null || runId.isEmpty) {
throw StateError('Missing runId in /agent/runs response');
}
_threadId = threadId;
await _streamEventsFromApi(threadId, streamToken: streamToken);
await _streamEventsFromApi(
threadId,
expectedRunId: runId,
streamToken: streamToken,
);
return SendMessageResult(
uploadedAttachments: runInputPayload.uploadedAttachments,
);
}
Future<HistorySnapshot> loadHistory({DateTime? beforeDate}) async {
@@ -108,6 +153,7 @@ class AgUiService {
Future<void> _streamEventsFromApi(
String threadId, {
required String expectedRunId,
required int streamToken,
}) async {
final lastEventId = _lastEventIdByThread[threadId];
@@ -122,6 +168,7 @@ class AgUiService {
String? eventType;
String? eventId;
var hasBoundExpectedRun = false;
final dataBuffer = StringBuffer();
await for (final line in sseLines) {
if (streamToken != _activeStreamToken) {
@@ -131,11 +178,32 @@ class AgUiService {
if (dataBuffer.isNotEmpty) {
final raw = dataBuffer.toString();
dataBuffer.clear();
Map<String, dynamic>? decoded;
String? eventRunId;
String? eventThreadId;
try {
final decoded = jsonDecode(raw);
if (decoded is Map<String, dynamic>) {
final event = AgUiEvent.fromJson(decoded);
onEvent(event);
final parsed = jsonDecode(raw);
if (parsed is Map<String, dynamic>) {
decoded = parsed;
final runId = parsed['runId'];
final thread = parsed['threadId'];
eventRunId = runId is String ? runId : null;
eventThreadId = thread is String ? thread : null;
final isRunStarted = eventType == AgUiEventTypeWire.runStarted;
final isTargetRun = eventRunId == expectedRunId;
if (isRunStarted && isTargetRun) {
hasBoundExpectedRun = true;
}
final isThreadMatched =
eventThreadId == null || eventThreadId == threadId;
final shouldDispatch =
isTargetRun || (hasBoundExpectedRun && isThreadMatched);
if (shouldDispatch) {
final event = AgUiEvent.fromJson(parsed);
onEvent(event);
}
}
} catch (_) {
// Ignore malformed SSE payload and keep stream alive.
@@ -144,8 +212,14 @@ class AgUiService {
if (currentEventId != null && currentEventId.isNotEmpty) {
_lastEventIdByThread[threadId] = currentEventId;
}
if (eventType == AgUiEventTypeWire.runFinished ||
eventType == AgUiEventTypeWire.runError) {
final isTerminalEvent =
eventType == AgUiEventTypeWire.runFinished ||
eventType == AgUiEventTypeWire.runError;
final isTargetRun = eventRunId == expectedRunId;
final isThreadMatched =
eventThreadId == null || eventThreadId == threadId;
if (isTerminalEvent &&
(isTargetRun || (hasBoundExpectedRun && isThreadMatched))) {
break;
}
}
@@ -174,7 +248,7 @@ class AgUiService {
}
}
Future<Map<String, dynamic>> _buildRunInput({
Future<_RunInputPayload> _buildRunInput({
required String content,
List<XFile>? images,
}) async {
@@ -187,16 +261,17 @@ class AgUiService {
contentBlocks.add({'type': 'text', 'text': content});
}
var uploadedAttachments = const <UploadedAttachment>[];
if (images != null && images.isNotEmpty) {
final uploadedAttachments = await _uploadAttachments(
uploadedAttachments = await _uploadAttachments(
threadId: threadId,
images: images,
);
for (final attachment in uploadedAttachments) {
contentBlocks.add({
'type': 'binary',
'mimeType': attachment['mimeType'],
'url': attachment['url'],
'mimeType': attachment.mimeType,
'url': attachment.url,
});
}
}
@@ -211,24 +286,27 @@ class AgUiService {
messageContent = contentBlocks;
}
return {
'threadId': threadId,
'runId': runId,
'state': <String, dynamic>{},
'messages': [
{'id': _nextId('user_'), 'role': 'user', 'content': messageContent},
],
'tools': <Map<String, dynamic>>[],
'context': <Map<String, dynamic>>[],
'forwardedProps': <String, dynamic>{},
};
return _RunInputPayload(
input: {
'threadId': threadId,
'runId': runId,
'state': <String, dynamic>{},
'messages': [
{'id': _nextId('user_'), 'role': 'user', 'content': messageContent},
],
'tools': <Map<String, dynamic>>[],
'context': <Map<String, dynamic>>[],
'forwardedProps': <String, dynamic>{},
},
uploadedAttachments: uploadedAttachments,
);
}
Future<List<Map<String, dynamic>>> _uploadAttachments({
Future<List<UploadedAttachment>> _uploadAttachments({
required String threadId,
required List<XFile> images,
}) async {
final attachments = <Map<String, dynamic>>[];
final attachments = <UploadedAttachment>[];
for (final image in images) {
final mimeType = image.mimeType ?? 'image/jpeg';
final fileBytes = await image.readAsBytes();
@@ -266,12 +344,13 @@ class AgUiService {
url.isEmpty) {
throw StateError('Invalid attachment reference');
}
attachments.add({
'bucket': bucket,
'path': path,
'mimeType': uploadedMime,
'url': url,
});
attachments.add(
UploadedAttachment(
localPath: image.path,
url: url,
mimeType: uploadedMime,
),
);
}
return attachments;
}