Files
social-app/apps/lib/features/chat/data/services/ag_ui_service.dart
T

465 lines
14 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
import 'package:dio/dio.dart';
import 'package:image_picker/image_picker.dart';
import 'package:social_app/core/api/i_api_client.dart';
import '../models/ag_ui_event.dart';
typedef EventCallback = void Function(AgUiEvent event);
const _runIdPrefix = 'run_';
class UploadedAttachment {
const UploadedAttachment({
required this.localPath,
required this.url,
required this.mimeType,
});
final String localPath;
final String url;
final String mimeType;
}
class SendMessageResult {
const SendMessageResult({required this.uploadedAttachments});
final List<UploadedAttachment> uploadedAttachments;
}
class _RunInputPayload {
const _RunInputPayload({
required this.input,
required this.uploadedAttachments,
});
final Map<String, dynamic> input;
final List<UploadedAttachment> uploadedAttachments;
}
class AgUiService {
final IApiClient _apiClient;
EventCallback onEvent;
final Map<String, String> _lastEventIdByThread = {};
int _activeStreamToken = 0;
StreamSubscription<String>? _activeSseSubscription;
Completer<void>? _activeSseDoneCompleter;
String? _threadId;
bool _hasMoreHistory = false;
AgUiService({EventCallback? onEvent, required IApiClient apiClient})
: onEvent = onEvent ?? ((_) {}),
_apiClient = apiClient;
Future<SendMessageResult> sendMessage(
String content, {
List<XFile>? images,
}) async {
await _cancelActiveSseSubscription();
final streamToken = ++_activeStreamToken;
final runInputPayload = await _buildRunInput(
content: content,
images: images,
);
final response = await _apiClient.post<Map<String, dynamic>>(
'/api/v1/agent/runs',
data: runInputPayload.input,
);
final payload = response.data;
if (payload is! Map<String, dynamic>) {
throw StateError('Invalid /agent/runs response');
}
final threadId = payload['threadId'] as String?;
final runId = payload['runId'] as String?;
if (threadId == null || threadId.isEmpty) {
throw StateError('Missing threadId in /agent/runs response');
}
if (runId == null || runId.isEmpty) {
throw StateError('Missing runId in /agent/runs response');
}
_threadId = threadId;
await _streamEventsFromApi(
threadId,
expectedRunId: runId,
streamToken: streamToken,
);
return SendMessageResult(
uploadedAttachments: runInputPayload.uploadedAttachments,
);
}
Future<HistorySnapshot> loadHistory({DateTime? beforeDate}) async {
final path = _buildHistoryPath(beforeDate: beforeDate);
final response = await _apiClient.get<Map<String, dynamic>>(path);
final payload = response.data;
if (payload is! Map<String, dynamic>) {
throw StateError('Invalid /agent/history response');
}
final snapshot = HistorySnapshot.fromJson(payload);
if (snapshot.threadId != null && snapshot.threadId!.isNotEmpty) {
_threadId = snapshot.threadId;
}
_hasMoreHistory = snapshot.hasMore;
return snapshot;
}
Future<Uint8List> fetchAttachmentPreview(String previewPath) async {
final response = await _apiClient.get<List<int>>(
previewPath,
options: Options(responseType: ResponseType.bytes),
);
final payload = response.data;
if (payload is List<int>) {
return Uint8List.fromList(payload);
}
throw StateError('Invalid attachment payload');
}
Future<String> transcribeAudio(String filePath) async {
final formData = FormData.fromMap({
'audio': await MultipartFile.fromFile(
filePath,
filename: 'recording.wav',
contentType: DioMediaType('audio', 'wav'),
),
});
final response = await _apiClient.post<Map<String, dynamic>>(
'/api/v1/agent/transcribe',
data: formData,
);
final payload = response.data;
if (payload is! Map<String, dynamic>) {
throw StateError('Invalid /agent/transcribe response');
}
final transcript = payload['transcript'];
if (transcript is! String) {
throw StateError('Missing transcript in /agent/transcribe response');
}
return transcript;
}
bool hasEarlierHistory(DateTime fromDate) {
// 历史是否还有更多由后端 history snapshot 的 hasMore 驱动。
// 参数保留是为了兼容 ChatBloc 现有调用签名。
final _ = fromDate;
return _hasMoreHistory;
}
Future<void> cancelCurrentRun() async {
_activeStreamToken += 1;
await _cancelActiveSseSubscription();
}
Future<void> _cancelActiveSseSubscription() async {
final doneCompleter = _activeSseDoneCompleter;
if (doneCompleter != null && !doneCompleter.isCompleted) {
doneCompleter.complete();
}
_activeSseDoneCompleter = null;
final subscription = _activeSseSubscription;
_activeSseSubscription = null;
if (subscription == null) {
return;
}
await subscription.cancel();
}
Future<void> _streamEventsFromApi(
String threadId, {
required String expectedRunId,
required int streamToken,
}) async {
final lastEventId = _lastEventIdByThread[threadId];
final headers = <String, String>{'Accept': 'text/event-stream'};
if (lastEventId != null && lastEventId.isNotEmpty) {
headers['Last-Event-ID'] = lastEventId;
}
final sseLines = await _apiClient.getSseLines(
'/api/v1/agent/runs/$threadId/events',
headers: headers,
);
String? eventType;
String? eventId;
var hasBoundExpectedRun = false;
var hasSeenTerminalForRun = false;
final dataBuffer = StringBuffer();
final done = Completer<void>();
late final StreamSubscription<String> subscription;
void stopStream({Object? error, StackTrace? stackTrace}) {
if (!done.isCompleted) {
if (error == null) {
done.complete();
} else {
done.completeError(error, stackTrace);
}
}
unawaited(subscription.cancel());
}
subscription = sseLines.listen(
(line) {
try {
if (streamToken != _activeStreamToken) {
stopStream();
return;
}
if (line.isEmpty) {
if (dataBuffer.isNotEmpty) {
final raw = dataBuffer.toString();
dataBuffer.clear();
String? eventRunId;
String? eventThreadId;
Map<String, dynamic>? parsedMap;
try {
final parsed = jsonDecode(raw);
if (parsed is Map<String, dynamic>) {
parsedMap = parsed;
}
} catch (_) {
// Ignore malformed SSE payload and keep stream alive.
}
if (parsedMap != null) {
final runId = parsedMap['runId'];
final thread = parsedMap['threadId'];
eventRunId = runId is String ? runId : null;
eventThreadId = thread is String ? thread : null;
final isRunStarted = eventType == AgUiEventTypeWire.runStarted;
final isTargetRun = eventRunId == expectedRunId;
if (isRunStarted && isTargetRun) {
hasBoundExpectedRun = true;
}
final isThreadMatched =
eventThreadId == null || eventThreadId == threadId;
final shouldDispatch =
isTargetRun || (hasBoundExpectedRun && isThreadMatched);
if (shouldDispatch) {
final event = AgUiEvent.fromJson(parsedMap);
onEvent(event);
}
}
final currentEventId = eventId;
if (currentEventId != null && currentEventId.isNotEmpty) {
_lastEventIdByThread[threadId] = currentEventId;
}
final isTerminalEvent =
eventType == AgUiEventTypeWire.runFinished ||
eventType == AgUiEventTypeWire.runError;
final isTargetRun = eventRunId == expectedRunId;
final isThreadMatched =
eventThreadId == null || eventThreadId == threadId;
if (isTerminalEvent &&
(isTargetRun || (hasBoundExpectedRun && isThreadMatched))) {
hasSeenTerminalForRun = true;
stopStream();
return;
}
}
eventType = null;
eventId = null;
return;
}
if (line.startsWith(':')) {
return;
}
if (line.startsWith('id:')) {
eventId = line.substring(3).trim();
return;
}
if (line.startsWith('event:')) {
eventType = line.substring(6).trim();
return;
}
if (line.startsWith('data:')) {
final fragment = line.substring(5).trim();
if (dataBuffer.isNotEmpty) {
dataBuffer.write('\n');
}
dataBuffer.write(fragment);
}
} catch (error, stackTrace) {
stopStream(error: error, stackTrace: stackTrace);
}
},
onError: (Object error, StackTrace stackTrace) {
stopStream(error: error, stackTrace: stackTrace);
},
onDone: () {
if (streamToken != _activeStreamToken) {
stopStream();
return;
}
if (!hasSeenTerminalForRun) {
stopStream(
error: StateError('SSE closed before terminal event for run'),
);
return;
}
stopStream();
},
cancelOnError: false,
);
if (streamToken != _activeStreamToken) {
await subscription.cancel();
return;
}
_activeSseSubscription = subscription;
_activeSseDoneCompleter = done;
try {
await done.future;
} finally {
if (identical(_activeSseSubscription, subscription)) {
_activeSseSubscription = null;
}
if (identical(_activeSseDoneCompleter, done)) {
_activeSseDoneCompleter = null;
}
}
}
Future<_RunInputPayload> _buildRunInput({
required String content,
List<XFile>? images,
}) async {
final threadId = _threadId ?? _newUuid();
final runId = _nextId(_runIdPrefix);
final contentBlocks = <Map<String, dynamic>>[];
if (content.isNotEmpty) {
contentBlocks.add({'type': 'text', 'text': content});
}
var uploadedAttachments = const <UploadedAttachment>[];
if (images != null && images.isNotEmpty) {
uploadedAttachments = await _uploadAttachments(
threadId: threadId,
images: images,
);
for (final attachment in uploadedAttachments) {
contentBlocks.add({
'type': 'binary',
'mimeType': attachment.mimeType,
'url': attachment.url,
});
}
}
final dynamic messageContent;
if (contentBlocks.isEmpty) {
messageContent = '';
} else if (contentBlocks.length == 1 &&
contentBlocks[0]['type'] == 'text') {
messageContent = contentBlocks[0]['text'];
} else {
messageContent = contentBlocks;
}
return _RunInputPayload(
input: {
'threadId': threadId,
'runId': runId,
'state': <String, dynamic>{},
'messages': [
{'id': _nextId('user_'), 'role': 'user', 'content': messageContent},
],
'tools': <Map<String, dynamic>>[],
'context': <Map<String, dynamic>>[],
'forwardedProps': <String, dynamic>{'runtime_mode': 'chat'},
},
uploadedAttachments: uploadedAttachments,
);
}
Future<List<UploadedAttachment>> _uploadAttachments({
required String threadId,
required List<XFile> images,
}) async {
final attachments = <UploadedAttachment>[];
for (final image in images) {
final mimeType = image.mimeType ?? 'image/jpeg';
final fileBytes = await image.readAsBytes();
final formData = FormData.fromMap({
'threadId': threadId,
'file': MultipartFile.fromBytes(
fileBytes,
filename: image.name,
contentType: DioMediaType.parse(mimeType),
),
});
final response = await _apiClient.post<Map<String, dynamic>>(
'/api/v1/agent/attachments',
data: formData,
);
final payload = response.data;
if (payload is! Map<String, dynamic>) {
throw StateError('Invalid /agent/attachments response');
}
final attachment = payload['attachment'];
if (attachment is! Map<String, dynamic>) {
throw StateError('Missing attachment in /agent/attachments response');
}
final bucket = attachment['bucket'];
final path = attachment['path'];
final uploadedMime = attachment['mimeType'];
final url = attachment['url'];
if (bucket is! String ||
path is! String ||
uploadedMime is! String ||
url is! String ||
bucket.isEmpty ||
path.isEmpty ||
uploadedMime.isEmpty ||
url.isEmpty) {
throw StateError('Invalid attachment reference');
}
attachments.add(
UploadedAttachment(
localPath: image.path,
url: url,
mimeType: uploadedMime,
),
);
}
return attachments;
}
String _buildHistoryPath({DateTime? beforeDate}) {
final query = <String>[];
if (_threadId != null && _threadId!.isNotEmpty) {
query.add('threadId=$_threadId');
}
if (beforeDate != null) {
final day = DateTime(beforeDate.year, beforeDate.month, beforeDate.day);
query.add('before=${day.toIso8601String().substring(0, 10)}');
}
if (query.isEmpty) {
return '/api/v1/agent/history';
}
return '/api/v1/agent/history?${query.join('&')}';
}
String _nextId(String prefix) =>
'$prefix${DateTime.now().millisecondsSinceEpoch}';
String _newUuid() {
final random = Random();
String hex(int len) => List<String>.generate(
len,
(_) => random.nextInt(16).toRadixString(16),
).join();
const variant = ['8', '9', 'a', 'b'];
return '${hex(8)}-${hex(4)}-4${hex(3)}-${variant[random.nextInt(4)]}${hex(3)}-${hex(12)}';
}
}