refactor: 修复预热编排器竞态条件添加 runToken 机制

This commit is contained in:
zl-q
2026-03-30 09:06:38 +08:00
parent 41f35d6e22
commit bd7ac6285c
3 changed files with 44 additions and 6 deletions
+23 -4
View File
@@ -9,6 +9,8 @@ import '../l10n/app_localizations.dart';
import '../features/auth/presentation/bloc/auth_bloc.dart';
import '../features/auth/presentation/bloc/auth_event.dart';
import '../features/auth/presentation/bloc/auth_state.dart';
import '../features/chat/presentation/bloc/chat_bloc.dart';
import '../data/cache/cache_scope.dart';
import 'services/app_prewarm_orchestrator.dart';
import 'router/app_router.dart';
import '../core/theme/app_theme.dart';
@@ -23,11 +25,30 @@ class LinksyApp extends StatefulWidget {
class _LinksyAppState extends State<LinksyApp> {
late final AuthBloc _authBloc;
late final GoRouter _router;
int _cacheScopeVersion = 0;
Future<void> _onAuthenticated(String userId) async {
_cacheScopeVersion += 1;
final scopeKey = 'user:$userId:v$_cacheScopeVersion';
CacheScope.configureProvider(() => scopeKey);
await sl<ChatBloc>().switchUser(userId);
await sl<AppPrewarmOrchestrator>().ensureStartedFor(userId);
}
Future<void> _onUnauthenticated() async {
_cacheScopeVersion += 1;
final scopeKey = 'anonymous:v$_cacheScopeVersion';
CacheScope.configureProvider(() => scopeKey);
await sl<ChatBloc>().switchUser(null);
sl<AppPrewarmOrchestrator>().reset();
}
@override
void initState() {
super.initState();
_authBloc = sl<AuthBloc>();
const initialScopeKey = 'anonymous:v0';
CacheScope.configureProvider(() => initialScopeKey);
_authBloc.add(AuthStarted());
_router = createAppRouter(_authBloc);
}
@@ -45,12 +66,10 @@ class _LinksyAppState extends State<LinksyApp> {
child: BlocListener<AuthBloc, AuthState>(
listener: (context, state) {
if (state is AuthAuthenticated) {
unawaited(
sl<AppPrewarmOrchestrator>().ensureStartedFor(state.user.id),
);
unawaited(_onAuthenticated(state.user.id));
}
if (state is AuthUnauthenticated) {
sl<AppPrewarmOrchestrator>().reset();
unawaited(_onUnauthenticated());
}
},
child: MaterialApp.router(
@@ -37,6 +37,7 @@ class AppPrewarmOrchestrator extends ChangeNotifier {
String? _userId;
Future<void>? _running;
int _runToken = 0;
bool get isBootBlocking => _status == AppPrewarmStatus.running;
@@ -51,6 +52,7 @@ class AppPrewarmOrchestrator extends ChangeNotifier {
}
_userId = userId;
final runToken = ++_runToken;
_status = AppPrewarmStatus.running;
notifyListeners();
@@ -60,7 +62,7 @@ class AppPrewarmOrchestrator extends ChangeNotifier {
_runPrewarmUnreadInbox(),
]);
final running = _runWithBudget(tasks);
final running = _runWithBudget(tasks, userId: userId, runToken: runToken);
_running = running;
return running.whenComplete(() {
if (identical(_running, running)) {
@@ -93,15 +95,30 @@ class AppPrewarmOrchestrator extends ChangeNotifier {
return _inboxRepository.getMessages(isRead: false);
}
Future<void> _runWithBudget(Future<void> tasks) async {
Future<void> _runWithBudget(
Future<void> tasks, {
required String userId,
required int runToken,
}) async {
bool isLatestRun() => _runToken == runToken && _userId == userId;
try {
await tasks.timeout(bootBudget);
if (!isLatestRun()) {
return;
}
_status = AppPrewarmStatus.completed;
notifyListeners();
} on TimeoutException {
if (!isLatestRun()) {
return;
}
_status = AppPrewarmStatus.timedOut;
notifyListeners();
} catch (_) {
if (!isLatestRun()) {
return;
}
_status = AppPrewarmStatus.failed;
notifyListeners();
}
@@ -110,6 +127,7 @@ class AppPrewarmOrchestrator extends ChangeNotifier {
void reset() {
_userId = null;
_running = null;
_runToken += 1;
if (_status != AppPrewarmStatus.idle) {
_status = AppPrewarmStatus.idle;
notifyListeners();
@@ -73,6 +73,7 @@ class _FakeChatApi implements ChatApi {
@override
Future<Stream<String>> streamRunEvents(
String threadId, {
required String runId,
String? lastEventId,
}) {
throw UnimplementedError();