import 'dart:async'; import 'dart:convert'; import 'dart:math'; import 'dart:typed_data'; import 'package:dio/dio.dart'; import 'package:image_picker/image_picker.dart'; import 'package:social_app/core/api/i_api_client.dart'; import '../models/ag_ui_event.dart'; typedef EventCallback = void Function(AgUiEvent event); const _runIdPrefix = 'run_'; class UploadedAttachment { const UploadedAttachment({ required this.localPath, required this.url, required this.mimeType, }); final String localPath; final String url; final String mimeType; } class SendMessageResult { const SendMessageResult({required this.uploadedAttachments}); final List uploadedAttachments; } class _RunInputPayload { const _RunInputPayload({ required this.input, required this.uploadedAttachments, }); final Map input; final List uploadedAttachments; } class AgUiService { final IApiClient _apiClient; EventCallback onEvent; final Map _lastEventIdByThread = {}; int _activeStreamToken = 0; String? _threadId; bool _hasMoreHistory = false; AgUiService({EventCallback? onEvent, required IApiClient apiClient}) : onEvent = onEvent ?? ((_) {}), _apiClient = apiClient; Future sendMessage( String content, { List? images, }) async { final streamToken = ++_activeStreamToken; final runInputPayload = await _buildRunInput( content: content, images: images, ); final response = await _apiClient.post>( '/api/v1/agent/runs', data: runInputPayload.input, ); final payload = response.data; if (payload is! Map) { throw StateError('Invalid /agent/runs response'); } final threadId = payload['threadId'] as String?; final runId = payload['runId'] as String?; if (threadId == null || threadId.isEmpty) { throw StateError('Missing threadId in /agent/runs response'); } if (runId == null || runId.isEmpty) { throw StateError('Missing runId in /agent/runs response'); } _threadId = threadId; await _streamEventsFromApi( threadId, expectedRunId: runId, streamToken: streamToken, ); return SendMessageResult( uploadedAttachments: runInputPayload.uploadedAttachments, ); } Future loadHistory({DateTime? beforeDate}) async { final path = _buildHistoryPath(beforeDate: beforeDate); final response = await _apiClient.get>(path); final payload = response.data; if (payload is! Map) { throw StateError('Invalid /agent/history response'); } final snapshot = HistorySnapshot.fromJson(payload); if (snapshot.threadId != null && snapshot.threadId!.isNotEmpty) { _threadId = snapshot.threadId; } _hasMoreHistory = snapshot.hasMore; return snapshot; } Future fetchAttachmentPreview(String previewPath) async { final response = await _apiClient.get>( previewPath, options: Options(responseType: ResponseType.bytes), ); final payload = response.data; if (payload is List) { return Uint8List.fromList(payload); } throw StateError('Invalid attachment payload'); } Future transcribeAudio(String filePath) async { final formData = FormData.fromMap({ 'audio': await MultipartFile.fromFile( filePath, filename: 'recording.wav', contentType: DioMediaType('audio', 'wav'), ), }); final response = await _apiClient.post>( '/api/v1/agent/transcribe', data: formData, ); final payload = response.data; if (payload is! Map) { throw StateError('Invalid /agent/transcribe response'); } final transcript = payload['transcript']; if (transcript is! String) { throw StateError('Missing transcript in /agent/transcribe response'); } return transcript; } bool hasEarlierHistory(DateTime fromDate) { // 历史是否还有更多由后端 history snapshot 的 hasMore 驱动。 // 参数保留是为了兼容 ChatBloc 现有调用签名。 final _ = fromDate; return _hasMoreHistory; } Future cancelCurrentRun() async { _activeStreamToken += 1; } Future _streamEventsFromApi( String threadId, { required String expectedRunId, required int streamToken, }) async { final lastEventId = _lastEventIdByThread[threadId]; final headers = {'Accept': 'text/event-stream'}; if (lastEventId != null && lastEventId.isNotEmpty) { headers['Last-Event-ID'] = lastEventId; } final sseLines = await _apiClient.getSseLines( '/api/v1/agent/runs/$threadId/events', headers: headers, ); String? eventType; String? eventId; var hasBoundExpectedRun = false; final dataBuffer = StringBuffer(); await for (final line in sseLines) { if (streamToken != _activeStreamToken) { break; } if (line.isEmpty) { if (dataBuffer.isNotEmpty) { final raw = dataBuffer.toString(); dataBuffer.clear(); Map? decoded; String? eventRunId; String? eventThreadId; try { final parsed = jsonDecode(raw); if (parsed is Map) { decoded = parsed; final runId = parsed['runId']; final thread = parsed['threadId']; eventRunId = runId is String ? runId : null; eventThreadId = thread is String ? thread : null; final isRunStarted = eventType == AgUiEventTypeWire.runStarted; final isTargetRun = eventRunId == expectedRunId; if (isRunStarted && isTargetRun) { hasBoundExpectedRun = true; } final isThreadMatched = eventThreadId == null || eventThreadId == threadId; final shouldDispatch = isTargetRun || (hasBoundExpectedRun && isThreadMatched); if (shouldDispatch) { final event = AgUiEvent.fromJson(parsed); onEvent(event); } } } catch (_) { // Ignore malformed SSE payload and keep stream alive. } final currentEventId = eventId; if (currentEventId != null && currentEventId.isNotEmpty) { _lastEventIdByThread[threadId] = currentEventId; } final isTerminalEvent = eventType == AgUiEventTypeWire.runFinished || eventType == AgUiEventTypeWire.runError; final isTargetRun = eventRunId == expectedRunId; final isThreadMatched = eventThreadId == null || eventThreadId == threadId; if (isTerminalEvent && (isTargetRun || (hasBoundExpectedRun && isThreadMatched))) { break; } } eventType = null; eventId = null; continue; } if (line.startsWith(':')) { continue; } if (line.startsWith('id:')) { eventId = line.substring(3).trim(); continue; } if (line.startsWith('event:')) { eventType = line.substring(6).trim(); continue; } if (line.startsWith('data:')) { final fragment = line.substring(5).trim(); if (dataBuffer.isNotEmpty) { dataBuffer.write('\n'); } dataBuffer.write(fragment); } } } Future<_RunInputPayload> _buildRunInput({ required String content, List? images, }) async { final threadId = _threadId ?? _newUuid(); final runId = _nextId(_runIdPrefix); final contentBlocks = >[]; if (content.isNotEmpty) { contentBlocks.add({'type': 'text', 'text': content}); } var uploadedAttachments = const []; if (images != null && images.isNotEmpty) { uploadedAttachments = await _uploadAttachments( threadId: threadId, images: images, ); for (final attachment in uploadedAttachments) { contentBlocks.add({ 'type': 'binary', 'mimeType': attachment.mimeType, 'url': attachment.url, }); } } final dynamic messageContent; if (contentBlocks.isEmpty) { messageContent = ''; } else if (contentBlocks.length == 1 && contentBlocks[0]['type'] == 'text') { messageContent = contentBlocks[0]['text']; } else { messageContent = contentBlocks; } return _RunInputPayload( input: { 'threadId': threadId, 'runId': runId, 'state': {}, 'messages': [ {'id': _nextId('user_'), 'role': 'user', 'content': messageContent}, ], 'tools': >[], 'context': >[], 'forwardedProps': {}, }, uploadedAttachments: uploadedAttachments, ); } Future> _uploadAttachments({ required String threadId, required List images, }) async { final attachments = []; for (final image in images) { final mimeType = image.mimeType ?? 'image/jpeg'; final fileBytes = await image.readAsBytes(); final formData = FormData.fromMap({ 'threadId': threadId, 'file': MultipartFile.fromBytes( fileBytes, filename: image.name, contentType: DioMediaType.parse(mimeType), ), }); final response = await _apiClient.post>( '/api/v1/agent/attachments', data: formData, ); final payload = response.data; if (payload is! Map) { throw StateError('Invalid /agent/attachments response'); } final attachment = payload['attachment']; if (attachment is! Map) { throw StateError('Missing attachment in /agent/attachments response'); } final bucket = attachment['bucket']; final path = attachment['path']; final uploadedMime = attachment['mimeType']; final url = attachment['url']; if (bucket is! String || path is! String || uploadedMime is! String || url is! String || bucket.isEmpty || path.isEmpty || uploadedMime.isEmpty || url.isEmpty) { throw StateError('Invalid attachment reference'); } attachments.add( UploadedAttachment( localPath: image.path, url: url, mimeType: uploadedMime, ), ); } return attachments; } String _buildHistoryPath({DateTime? beforeDate}) { final query = []; if (_threadId != null && _threadId!.isNotEmpty) { query.add('threadId=$_threadId'); } if (beforeDate != null) { final day = DateTime(beforeDate.year, beforeDate.month, beforeDate.day); query.add('before=${day.toIso8601String().substring(0, 10)}'); } if (query.isEmpty) { return '/api/v1/agent/history'; } return '/api/v1/agent/history?${query.join('&')}'; } String _nextId(String prefix) => '$prefix${DateTime.now().millisecondsSinceEpoch}'; String _newUuid() { final random = Random(); String hex(int len) => List.generate( len, (_) => random.nextInt(16).toRadixString(16), ).join(); const variant = ['8', '9', 'a', 'b']; return '${hex(8)}-${hex(4)}-4${hex(3)}-${variant[random.nextInt(4)]}${hex(3)}-${hex(12)}'; } }