refactor: 重构聊天数据层至core并简化首页UI
This commit is contained in:
@@ -0,0 +1,390 @@
|
||||
class AgUiEventTypeWire {
|
||||
static const runStarted = 'RUN_STARTED';
|
||||
static const runFinished = 'RUN_FINISHED';
|
||||
static const runError = 'RUN_ERROR';
|
||||
static const stepStarted = 'STEP_STARTED';
|
||||
static const stepFinished = 'STEP_FINISHED';
|
||||
static const textMessageEnd = 'TEXT_MESSAGE_END';
|
||||
static const toolCallStart = 'TOOL_CALL_START';
|
||||
static const toolCallArgs = 'TOOL_CALL_ARGS';
|
||||
static const toolCallEnd = 'TOOL_CALL_END';
|
||||
static const toolCallResult = 'TOOL_CALL_RESULT';
|
||||
static const toolCallError = 'TOOL_CALL_ERROR';
|
||||
}
|
||||
|
||||
enum AgUiEventType {
|
||||
runStarted,
|
||||
runFinished,
|
||||
runError,
|
||||
stepStarted,
|
||||
stepFinished,
|
||||
textMessageEnd,
|
||||
toolCallStart,
|
||||
toolCallArgs,
|
||||
toolCallEnd,
|
||||
toolCallResult,
|
||||
toolCallError,
|
||||
unknown,
|
||||
}
|
||||
|
||||
const _wireToTypeMap = {
|
||||
AgUiEventTypeWire.runStarted: AgUiEventType.runStarted,
|
||||
AgUiEventTypeWire.runFinished: AgUiEventType.runFinished,
|
||||
AgUiEventTypeWire.runError: AgUiEventType.runError,
|
||||
AgUiEventTypeWire.stepStarted: AgUiEventType.stepStarted,
|
||||
AgUiEventTypeWire.stepFinished: AgUiEventType.stepFinished,
|
||||
AgUiEventTypeWire.textMessageEnd: AgUiEventType.textMessageEnd,
|
||||
AgUiEventTypeWire.toolCallStart: AgUiEventType.toolCallStart,
|
||||
AgUiEventTypeWire.toolCallArgs: AgUiEventType.toolCallArgs,
|
||||
AgUiEventTypeWire.toolCallEnd: AgUiEventType.toolCallEnd,
|
||||
AgUiEventTypeWire.toolCallResult: AgUiEventType.toolCallResult,
|
||||
AgUiEventTypeWire.toolCallError: AgUiEventType.toolCallError,
|
||||
};
|
||||
|
||||
const _typeToWireMap = {
|
||||
AgUiEventType.runStarted: AgUiEventTypeWire.runStarted,
|
||||
AgUiEventType.runFinished: AgUiEventTypeWire.runFinished,
|
||||
AgUiEventType.runError: AgUiEventTypeWire.runError,
|
||||
AgUiEventType.stepStarted: AgUiEventTypeWire.stepStarted,
|
||||
AgUiEventType.stepFinished: AgUiEventTypeWire.stepFinished,
|
||||
AgUiEventType.textMessageEnd: AgUiEventTypeWire.textMessageEnd,
|
||||
AgUiEventType.toolCallStart: AgUiEventTypeWire.toolCallStart,
|
||||
AgUiEventType.toolCallArgs: AgUiEventTypeWire.toolCallArgs,
|
||||
AgUiEventType.toolCallEnd: AgUiEventTypeWire.toolCallEnd,
|
||||
AgUiEventType.toolCallResult: AgUiEventTypeWire.toolCallResult,
|
||||
AgUiEventType.toolCallError: AgUiEventTypeWire.toolCallError,
|
||||
AgUiEventType.unknown: '',
|
||||
};
|
||||
|
||||
AgUiEventType agUiEventTypeFromWire(String wire) =>
|
||||
_wireToTypeMap[wire] ?? AgUiEventType.unknown;
|
||||
|
||||
String agUiEventTypeToWire(AgUiEventType type) => _typeToWireMap[type] ?? '';
|
||||
|
||||
abstract class AgUiEvent {
|
||||
const AgUiEvent({required this.type});
|
||||
|
||||
final AgUiEventType type;
|
||||
|
||||
factory AgUiEvent.fromJson(Map<String, dynamic> json) {
|
||||
final wireType = json['type'];
|
||||
final type = wireType is String
|
||||
? agUiEventTypeFromWire(wireType)
|
||||
: AgUiEventType.unknown;
|
||||
return switch (type) {
|
||||
AgUiEventType.runStarted => RunStartedEvent.fromJson(json),
|
||||
AgUiEventType.runFinished => RunFinishedEvent.fromJson(json),
|
||||
AgUiEventType.runError => RunErrorEvent.fromJson(json),
|
||||
AgUiEventType.stepStarted => StepStartedEvent.fromJson(json),
|
||||
AgUiEventType.stepFinished => StepFinishedEvent.fromJson(json),
|
||||
AgUiEventType.textMessageEnd => TextMessageEndEvent.fromJson(json),
|
||||
AgUiEventType.toolCallStart => ToolCallStartEvent.fromJson(json),
|
||||
AgUiEventType.toolCallArgs => ToolCallArgsEvent.fromJson(json),
|
||||
AgUiEventType.toolCallEnd => ToolCallEndEvent.fromJson(json),
|
||||
AgUiEventType.toolCallResult => ToolCallResultEvent.fromJson(json),
|
||||
AgUiEventType.toolCallError => ToolCallErrorEvent.fromJson(json),
|
||||
AgUiEventType.unknown => UnknownAgUiEvent(rawJson: json),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
class UnknownAgUiEvent extends AgUiEvent {
|
||||
const UnknownAgUiEvent({required this.rawJson})
|
||||
: super(type: AgUiEventType.unknown);
|
||||
|
||||
final Map<String, dynamic> rawJson;
|
||||
}
|
||||
|
||||
class RunStartedEvent extends AgUiEvent {
|
||||
RunStartedEvent({required this.threadId, required this.runId})
|
||||
: super(type: AgUiEventType.runStarted);
|
||||
|
||||
final String threadId;
|
||||
final String runId;
|
||||
|
||||
factory RunStartedEvent.fromJson(Map<String, dynamic> json) =>
|
||||
RunStartedEvent(
|
||||
threadId: _asString(json['threadId']),
|
||||
runId: _asString(json['runId']),
|
||||
);
|
||||
}
|
||||
|
||||
class RunFinishedEvent extends AgUiEvent {
|
||||
RunFinishedEvent({required this.threadId, required this.runId})
|
||||
: super(type: AgUiEventType.runFinished);
|
||||
|
||||
final String threadId;
|
||||
final String runId;
|
||||
|
||||
factory RunFinishedEvent.fromJson(Map<String, dynamic> json) =>
|
||||
RunFinishedEvent(
|
||||
threadId: _asString(json['threadId']),
|
||||
runId: _asString(json['runId']),
|
||||
);
|
||||
}
|
||||
|
||||
class RunErrorEvent extends AgUiEvent {
|
||||
RunErrorEvent({required this.message, this.code})
|
||||
: super(type: AgUiEventType.runError);
|
||||
|
||||
final String message;
|
||||
final String? code;
|
||||
|
||||
factory RunErrorEvent.fromJson(Map<String, dynamic> json) => RunErrorEvent(
|
||||
message: _asString(json['message'], fallback: 'Unknown error'),
|
||||
code: json['code'] as String?,
|
||||
);
|
||||
}
|
||||
|
||||
class StepStartedEvent extends AgUiEvent {
|
||||
StepStartedEvent({required this.stepName})
|
||||
: super(type: AgUiEventType.stepStarted);
|
||||
|
||||
final String stepName;
|
||||
|
||||
factory StepStartedEvent.fromJson(Map<String, dynamic> json) =>
|
||||
StepStartedEvent(stepName: _asString(json['stepName']));
|
||||
}
|
||||
|
||||
class StepFinishedEvent extends AgUiEvent {
|
||||
StepFinishedEvent({required this.stepName})
|
||||
: super(type: AgUiEventType.stepFinished);
|
||||
|
||||
final String stepName;
|
||||
|
||||
factory StepFinishedEvent.fromJson(Map<String, dynamic> json) =>
|
||||
StepFinishedEvent(stepName: _asString(json['stepName']));
|
||||
}
|
||||
|
||||
class TextMessageEndEvent extends AgUiEvent {
|
||||
TextMessageEndEvent({
|
||||
required this.messageId,
|
||||
required this.answer,
|
||||
required this.role,
|
||||
required this.status,
|
||||
required this.uiSchema,
|
||||
}) : super(type: AgUiEventType.textMessageEnd);
|
||||
|
||||
final String messageId;
|
||||
final String answer;
|
||||
final String role;
|
||||
final String status;
|
||||
final Map<String, dynamic>? uiSchema;
|
||||
|
||||
factory TextMessageEndEvent.fromJson(Map<String, dynamic> json) =>
|
||||
TextMessageEndEvent(
|
||||
messageId: _asString(json['messageId']),
|
||||
answer: _asString(json['answer']),
|
||||
role: _asString(json['role'], fallback: 'assistant'),
|
||||
status: _asString(json['status'], fallback: 'success'),
|
||||
uiSchema: _asMap(json['ui_schema']),
|
||||
);
|
||||
}
|
||||
|
||||
class ToolCallStartEvent extends AgUiEvent {
|
||||
ToolCallStartEvent({required this.toolCallId, required this.toolCallName})
|
||||
: super(type: AgUiEventType.toolCallStart);
|
||||
|
||||
final String toolCallId;
|
||||
final String toolCallName;
|
||||
|
||||
factory ToolCallStartEvent.fromJson(Map<String, dynamic> json) =>
|
||||
ToolCallStartEvent(
|
||||
toolCallId: _asString(json['toolCallId']),
|
||||
toolCallName: _asString(json['toolCallName']),
|
||||
);
|
||||
}
|
||||
|
||||
class ToolCallArgsEvent extends AgUiEvent {
|
||||
ToolCallArgsEvent({required this.toolCallId, required this.args})
|
||||
: super(type: AgUiEventType.toolCallArgs);
|
||||
|
||||
final String toolCallId;
|
||||
final Map<String, dynamic> args;
|
||||
|
||||
factory ToolCallArgsEvent.fromJson(Map<String, dynamic> json) =>
|
||||
ToolCallArgsEvent(
|
||||
toolCallId: _asString(json['toolCallId']),
|
||||
args: _asMap(json['args']) ?? const {},
|
||||
);
|
||||
}
|
||||
|
||||
class ToolCallEndEvent extends AgUiEvent {
|
||||
ToolCallEndEvent({required this.toolCallId})
|
||||
: super(type: AgUiEventType.toolCallEnd);
|
||||
|
||||
final String toolCallId;
|
||||
|
||||
factory ToolCallEndEvent.fromJson(Map<String, dynamic> json) =>
|
||||
ToolCallEndEvent(toolCallId: _asString(json['toolCallId']));
|
||||
}
|
||||
|
||||
class ToolCallResultEvent extends AgUiEvent {
|
||||
ToolCallResultEvent({
|
||||
required this.messageId,
|
||||
required this.toolCallId,
|
||||
required this.toolName,
|
||||
required this.resultSummary,
|
||||
required this.status,
|
||||
}) : super(type: AgUiEventType.toolCallResult);
|
||||
|
||||
final String messageId;
|
||||
final String toolCallId;
|
||||
final String toolName;
|
||||
final String resultSummary;
|
||||
final String status;
|
||||
|
||||
factory ToolCallResultEvent.fromJson(Map<String, dynamic> json) =>
|
||||
ToolCallResultEvent(
|
||||
messageId: _asString(json['messageId']),
|
||||
toolCallId: _asString(json['tool_call_id']),
|
||||
toolName: _asString(json['tool_name']),
|
||||
resultSummary: _asString(json['result']),
|
||||
status: _asString(json['status'], fallback: 'success'),
|
||||
);
|
||||
}
|
||||
|
||||
class ToolCallErrorEvent extends AgUiEvent {
|
||||
ToolCallErrorEvent({required this.toolCallId, required this.error, this.code})
|
||||
: super(type: AgUiEventType.toolCallError);
|
||||
|
||||
final String toolCallId;
|
||||
final String error;
|
||||
final String? code;
|
||||
|
||||
factory ToolCallErrorEvent.fromJson(Map<String, dynamic> json) =>
|
||||
ToolCallErrorEvent(
|
||||
toolCallId: _asString(json['toolCallId']),
|
||||
error: _asString(json['error'], fallback: 'Tool call failed'),
|
||||
code: json['code'] as String?,
|
||||
);
|
||||
}
|
||||
|
||||
class HistorySnapshot {
|
||||
const HistorySnapshot({
|
||||
required this.scope,
|
||||
required this.threadId,
|
||||
required this.day,
|
||||
required this.hasMore,
|
||||
required this.messages,
|
||||
});
|
||||
|
||||
final String scope;
|
||||
final String? threadId;
|
||||
final String? day;
|
||||
final bool hasMore;
|
||||
final List<HistoryMessage> messages;
|
||||
|
||||
factory HistorySnapshot.fromJson(Map<String, dynamic> json) {
|
||||
final rawMessages = json['messages'];
|
||||
final messages = rawMessages is List
|
||||
? rawMessages
|
||||
.whereType<Map<String, dynamic>>()
|
||||
.map(HistoryMessage.fromJson)
|
||||
.toList()
|
||||
: const <HistoryMessage>[];
|
||||
return HistorySnapshot(
|
||||
scope: _asString(json['scope'], fallback: 'history_day'),
|
||||
threadId: json['threadId'] as String?,
|
||||
day: json['day'] as String?,
|
||||
hasMore: json['hasMore'] == true,
|
||||
messages: messages,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
class HistoryMessage {
|
||||
const HistoryMessage({
|
||||
required this.id,
|
||||
required this.seq,
|
||||
required this.role,
|
||||
required this.content,
|
||||
required this.timestamp,
|
||||
this.attachments = const <HistoryAttachment>[],
|
||||
this.uiSchema,
|
||||
});
|
||||
|
||||
final String id;
|
||||
final int seq;
|
||||
final String role;
|
||||
final String content;
|
||||
final DateTime timestamp;
|
||||
final List<HistoryAttachment> attachments;
|
||||
final Map<String, dynamic>? uiSchema;
|
||||
|
||||
factory HistoryMessage.fromJson(Map<String, dynamic> json) => HistoryMessage(
|
||||
id: _asString(json['id']),
|
||||
seq: _asInt(json['seq']),
|
||||
role: _asString(json['role']),
|
||||
content: _asString(json['content']),
|
||||
timestamp:
|
||||
DateTime.tryParse(_asString(json['timestamp'])) ?? DateTime.now(),
|
||||
attachments: _parseHistoryAttachments(json['attachments']),
|
||||
uiSchema: _asMap(json['ui_schema']),
|
||||
);
|
||||
}
|
||||
|
||||
class HistoryAttachment {
|
||||
const HistoryAttachment({required this.url, required this.mimeType});
|
||||
|
||||
final String url;
|
||||
final String mimeType;
|
||||
|
||||
factory HistoryAttachment.fromJson(Map<String, dynamic> json) {
|
||||
return HistoryAttachment(
|
||||
url: _asString(json['url']),
|
||||
mimeType: _asString(json['mimeType']),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
String _asString(Object? value, {String fallback = ''}) {
|
||||
if (value is String) {
|
||||
return value;
|
||||
}
|
||||
return fallback;
|
||||
}
|
||||
|
||||
int _asInt(Object? value) {
|
||||
if (value is int) {
|
||||
return value;
|
||||
}
|
||||
if (value is double) {
|
||||
return value.toInt();
|
||||
}
|
||||
if (value is String) {
|
||||
return int.tryParse(value) ?? 0;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
Map<String, dynamic>? _asMap(Object? value) {
|
||||
if (value is Map<String, dynamic>) {
|
||||
return value;
|
||||
}
|
||||
if (value is Map) {
|
||||
final result = <String, dynamic>{};
|
||||
for (final entry in value.entries) {
|
||||
final key = entry.key;
|
||||
if (key is String) {
|
||||
result[key] = entry.value;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
List<HistoryAttachment> _parseHistoryAttachments(Object? value) {
|
||||
if (value is! List) {
|
||||
return const <HistoryAttachment>[];
|
||||
}
|
||||
return value
|
||||
.whereType<Map<String, dynamic>>()
|
||||
.map(HistoryAttachment.fromJson)
|
||||
.where(
|
||||
(attachment) =>
|
||||
attachment.url.isNotEmpty && attachment.mimeType.isNotEmpty,
|
||||
)
|
||||
.toList();
|
||||
}
|
||||
@@ -0,0 +1,442 @@
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:math';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'chat_api.dart';
|
||||
import 'ag_ui_event.dart';
|
||||
import 'chat_history_repository.dart';
|
||||
|
||||
typedef EventCallback = void Function(AgUiEvent event);
|
||||
|
||||
const _runIdPrefix = 'run_';
|
||||
|
||||
class AttachmentUploadInput {
|
||||
const AttachmentUploadInput({
|
||||
required this.name,
|
||||
required this.mimeType,
|
||||
required this.bytes,
|
||||
required this.localPath,
|
||||
});
|
||||
|
||||
final String name;
|
||||
final String mimeType;
|
||||
final Uint8List bytes;
|
||||
final String localPath;
|
||||
}
|
||||
|
||||
class UploadedAttachment {
|
||||
const UploadedAttachment({
|
||||
required this.localPath,
|
||||
required this.url,
|
||||
required this.mimeType,
|
||||
});
|
||||
|
||||
final String localPath;
|
||||
final String url;
|
||||
final String mimeType;
|
||||
}
|
||||
|
||||
class SendMessageResult {
|
||||
const SendMessageResult({required this.uploadedAttachments});
|
||||
|
||||
final List<UploadedAttachment> uploadedAttachments;
|
||||
}
|
||||
|
||||
class _RunInputPayload {
|
||||
const _RunInputPayload({
|
||||
required this.input,
|
||||
required this.uploadedAttachments,
|
||||
});
|
||||
|
||||
final Map<String, dynamic> input;
|
||||
final List<UploadedAttachment> uploadedAttachments;
|
||||
}
|
||||
|
||||
class AgUiService {
|
||||
final ChatApi _chatApi;
|
||||
final ChatHistoryRepository? _historyRepository;
|
||||
EventCallback onEvent;
|
||||
final Map<String, String> _lastEventIdByThread = {};
|
||||
int _activeStreamToken = 0;
|
||||
StreamSubscription<String>? _activeSseSubscription;
|
||||
Completer<void>? _activeSseDoneCompleter;
|
||||
|
||||
String? _threadId;
|
||||
String? _activeThreadIdForRun;
|
||||
String? _activeRunId;
|
||||
bool _hasMoreHistory = false;
|
||||
|
||||
AgUiService({
|
||||
EventCallback? onEvent,
|
||||
required ChatApi chatApi,
|
||||
ChatHistoryRepository? historyRepository,
|
||||
}) : onEvent = onEvent ?? ((_) {}),
|
||||
_chatApi = chatApi,
|
||||
_historyRepository = historyRepository;
|
||||
|
||||
Future<SendMessageResult> sendMessage(
|
||||
String content, {
|
||||
List<AttachmentUploadInput>? attachments,
|
||||
}) async {
|
||||
await _cancelActiveSseSubscription();
|
||||
final streamToken = ++_activeStreamToken;
|
||||
final runInputPayload = await _buildRunInput(
|
||||
content: content,
|
||||
attachments: attachments,
|
||||
);
|
||||
final payload = await _chatApi.createRun(runInputPayload.input);
|
||||
final threadId = payload['threadId'] as String?;
|
||||
final runId = payload['runId'] as String?;
|
||||
if (threadId == null || threadId.isEmpty) {
|
||||
throw StateError('Missing threadId in /agent/runs response');
|
||||
}
|
||||
if (runId == null || runId.isEmpty) {
|
||||
throw StateError('Missing runId in /agent/runs response');
|
||||
}
|
||||
_threadId = threadId;
|
||||
_activeThreadIdForRun = threadId;
|
||||
_activeRunId = runId;
|
||||
try {
|
||||
await _streamEventsFromApi(
|
||||
threadId,
|
||||
expectedRunId: runId,
|
||||
streamToken: streamToken,
|
||||
);
|
||||
} finally {
|
||||
if (_activeThreadIdForRun == threadId && _activeRunId == runId) {
|
||||
_activeThreadIdForRun = null;
|
||||
_activeRunId = null;
|
||||
}
|
||||
}
|
||||
return SendMessageResult(
|
||||
uploadedAttachments: runInputPayload.uploadedAttachments,
|
||||
);
|
||||
}
|
||||
|
||||
Future<HistorySnapshot> loadHistory({DateTime? beforeDate}) async {
|
||||
final repository = _historyRepository;
|
||||
final snapshot = repository != null
|
||||
? await repository.loadHistory(
|
||||
threadId: _threadId,
|
||||
beforeDate: beforeDate,
|
||||
)
|
||||
: await _loadHistoryFromApi(beforeDate: beforeDate);
|
||||
if (snapshot.threadId != null && snapshot.threadId!.isNotEmpty) {
|
||||
_threadId = snapshot.threadId;
|
||||
}
|
||||
_hasMoreHistory = snapshot.hasMore;
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
Future<HistorySnapshot> _loadHistoryFromApi({DateTime? beforeDate}) async {
|
||||
final payload = await _chatApi.fetchHistory(
|
||||
threadId: _threadId,
|
||||
beforeDate: beforeDate,
|
||||
);
|
||||
return HistorySnapshot.fromJson(payload);
|
||||
}
|
||||
|
||||
Future<Uint8List> fetchAttachmentPreview(String previewPath) =>
|
||||
_chatApi.fetchAttachmentPreview(previewPath);
|
||||
|
||||
Future<String> transcribeAudio(String filePath) =>
|
||||
_chatApi.transcribeAudio(filePath);
|
||||
|
||||
bool hasEarlierHistory(DateTime fromDate) {
|
||||
// Whether earlier history exists is driven by backend snapshot.hasMore.
|
||||
// Keep the parameter for compatibility with the current ChatBloc signature.
|
||||
final _ = fromDate;
|
||||
return _hasMoreHistory;
|
||||
}
|
||||
|
||||
Future<void> cancelCurrentRun() async {
|
||||
final activeThreadId = _activeThreadIdForRun;
|
||||
final activeRunId = _activeRunId;
|
||||
if (activeThreadId != null && activeRunId != null) {
|
||||
await _chatApi.cancelRun(threadId: activeThreadId, runId: activeRunId);
|
||||
_activeThreadIdForRun = null;
|
||||
_activeRunId = null;
|
||||
_activeStreamToken += 1;
|
||||
await _cancelActiveSseSubscription();
|
||||
return;
|
||||
}
|
||||
_activeStreamToken += 1;
|
||||
await _cancelActiveSseSubscription();
|
||||
}
|
||||
|
||||
Future<void> _cancelActiveSseSubscription() async {
|
||||
final doneCompleter = _activeSseDoneCompleter;
|
||||
if (doneCompleter != null && !doneCompleter.isCompleted) {
|
||||
doneCompleter.complete();
|
||||
}
|
||||
_activeSseDoneCompleter = null;
|
||||
final subscription = _activeSseSubscription;
|
||||
_activeSseSubscription = null;
|
||||
if (subscription == null) {
|
||||
return;
|
||||
}
|
||||
await subscription.cancel();
|
||||
}
|
||||
|
||||
Future<void> _streamEventsFromApi(
|
||||
String threadId, {
|
||||
required String expectedRunId,
|
||||
required int streamToken,
|
||||
}) async {
|
||||
final sseLines = await _chatApi.streamRunEvents(
|
||||
threadId,
|
||||
lastEventId: _lastEventIdByThread[threadId],
|
||||
);
|
||||
|
||||
String? eventType;
|
||||
String? eventId;
|
||||
var hasBoundExpectedRun = false;
|
||||
var hasSeenTerminalForRun = false;
|
||||
final dataBuffer = StringBuffer();
|
||||
final done = Completer<void>();
|
||||
late final StreamSubscription<String> subscription;
|
||||
|
||||
void stopStream({Object? error, StackTrace? stackTrace}) {
|
||||
if (!done.isCompleted) {
|
||||
if (error == null) {
|
||||
done.complete();
|
||||
} else {
|
||||
done.completeError(error, stackTrace);
|
||||
}
|
||||
}
|
||||
unawaited(subscription.cancel());
|
||||
}
|
||||
|
||||
subscription = sseLines.listen(
|
||||
(line) {
|
||||
try {
|
||||
if (streamToken != _activeStreamToken) {
|
||||
stopStream();
|
||||
return;
|
||||
}
|
||||
if (line.isEmpty) {
|
||||
if (dataBuffer.isNotEmpty) {
|
||||
final raw = dataBuffer.toString();
|
||||
dataBuffer.clear();
|
||||
String? eventRunId;
|
||||
String? eventThreadId;
|
||||
Map<String, dynamic>? parsedMap;
|
||||
try {
|
||||
final parsed = jsonDecode(raw);
|
||||
if (parsed is Map<String, dynamic>) {
|
||||
parsedMap = parsed;
|
||||
}
|
||||
} catch (_) {
|
||||
// Ignore malformed SSE payload and keep stream alive.
|
||||
}
|
||||
if (parsedMap != null) {
|
||||
final runId = parsedMap['runId'];
|
||||
final thread = parsedMap['threadId'];
|
||||
eventRunId = runId is String ? runId : null;
|
||||
eventThreadId = thread is String ? thread : null;
|
||||
|
||||
final isRunStarted = eventType == AgUiEventTypeWire.runStarted;
|
||||
final isTargetRun = eventRunId == expectedRunId;
|
||||
if (isRunStarted && isTargetRun) {
|
||||
hasBoundExpectedRun = true;
|
||||
}
|
||||
|
||||
final isThreadMatched =
|
||||
eventThreadId == null || eventThreadId == threadId;
|
||||
final shouldDispatch =
|
||||
isTargetRun || (hasBoundExpectedRun && isThreadMatched);
|
||||
if (shouldDispatch) {
|
||||
final event = AgUiEvent.fromJson(parsedMap);
|
||||
onEvent(event);
|
||||
}
|
||||
}
|
||||
final currentEventId = eventId;
|
||||
if (currentEventId != null && currentEventId.isNotEmpty) {
|
||||
_lastEventIdByThread[threadId] = currentEventId;
|
||||
}
|
||||
final isTerminalEvent =
|
||||
eventType == AgUiEventTypeWire.runFinished ||
|
||||
eventType == AgUiEventTypeWire.runError;
|
||||
final isTargetRun = eventRunId == expectedRunId;
|
||||
final isThreadMatched =
|
||||
eventThreadId == null || eventThreadId == threadId;
|
||||
if (isTerminalEvent &&
|
||||
(isTargetRun || (hasBoundExpectedRun && isThreadMatched))) {
|
||||
hasSeenTerminalForRun = true;
|
||||
stopStream();
|
||||
return;
|
||||
}
|
||||
}
|
||||
eventType = null;
|
||||
eventId = null;
|
||||
return;
|
||||
}
|
||||
if (line.startsWith(':')) {
|
||||
return;
|
||||
}
|
||||
if (line.startsWith('id:')) {
|
||||
eventId = line.substring(3).trim();
|
||||
return;
|
||||
}
|
||||
if (line.startsWith('event:')) {
|
||||
eventType = line.substring(6).trim();
|
||||
return;
|
||||
}
|
||||
if (line.startsWith('data:')) {
|
||||
final fragment = line.substring(5).trim();
|
||||
if (dataBuffer.isNotEmpty) {
|
||||
dataBuffer.write('\n');
|
||||
}
|
||||
dataBuffer.write(fragment);
|
||||
}
|
||||
} catch (error, stackTrace) {
|
||||
stopStream(error: error, stackTrace: stackTrace);
|
||||
}
|
||||
},
|
||||
onError: (Object error, StackTrace stackTrace) {
|
||||
stopStream(error: error, stackTrace: stackTrace);
|
||||
},
|
||||
onDone: () {
|
||||
if (streamToken != _activeStreamToken) {
|
||||
stopStream();
|
||||
return;
|
||||
}
|
||||
if (!hasSeenTerminalForRun) {
|
||||
stopStream(
|
||||
error: StateError('SSE closed before terminal event for run'),
|
||||
);
|
||||
return;
|
||||
}
|
||||
stopStream();
|
||||
},
|
||||
cancelOnError: false,
|
||||
);
|
||||
|
||||
if (streamToken != _activeStreamToken) {
|
||||
await subscription.cancel();
|
||||
return;
|
||||
}
|
||||
|
||||
_activeSseSubscription = subscription;
|
||||
_activeSseDoneCompleter = done;
|
||||
try {
|
||||
await done.future;
|
||||
} finally {
|
||||
if (identical(_activeSseSubscription, subscription)) {
|
||||
_activeSseSubscription = null;
|
||||
}
|
||||
if (identical(_activeSseDoneCompleter, done)) {
|
||||
_activeSseDoneCompleter = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<_RunInputPayload> _buildRunInput({
|
||||
required String content,
|
||||
List<AttachmentUploadInput>? attachments,
|
||||
}) async {
|
||||
final threadId = _threadId ?? _newUuid();
|
||||
final runId = _nextId(_runIdPrefix);
|
||||
|
||||
final contentBlocks = <Map<String, dynamic>>[];
|
||||
|
||||
if (content.isNotEmpty) {
|
||||
contentBlocks.add({'type': 'text', 'text': content});
|
||||
}
|
||||
|
||||
var uploadedAttachments = const <UploadedAttachment>[];
|
||||
if (attachments != null && attachments.isNotEmpty) {
|
||||
uploadedAttachments = await _uploadAttachments(
|
||||
threadId: threadId,
|
||||
attachments: attachments,
|
||||
);
|
||||
for (final attachment in uploadedAttachments) {
|
||||
contentBlocks.add({
|
||||
'type': 'binary',
|
||||
'mimeType': attachment.mimeType,
|
||||
'url': attachment.url,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
final dynamic messageContent;
|
||||
if (contentBlocks.isEmpty) {
|
||||
messageContent = '';
|
||||
} else if (contentBlocks.length == 1 &&
|
||||
contentBlocks[0]['type'] == 'text') {
|
||||
messageContent = contentBlocks[0]['text'];
|
||||
} else {
|
||||
messageContent = contentBlocks;
|
||||
}
|
||||
|
||||
return _RunInputPayload(
|
||||
input: {
|
||||
'threadId': threadId,
|
||||
'runId': runId,
|
||||
'state': <String, dynamic>{},
|
||||
'messages': [
|
||||
{'id': _nextId('user_'), 'role': 'user', 'content': messageContent},
|
||||
],
|
||||
'tools': <Map<String, dynamic>>[],
|
||||
'context': <Map<String, dynamic>>[],
|
||||
'forwardedProps': <String, dynamic>{'runtime_mode': 'chat'},
|
||||
},
|
||||
uploadedAttachments: uploadedAttachments,
|
||||
);
|
||||
}
|
||||
|
||||
Future<List<UploadedAttachment>> _uploadAttachments({
|
||||
required String threadId,
|
||||
required List<AttachmentUploadInput> attachments,
|
||||
}) async {
|
||||
final uploaded = <UploadedAttachment>[];
|
||||
for (final attachment in attachments) {
|
||||
final payload = await _chatApi.uploadAttachment(
|
||||
threadId: threadId,
|
||||
filename: attachment.name,
|
||||
mimeType: attachment.mimeType,
|
||||
bytes: attachment.bytes,
|
||||
);
|
||||
final payloadAttachment = payload['attachment'];
|
||||
if (payloadAttachment is! Map<String, dynamic>) {
|
||||
throw StateError('Missing attachment in /agent/attachments response');
|
||||
}
|
||||
final bucket = payloadAttachment['bucket'];
|
||||
final path = payloadAttachment['path'];
|
||||
final uploadedMime = payloadAttachment['mimeType'];
|
||||
final url = payloadAttachment['url'];
|
||||
if (bucket is! String ||
|
||||
path is! String ||
|
||||
uploadedMime is! String ||
|
||||
url is! String ||
|
||||
bucket.isEmpty ||
|
||||
path.isEmpty ||
|
||||
uploadedMime.isEmpty ||
|
||||
url.isEmpty) {
|
||||
throw StateError('Invalid attachment reference');
|
||||
}
|
||||
uploaded.add(
|
||||
UploadedAttachment(
|
||||
localPath: attachment.localPath,
|
||||
url: url,
|
||||
mimeType: uploadedMime,
|
||||
),
|
||||
);
|
||||
}
|
||||
return uploaded;
|
||||
}
|
||||
|
||||
String _nextId(String prefix) =>
|
||||
'$prefix${DateTime.now().millisecondsSinceEpoch}';
|
||||
|
||||
String _newUuid() {
|
||||
final random = Random();
|
||||
String hex(int len) => List<String>.generate(
|
||||
len,
|
||||
(_) => random.nextInt(16).toRadixString(16),
|
||||
).join();
|
||||
const variant = ['8', '9', 'a', 'b'];
|
||||
return '${hex(8)}-${hex(4)}-4${hex(3)}-${variant[random.nextInt(4)]}${hex(3)}-${hex(12)}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
import 'dart:typed_data';
|
||||
|
||||
abstract class ChatApi {
|
||||
Future<Map<String, dynamic>> createRun(Map<String, dynamic> runInput);
|
||||
|
||||
Future<Stream<String>> streamRunEvents(
|
||||
String threadId, {
|
||||
String? lastEventId,
|
||||
});
|
||||
|
||||
Future<Map<String, dynamic>> fetchHistory({
|
||||
String? threadId,
|
||||
DateTime? beforeDate,
|
||||
});
|
||||
|
||||
Future<Map<String, dynamic>> uploadAttachment({
|
||||
required String threadId,
|
||||
required String filename,
|
||||
required String mimeType,
|
||||
required Uint8List bytes,
|
||||
});
|
||||
|
||||
Future<Uint8List> fetchAttachmentPreview(String previewPath);
|
||||
|
||||
Future<String> transcribeAudio(String filePath);
|
||||
|
||||
Future<void> cancelRun({required String threadId, required String runId});
|
||||
}
|
||||
@@ -0,0 +1,97 @@
|
||||
import 'chat_api.dart';
|
||||
import 'package:social_app/data/cache/cache_policy.dart';
|
||||
import 'package:social_app/data/cache/cached_repository.dart';
|
||||
|
||||
import 'ag_ui_event.dart';
|
||||
|
||||
class ChatHistoryRepository extends CachedRepository<HistorySnapshot> {
|
||||
ChatHistoryRepository({required ChatApi chatApi, required super.store})
|
||||
: _chatApi = chatApi,
|
||||
super(
|
||||
policy: const CachePolicy(
|
||||
softTtl: Duration(seconds: 30),
|
||||
hardTtl: Duration(minutes: 5),
|
||||
minRefreshInterval: Duration(seconds: 15),
|
||||
),
|
||||
encodeValue: _encodeSnapshot,
|
||||
decodeValue: _decodeSnapshot,
|
||||
);
|
||||
|
||||
final ChatApi _chatApi;
|
||||
|
||||
Future<HistorySnapshot> loadHistory({
|
||||
String? threadId,
|
||||
DateTime? beforeDate,
|
||||
bool forceRefresh = false,
|
||||
}) {
|
||||
final key = _keyFor(threadId: threadId, beforeDate: beforeDate);
|
||||
return getOrLoad(
|
||||
key: key,
|
||||
forceRefresh: forceRefresh,
|
||||
loadFromRemote: () =>
|
||||
_loadHistoryRemote(threadId: threadId, beforeDate: beforeDate),
|
||||
);
|
||||
}
|
||||
|
||||
Future<HistorySnapshot> _loadHistoryRemote({
|
||||
String? threadId,
|
||||
DateTime? beforeDate,
|
||||
}) async {
|
||||
final payload = await _chatApi.fetchHistory(
|
||||
threadId: threadId,
|
||||
beforeDate: beforeDate,
|
||||
);
|
||||
return HistorySnapshot.fromJson(payload);
|
||||
}
|
||||
|
||||
static String _keyFor({String? threadId, DateTime? beforeDate}) {
|
||||
final threadPart = (threadId == null || threadId.isEmpty)
|
||||
? 'default'
|
||||
: threadId;
|
||||
if (beforeDate == null) {
|
||||
return 'chat:history:first:$threadPart';
|
||||
}
|
||||
final day = DateTime(
|
||||
beforeDate.year,
|
||||
beforeDate.month,
|
||||
beforeDate.day,
|
||||
).toIso8601String().substring(0, 10);
|
||||
return 'chat:history:before:$threadPart:$day';
|
||||
}
|
||||
|
||||
static Object? _encodeSnapshot(HistorySnapshot snapshot) {
|
||||
return <String, Object?>{
|
||||
'scope': snapshot.scope,
|
||||
'threadId': snapshot.threadId,
|
||||
'day': snapshot.day,
|
||||
'hasMore': snapshot.hasMore,
|
||||
'messages': snapshot.messages.map(_encodeMessage).toList(growable: false),
|
||||
};
|
||||
}
|
||||
|
||||
static HistorySnapshot _decodeSnapshot(Object? raw) {
|
||||
if (raw is! Map) {
|
||||
throw const FormatException('Invalid cached history snapshot payload');
|
||||
}
|
||||
return HistorySnapshot.fromJson(Map<String, dynamic>.from(raw));
|
||||
}
|
||||
|
||||
static Map<String, Object?> _encodeMessage(HistoryMessage message) {
|
||||
return <String, Object?>{
|
||||
'id': message.id,
|
||||
'seq': message.seq,
|
||||
'role': message.role,
|
||||
'content': message.content,
|
||||
'timestamp': message.timestamp.toIso8601String(),
|
||||
'attachments': message.attachments
|
||||
.map(
|
||||
(attachment) => <String, Object?>{
|
||||
'url': attachment.url,
|
||||
'mimeType': attachment.mimeType,
|
||||
},
|
||||
)
|
||||
.toList(growable: false),
|
||||
'ui_schema': message.uiSchema,
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user