import 'dart:async'; import 'dart:convert'; import 'dart:math'; import 'dart:typed_data'; import 'package:dio/dio.dart'; import 'package:image_picker/image_picker.dart'; import 'package:social_app/core/api/i_api_client.dart'; import '../models/ag_ui_event.dart'; typedef EventCallback = void Function(AgUiEvent event); const _runIdPrefix = 'run_'; class AgUiService { final IApiClient _apiClient; EventCallback onEvent; final Map _lastEventIdByThread = {}; int _activeStreamToken = 0; String? _threadId; bool _hasMoreHistory = false; AgUiService({EventCallback? onEvent, required IApiClient apiClient}) : onEvent = onEvent ?? ((_) {}), _apiClient = apiClient; Future sendMessage(String content, {List? images}) async { final streamToken = ++_activeStreamToken; final runInput = await _buildRunInput(content: content, images: images); final response = await _apiClient.post>( '/api/v1/agent/runs', data: runInput, ); final payload = response.data; if (payload is! Map) { throw StateError('Invalid /agent/runs response'); } final threadId = payload['threadId'] as String?; if (threadId == null || threadId.isEmpty) { throw StateError('Missing threadId in /agent/runs response'); } _threadId = threadId; await _streamEventsFromApi(threadId, streamToken: streamToken); } Future loadHistory({DateTime? beforeDate}) async { final path = _buildHistoryPath(beforeDate: beforeDate); final response = await _apiClient.get>(path); final payload = response.data; if (payload is! Map) { throw StateError('Invalid /agent/history response'); } final snapshot = HistorySnapshot.fromJson(payload); if (snapshot.threadId != null && snapshot.threadId!.isNotEmpty) { _threadId = snapshot.threadId; } _hasMoreHistory = snapshot.hasMore; return snapshot; } Future fetchAttachmentPreview(String previewPath) async { final response = await _apiClient.get>( previewPath, options: Options(responseType: ResponseType.bytes), ); final payload = response.data; if (payload is List) { return Uint8List.fromList(payload); } throw StateError('Invalid attachment payload'); } Future transcribeAudio(String filePath) async { final formData = FormData.fromMap({ 'audio': await MultipartFile.fromFile( filePath, filename: 'recording.wav', contentType: DioMediaType('audio', 'wav'), ), }); final response = await _apiClient.post>( '/api/v1/agent/transcribe', data: formData, ); final payload = response.data; if (payload is! Map) { throw StateError('Invalid /agent/transcribe response'); } final transcript = payload['transcript']; if (transcript is! String) { throw StateError('Missing transcript in /agent/transcribe response'); } return transcript; } bool hasEarlierHistory(DateTime fromDate) { // 历史是否还有更多由后端 history snapshot 的 hasMore 驱动。 // 参数保留是为了兼容 ChatBloc 现有调用签名。 final _ = fromDate; return _hasMoreHistory; } Future cancelCurrentRun() async { _activeStreamToken += 1; } Future _streamEventsFromApi( String threadId, { required int streamToken, }) async { final lastEventId = _lastEventIdByThread[threadId]; final headers = {'Accept': 'text/event-stream'}; if (lastEventId != null && lastEventId.isNotEmpty) { headers['Last-Event-ID'] = lastEventId; } final sseLines = await _apiClient.getSseLines( '/api/v1/agent/runs/$threadId/events', headers: headers, ); String? eventType; String? eventId; final dataBuffer = StringBuffer(); await for (final line in sseLines) { if (streamToken != _activeStreamToken) { break; } if (line.isEmpty) { if (dataBuffer.isNotEmpty) { final raw = dataBuffer.toString(); dataBuffer.clear(); try { final decoded = jsonDecode(raw); if (decoded is Map) { final event = AgUiEvent.fromJson(decoded); onEvent(event); } } catch (_) { // Ignore malformed SSE payload and keep stream alive. } final currentEventId = eventId; if (currentEventId != null && currentEventId.isNotEmpty) { _lastEventIdByThread[threadId] = currentEventId; } if (eventType == AgUiEventTypeWire.runFinished || eventType == AgUiEventTypeWire.runError) { break; } } eventType = null; eventId = null; continue; } if (line.startsWith(':')) { continue; } if (line.startsWith('id:')) { eventId = line.substring(3).trim(); continue; } if (line.startsWith('event:')) { eventType = line.substring(6).trim(); continue; } if (line.startsWith('data:')) { final fragment = line.substring(5).trim(); if (dataBuffer.isNotEmpty) { dataBuffer.write('\n'); } dataBuffer.write(fragment); } } } Future> _buildRunInput({ required String content, List? images, }) async { final threadId = _threadId ?? _newUuid(); final runId = _nextId(_runIdPrefix); final contentBlocks = >[]; if (content.isNotEmpty) { contentBlocks.add({'type': 'text', 'text': content}); } if (images != null && images.isNotEmpty) { final uploadedAttachments = await _uploadAttachments( threadId: threadId, images: images, ); for (final attachment in uploadedAttachments) { contentBlocks.add({ 'type': 'binary', 'mimeType': attachment['mimeType'], 'url': attachment['url'], }); } } final dynamic messageContent; if (contentBlocks.isEmpty) { messageContent = ''; } else if (contentBlocks.length == 1 && contentBlocks[0]['type'] == 'text') { messageContent = contentBlocks[0]['text']; } else { messageContent = contentBlocks; } return { 'threadId': threadId, 'runId': runId, 'state': {}, 'messages': [ {'id': _nextId('user_'), 'role': 'user', 'content': messageContent}, ], 'tools': >[], 'context': >[], 'forwardedProps': {}, }; } Future>> _uploadAttachments({ required String threadId, required List images, }) async { final attachments = >[]; for (final image in images) { final mimeType = image.mimeType ?? 'image/jpeg'; final fileBytes = await image.readAsBytes(); final formData = FormData.fromMap({ 'threadId': threadId, 'file': MultipartFile.fromBytes( fileBytes, filename: image.name, contentType: DioMediaType.parse(mimeType), ), }); final response = await _apiClient.post>( '/api/v1/agent/attachments', data: formData, ); final payload = response.data; if (payload is! Map) { throw StateError('Invalid /agent/attachments response'); } final attachment = payload['attachment']; if (attachment is! Map) { throw StateError('Missing attachment in /agent/attachments response'); } final bucket = attachment['bucket']; final path = attachment['path']; final uploadedMime = attachment['mimeType']; final url = attachment['url']; if (bucket is! String || path is! String || uploadedMime is! String || url is! String || bucket.isEmpty || path.isEmpty || uploadedMime.isEmpty || url.isEmpty) { throw StateError('Invalid attachment reference'); } attachments.add({ 'bucket': bucket, 'path': path, 'mimeType': uploadedMime, 'url': url, }); } return attachments; } String _buildHistoryPath({DateTime? beforeDate}) { final query = []; if (_threadId != null && _threadId!.isNotEmpty) { query.add('threadId=$_threadId'); } if (beforeDate != null) { final day = DateTime(beforeDate.year, beforeDate.month, beforeDate.day); query.add('before=${day.toIso8601String().substring(0, 10)}'); } if (query.isEmpty) { return '/api/v1/agent/history'; } return '/api/v1/agent/history?${query.join('&')}'; } String _nextId(String prefix) => '$prefix${DateTime.now().millisecondsSinceEpoch}'; String _newUuid() { final random = Random(); String hex(int len) => List.generate( len, (_) => random.nextInt(16).toRadixString(16), ).join(); const variant = ['8', '9', 'a', 'b']; return '${hex(8)}-${hex(4)}-4${hex(3)}-${variant[random.nextInt(4)]}${hex(3)}-${hex(12)}'; } }