diff --git a/apps/lib/core/chat/ag_ui_event.dart b/apps/lib/core/chat/ag_ui_event.dart index 318f020..cfbebf7 100644 --- a/apps/lib/core/chat/ag_ui_event.dart +++ b/apps/lib/core/chat/ag_ui_event.dart @@ -317,13 +317,20 @@ class HistoryMessage { seq: _asInt(json['seq']), role: _asString(json['role']), content: _asString(json['content']), - timestamp: - DateTime.tryParse(_asString(json['timestamp'])) ?? DateTime.now(), + timestamp: _parseTimestamp(_asString(json['timestamp'])), attachments: _parseHistoryAttachments(json['attachments']), uiSchema: _asMap(json['ui_schema']), ); } +DateTime _parseTimestamp(String value) { + final parsed = DateTime.tryParse(value); + if (parsed == null) { + return DateTime.now(); + } + return parsed.isUtc ? parsed.toLocal() : parsed; +} + class HistoryAttachment { const HistoryAttachment({required this.url, required this.mimeType}); diff --git a/apps/lib/core/chat/ag_ui_service.dart b/apps/lib/core/chat/ag_ui_service.dart index 2e3ee78..1d58098 100644 --- a/apps/lib/core/chat/ag_ui_service.dart +++ b/apps/lib/core/chat/ag_ui_service.dart @@ -54,6 +54,8 @@ class _RunInputPayload { } class AgUiService { + static const int _maxSseResumeAttempts = 2; + final ChatApi _chatApi; final ChatHistoryRepository? _historyRepository; EventCallback onEvent; @@ -63,6 +65,7 @@ class AgUiService { Completer? _activeSseDoneCompleter; String? _threadId; + String? _userId; String? _activeThreadIdForRun; String? _activeRunId; bool _hasMoreHistory = false; @@ -98,7 +101,7 @@ class AgUiService { _activeThreadIdForRun = threadId; _activeRunId = runId; try { - await _streamEventsFromApi( + await _streamEventsWithResume( threadId, expectedRunId: runId, streamToken: streamToken, @@ -114,12 +117,16 @@ class AgUiService { ); } - Future loadHistory({DateTime? beforeDate}) async { + Future loadHistory({ + DateTime? beforeDate, + bool forceRefresh = false, + }) async { final repository = _historyRepository; final snapshot = repository != null ? await repository.loadHistory( threadId: _threadId, beforeDate: beforeDate, + forceRefresh: forceRefresh, ) : await _loadHistoryFromApi(beforeDate: beforeDate); if (snapshot.threadId != null && snapshot.threadId!.isNotEmpty) { @@ -129,6 +136,21 @@ class AgUiService { return snapshot; } + Future setUserContext(String? userId) async { + final normalizedUserId = userId?.trim(); + if (_userId == normalizedUserId) { + return; + } + _userId = normalizedUserId; + _threadId = null; + _activeThreadIdForRun = null; + _activeRunId = null; + _hasMoreHistory = false; + _lastEventIdByThread.clear(); + _activeStreamToken += 1; + await _cancelActiveSseSubscription(); + } + Future _loadHistoryFromApi({DateTime? beforeDate}) async { final payload = await _chatApi.fetchHistory( threadId: _threadId, @@ -186,6 +208,7 @@ class AgUiService { }) async { final sseLines = await _chatApi.streamRunEvents( threadId, + runId: expectedRunId, lastEventId: _lastEventIdByThread[threadId], ); @@ -332,6 +355,42 @@ class AgUiService { } } + Future _streamEventsWithResume( + String threadId, { + required String expectedRunId, + required int streamToken, + }) async { + var attempt = 0; + while (true) { + try { + await _streamEventsFromApi( + threadId, + expectedRunId: expectedRunId, + streamToken: streamToken, + ); + return; + } catch (error) { + final canResume = + _isPrematureSseClose(error) && + attempt < _maxSseResumeAttempts && + streamToken == _activeStreamToken && + _activeThreadIdForRun == threadId && + _activeRunId == expectedRunId; + if (!canResume) { + rethrow; + } + attempt += 1; + await Future.delayed(Duration(milliseconds: 250 * attempt)); + } + } + } + + bool _isPrematureSseClose(Object error) { + return error.toString().toLowerCase().contains( + 'sse closed before terminal event', + ); + } + Future<_RunInputPayload> _buildRunInput({ required String content, List? attachments, diff --git a/apps/lib/core/chat/agent_stage.dart b/apps/lib/core/chat/agent_stage.dart index 0f65f6d..0372ea9 100644 --- a/apps/lib/core/chat/agent_stage.dart +++ b/apps/lib/core/chat/agent_stage.dart @@ -3,7 +3,8 @@ import '../l10n/l10n.dart'; enum AgentStage { routing, execution, memory } AgentStage? stageFromStepName(String value) { - switch (value) { + final normalized = value.trim().toLowerCase(); + switch (normalized) { case 'router': return AgentStage.routing; case 'worker': diff --git a/apps/lib/core/chat/chat_api.dart b/apps/lib/core/chat/chat_api.dart index 5d77de7..616481d 100644 --- a/apps/lib/core/chat/chat_api.dart +++ b/apps/lib/core/chat/chat_api.dart @@ -5,6 +5,7 @@ abstract class ChatApi { Future> streamRunEvents( String threadId, { + required String runId, String? lastEventId, }); diff --git a/apps/lib/core/chat/chat_list_item.dart b/apps/lib/core/chat/chat_list_item.dart index 189a0b1..9f4147b 100644 --- a/apps/lib/core/chat/chat_list_item.dart +++ b/apps/lib/core/chat/chat_list_item.dart @@ -20,6 +20,7 @@ class TextMessageItem extends ChatListItem { @override final MessageSender sender; final bool isStreaming; + final bool isLocalEcho; final List> attachments; TextMessageItem({ @@ -28,6 +29,7 @@ class TextMessageItem extends ChatListItem { required this.timestamp, required this.sender, this.isStreaming = false, + this.isLocalEcho = false, this.attachments = const [], }); @@ -40,6 +42,7 @@ class TextMessageItem extends ChatListItem { DateTime? timestamp, MessageSender? sender, bool? isStreaming, + bool? isLocalEcho, List>? attachments, }) => TextMessageItem( id: id ?? this.id, @@ -47,6 +50,7 @@ class TextMessageItem extends ChatListItem { timestamp: timestamp ?? this.timestamp, sender: sender ?? this.sender, isStreaming: isStreaming ?? this.isStreaming, + isLocalEcho: isLocalEcho ?? this.isLocalEcho, attachments: attachments ?? this.attachments, ); } diff --git a/apps/lib/core/chat/chat_timeline_reconciler.dart b/apps/lib/core/chat/chat_timeline_reconciler.dart new file mode 100644 index 0000000..b454687 --- /dev/null +++ b/apps/lib/core/chat/chat_timeline_reconciler.dart @@ -0,0 +1,169 @@ +import 'package:social_app/core/chat/chat_list_item.dart'; + +class ChatTimelineReconciler { + const ChatTimelineReconciler._(); + + static List merge({ + required List localItems, + required List remoteItems, + }) { + final merged = List.from(localItems); + + for (final remote in remoteItems) { + final sameIdIndex = merged.indexWhere((item) => item.id == remote.id); + if (sameIdIndex >= 0) { + merged[sameIdIndex] = remote; + continue; + } + + _reconcileOptimisticUserEcho(merged, remote); + merged.add(remote); + } + + final byId = {}; + for (final item in merged) { + byId[item.id] = item; + } + final collapsed = byId.values.toList(); + collapsed.sort(_compareItems); + return collapsed; + } + + static void _reconcileOptimisticUserEcho( + List merged, + ChatListItem remote, + ) { + if (remote is! TextMessageItem || remote.sender != MessageSender.user) { + return; + } + + var optimisticIndex = -1; + Duration? nearestGap; + for (var i = 0; i < merged.length; i++) { + final item = merged[i]; + if (item is! TextMessageItem) { + continue; + } + if (!item.isLocalEcho || item.sender != MessageSender.user) { + continue; + } + if (!isLikelySameUserMessage(item, remote)) { + continue; + } + final gap = item.timestamp.difference(remote.timestamp).abs(); + if (nearestGap == null || gap < nearestGap) { + nearestGap = gap; + optimisticIndex = i; + } + } + + if (optimisticIndex >= 0) { + merged.removeAt(optimisticIndex); + } + } + + static bool isLikelySameUserMessage( + TextMessageItem local, + TextMessageItem remote, + ) { + if (_normalizeText(local.content) != _normalizeText(remote.content)) { + return false; + } + + final diff = local.timestamp.difference(remote.timestamp).abs(); + if (diff > const Duration(minutes: 1)) { + return false; + } + + return _attachmentsLikelySame( + local.attachments, + remote.attachments, + timeGap: diff, + ); + } + + static String _normalizeText(String text) { + return text.trim().replaceAll(RegExp(r'\s+'), ' '); + } + + static bool _attachmentsLikelySame( + List> local, + List> remote, { + required Duration timeGap, + }) { + if (local.length != remote.length) { + return false; + } + if (_attachmentMimeSignature(local) != _attachmentMimeSignature(remote)) { + return false; + } + + final localIdentity = _attachmentIdentitySignature(local); + final remoteIdentity = _attachmentIdentitySignature(remote); + if (localIdentity.isNotEmpty && remoteIdentity.isNotEmpty) { + return localIdentity == remoteIdentity; + } + + return timeGap <= const Duration(seconds: 20); + } + + static String _attachmentMimeSignature( + List> attachments, + ) { + if (attachments.isEmpty) { + return ''; + } + final parts = attachments.map((entry) { + final mimeType = entry['mimeType'] as String? ?? ''; + return mimeType; + }).toList()..sort(); + return parts.join('||'); + } + + static String _attachmentIdentitySignature( + List> attachments, + ) { + final parts = + attachments + .map(_attachmentIdentity) + .where((value) => value.isNotEmpty) + .toList() + ..sort(); + if (parts.isEmpty) { + return ''; + } + return parts.join('||'); + } + + static String _attachmentIdentity(Map entry) { + final url = entry['url'] as String?; + if (url != null && url.isNotEmpty) { + final uri = Uri.tryParse(url); + if (uri != null && uri.pathSegments.isNotEmpty) { + return uri.pathSegments.last.toLowerCase(); + } + return url.toLowerCase(); + } + + final path = entry['path'] as String?; + if (path != null && path.isNotEmpty) { + final normalized = path.replaceAll('\\', '/'); + final slashIndex = normalized.lastIndexOf('/'); + return slashIndex >= 0 + ? normalized.substring(slashIndex + 1).toLowerCase() + : normalized.toLowerCase(); + } + return ''; + } + + static int _compareItems(ChatListItem a, ChatListItem b) { + final tsCompare = a.timestamp.compareTo(b.timestamp); + if (tsCompare != 0) { + return tsCompare; + } + if (a.sender != b.sender) { + return a.sender == MessageSender.user ? -1 : 1; + } + return a.id.compareTo(b.id); + } +} diff --git a/apps/lib/features/chat/data/apis/chat_api_impl.dart b/apps/lib/features/chat/data/apis/chat_api_impl.dart index 9894832..8389340 100644 --- a/apps/lib/features/chat/data/apis/chat_api_impl.dart +++ b/apps/lib/features/chat/data/apis/chat_api_impl.dart @@ -25,16 +25,16 @@ class ChatApiImpl implements ChatApi { @override Future> streamRunEvents( String threadId, { + required String runId, String? lastEventId, }) { final headers = {'Accept': 'text/event-stream'}; if (lastEventId != null && lastEventId.isNotEmpty) { headers['Last-Event-ID'] = lastEventId; } - return _apiClient.getSseLines( - '/api/v1/agent/runs/$threadId/events', - headers: headers, - ); + final encodedRunId = Uri.encodeQueryComponent(runId); + final path = '/api/v1/agent/runs/$threadId/events?runId=$encodedRunId'; + return _apiClient.getSseLines(path, headers: headers); } @override diff --git a/apps/lib/features/chat/presentation/bloc/chat_bloc.dart b/apps/lib/features/chat/presentation/bloc/chat_bloc.dart index 6504fed..dd79dce 100644 --- a/apps/lib/features/chat/presentation/bloc/chat_bloc.dart +++ b/apps/lib/features/chat/presentation/bloc/chat_bloc.dart @@ -9,7 +9,14 @@ import 'package:social_app/core/chat/ag_ui_service.dart'; import 'package:social_app/core/chat/chat_list_item.dart'; import 'package:social_app/core/chat/chat_orchestrator.dart'; import 'package:social_app/core/chat/chat_history_repository.dart'; +import 'package:social_app/core/chat/chat_timeline_reconciler.dart'; import 'package:social_app/core/l10n/l10n.dart'; +import 'chat_bloc_recovery_utils.dart'; + +part 'chat_bloc_events.dart'; +part 'chat_bloc_send.dart'; +part 'chat_bloc_history.dart'; +part 'chat_bloc_attachments.dart'; class ChatState implements ChatOrchestratorState { @override @@ -34,6 +41,7 @@ class ChatState implements ChatOrchestratorState { final bool hasEarlierHistory; @override final AgentStage? currentStage; + final bool hasSeenStep; const ChatState({ this.items = const [], @@ -47,6 +55,7 @@ class ChatState implements ChatOrchestratorState { this.oldestLoadedDate, this.hasEarlierHistory = false, this.currentStage, + this.hasSeenStep = false, }); @override @@ -71,6 +80,7 @@ class ChatState implements ChatOrchestratorState { Object? oldestLoadedDate = _unset, bool? hasEarlierHistory, Object? currentStage = _unset, + bool? hasSeenStep, }) { return ChatState( items: items ?? this.items, @@ -90,6 +100,7 @@ class ChatState implements ChatOrchestratorState { currentStage: currentStage == _unset ? this.currentStage : currentStage as AgentStage?, + hasSeenStep: hasSeenStep ?? this.hasSeenStep, ); } } @@ -99,14 +110,22 @@ class ChatBloc extends Cubit implements ChatOrchestrator { AgUiService? service, required ChatApi chatApi, ChatHistoryRepository? historyRepository, + Duration recoveryPollInterval = const Duration(milliseconds: 700), + Duration recoveryTimeout = const Duration(seconds: 20), }) : _service = service ?? AgUiService(chatApi: chatApi, historyRepository: historyRepository), + _recoveryPollInterval = recoveryPollInterval, + _recoveryTimeout = recoveryTimeout, super(const ChatState()) { _service.onEvent = _handleEvent; } final AgUiService _service; + final Duration _recoveryPollInterval; + final Duration _recoveryTimeout; + String? _activeUserId; + int _sessionEpoch = 0; final Map _attachmentPreviewCache = {}; final Map> _attachmentPreviewInflight = >{}; @@ -121,501 +140,23 @@ class ChatBloc extends Cubit implements ChatOrchestrator { currentMessageId: currentMessageId, error: error, currentStage: null, + hasSeenStep: false, ); } - void _handleEvent(AgUiEvent event) { - switch (event.type) { - case AgUiEventType.runStarted: - emit( - state.copyWith( - isSending: false, - isWaitingFirstToken: true, - isCancelling: false, - error: null, - currentStage: null, - ), - ); - case AgUiEventType.runFinished: - emit( - _resetRunState().copyWith(items: _removeToolCallItems(state.items)), - ); - case AgUiEventType.runError: - final errorEvent = event as RunErrorEvent; - final isCanceledByUser = errorEvent.code == 'RUN_CANCELED'; - emit( - _resetRunState( - error: isCanceledByUser ? null : errorEvent.message, - ).copyWith( - items: _markActiveToolCallsFailed( - state.items, - reason: isCanceledByUser - ? L10n.current.chatRunCanceled - : L10n.current.chatRunFailed, - ), - ), - ); - case AgUiEventType.stepStarted: - _handleStepStarted(event as StepStartedEvent); - case AgUiEventType.stepFinished: - _handleStepFinished(event as StepFinishedEvent); - case AgUiEventType.textMessageEnd: - _handleTextMessageEnd(event as TextMessageEndEvent); - case AgUiEventType.toolCallStart: - _handleToolCallStart(event as ToolCallStartEvent); - case AgUiEventType.toolCallArgs: - _handleToolCallArgs(event as ToolCallArgsEvent); - case AgUiEventType.toolCallEnd: - _handleToolCallEnd(event as ToolCallEndEvent); - case AgUiEventType.toolCallResult: - _handleToolCallResult(event as ToolCallResultEvent); - case AgUiEventType.toolCallError: - _handleToolCallError(event as ToolCallErrorEvent); - case AgUiEventType.unknown: - break; - } - } - - void _handleStepStarted(StepStartedEvent event) { - emit(state.copyWith(currentStage: stageFromStepName(event.stepName))); - } - - void _handleStepFinished(StepFinishedEvent event) { - if (state.currentStage == stageFromStepName(event.stepName)) { - emit(state.copyWith(currentStage: null)); - } - } - - void _handleTextMessageEnd(TextMessageEndEvent event) { - final timestamp = DateTime.now(); - final items = _updateOrAddMessage( - state.items, - event.messageId, - event.answer, - timestamp, - ); - - final uiSchema = event.uiSchema; - if (uiSchema != null) { - _upsertUiSchema(items, event.messageId, uiSchema, timestamp); - } - - final withoutToolCalls = _removeToolCallItems(items); - - emit( - state.copyWith( - items: withoutToolCalls, - currentMessageId: null, - isWaitingFirstToken: false, - isStreaming: false, - ), - ); - } - - List _updateOrAddMessage( - List items, - String messageId, - String content, - DateTime timestamp, - ) { - final result = List.from(items); - final index = result.indexWhere( - (item) => item.id == messageId && item is TextMessageItem, - ); - - if (index >= 0) { - final existing = result[index] as TextMessageItem; - result[index] = existing.copyWith(content: content, isStreaming: false); - } else { - result.add( - TextMessageItem( - id: messageId, - content: content, - timestamp: timestamp, - sender: MessageSender.ai, - isStreaming: false, - ), - ); - } - return result; - } - - void _upsertUiSchema( - List items, - String messageId, - Map uiSchema, - DateTime timestamp, - ) { - final uiItemId = '$messageId-ui'; - final existingIndex = items.indexWhere((item) => item.id == uiItemId); - final uiItem = ToolResultItem( - id: uiItemId, - callId: messageId, - uiSchema: uiSchema, - timestamp: timestamp, - sender: MessageSender.ai, - ); - if (existingIndex >= 0) { - items[existingIndex] = uiItem; - } else { - items.add(uiItem); - } - } - - void _handleToolCallStart(ToolCallStartEvent event) { - final items = List.from(state.items) - ..add( - ToolCallItem( - id: event.toolCallId, - callId: event.toolCallId, - toolName: event.toolCallName, - args: const {}, - status: ToolCallStatus.pending, - timestamp: DateTime.now(), - sender: MessageSender.ai, - ), - ); - emit(state.copyWith(items: items)); - } - - void _handleToolCallArgs(ToolCallArgsEvent event) { - final items = state.items.map((item) { - if (item is ToolCallItem && item.id == event.toolCallId) { - return item.copyWith(args: event.args); - } - return item; - }).toList(); - emit(state.copyWith(items: items)); - } - - void _handleToolCallEnd(ToolCallEndEvent event) { - final items = state.items.map((item) { - if (item is ToolCallItem && item.id == event.toolCallId) { - return item.copyWith(status: ToolCallStatus.executing); - } - return item; - }).toList(); - emit(state.copyWith(items: items)); - } - - void _handleToolCallResult(ToolCallResultEvent event) { - final items = state.items.map((item) { - if (item is ToolCallItem && item.id == event.toolCallId) { - return item.copyWith(status: ToolCallStatus.completed); - } - return item; - }).toList(); - - emit(state.copyWith(items: items)); - } - - List _removeToolCallItems(List items) { - return items.where((item) => item is! ToolCallItem).toList(); - } - - List _markActiveToolCallsFailed( - List items, { - required String reason, - }) { - return items.map((item) { - if (item is! ToolCallItem) { - return item; - } - if (item.status == ToolCallStatus.error) { - return item; - } - if (item.status == ToolCallStatus.completed) { - return item; - } - return item.copyWith(status: ToolCallStatus.error, errorMessage: reason); - }).toList(); - } - - void _handleToolCallError(ToolCallErrorEvent event) { - final items = state.items.map((item) { - if (item is ToolCallItem && item.id == event.toolCallId) { - return item.copyWith( - status: ToolCallStatus.error, - errorMessage: event.error, - ); - } - return item; - }).toList(); - emit(state.copyWith(items: items)); - } - - List _convertHistoryMessages(List messages) { - final converted = []; - for (final msg in messages) { - final normalizedRole = msg.role.toLowerCase(); - final isUser = normalizedRole == 'user'; - final isTool = normalizedRole == 'tool' || normalizedRole == 'tools'; - final sender = isUser ? MessageSender.user : MessageSender.ai; - final attachments = msg.attachments - .map( - (attachment) => { - 'url': attachment.url, - 'mimeType': attachment.mimeType, - }, - ) - .toList(); - - if (!isTool && (msg.content.isNotEmpty || isUser)) { - converted.add( - TextMessageItem( - id: msg.id, - content: msg.content, - timestamp: msg.timestamp, - sender: sender, - attachments: attachments, - ), - ); - } - - if (!isTool && msg.uiSchema != null) { - converted.add( - ToolResultItem( - id: '${msg.id}-ui', - callId: msg.id, - uiSchema: msg.uiSchema!, - timestamp: msg.timestamp, - sender: MessageSender.ai, - ), - ); - } - } - return converted; - } - - DateTime? _extractDateFromItems(List items) { - if (items.isEmpty) return null; - return items - .map( - (item) => DateTime( - item.timestamp.year, - item.timestamp.month, - item.timestamp.day, - ), - ) - .reduce((a, b) => a.isBefore(b) ? a : b); - } - @override - Future sendMessage(String content, {List? images}) async { - final messageId = 'user-${DateTime.now().millisecondsSinceEpoch}'; - final attachments = (images ?? const []) - .map( - (image) => { - 'path': image.path, - 'mimeType': image.mimeType ?? 'image/jpeg', - 'uploading': true, - }, - ) - .toList(); - final userMessage = TextMessageItem( - id: messageId, - content: content, - timestamp: DateTime.now(), - sender: MessageSender.user, - attachments: attachments, - ); - emit( - state.copyWith( - items: [...state.items, userMessage], - isSending: true, - isWaitingFirstToken: true, - isStreaming: false, - isCancelling: false, - error: null, - ), - ); - try { - final uploadInputs = await Future.wait( - (images ?? const []).map( - (image) async => AttachmentUploadInput( - name: image.name, - mimeType: image.mimeType ?? 'image/jpeg', - bytes: await image.readAsBytes(), - localPath: image.path, - ), - ), - ); - final sendResult = await _service.sendMessage( - content, - attachments: uploadInputs, - ); - _syncUploadedAttachments( - messageId: messageId, - uploadedAttachments: sendResult.uploadedAttachments, - ); - } catch (error) { - final sseClosedBeforeTerminal = _isSseClosedBeforeTerminalError(error); - var recoveredFromHistory = false; - if (sseClosedBeforeTerminal) { - recoveredFromHistory = await _recoverFromAbnormalSseClose(); - } - _markAttachmentUploadDone(messageId); - emit( - state.copyWith( - isSending: false, - isWaitingFirstToken: false, - isStreaming: false, - isCancelling: false, - currentStage: null, - error: sseClosedBeforeTerminal - ? (recoveredFromHistory - ? null - : L10n.current.chatSseInterruptedRetry) - : error.toString(), - ), - ); - } - } - - bool _isSseClosedBeforeTerminalError(Object error) { - final text = error.toString().toLowerCase(); - return text.contains('sse closed before terminal event'); - } - - Future _recoverFromAbnormalSseClose() async { - try { - final snapshot = await _service.loadHistory(); - final historyItems = _convertHistoryMessages(snapshot.messages); - final mergedById = { - for (final item in historyItems) item.id: item, - }; - for (final item in state.items) { - mergedById[item.id] = item; - } - final merged = mergedById.values.toList() - ..sort((a, b) => a.timestamp.compareTo(b.timestamp)); - emit( - state.copyWith( - items: merged, - oldestLoadedDate: _extractDateFromItems(merged), - hasEarlierHistory: snapshot.hasMore, - ), - ); - return true; - } catch (_) { - return false; - } - } - - void _syncUploadedAttachments({ - required String messageId, - required List uploadedAttachments, - }) { - if (uploadedAttachments.isEmpty) { - _markAttachmentUploadDone(messageId); - return; - } - final items = state.items.map((item) { - if (item is! TextMessageItem || item.id != messageId) { - return item; - } - final synced = item.attachments.map((attachment) { - final localPath = attachment['path']; - if (localPath is! String || localPath.isEmpty) { - return {...attachment, 'uploading': false}; - } - UploadedAttachment? matched; - for (final candidate in uploadedAttachments) { - if (candidate.localPath == localPath) { - matched = candidate; - break; - } - } - if (matched == null) { - return {...attachment, 'uploading': false}; - } - return { - ...attachment, - 'url': matched.url, - 'mimeType': matched.mimeType, - 'uploading': false, - }; - }).toList(); - return item.copyWith(attachments: synced); - }).toList(); - emit(state.copyWith(items: items)); - } - - void _markAttachmentUploadDone(String messageId) { - final items = state.items.map((item) { - if (item is! TextMessageItem || item.id != messageId) { - return item; - } - final done = item.attachments - .map( - (attachment) => { - ...attachment, - 'uploading': false, - }, - ) - .toList(); - return item.copyWith(attachments: done); - }).toList(); - emit(state.copyWith(items: items)); + Future sendMessage(String content, {List? images}) { + return _sendMessage(content, images: images); } @override - Future loadHistory() async { - if (state.isLoadingHistory) return; - emit(state.copyWith(isLoadingHistory: true)); - try { - final snapshot = await _service.loadHistory(); - final newItems = _convertHistoryMessages(snapshot.messages); - final mergedById = { - for (final item in newItems) item.id: item, - }; - for (final item in state.items) { - mergedById[item.id] = item; - } - final merged = mergedById.values.toList() - ..sort((a, b) => a.timestamp.compareTo(b.timestamp)); - final oldestDate = _extractDateFromItems(merged); - emit( - state.copyWith( - items: merged, - oldestLoadedDate: oldestDate, - hasEarlierHistory: snapshot.hasMore, - ), - ); - } finally { - emit(state.copyWith(isLoadingHistory: false)); - } + Future loadHistory() { + return _loadHistory(); } @override - Future loadMoreHistory() async { - if (state.isLoadingHistory || !state.hasEarlierHistory) return; - if (state.oldestLoadedDate == null) return; - emit(state.copyWith(isLoadingHistory: true)); - try { - final snapshot = await _service.loadHistory( - beforeDate: state.oldestLoadedDate, - ); - final newItems = _convertHistoryMessages(snapshot.messages); - final mergedById = { - for (final item in state.items) item.id: item, - }; - for (final item in newItems) { - mergedById[item.id] = item; - } - final merged = mergedById.values.toList() - ..sort((a, b) => a.timestamp.compareTo(b.timestamp)); - final oldestDate = _extractDateFromItems(merged); - emit( - state.copyWith( - items: merged, - oldestLoadedDate: oldestDate, - hasEarlierHistory: snapshot.hasMore, - ), - ); - } finally { - emit(state.copyWith(isLoadingHistory: false)); - } + Future loadMoreHistory() { + return _loadMoreHistory(); } @override @@ -649,32 +190,29 @@ class ChatBloc extends Cubit implements ChatOrchestrator { } } - Future loadAttachmentPreview(String previewPath) async { - final cached = _attachmentPreviewCache[previewPath]; - if (cached != null) { - return cached; - } - final pending = _attachmentPreviewInflight[previewPath]; - if (pending != null) { - return pending; - } - final future = (() async { - try { - final bytes = await _service.fetchAttachmentPreview(previewPath); - _attachmentPreviewCache[previewPath] = bytes; - return bytes; - } catch (_) { - return null; - } finally { - _attachmentPreviewInflight.remove(previewPath); - } - })(); - _attachmentPreviewInflight[previewPath] = future; - return future; - } - @override void clearError() { emit(state.copyWith(error: null)); } + + Future loadAttachmentPreview(String previewPath) { + return _loadAttachmentPreview(previewPath); + } + + Future switchUser(String? userId) async { + final normalizedUserId = userId?.trim(); + if (_activeUserId == normalizedUserId) { + return; + } + + final epoch = ++_sessionEpoch; + _activeUserId = normalizedUserId; + await _service.setUserContext(normalizedUserId); + if (epoch != _sessionEpoch) { + return; + } + _attachmentPreviewCache.clear(); + _attachmentPreviewInflight.clear(); + emit(const ChatState()); + } } diff --git a/apps/lib/features/chat/presentation/bloc/chat_bloc_attachments.dart b/apps/lib/features/chat/presentation/bloc/chat_bloc_attachments.dart new file mode 100644 index 0000000..59fee76 --- /dev/null +++ b/apps/lib/features/chat/presentation/bloc/chat_bloc_attachments.dart @@ -0,0 +1,88 @@ +// ignore_for_file: invalid_use_of_protected_member, invalid_use_of_visible_for_testing_member + +part of 'chat_bloc.dart'; + +extension _ChatBlocAttachments on ChatBloc { + void _syncUploadedAttachments({ + required String messageId, + required List uploadedAttachments, + }) { + if (uploadedAttachments.isEmpty) { + _markAttachmentUploadDone(messageId); + return; + } + final items = state.items.map((item) { + if (item is! TextMessageItem || item.id != messageId) { + return item; + } + final synced = item.attachments.map((attachment) { + final localPath = attachment['path']; + if (localPath is! String || localPath.isEmpty) { + return {...attachment, 'uploading': false}; + } + UploadedAttachment? matched; + for (final candidate in uploadedAttachments) { + if (candidate.localPath == localPath) { + matched = candidate; + break; + } + } + if (matched == null) { + return {...attachment, 'uploading': false}; + } + return { + ...attachment, + 'url': matched.url, + 'mimeType': matched.mimeType, + 'uploading': false, + }; + }).toList(); + return item.copyWith(attachments: synced); + }).toList(); + emit(state.copyWith(items: items)); + } + + void _markAttachmentUploadDone(String messageId) { + final items = state.items.map((item) { + if (item is! TextMessageItem || item.id != messageId) { + return item; + } + final done = item.attachments + .map( + (attachment) => { + ...attachment, + 'uploading': false, + }, + ) + .toList(); + return item.copyWith(attachments: done); + }).toList(); + emit(state.copyWith(items: items)); + } + + Future _loadAttachmentPreview(String previewPath) async { + final cached = _attachmentPreviewCache[previewPath]; + if (cached != null) { + return cached; + } + final pending = _attachmentPreviewInflight[previewPath]; + if (pending != null) { + return pending; + } + + final future = (() async { + try { + final bytes = await _service.fetchAttachmentPreview(previewPath); + _attachmentPreviewCache[previewPath] = bytes; + return bytes; + } catch (_) { + return null; + } finally { + _attachmentPreviewInflight.remove(previewPath); + } + })(); + + _attachmentPreviewInflight[previewPath] = future; + return future; + } +} diff --git a/apps/lib/features/chat/presentation/bloc/chat_bloc_events.dart b/apps/lib/features/chat/presentation/bloc/chat_bloc_events.dart new file mode 100644 index 0000000..bb6c8da --- /dev/null +++ b/apps/lib/features/chat/presentation/bloc/chat_bloc_events.dart @@ -0,0 +1,305 @@ +// ignore_for_file: invalid_use_of_protected_member, invalid_use_of_visible_for_testing_member + +part of 'chat_bloc.dart'; + +extension _ChatBlocEvents on ChatBloc { + void _handleEvent(AgUiEvent event) { + switch (event.type) { + case AgUiEventType.runStarted: + emit( + state.copyWith( + isSending: false, + isWaitingFirstToken: true, + isCancelling: false, + error: null, + currentStage: null, + hasSeenStep: false, + ), + ); + case AgUiEventType.runFinished: + emit( + _resetRunState().copyWith(items: _removeToolCallItems(state.items)), + ); + case AgUiEventType.runError: + final errorEvent = event as RunErrorEvent; + final isCanceledByUser = errorEvent.code == 'RUN_CANCELED'; + emit( + _resetRunState( + error: isCanceledByUser ? null : errorEvent.message, + ).copyWith( + items: _markActiveToolCallsFailed( + state.items, + reason: isCanceledByUser + ? L10n.current.chatRunCanceled + : L10n.current.chatRunFailed, + ), + ), + ); + case AgUiEventType.stepStarted: + _handleStepStarted(event as StepStartedEvent); + case AgUiEventType.stepFinished: + _handleStepFinished(event as StepFinishedEvent); + case AgUiEventType.textMessageEnd: + _handleTextMessageEnd(event as TextMessageEndEvent); + case AgUiEventType.toolCallStart: + _handleToolCallStart(event as ToolCallStartEvent); + case AgUiEventType.toolCallArgs: + _handleToolCallArgs(event as ToolCallArgsEvent); + case AgUiEventType.toolCallEnd: + _handleToolCallEnd(event as ToolCallEndEvent); + case AgUiEventType.toolCallResult: + _handleToolCallResult(event as ToolCallResultEvent); + case AgUiEventType.toolCallError: + _handleToolCallError(event as ToolCallErrorEvent); + case AgUiEventType.unknown: + break; + } + } + + void _handleStepStarted(StepStartedEvent event) { + emit( + state.copyWith( + currentStage: stageFromStepName(event.stepName), + hasSeenStep: true, + ), + ); + } + + void _handleStepFinished(StepFinishedEvent event) { + if (state.currentStage == stageFromStepName(event.stepName)) { + emit(state.copyWith(currentStage: null)); + } + } + + void _handleTextMessageEnd(TextMessageEndEvent event) { + final timestamp = DateTime.now(); + final items = _updateOrAddMessage( + state.items, + event.messageId, + event.answer, + timestamp, + ); + + final uiSchema = event.uiSchema; + if (uiSchema != null) { + _upsertUiSchema(items, event.messageId, uiSchema, timestamp); + } + + emit( + state.copyWith( + items: _removeToolCallItems(items), + currentMessageId: null, + isWaitingFirstToken: false, + isStreaming: false, + ), + ); + } + + List _updateOrAddMessage( + List items, + String messageId, + String content, + DateTime timestamp, + ) { + final result = List.from(items); + final index = result.indexWhere( + (item) => item.id == messageId && item is TextMessageItem, + ); + + if (index >= 0) { + final existing = result[index] as TextMessageItem; + result[index] = existing.copyWith(content: content, isStreaming: false); + return result; + } + + result.add( + TextMessageItem( + id: messageId, + content: content, + timestamp: timestamp, + sender: MessageSender.ai, + isStreaming: false, + ), + ); + return result; + } + + void _upsertUiSchema( + List items, + String messageId, + Map uiSchema, + DateTime timestamp, + ) { + final uiItemId = '$messageId-ui'; + final uiItem = ToolResultItem( + id: uiItemId, + callId: messageId, + uiSchema: uiSchema, + timestamp: timestamp, + sender: MessageSender.ai, + ); + final existingIndex = items.indexWhere((item) => item.id == uiItemId); + if (existingIndex >= 0) { + items[existingIndex] = uiItem; + return; + } + items.add(uiItem); + } + + void _handleToolCallStart(ToolCallStartEvent event) { + final exists = state.items.any( + (item) => item is ToolCallItem && item.id == event.toolCallId, + ); + if (exists) { + return; + } + emit( + state.copyWith( + items: [ + ...state.items, + ToolCallItem( + id: event.toolCallId, + callId: event.toolCallId, + toolName: event.toolCallName, + args: const {}, + status: ToolCallStatus.pending, + timestamp: DateTime.now(), + sender: MessageSender.ai, + ), + ], + ), + ); + } + + void _handleToolCallArgs(ToolCallArgsEvent event) { + emit( + state.copyWith( + items: state.items.map((item) { + if (item is ToolCallItem && item.id == event.toolCallId) { + return item.copyWith(args: event.args); + } + return item; + }).toList(), + ), + ); + } + + void _handleToolCallEnd(ToolCallEndEvent event) { + emit( + state.copyWith( + items: state.items.map((item) { + if (item is ToolCallItem && item.id == event.toolCallId) { + return item.copyWith(status: ToolCallStatus.executing); + } + return item; + }).toList(), + ), + ); + } + + void _handleToolCallResult(ToolCallResultEvent event) { + emit( + state.copyWith( + items: state.items.map((item) { + if (item is ToolCallItem && item.id == event.toolCallId) { + return item.copyWith(status: ToolCallStatus.completed); + } + return item; + }).toList(), + ), + ); + } + + void _handleToolCallError(ToolCallErrorEvent event) { + emit( + state.copyWith( + items: state.items.map((item) { + if (item is ToolCallItem && item.id == event.toolCallId) { + return item.copyWith( + status: ToolCallStatus.error, + errorMessage: event.error, + ); + } + return item; + }).toList(), + ), + ); + } + + List _removeToolCallItems(List items) { + return items.where((item) => item is! ToolCallItem).toList(); + } + + List _markActiveToolCallsFailed( + List items, { + required String reason, + }) { + return items.map((item) { + if (item is! ToolCallItem || + item.status == ToolCallStatus.error || + item.status == ToolCallStatus.completed) { + return item; + } + return item.copyWith(status: ToolCallStatus.error, errorMessage: reason); + }).toList(); + } + + List _convertHistoryMessages(List messages) { + final converted = []; + for (final msg in messages) { + final normalizedRole = msg.role.toLowerCase(); + final isUser = normalizedRole == 'user'; + final isTool = normalizedRole == 'tool' || normalizedRole == 'tools'; + final sender = isUser ? MessageSender.user : MessageSender.ai; + final attachments = msg.attachments + .map( + (attachment) => { + 'url': attachment.url, + 'mimeType': attachment.mimeType, + }, + ) + .toList(); + + if (!isTool && (msg.content.isNotEmpty || isUser)) { + converted.add( + TextMessageItem( + id: msg.id, + content: msg.content, + timestamp: msg.timestamp, + sender: sender, + isLocalEcho: false, + attachments: attachments, + ), + ); + } + + if (!isTool && msg.uiSchema != null) { + converted.add( + ToolResultItem( + id: '${msg.id}-ui', + callId: msg.id, + uiSchema: msg.uiSchema!, + timestamp: msg.timestamp, + sender: MessageSender.ai, + ), + ); + } + } + return converted; + } + + DateTime? _extractDateFromItems(List items) { + if (items.isEmpty) { + return null; + } + return items + .map( + (item) => DateTime( + item.timestamp.year, + item.timestamp.month, + item.timestamp.day, + ), + ) + .reduce((a, b) => a.isBefore(b) ? a : b); + } +} diff --git a/apps/lib/features/chat/presentation/bloc/chat_bloc_history.dart b/apps/lib/features/chat/presentation/bloc/chat_bloc_history.dart new file mode 100644 index 0000000..2d500f4 --- /dev/null +++ b/apps/lib/features/chat/presentation/bloc/chat_bloc_history.dart @@ -0,0 +1,62 @@ +// ignore_for_file: invalid_use_of_protected_member, invalid_use_of_visible_for_testing_member + +part of 'chat_bloc.dart'; + +extension _ChatBlocHistory on ChatBloc { + Future _loadHistory() async { + if (state.isLoadingHistory) { + return; + } + final epoch = _sessionEpoch; + emit(state.copyWith(isLoadingHistory: true)); + try { + final snapshot = await _service.loadHistory(); + if (epoch != _sessionEpoch) { + return; + } + final merged = _mergeWithHistory(state.items, snapshot.messages); + emit( + state.copyWith( + items: merged, + oldestLoadedDate: _extractDateFromItems(merged), + hasEarlierHistory: snapshot.hasMore, + ), + ); + } finally { + if (epoch == _sessionEpoch) { + emit(state.copyWith(isLoadingHistory: false)); + } + } + } + + Future _loadMoreHistory() async { + if (state.isLoadingHistory || !state.hasEarlierHistory) { + return; + } + if (state.oldestLoadedDate == null) { + return; + } + final epoch = _sessionEpoch; + emit(state.copyWith(isLoadingHistory: true)); + try { + final snapshot = await _service.loadHistory( + beforeDate: state.oldestLoadedDate, + ); + if (epoch != _sessionEpoch) { + return; + } + final merged = _mergeWithHistory(state.items, snapshot.messages); + emit( + state.copyWith( + items: merged, + oldestLoadedDate: _extractDateFromItems(merged), + hasEarlierHistory: snapshot.hasMore, + ), + ); + } finally { + if (epoch == _sessionEpoch) { + emit(state.copyWith(isLoadingHistory: false)); + } + } + } +} diff --git a/apps/lib/features/chat/presentation/bloc/chat_bloc_recovery_utils.dart b/apps/lib/features/chat/presentation/bloc/chat_bloc_recovery_utils.dart new file mode 100644 index 0000000..6a91d20 --- /dev/null +++ b/apps/lib/features/chat/presentation/bloc/chat_bloc_recovery_utils.dart @@ -0,0 +1,76 @@ +import 'package:social_app/core/chat/chat_list_item.dart'; +import 'package:social_app/core/chat/chat_timeline_reconciler.dart'; + +bool chatBlocIsSseClosedBeforeTerminalError(Object error) { + final text = error.toString().toLowerCase(); + return text.contains('sse closed before terminal event'); +} + +DateTime? chatBlocLatestAssistantTimestamp(List items) { + DateTime? latest; + for (final item in items) { + if (item is! TextMessageItem) { + continue; + } + if (item.sender != MessageSender.ai || item.content.trim().isEmpty) { + continue; + } + if (latest == null || item.timestamp.isAfter(latest)) { + latest = item.timestamp; + } + } + return latest; +} + +DateTime? chatBlocFindPersistedUserTimestamp( + List items, + TextMessageItem localEcho, + DateTime sendStartedAt, +) { + DateTime? matched; + final earliestAccepted = sendStartedAt.subtract(const Duration(seconds: 5)); + final latestAccepted = sendStartedAt.add(const Duration(minutes: 2)); + for (final item in items) { + if (item is! TextMessageItem) { + continue; + } + if (item.sender != MessageSender.user || item.isLocalEcho) { + continue; + } + if (!ChatTimelineReconciler.isLikelySameUserMessage(localEcho, item)) { + continue; + } + if (item.timestamp.isBefore(earliestAccepted)) { + continue; + } + if (item.timestamp.isAfter(latestAccepted)) { + continue; + } + if (matched == null || item.timestamp.isBefore(matched)) { + matched = item.timestamp; + } + } + return matched; +} + +bool chatBlocHasAssistantAfterBaseline( + List items, + DateTime? baseline, { + required DateTime notBefore, +}) { + for (final item in items) { + if (item is! TextMessageItem) { + continue; + } + if (item.sender != MessageSender.ai || item.content.trim().isEmpty) { + continue; + } + if (!item.timestamp.isAfter(notBefore)) { + continue; + } + if (baseline == null || item.timestamp.isAfter(baseline)) { + return true; + } + } + return false; +} diff --git a/apps/lib/features/chat/presentation/bloc/chat_bloc_send.dart b/apps/lib/features/chat/presentation/bloc/chat_bloc_send.dart new file mode 100644 index 0000000..1e632d0 --- /dev/null +++ b/apps/lib/features/chat/presentation/bloc/chat_bloc_send.dart @@ -0,0 +1,163 @@ +// ignore_for_file: invalid_use_of_protected_member, invalid_use_of_visible_for_testing_member + +part of 'chat_bloc.dart'; + +extension _ChatBlocSend on ChatBloc { + Future _sendMessage(String content, {List? images}) async { + final epoch = _sessionEpoch; + final assistantBaselineAtSend = chatBlocLatestAssistantTimestamp( + state.items, + ); + final sendStartedAt = DateTime.now(); + final messageId = 'user-${sendStartedAt.millisecondsSinceEpoch}'; + final localEchoAttachments = (images ?? const []) + .map( + (image) => { + 'path': image.path, + 'mimeType': image.mimeType ?? 'image/jpeg', + 'uploading': true, + }, + ) + .toList(); + final localEcho = TextMessageItem( + id: messageId, + content: content, + timestamp: sendStartedAt, + sender: MessageSender.user, + isLocalEcho: true, + attachments: localEchoAttachments, + ); + + emit( + state.copyWith( + items: [...state.items, localEcho], + isSending: true, + isWaitingFirstToken: true, + isStreaming: false, + isCancelling: false, + error: null, + ), + ); + + try { + final uploadInputs = await Future.wait( + (images ?? const []).map( + (image) async => AttachmentUploadInput( + name: image.name, + mimeType: image.mimeType ?? 'image/jpeg', + bytes: await image.readAsBytes(), + localPath: image.path, + ), + ), + ); + final sendResult = await _service.sendMessage( + content, + attachments: uploadInputs, + ); + if (epoch != _sessionEpoch) { + return; + } + _syncUploadedAttachments( + messageId: messageId, + uploadedAttachments: sendResult.uploadedAttachments, + ); + } catch (error) { + if (epoch != _sessionEpoch) { + return; + } + final sseClosedBeforeTerminal = chatBlocIsSseClosedBeforeTerminalError( + error, + ); + var recoveredFromHistory = false; + if (sseClosedBeforeTerminal) { + recoveredFromHistory = await _recoverFromAbnormalSseClose( + epoch: epoch, + localEchoMessage: localEcho, + sendStartedAt: sendStartedAt, + assistantBaselineAtSend: assistantBaselineAtSend, + ); + } + if (epoch != _sessionEpoch) { + return; + } + _markAttachmentUploadDone(messageId); + emit( + state.copyWith( + isSending: false, + isWaitingFirstToken: false, + isStreaming: false, + isCancelling: false, + currentStage: null, + error: sseClosedBeforeTerminal + ? (recoveredFromHistory + ? null + : L10n.current.chatSseInterruptedRetry) + : error.toString(), + ), + ); + } + } + + Future _recoverFromAbnormalSseClose({ + required int epoch, + required TextMessageItem localEchoMessage, + required DateTime sendStartedAt, + required DateTime? assistantBaselineAtSend, + }) async { + try { + final deadline = DateTime.now().add(_recoveryTimeout); + + while (DateTime.now().isBefore(deadline)) { + final snapshot = await _service.loadHistory(forceRefresh: true); + if (epoch != _sessionEpoch) { + return false; + } + final merged = _mergeWithHistory(state.items, snapshot.messages); + emit( + state.copyWith( + items: merged, + oldestLoadedDate: _extractDateFromItems(merged), + hasEarlierHistory: snapshot.hasMore, + ), + ); + + final persistedUserTimestamp = chatBlocFindPersistedUserTimestamp( + merged, + localEchoMessage, + sendStartedAt, + ); + final assistantCaughtUp = chatBlocHasAssistantAfterBaseline( + merged, + assistantBaselineAtSend, + notBefore: persistedUserTimestamp ?? sendStartedAt, + ); + if (persistedUserTimestamp != null && assistantCaughtUp) { + return true; + } + + final remaining = deadline.difference(DateTime.now()); + if (remaining <= Duration.zero) { + break; + } + await Future.delayed( + remaining < _recoveryPollInterval ? remaining : _recoveryPollInterval, + ); + } + + return false; + } catch (_) { + return false; + } + } + + List _mergeWithHistory( + List localItems, + List historyMessages, + ) { + final historyItems = _convertHistoryMessages(historyMessages); + return ChatTimelineReconciler.merge( + localItems: localItems, + remoteItems: historyItems, + ); + } +} diff --git a/apps/lib/features/home/presentation/widgets/home_chat_item_renderer.dart b/apps/lib/features/home/presentation/widgets/home_chat_item_renderer.dart index 7468d69..c3c5a68 100644 --- a/apps/lib/features/home/presentation/widgets/home_chat_item_renderer.dart +++ b/apps/lib/features/home/presentation/widgets/home_chat_item_renderer.dart @@ -10,14 +10,15 @@ import '../../../../core/utils/tool_name_localizer.dart'; import '../../../../shared/widgets/app_loading_indicator.dart'; import '../../../../shared/widgets/ui_schema/ui_schema_renderer.dart'; -const _messagePaddingH = 13.0; -const _messagePaddingV = 9.0; -const _cornerRadius = 12.0; -const _attachmentPreviewSize = 88.0; -const _attachmentPreviewRadius = 10.0; -const _attachmentPreviewGap = 8.0; -const _toolResultWidthFactor = 0.9; -const _iconSize = 24.0; +const _messageMaxWidthFactor = 0.82; +const _messagePaddingH = AppSpacing.md; +const _messagePaddingV = AppSpacing.sm; +const _cornerRadius = AppRadius.lg; +const _attachmentPreviewSize = AppSpacing.xxl * 3 + AppSpacing.xs; +const _attachmentPreviewRadius = AppRadius.md; +const _attachmentPreviewGap = AppSpacing.sm; +const _toolResultWidthFactor = 0.88; +const _iconSize = AppSpacing.xxl; class HomeChatItemRenderer { static Widget build(BuildContext context, ChatListItem item) { @@ -34,6 +35,8 @@ class HomeChatItemRenderer { static Widget _buildMessageItem(BuildContext context, TextMessageItem item) { final colorScheme = Theme.of(context).colorScheme; final isUser = item.sender == MessageSender.user; + final maxMessageWidth = + MediaQuery.sizeOf(context).width * _messageMaxWidthFactor; final imageAttachments = _collectRenderableImageAttachments( item.attachments, ); @@ -49,7 +52,8 @@ class HomeChatItemRenderer { : MainAxisAlignment.start, crossAxisAlignment: CrossAxisAlignment.start, children: [ - Flexible( + ConstrainedBox( + constraints: BoxConstraints(maxWidth: maxMessageWidth), child: Container( padding: const EdgeInsets.symmetric( horizontal: _messagePaddingH, @@ -58,21 +62,24 @@ class HomeChatItemRenderer { decoration: BoxDecoration( color: isUser ? colorScheme.primaryContainer - : colorScheme.surface, + : colorScheme.surfaceContainerLow, borderRadius: BorderRadius.only( topLeft: const Radius.circular(_cornerRadius), topRight: const Radius.circular(_cornerRadius), bottomLeft: Radius.circular(isUser ? _cornerRadius : 0), bottomRight: Radius.circular(isUser ? 0 : _cornerRadius), ), - border: isUser - ? null - : Border.all(color: colorScheme.outlineVariant), + border: Border.all( + color: isUser + ? colorScheme.primary.withValues(alpha: 0.12) + : colorScheme.outlineVariant, + ), ), child: Text( item.content, style: TextStyle( - fontSize: 14, + fontSize: AppSpacing.md, + height: 1.45, color: isUser ? colorScheme.onPrimaryContainer : colorScheme.onSurface, @@ -302,6 +309,7 @@ class HomeChatItemRenderer { : null; final needsOuterCard = appearance == null || appearance == 'plain'; final schemaContent = UiSchemaRenderer( + context, colorScheme, ).renderSchema(item.uiSchema); final wrappedContent = needsOuterCard @@ -309,9 +317,11 @@ class HomeChatItemRenderer { width: double.infinity, padding: const EdgeInsets.all(AppSpacing.md), decoration: BoxDecoration( - color: colorScheme.surface, + color: colorScheme.surfaceContainerLow.withValues(alpha: 0.65), borderRadius: BorderRadius.circular(AppRadius.lg), - border: Border.all(color: colorScheme.outlineVariant), + border: Border.all( + color: colorScheme.outlineVariant.withValues(alpha: 0.25), + ), ), child: schemaContent, ) diff --git a/apps/lib/shared/widgets/ui_schema/ui_schema_renderer.dart b/apps/lib/shared/widgets/ui_schema/ui_schema_renderer.dart index afde8dc..264cc5b 100644 --- a/apps/lib/shared/widgets/ui_schema/ui_schema_renderer.dart +++ b/apps/lib/shared/widgets/ui_schema/ui_schema_renderer.dart @@ -1,12 +1,26 @@ import 'package:flutter/material.dart'; +import 'package:flutter/services.dart'; +import 'package:go_router/go_router.dart'; +import 'package:url_launcher/url_launcher.dart'; import 'package:social_app/core/l10n/l10n.dart'; +import 'package:social_app/core/ui_schema/navigation/ui_schema_navigation.dart'; import 'package:social_app/core/theme/design_tokens.dart'; +import 'package:social_app/shared/widgets/toast/toast.dart'; +import 'package:social_app/shared/widgets/toast/toast_type.dart'; + +const _titleFontSize = AppSpacing.lg + 1; +const _bodyFontSize = AppSpacing.md; +const _captionFontSize = AppSpacing.sm + AppSpacing.xs / 2; +const _codeFontSize = AppSpacing.sm + AppSpacing.xs; +const _buttonFontSize = AppSpacing.sm + AppSpacing.xs; +const _statusLabelTokenPrefix = 'ui.status.'; class UiSchemaRenderer { + final BuildContext context; final ColorScheme colorScheme; - UiSchemaRenderer(this.colorScheme); + UiSchemaRenderer(this.context, this.colorScheme); Widget renderSchema(Map? schema) { if (schema == null || schema.isEmpty) { @@ -96,30 +110,30 @@ class UiSchemaRenderer { final status = _asString(node['status']); final style = switch (role) { 'title' => TextStyle( - fontSize: 18, - fontWeight: FontWeight.w700, + fontSize: _titleFontSize, + fontWeight: FontWeight.w600, color: colorScheme.onSurface, - height: 1.2, + height: 1.25, ), 'subtitle' => TextStyle( - fontSize: 14, + fontSize: AppSpacing.md, fontWeight: FontWeight.w600, color: colorScheme.onSurface, ), 'caption' => TextStyle( - fontSize: 11, + fontSize: _captionFontSize, color: colorScheme.onSurfaceVariant, height: 1.4, ), 'code' => TextStyle( - fontSize: 12, + fontSize: _codeFontSize, color: colorScheme.onSurfaceVariant, fontFamily: 'monospace', ), _ => TextStyle( - fontSize: 13, + fontSize: _bodyFontSize, color: colorScheme.onSurfaceVariant, - height: 1.35, + height: 1.4, ), }; return Text( @@ -140,23 +154,35 @@ class UiSchemaRenderer { Widget _renderBadge(Map node) { final status = _asString(node['status']); - final fg = - _statusTextColor(status, colorScheme.onSurfaceVariant) ?? - colorScheme.onSurfaceVariant; - final bg = _statusBackground(status); - return Container( - padding: const EdgeInsets.symmetric( - horizontal: AppSpacing.md, - vertical: AppSpacing.xs, - ), - decoration: BoxDecoration( - color: bg, - borderRadius: BorderRadius.circular(AppRadius.full), - border: Border.all(color: _statusBorder(status)), - ), - child: Text( - _asString(node['label']), - style: TextStyle(fontSize: 12, fontWeight: FontWeight.w700, color: fg), + final resolvedLabel = _resolveStatusLabel( + rawLabel: _asString(node['label']), + status: status, + ); + final statusDotColor = _statusBorder(status); + return Padding( + padding: const EdgeInsets.only(left: AppSpacing.xs / 2), + child: Row( + mainAxisSize: MainAxisSize.min, + crossAxisAlignment: CrossAxisAlignment.center, + children: [ + Container( + width: AppSpacing.xs, + height: AppSpacing.xs, + decoration: BoxDecoration( + color: statusDotColor, + shape: BoxShape.circle, + ), + ), + const SizedBox(width: AppSpacing.xs), + Text( + resolvedLabel, + style: TextStyle( + fontSize: _captionFontSize, + fontWeight: FontWeight.w600, + color: colorScheme.onSurfaceVariant, + ), + ), + ], ), ); } @@ -165,39 +191,269 @@ class UiSchemaRenderer { final style = _asString(node['style'], fallback: 'secondary'); final action = _asMap(node['action']); final disabled = node['disabled'] == true; + final isPrimary = style == 'primary'; + final isGhost = style == 'ghost'; + final isDanger = style == 'danger'; + final backgroundColor = isPrimary + ? colorScheme.primaryContainer + : isGhost + ? colorScheme.surface + : isDanger + ? colorScheme.errorContainer + : colorScheme.surfaceContainerLow; + final foregroundColor = isPrimary + ? colorScheme.onPrimaryContainer + : isDanger + ? colorScheme.onErrorContainer + : isGhost + ? colorScheme.onSurfaceVariant + : colorScheme.onSurfaceVariant; + final borderColor = isPrimary + ? colorScheme.primary.withValues(alpha: 0.3) + : isGhost + ? colorScheme.outlineVariant.withValues(alpha: 0.4) + : isDanger + ? colorScheme.error.withValues(alpha: 0.35) + : colorScheme.outlineVariant.withValues(alpha: 0.4); return ElevatedButton( onPressed: disabled ? null - : () { - _handleAction(action); + : () async { + await _handleAction(action); }, style: ElevatedButton.styleFrom( elevation: 0, padding: const EdgeInsets.symmetric( - horizontal: AppSpacing.lg, + horizontal: AppSpacing.md + AppSpacing.xs, vertical: AppSpacing.sm, ), - backgroundColor: style == 'primary' - ? colorScheme.primary - : colorScheme.surfaceContainerHighest, - foregroundColor: style == 'primary' - ? colorScheme.onPrimary - : colorScheme.onSurfaceVariant, + minimumSize: const Size(0, AppSpacing.xxl + AppSpacing.xs), + backgroundColor: backgroundColor, + foregroundColor: foregroundColor, shape: RoundedRectangleBorder( borderRadius: BorderRadius.circular(AppRadius.full), - side: style == 'primary' - ? BorderSide.none - : BorderSide(color: colorScheme.outlineVariant), + side: BorderSide(color: borderColor), ), ), child: Text( _asString(node['label'], fallback: L10n.current.uiSchemaActionFallback), - style: const TextStyle(fontSize: 12, fontWeight: FontWeight.w600), + style: TextStyle( + fontSize: _buttonFontSize, + fontWeight: isPrimary ? FontWeight.w700 : FontWeight.w600, + ), ), ); } - void _handleAction(Map? action) {} + Future _handleAction(Map? action) async { + if (action == null || action.isEmpty) { + Toast.show( + context, + L10n.current.uiSchemaActionNotImplemented, + type: ToastType.warning, + ); + return; + } + + final type = _asString(action['type']); + switch (type) { + case 'navigation': + _handleNavigationAction(action); + return; + case 'url': + await _handleUrlAction(action); + return; + case 'copy': + _handleCopyAction(action); + return; + case 'event': + case 'tool': + case 'payload': + default: + Toast.show( + context, + L10n.current.uiSchemaActionNotImplemented, + type: ToastType.info, + ); + } + } + + Future _handleUrlAction(Map action) async { + final rawUrl = _asString(action['url']).trim(); + if (!_isValidExternalUrl(rawUrl)) { + Toast.show( + context, + L10n.current.uiSchemaUrlInvalid, + type: ToastType.error, + ); + return; + } + + final uri = Uri.tryParse(rawUrl); + if (uri == null) { + Toast.show( + context, + L10n.current.uiSchemaUrlInvalid, + type: ToastType.error, + ); + return; + } + + final target = _asString(action['target'], fallback: '_blank'); + final mode = target == '_self' + ? LaunchMode.platformDefault + : LaunchMode.externalApplication; + + try { + final launched = await launchUrl(uri, mode: mode); + if (!context.mounted) { + return; + } + if (!launched) { + debugPrint('UiSchemaRenderer: failed to launch URL: $rawUrl'); + Toast.show( + context, + L10n.current.uiSchemaUrlOpenFailed, + type: ToastType.error, + ); + } + } catch (error) { + debugPrint('UiSchemaRenderer: URL launch error for $rawUrl: $error'); + if (!context.mounted) { + return; + } + Toast.show( + context, + L10n.current.uiSchemaUrlOpenFailed, + type: ToastType.error, + ); + } + } + + void _handleNavigationAction(Map action) { + final path = _asString(action['path']).trim(); + if (!isValidInternalNavigationPath(path)) { + Toast.show( + context, + L10n.current.uiSchemaNavigationInvalidPath, + type: ToastType.error, + ); + return; + } + + final params = _asMap(action['params']); + if (params != null && !_areScalarNavigationParams(params)) { + Toast.show( + context, + L10n.current.uiSchemaNavigationInvalidParams, + type: ToastType.error, + ); + return; + } + + final target = buildUiSchemaNavigationTarget(path: path, params: params); + + try { + context.push(target); + } catch (error) { + debugPrint('UiSchemaRenderer: navigation failed for $target: $error'); + Toast.show( + context, + L10n.current.uiSchemaNavigationInvalidPath, + type: ToastType.error, + ); + } + } + + void _handleCopyAction(Map action) { + final content = _asString(action['content']); + if (content.isEmpty) { + Toast.show( + context, + L10n.current.uiSchemaActionNotImplemented, + type: ToastType.warning, + ); + return; + } + + Clipboard.setData(ClipboardData(text: content)); + final successMessage = _asString( + action['successMessage'], + fallback: L10n.current.commonCopySuccess, + ); + Toast.show(context, successMessage, type: ToastType.success); + } + + static bool _isValidExternalUrl(String url) { + if (url.isEmpty) return false; + final uri = Uri.tryParse(url); + if (uri == null || !uri.hasScheme) return false; + final scheme = uri.scheme.toLowerCase(); + if (scheme == 'http' || scheme == 'https') { + if (uri.host.isEmpty || uri.userInfo.isNotEmpty) { + return false; + } + if (_isLocalOrPrivateHost(uri.host)) { + return false; + } + return true; + } + return scheme == 'mailto' || scheme == 'tel' || scheme == 'sms'; + } + + static bool _areScalarNavigationParams(Map params) { + for (final value in params.values) { + if (value is String || value is num || value is bool) { + continue; + } + return false; + } + return true; + } + + static bool _isLocalOrPrivateHost(String host) { + final normalizedHost = host.toLowerCase(); + if (normalizedHost.contains(':')) { + return true; + } + if (int.tryParse(normalizedHost) != null) { + return true; + } + if (normalizedHost == 'localhost' || + normalizedHost == '127.0.0.1' || + normalizedHost == '::1') { + return true; + } + + final ipv4 = normalizedHost.split('.'); + if (ipv4.length != 4) { + return false; + } + final octets = []; + for (final part in ipv4) { + final parsed = int.tryParse(part); + if (parsed == null || parsed < 0 || parsed > 255) { + return false; + } + octets.add(parsed); + } + + final first = octets[0]; + final second = octets[1]; + if (first == 10 || first == 127 || first == 0) { + return true; + } + if (first == 169 && second == 254) { + return true; + } + if (first == 192 && second == 168) { + return true; + } + if (first == 172 && second >= 16 && second <= 31) { + return true; + } + return false; + } Widget _renderKv(Map node) { final items = _asList( @@ -217,13 +473,13 @@ class UiSchemaRenderer { final value = item['value']?.toString() ?? '-'; return Container( width: double.infinity, - padding: const EdgeInsets.symmetric( - horizontal: AppSpacing.md, - vertical: AppSpacing.xs, - ), + padding: const EdgeInsets.symmetric(vertical: AppSpacing.xs), decoration: BoxDecoration( - color: colorScheme.surfaceContainerHighest, - borderRadius: BorderRadius.circular(AppRadius.md), + border: Border( + bottom: BorderSide( + color: colorScheme.outlineVariant.withValues(alpha: 0.2), + ), + ), ), child: Row( crossAxisAlignment: CrossAxisAlignment.start, @@ -233,8 +489,10 @@ class UiSchemaRenderer { child: Text( label, style: TextStyle( - fontSize: 11, - color: colorScheme.onSurfaceVariant, + fontSize: _captionFontSize, + color: colorScheme.onSurfaceVariant.withValues( + alpha: 0.72, + ), ), ), ), @@ -244,9 +502,9 @@ class UiSchemaRenderer { child: Text( value, style: TextStyle( - fontSize: 12, + fontSize: _bodyFontSize, color: colorScheme.onSurface, - fontWeight: FontWeight.w600, + fontWeight: FontWeight.w400, ), ), ), @@ -274,14 +532,17 @@ class UiSchemaRenderer { return child; } final bg = switch (appearance) { - 'section' => colorScheme.surfaceContainerHighest, - 'card' => colorScheme.surface, + 'section' => colorScheme.surfaceContainerLow, + 'card' => + status == 'success' + ? colorScheme.primaryContainer.withValues(alpha: 0.05) + : colorScheme.surfaceContainerLow.withValues(alpha: 0.5), _ => _statusBackground(status), }; final borderColor = switch (status) { - 'success' => colorScheme.tertiary, - 'warning' => colorScheme.secondary, - 'error' => colorScheme.error, + 'success' => colorScheme.primary.withValues(alpha: 0.1), + 'warning' => colorScheme.outlineVariant, + 'error' => colorScheme.error.withValues(alpha: 0.22), _ => colorScheme.outlineVariant, }; return Container( @@ -289,15 +550,8 @@ class UiSchemaRenderer { padding: const EdgeInsets.all(AppSpacing.md), decoration: BoxDecoration( color: bg, - borderRadius: BorderRadius.circular(AppRadius.lg), - border: Border.all(color: borderColor), - boxShadow: [ - BoxShadow( - color: colorScheme.shadow.withValues(alpha: 0.08), - blurRadius: 12, - offset: const Offset(0, 6), - ), - ], + borderRadius: BorderRadius.circular(AppRadius.xl), + border: Border.all(color: borderColor.withValues(alpha: 0.4)), ), child: child, ); @@ -313,7 +567,10 @@ class UiSchemaRenderer { ), child: Text( text, - style: TextStyle(fontSize: 12, color: colorScheme.onSecondaryContainer), + style: TextStyle( + fontSize: _buttonFontSize, + color: colorScheme.onSecondaryContainer, + ), ), ); } @@ -331,7 +588,7 @@ class UiSchemaRenderer { Color _statusBackground(String status) { return switch (status) { - 'success' => colorScheme.tertiaryContainer, + 'success' => colorScheme.primaryContainer.withValues(alpha: 0.3), 'warning' => colorScheme.secondaryContainer, 'error' => colorScheme.errorContainer, 'pending' => colorScheme.primaryContainer, @@ -341,7 +598,7 @@ class UiSchemaRenderer { Color _statusBorder(String status) { return switch (status) { - 'success' => colorScheme.tertiary, + 'success' => colorScheme.primary, 'warning' => colorScheme.secondary, 'error' => colorScheme.error, 'pending' => colorScheme.primary, @@ -351,7 +608,7 @@ class UiSchemaRenderer { Color? _statusTextColor(String status, Color? fallback) { return switch (status) { - 'success' => colorScheme.onTertiaryContainer, + 'success' => colorScheme.onSurfaceVariant, 'warning' => colorScheme.onSecondaryContainer, 'error' => colorScheme.onErrorContainer, 'pending' => colorScheme.onPrimaryContainer, @@ -359,6 +616,37 @@ class UiSchemaRenderer { }; } + String _resolveStatusLabel({ + required String rawLabel, + required String status, + }) { + final normalizedStatus = status.trim().toLowerCase(); + final normalizedLabel = rawLabel.trim().toLowerCase(); + final bareLabel = normalizedLabel.startsWith(_statusLabelTokenPrefix) + ? normalizedLabel.substring(_statusLabelTokenPrefix.length) + : normalizedLabel; + + final isTokenLabel = normalizedLabel.startsWith(_statusLabelTokenPrefix); + final isLegacyStatusLabel = bareLabel == normalizedStatus; + + if (isTokenLabel || isLegacyStatusLabel) { + return _tryLocalizedStatusLabel(bareLabel) ?? rawLabel; + } + return rawLabel; + } + + String? _tryLocalizedStatusLabel(String status) { + final l10n = L10n.current; + return switch (status) { + 'success' => l10n.uiSchemaStatusSuccess, + 'warning' => l10n.uiSchemaStatusWarning, + 'error' => l10n.uiSchemaStatusError, + 'pending' => l10n.uiSchemaStatusPending, + 'info' => l10n.uiSchemaStatusInfo, + _ => null, + }; + } + static Map? _asMap(Object? value) { if (value is Map) { return value; diff --git a/backend/src/core/agentscope/prompts/agent_prompt.py b/backend/src/core/agentscope/prompts/agent_prompt.py index cd4b541..2bfa0e4 100644 --- a/backend/src/core/agentscope/prompts/agent_prompt.py +++ b/backend/src/core/agentscope/prompts/agent_prompt.py @@ -70,7 +70,6 @@ def _router_rules(llm_config: SystemAgentLLMConfig | None) -> list[str]: "- Return key_entities and constraints that are execution-relevant; low confidence -> omit rather than guess.", "- Set execution_mode by complexity: onestep / tool_assisted / multistep.", "- Set result_typing.primary to the most suitable response shape; use clarification_request only when required info is missing.", - "- Set ui.ui_mode and ui.ui_decision_reason based on whether structured UI improves actionability.", f"- task_typing.primary must use one TaskType enum: {_enum_values(TaskType)}.", f"- task_typing.secondary max 3 enums: {_enum_values(TaskType)}.", f"- result_typing.primary must use one ResultType enum: {_enum_values(ResultType)}.", diff --git a/backend/src/core/agentscope/runtime/runner.py b/backend/src/core/agentscope/runtime/runner.py index 576e49e..4bddc9a 100644 --- a/backend/src/core/agentscope/runtime/runner.py +++ b/backend/src/core/agentscope/runtime/runner.py @@ -279,7 +279,7 @@ class AgentScopeRunner: runtime_mode: RuntimeMode, work_memory: WorkProfileContent | None, ) -> WorkerAgentOutputLite: - worker_output_model = resolve_worker_output_model(router_output.ui.ui_mode) + worker_output_model = resolve_worker_output_model(router_output.execution_mode) await self._emit_step_event( pipeline=pipeline, run_input=run_input, diff --git a/backend/src/core/agentscope/runtime/ui_compiler.py b/backend/src/core/agentscope/runtime/ui_compiler.py index 9ecb0d0..59e4c1a 100644 --- a/backend/src/core/agentscope/runtime/ui_compiler.py +++ b/backend/src/core/agentscope/runtime/ui_compiler.py @@ -51,7 +51,10 @@ def _status_badge_needed(intent: UiHintIntent, status: UiHintStatus) -> bool: def _status_label(status: str) -> str: - return status.upper() + normalized = status.strip().lower() + if not normalized: + return "ui.status.info" + return f"ui.status.{normalized}" # ============================================================ diff --git a/backend/src/schemas/agent/__init__.py b/backend/src/schemas/agent/__init__.py index 8ac7a85..183634c 100644 --- a/backend/src/schemas/agent/__init__.py +++ b/backend/src/schemas/agent/__init__.py @@ -14,13 +14,11 @@ from schemas.agent.runtime_models import ( ResultTyping, ResultType, RouterAgentOutput, - RouterUiDecision, RunStatus, TaskType, TaskTyping, ToolAgentOutput, ToolStatus, - UiMode, WorkerAgentOutputLite, WorkerAgentOutputRich, resolve_worker_output_model, @@ -47,7 +45,6 @@ __all__ = [ "ClientTimeContext", "ResultType", "RouterAgentOutput", - "RouterUiDecision", "RunStatus", "RuntimeMode", "TaskType", @@ -56,7 +53,6 @@ __all__ = [ "SystemVisibilityBit", "ToolAgentOutput", "ToolStatus", - "UiMode", "UiHintAction", "UiHintIntent", "UiHintSection", diff --git a/backend/src/schemas/agent/runtime_models.py b/backend/src/schemas/agent/runtime_models.py index c5317d4..8d47c06 100644 --- a/backend/src/schemas/agent/runtime_models.py +++ b/backend/src/schemas/agent/runtime_models.py @@ -62,11 +62,6 @@ class ExecutionMode(str, Enum): MULTISTEP = "multistep" -class UiMode(str, Enum): - NONE = "none" - RICH = "rich" - - class RunStatus(str, Enum): SUCCESS = "success" PARTIAL_SUCCESS = "partial_success" @@ -114,13 +109,6 @@ class NormalizedTaskInput(BaseModel): context_summary: str = Field(default="", max_length=2000) -class RouterUiDecision(BaseModel): - model_config = ConfigDict(extra="forbid") - - ui_mode: UiMode - ui_decision_reason: str - - class RouterAgentOutput(BaseModel): model_config = ConfigDict(extra="forbid") @@ -130,7 +118,6 @@ class RouterAgentOutput(BaseModel): task_typing: TaskTyping execution_mode: ExecutionMode result_typing: ResultTyping - ui: RouterUiDecision class ErrorInfo(BaseModel): @@ -175,7 +162,9 @@ class AgentOutput(WorkerAgentOutputRich): WorkerAgentOutput = WorkerAgentOutputLite | WorkerAgentOutputRich -def resolve_worker_output_model(ui_mode: UiMode) -> type[WorkerAgentOutputLite]: - if ui_mode == UiMode.RICH: - return WorkerAgentOutputRich - return WorkerAgentOutputLite +def resolve_worker_output_model( + execution_mode: ExecutionMode, +) -> type[WorkerAgentOutputLite]: + if execution_mode == ExecutionMode.ONESTEP: + return WorkerAgentOutputLite + return WorkerAgentOutputRich diff --git a/backend/src/schemas/agent/ui_schema.py b/backend/src/schemas/agent/ui_schema.py index a0b682c..279db4d 100644 --- a/backend/src/schemas/agent/ui_schema.py +++ b/backend/src/schemas/agent/ui_schema.py @@ -595,11 +595,12 @@ def build_status_panel( secondary_button: UiButtonNode | None = None, node_id: str | None = None, ) -> UiStackNode: + status_label = f"ui.status.{status.value}" children: list[UiNode] = [ build_stack( [ build_text(title, role=TextRole.TITLE), - build_badge(label=status.value.upper(), status=status), + build_badge(label=status_label, status=status), ], direction=LayoutDirection.HORIZONTAL, gap=8, diff --git a/backend/src/v1/agent/router.py b/backend/src/v1/agent/router.py index 12eec4e..2e460e8 100644 --- a/backend/src/v1/agent/router.py +++ b/backend/src/v1/agent/router.py @@ -48,6 +48,7 @@ from v1.users.dependencies import get_current_user router = APIRouter(prefix="/agent", tags=["agent"]) logger = get_logger("v1.agent.router") _LAST_EVENT_ID_RE = re.compile(r"^\d+-\d+$") +_RUN_ID_RE = re.compile(r"^[A-Za-z0-9_-]{1,128}$") _MAX_SSE_CONNECTIONS_PER_USER = 3 _SSE_SLOT_TTL_SECONDS = 15 * 60 _TERMINAL_RUN_EVENT_TYPES = {"RUN_FINISHED", "RUN_ERROR"} @@ -120,6 +121,11 @@ def _is_terminal_run_event(event: dict[str, object]) -> bool: ) +def _is_target_run_event(event: dict[str, object], *, target_run_id: str) -> bool: + run_id = event.get("runId") + return isinstance(run_id, str) and run_id == target_run_id + + @router.post( "/runs", response_model=TaskAcceptedResponse, status_code=status.HTTP_202_ACCEPTED ) @@ -188,9 +194,19 @@ async def stream_events( thread_id: str, service: Annotated[AgentService, Depends(get_agent_service)], current_user: Annotated[CurrentUser, Depends(get_current_user)], + run_id: str | None = Query(default=None, alias="runId"), last_event_id: str | None = Header(default=None, alias="Last-Event-ID"), idle_limit: int = Query(default=300, ge=1, le=3600), ) -> StreamingResponse: + if run_id is None or _RUN_ID_RE.fullmatch(run_id) is None: + raise ApiProblemError( + status_code=422, + detail=problem_payload( + code="AGENT_INVALID_RUN_ID", + detail="Invalid runId", + ), + ) + if last_event_id is not None and ( len(last_event_id) > 32 or _LAST_EVENT_ID_RE.fullmatch(last_event_id) is None ): @@ -255,6 +271,8 @@ async def stream_events( if not row_id or not isinstance(event, dict): continue cursor = row_id + if not _is_target_run_event(event, target_run_id=run_id): + continue yield to_sse_event(row_id, event) if _is_terminal_run_event(event): terminal_event_reached = True diff --git a/backend/tests/integration/test_schedule_items_routes.py b/backend/tests/integration/test_schedule_items_routes.py index 0afb411..57d24f0 100644 --- a/backend/tests/integration/test_schedule_items_routes.py +++ b/backend/tests/integration/test_schedule_items_routes.py @@ -77,7 +77,7 @@ def test_create_schedule_item_returns_201() -> None: source_type=ScheduleItemSourceType.MANUAL, created_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc), updated_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc), - permission=7, + permission=15, is_owner=True, ) @@ -110,7 +110,7 @@ def test_list_schedule_items_returns_200() -> None: source_type=ScheduleItemSourceType.MANUAL, created_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc), updated_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc), - permission=7, + permission=15, is_owner=True, ) @@ -145,7 +145,7 @@ def test_get_schedule_item_returns_200() -> None: source_type=ScheduleItemSourceType.MANUAL, created_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc), updated_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc), - permission=7, + permission=15, is_owner=True, ) @@ -173,7 +173,7 @@ def test_update_schedule_item_returns_200() -> None: source_type=ScheduleItemSourceType.MANUAL, created_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc), updated_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc), - permission=7, + permission=15, is_owner=True, ) @@ -204,7 +204,7 @@ def test_delete_schedule_item_returns_204() -> None: source_type=ScheduleItemSourceType.MANUAL, created_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc), updated_at=datetime(2026, 2, 27, 10, 0, 0, tzinfo=timezone.utc), - permission=7, + permission=15, is_owner=True, ) diff --git a/backend/tests/integration/v1/agent/test_routes.py b/backend/tests/integration/v1/agent/test_routes.py index cf63492..2b16c22 100644 --- a/backend/tests/integration/v1/agent/test_routes.py +++ b/backend/tests/integration/v1/agent/test_routes.py @@ -154,6 +154,65 @@ class _TerminalStreamAgentService(_FakeAgentService): return [] +class _MixedRunStreamAgentService(_FakeAgentService): + def __init__(self) -> None: + super().__init__() + self.stream_calls = 0 + + async def stream_events( + self, + *, + thread_id: str, + last_event_id: str | None, + current_user: CurrentUser, + ) -> list[dict[str, object]]: + del thread_id, last_event_id, current_user + self.stream_calls += 1 + if self.stream_calls == 1: + return [ + { + "id": "11-0", + "event": { + "type": "RUN_FINISHED", + "threadId": "00000000-0000-0000-0000-000000000001", + "runId": "run-old", + }, + }, + { + "id": "12-0", + "event": { + "type": "RUN_STARTED", + "threadId": "00000000-0000-0000-0000-000000000001", + "runId": "run-1", + }, + }, + ] + if self.stream_calls == 2: + return [ + { + "id": "13-0", + "event": { + "type": "STEP_STARTED", + "threadId": "00000000-0000-0000-0000-000000000001", + "runId": "run-1", + "stepName": "router", + }, + } + ] + if self.stream_calls == 3: + return [ + { + "id": "14-0", + "event": { + "type": "RUN_FINISHED", + "threadId": "00000000-0000-0000-0000-000000000001", + "runId": "run-1", + }, + } + ] + return [] + + def test_run_requires_auth_and_returns_202_task_id() -> None: app.dependency_overrides[get_agent_service] = lambda: _FakeAgentService() client = TestClient(app) @@ -168,7 +227,7 @@ def test_run_requires_auth_and_returns_202_task_id() -> None: "messages": [{"id": "u1", "role": "user", "content": "hello"}], "tools": [], "context": [], - "forwardedProps": {"agent_type": "worker"}, + "forwardedProps": {"runtime_mode": "chat"}, }, ) assert unauthorized.status_code == 401 @@ -185,7 +244,7 @@ def test_run_requires_auth_and_returns_202_task_id() -> None: "messages": [{"id": "u1", "role": "user", "content": "hello"}], "tools": [], "context": [], - "forwardedProps": {"agent_type": "worker"}, + "forwardedProps": {"runtime_mode": "chat"}, }, ) assert authorized.status_code == 202 @@ -219,7 +278,7 @@ def test_stream_reads_from_last_event_id() -> None: try: response = client.get( - "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events?idle_limit=1", + "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events?runId=run-1&idle_limit=1", headers={"Last-Event-ID": "1-0"}, ) assert response.status_code == 200 @@ -255,7 +314,7 @@ def test_stream_handles_stream_backend_errors_without_connection_crash() -> None try: response = client.get( - "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events?idle_limit=1" + "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events?runId=run-1&idle_limit=1" ) assert response.status_code == 200 assert response.headers["content-type"].startswith("text/event-stream") @@ -288,7 +347,7 @@ def test_stream_stops_after_terminal_run_event() -> None: try: response = client.get( - "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events?idle_limit=3" + "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events?runId=run-1&idle_limit=3" ) assert response.status_code == 200 assert response.headers["content-type"].startswith("text/event-stream") @@ -309,7 +368,7 @@ def test_stream_rejects_invalid_last_event_id() -> None: try: response = client.get( - "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events", + "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events?runId=run-1", headers={"Last-Event-ID": "bad-id"}, ) assert response.status_code == 422 @@ -320,6 +379,68 @@ def test_stream_rejects_invalid_last_event_id() -> None: app.dependency_overrides = {} +def test_stream_filters_non_target_run_and_waits_target_terminal() -> None: + service = _MixedRunStreamAgentService() + app.dependency_overrides[get_agent_service] = lambda: service + app.dependency_overrides[get_current_user] = lambda: CurrentUser( + id=uuid4(), phone="+8613812345678" + ) + client = TestClient(app) + original_acquire = agent_router._acquire_sse_slot + original_release = agent_router._release_sse_slot + + async def _allow_slot(*, user_id: str) -> bool: + del user_id + return True + + async def _noop_release(*, user_id: str) -> None: + del user_id + return None + + agent_router._acquire_sse_slot = _allow_slot # type: ignore[assignment] + agent_router._release_sse_slot = _noop_release # type: ignore[assignment] + + try: + response = client.get( + "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events?runId=run-1&idle_limit=3" + ) + assert response.status_code == 200 + assert response.headers["content-type"].startswith("text/event-stream") + assert '"runId":"run-old"' not in response.text + assert '"runId":"run-1"' in response.text + assert "event: RUN_STARTED" in response.text + assert "event: STEP_STARTED" in response.text + assert "event: RUN_FINISHED" in response.text + assert service.stream_calls == 3 + finally: + agent_router._acquire_sse_slot = original_acquire # type: ignore[assignment] + agent_router._release_sse_slot = original_release # type: ignore[assignment] + app.dependency_overrides = {} + + +def test_stream_rejects_invalid_or_missing_run_id() -> None: + app.dependency_overrides[get_agent_service] = lambda: _FakeAgentService() + app.dependency_overrides[get_current_user] = lambda: CurrentUser( + id=uuid4(), phone="+8613812345678" + ) + client = TestClient(app) + + try: + invalid = client.get( + "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events?runId=bad%20id" + ) + assert invalid.status_code == 422 + assert invalid.json()["code"] == "AGENT_INVALID_RUN_ID" + + missing = client.get( + "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events" + ) + assert missing.status_code == 422 + assert missing.json()["code"] == "AGENT_INVALID_RUN_ID" + finally: + app.dependency_overrides = {} + + def test_stream_rejects_when_sse_connection_limit_exceeded() -> None: app.dependency_overrides[get_agent_service] = lambda: _FakeAgentService() app.dependency_overrides[get_current_user] = lambda: CurrentUser( @@ -336,7 +457,7 @@ def test_stream_rejects_when_sse_connection_limit_exceeded() -> None: try: response = client.get( - "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events" + "/api/v1/agent/runs/00000000-0000-0000-0000-000000000001/events?runId=run-1" ) assert response.status_code == 429 payload = response.json() @@ -587,7 +708,7 @@ def test_asr_transcribe_returns_sync_transcript(monkeypatch) -> None: return "这是测试转写结果" monkeypatch.setattr( - "v1.agent.service.asr_service.transcribe_file", + "v1.agent.router.asr_service.transcribe_file", mock_transcribe_file, ) diff --git a/backend/tests/unit/core/agentscope/runtime/test_runner.py b/backend/tests/unit/core/agentscope/runtime/test_runner.py index 2528aed..971f06a 100644 --- a/backend/tests/unit/core/agentscope/runtime/test_runner.py +++ b/backend/tests/unit/core/agentscope/runtime/test_runner.py @@ -12,10 +12,8 @@ from schemas.agent.runtime_models import ( ResultType, ResultTyping, RouterAgentOutput, - RouterUiDecision, TaskType, TaskTyping, - UiMode, WorkerAgentOutputLite, ) from schemas.agent.system_agent import AgentType @@ -65,10 +63,6 @@ def test_build_worker_input_messages_only_contains_router_contract() -> None: task_typing=TaskTyping(primary=TaskType.SCHEDULING), execution_mode=ExecutionMode.TOOL_ASSISTED, result_typing=ResultTyping(primary=ResultType.EXECUTION_REPORT), - ui=RouterUiDecision( - ui_mode=UiMode.NONE, - ui_decision_reason="单一执行任务,文本输出足够", - ), ) input_messages = runner._build_worker_input_messages(router_output=router_output) @@ -239,10 +233,6 @@ async def test_execute_runs_router_then_worker( task_typing=TaskTyping(primary=TaskType.SCHEDULING), execution_mode=ExecutionMode.TOOL_ASSISTED, result_typing=ResultTyping(primary=ResultType.EXECUTION_REPORT), - ui=RouterUiDecision( - ui_mode=UiMode.NONE, - ui_decision_reason="单任务", - ), ) async def _fake_execute_worker_step(**kwargs: object) -> WorkerAgentOutputLite: @@ -308,10 +298,6 @@ async def test_execute_raises_cancelled_error_before_worker_when_cancel_requeste task_typing=TaskTyping(primary=TaskType.SCHEDULING), execution_mode=ExecutionMode.TOOL_ASSISTED, result_typing=ResultTyping(primary=ResultType.EXECUTION_REPORT), - ui=RouterUiDecision( - ui_mode=UiMode.NONE, - ui_decision_reason="单任务", - ), ) async def _fake_execute_worker_step(**kwargs: object) -> WorkerAgentOutputLite: diff --git a/backend/tests/unit/core/agentscope/runtime/test_ui_compiler.py b/backend/tests/unit/core/agentscope/runtime/test_ui_compiler.py new file mode 100644 index 0000000..25fc2f2 --- /dev/null +++ b/backend/tests/unit/core/agentscope/runtime/test_ui_compiler.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from typing import Any + +from core.agentscope.runtime.ui_compiler import compile as compile_ui +from schemas.agent.ui_schema import UiStatus, build_status_panel +from schemas.agent.ui_hints import UiHintsPayload + + +def _collect_badge_labels(node: Any) -> list[str]: + labels: list[str] = [] + if isinstance(node, dict): + if node.get("type") == "badge" and isinstance(node.get("label"), str): + labels.append(node["label"]) + for value in node.values(): + labels.extend(_collect_badge_labels(value)) + elif isinstance(node, list): + for item in node: + labels.extend(_collect_badge_labels(item)) + return labels + + +def test_compile_status_badge_uses_stable_status_token() -> None: + hints = UiHintsPayload.model_validate( + { + "intent": "status", + "status": "success", + "title": "日程已创建", + "body": "已为您创建提醒。", + } + ) + + schema = compile_ui(hints) + badge_labels = _collect_badge_labels(schema) + + assert "ui.status.success" in badge_labels + + +def test_compile_status_badge_does_not_emit_uppercase_legacy_label() -> None: + hints = UiHintsPayload.model_validate( + { + "intent": "status", + "status": "error", + "title": "创建失败", + "body": "请稍后重试。", + } + ) + + schema = compile_ui(hints) + badge_labels = _collect_badge_labels(schema) + + assert "ERROR" not in badge_labels + assert "ui.status.error" in badge_labels + + +def test_build_status_panel_uses_stable_status_token_label() -> None: + panel = build_status_panel( + title="已创建", + message="提醒创建成功", + status=UiStatus.SUCCESS, + ) + + badge_labels = _collect_badge_labels(panel) + assert "ui.status.success" in badge_labels diff --git a/backend/tests/unit/schemas/agent/test_runtime_models.py b/backend/tests/unit/schemas/agent/test_runtime_models.py index 6da1dd2..ca1fb8e 100644 --- a/backend/tests/unit/schemas/agent/test_runtime_models.py +++ b/backend/tests/unit/schemas/agent/test_runtime_models.py @@ -27,10 +27,6 @@ def test_router_agent_output_coerces_key_entity_value_to_string() -> None: "primary": "summary", "secondary": [], }, - "ui": { - "ui_mode": "none", - "ui_decision_reason": "test", - }, } model = RouterAgentOutput.model_validate(payload) diff --git a/backend/tests/unit/v1/agent/test_router.py b/backend/tests/unit/v1/agent/test_router.py new file mode 100644 index 0000000..5f9cd6e --- /dev/null +++ b/backend/tests/unit/v1/agent/test_router.py @@ -0,0 +1,22 @@ +from v1.agent.router import _is_target_run_event, _is_terminal_run_event + + +def test_is_target_run_event_matches_expected_run_id() -> None: + event: dict[str, object] = {"type": "STEP_STARTED", "runId": "run_123"} + assert _is_target_run_event(event, target_run_id="run_123") is True + + +def test_is_target_run_event_rejects_other_run_id() -> None: + event: dict[str, object] = {"type": "STEP_STARTED", "runId": "run_999"} + assert _is_target_run_event(event, target_run_id="run_123") is False + + +def test_is_target_run_event_rejects_missing_run_id() -> None: + event: dict[str, object] = {"type": "STEP_STARTED"} + assert _is_target_run_event(event, target_run_id="run_123") is False + + +def test_is_terminal_run_event_only_accepts_terminal_types() -> None: + assert _is_terminal_run_event({"type": "RUN_FINISHED"}) is True + assert _is_terminal_run_event({"type": "RUN_ERROR"}) is True + assert _is_terminal_run_event({"type": "STEP_STARTED"}) is False