diff --git a/mobile/lib/domain/interfaces/sync_api.interface.dart b/mobile/lib/domain/interfaces/sync_api.interface.dart index 44e22c5894a..57abed2e7fe 100644 --- a/mobile/lib/domain/interfaces/sync_api.interface.dart +++ b/mobile/lib/domain/interfaces/sync_api.interface.dart @@ -1,8 +1,12 @@ +import 'package:http/http.dart' as http; import 'package:immich_mobile/domain/models/sync_event.model.dart'; -import 'package:openapi/api.dart'; abstract interface class ISyncApiRepository { Future ack(List data); - Stream> getSyncEvents(List type); + Future streamChanges( + Function(List, Function() abort) onData, { + int batchSize, + http.Client? httpClient, + }); } diff --git a/mobile/lib/domain/interfaces/sync_stream.interface.dart b/mobile/lib/domain/interfaces/sync_stream.interface.dart index f9c52d7ee0c..5f61d6b52f3 100644 --- a/mobile/lib/domain/interfaces/sync_stream.interface.dart +++ b/mobile/lib/domain/interfaces/sync_stream.interface.dart @@ -2,9 +2,17 @@ import 'package:immich_mobile/domain/interfaces/db.interface.dart'; import 'package:openapi/api.dart'; abstract interface class ISyncStreamRepository implements IDatabaseRepository { - Future updateUsersV1(Iterable data); - Future deleteUsersV1(Iterable data); + Future updateUsersV1(Iterable data); + Future deleteUsersV1(Iterable data); - Future updatePartnerV1(Iterable data); - Future deletePartnerV1(Iterable data); + Future updatePartnerV1(Iterable data); + Future deletePartnerV1(Iterable data); + + Future updateAssetsV1(Iterable data); + Future deleteAssetsV1(Iterable data); + Future updateAssetsExifV1(Iterable data); + + Future updatePartnerAssetsV1(Iterable data); + Future deletePartnerAssetsV1(Iterable data); + Future updatePartnerAssetsExifV1(Iterable data); } diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index 8d7d87e35e5..ac63734b07d 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -2,25 +2,11 @@ import 'dart:async'; -import 'package:collection/collection.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; +import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:logging/logging.dart'; import 'package:openapi/api.dart'; -import 'package:worker_manager/worker_manager.dart'; - -const _kSyncTypeOrder = [ - SyncEntityType.userDeleteV1, - SyncEntityType.userV1, - SyncEntityType.partnerDeleteV1, - SyncEntityType.partnerV1, - SyncEntityType.assetDeleteV1, - SyncEntityType.assetV1, - SyncEntityType.assetExifV1, - SyncEntityType.partnerAssetDeleteV1, - SyncEntityType.partnerAssetV1, - SyncEntityType.partnerAssetExifV1, -]; class SyncStreamService { final Logger _logger = Logger('SyncStreamService'); @@ -37,164 +23,70 @@ class SyncStreamService { _syncStreamRepository = syncStreamRepository, _cancelChecker = cancelChecker; - Future _handleSyncData( + bool get isCancelled => _cancelChecker?.call() ?? false; + + Future sync() => _syncApiRepository.streamChanges(_handleEvents); + + Future _handleEvents(List events, Function() abort) async { + List items = []; + for (final event in events) { + if (isCancelled) { + _logger.warning("Sync stream cancelled"); + abort(); + return; + } + + if (event.type != items.firstOrNull?.type) { + await _processBatch(items); + } + + items.add(event); + } + + await _processBatch(items); + } + + Future _processBatch(List batch) async { + if (batch.isEmpty) { + return; + } + + final type = batch.first.type; + await _handleSyncData(type, batch.map((e) => e.data)); + await _syncApiRepository.ack([batch.last.ack]); + batch.clear(); + } + + Future _handleSyncData( SyncEntityType type, // ignore: avoid-dynamic Iterable data, ) async { - if (data.isEmpty) { - _logger.warning("Received empty sync data for $type"); - return false; - } - _logger.fine("Processing sync data for $type of length ${data.length}"); - - try { - if (type == SyncEntityType.partnerV1) { - return await _syncStreamRepository.updatePartnerV1(data.cast()); - } - - if (type == SyncEntityType.partnerDeleteV1) { - return await _syncStreamRepository.deletePartnerV1(data.cast()); - } - - if (type == SyncEntityType.userV1) { - return await _syncStreamRepository.updateUsersV1(data.cast()); - } - - if (type == SyncEntityType.userDeleteV1) { - return await _syncStreamRepository.deleteUsersV1(data.cast()); - } - } catch (error, stack) { - _logger.severe("Error processing sync data for $type", error, stack); - return false; + // ignore: prefer-switch-expression + switch (type) { + case SyncEntityType.userV1: + return _syncStreamRepository.updateUsersV1(data.cast()); + case SyncEntityType.userDeleteV1: + return _syncStreamRepository.deleteUsersV1(data.cast()); + case SyncEntityType.partnerV1: + return _syncStreamRepository.updatePartnerV1(data.cast()); + case SyncEntityType.partnerDeleteV1: + return _syncStreamRepository.deletePartnerV1(data.cast()); + case SyncEntityType.assetV1: + return _syncStreamRepository.updateAssetsV1(data.cast()); + case SyncEntityType.assetDeleteV1: + return _syncStreamRepository.deleteAssetsV1(data.cast()); + case SyncEntityType.assetExifV1: + return _syncStreamRepository.updateAssetsExifV1(data.cast()); + case SyncEntityType.partnerAssetV1: + return _syncStreamRepository.updatePartnerAssetsV1(data.cast()); + case SyncEntityType.partnerAssetDeleteV1: + return _syncStreamRepository.deletePartnerAssetsV1(data.cast()); + case SyncEntityType.partnerAssetExifV1: + return _syncStreamRepository.updatePartnerAssetsExifV1(data.cast()); + default: + _logger.warning("Unknown sync data type: $type"); } - - _logger.warning("Unknown sync data type: $type"); - return false; } - - Future _syncEvent(List types) { - _logger.info("Syncing Events: $types"); - final streamCompleter = Completer(); - bool shouldComplete = false; - // the onDone callback might fire before the events are processed - // the following flag ensures that the onDone callback is not called - // before the events are processed and also that events are processed sequentially - Completer? mutex; - StreamSubscription? subscription; - try { - subscription = _syncApiRepository.getSyncEvents(types).listen( - (events) async { - if (events.isEmpty) { - _logger.warning("Received empty sync events"); - return; - } - - // If previous events are still being processed, wait for them to finish - if (mutex != null) { - await mutex!.future; - } - - if (_cancelChecker?.call() ?? false) { - _logger.info("Sync cancelled, stopping stream"); - subscription?.cancel(); - if (!streamCompleter.isCompleted) { - streamCompleter.completeError( - CanceledError(), - StackTrace.current, - ); - } - return; - } - - // Take control of the mutex and process the events - mutex = Completer(); - - try { - final eventsMap = events.groupListsBy((event) => event.type); - final Map acks = {}; - - for (final type in _kSyncTypeOrder) { - final data = eventsMap[type]; - if (data == null) { - continue; - } - - if (_cancelChecker?.call() ?? false) { - _logger.info("Sync cancelled, stopping stream"); - mutex?.complete(); - mutex = null; - if (!streamCompleter.isCompleted) { - streamCompleter.completeError( - CanceledError(), - StackTrace.current, - ); - } - - return; - } - - if (data.isEmpty) { - _logger.warning("Received empty sync events for $type"); - continue; - } - - if (await _handleSyncData(type, data.map((e) => e.data))) { - // ignore: avoid-unsafe-collection-methods - acks[type] = data.last.ack; - } else { - _logger.warning("Failed to handle sync events for $type"); - } - } - - if (acks.isNotEmpty) { - await _syncApiRepository.ack(acks.values.toList()); - } - _logger.info("$types events processed"); - } catch (error, stack) { - _logger.warning("Error handling sync events", error, stack); - } finally { - mutex?.complete(); - mutex = null; - } - - if (shouldComplete) { - _logger.info("Sync done, completing stream"); - if (!streamCompleter.isCompleted) streamCompleter.complete(); - } - }, - onError: (error, stack) { - _logger.warning("Error in sync stream for $types", error, stack); - // Do not proceed if the stream errors - if (!streamCompleter.isCompleted) { - // ignore: avoid-missing-completer-stack-trace - streamCompleter.completeError(error, stack); - } - }, - onDone: () { - _logger.info("$types stream done"); - if (mutex == null && !streamCompleter.isCompleted) { - streamCompleter.complete(); - } else { - // Marks the stream as done but does not complete the completer - // until the events are processed - shouldComplete = true; - } - }, - ); - } catch (error, stack) { - _logger.severe("Error starting sync stream", error, stack); - if (!streamCompleter.isCompleted) { - streamCompleter.completeError(error, stack); - } - } - return streamCompleter.future.whenComplete(() { - _logger.info("Sync stream completed"); - return subscription?.cancel(); - }); - } - - Future syncUsers() => - _syncEvent([SyncRequestType.usersV1, SyncRequestType.partnersV1]); } diff --git a/mobile/lib/domain/utils/background_sync.dart b/mobile/lib/domain/utils/background_sync.dart index 0bd456f0bb5..f63dc81ba99 100644 --- a/mobile/lib/domain/utils/background_sync.dart +++ b/mobile/lib/domain/utils/background_sync.dart @@ -7,31 +7,33 @@ import 'package:immich_mobile/utils/isolate.dart'; import 'package:worker_manager/worker_manager.dart'; class BackgroundSyncManager { - Cancelable? _userSyncTask; + Cancelable? _syncTask; BackgroundSyncManager(); Future cancel() { final futures = []; - if (_userSyncTask != null) { - futures.add(_userSyncTask!.future); + + if (_syncTask != null) { + futures.add(_syncTask!.future); } - _userSyncTask?.cancel(); - _userSyncTask = null; + _syncTask?.cancel(); + _syncTask = null; + return Future.wait(futures); } - Future syncUsers() { - if (_userSyncTask != null) { - return _userSyncTask!.future; + Future sync() { + if (_syncTask != null) { + return _syncTask!.future; } - _userSyncTask = runInIsolateGentle( - computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(), + _syncTask = runInIsolateGentle( + computation: (ref) => ref.read(syncStreamServiceProvider).sync(), ); - _userSyncTask!.whenComplete(() { - _userSyncTask = null; + _syncTask!.whenComplete(() { + _syncTask = null; }); - return _userSyncTask!.future; + return _syncTask!.future; } } diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart index a26b867df6c..dd1ea208ba0 100644 --- a/mobile/lib/infrastructure/repositories/sync_api.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -12,22 +12,22 @@ import 'package:openapi/api.dart'; class SyncApiRepository implements ISyncApiRepository { final Logger _logger = Logger('SyncApiRepository'); final ApiService _api; - final int _batchSize; - SyncApiRepository(this._api, {int batchSize = kSyncEventBatchSize}) - : _batchSize = batchSize; - - @override - Stream> getSyncEvents(List type) { - return _getSyncStream(SyncStreamDto(types: type)); - } + SyncApiRepository(this._api); @override Future ack(List data) { return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data)); } - Stream> _getSyncStream(SyncStreamDto dto) async* { - final client = http.Client(); + @override + Future streamChanges( + Function(List, Function() abort) onData, { + int batchSize = kSyncEventBatchSize, + http.Client? httpClient, + }) async { + // ignore: avoid-unused-assignment + final stopwatch = Stopwatch()..start(); + final client = httpClient ?? http.Client(); final endpoint = "${_api.apiClient.basePath}/sync/stream"; final headers = { @@ -35,20 +35,38 @@ class SyncApiRepository implements ISyncApiRepository { 'Accept': 'application/jsonlines+json', }; - final queryParams = []; final headerParams = {}; - await _api.applyToParams(queryParams, headerParams); + await _api.applyToParams([], headerParams); headers.addAll(headerParams); final request = http.Request('POST', Uri.parse(endpoint)); request.headers.addAll(headers); - request.body = jsonEncode(dto.toJson()); + request.body = jsonEncode( + SyncStreamDto( + types: [ + SyncRequestType.usersV1, + SyncRequestType.partnersV1, + SyncRequestType.assetsV1, + SyncRequestType.partnerAssetsV1, + SyncRequestType.assetExifsV1, + SyncRequestType.partnerAssetExifsV1, + ], + ).toJson(), + ); String previousChunk = ''; List lines = []; + bool shouldAbort = false; + + void abort() { + _logger.warning("Abort requested, stopping sync stream"); + shouldAbort = true; + } + try { - final response = await client.send(request); + final response = + await client.send(request).timeout(const Duration(seconds: 20)); if (response.statusCode != 200) { final errorBody = await response.stream.bytesToString(); @@ -59,27 +77,38 @@ class SyncApiRepository implements ISyncApiRepository { } await for (final chunk in response.stream.transform(utf8.decoder)) { + if (shouldAbort) { + break; + } + previousChunk += chunk; final parts = previousChunk.toString().split('\n'); previousChunk = parts.removeLast(); lines.addAll(parts); - if (lines.length < _batchSize) { + if (lines.length < batchSize) { continue; } - yield _parseSyncResponse(lines); + await onData(_parseLines(lines), abort); lines.clear(); } - } finally { - if (lines.isNotEmpty) { - yield _parseSyncResponse(lines); + + if (lines.isNotEmpty && !shouldAbort) { + await onData(_parseLines(lines), abort); } + } catch (error, stack) { + _logger.severe("error processing stream", error, stack); + return Future.error(error, stack); + } finally { client.close(); } + stopwatch.stop(); + _logger + .info("Remote Sync completed in ${stopwatch.elapsed.inMilliseconds}ms"); } - List _parseSyncResponse(List lines) { + List _parseLines(List lines) { final List data = []; for (final line in lines) { @@ -110,4 +139,10 @@ const _kResponseMap = { SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson, SyncEntityType.partnerV1: SyncPartnerV1.fromJson, SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson, + SyncEntityType.assetV1: SyncAssetV1.fromJson, + SyncEntityType.assetDeleteV1: SyncAssetDeleteV1.fromJson, + SyncEntityType.assetExifV1: SyncAssetExifV1.fromJson, + SyncEntityType.partnerAssetV1: SyncAssetV1.fromJson, + SyncEntityType.partnerAssetDeleteV1: SyncAssetDeleteV1.fromJson, + SyncEntityType.partnerAssetExifV1: SyncAssetExifV1.fromJson, }; diff --git a/mobile/lib/infrastructure/repositories/sync_stream.repository.dart b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart index a947a9a66b4..5ad9a369df6 100644 --- a/mobile/lib/infrastructure/repositories/sync_stream.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart @@ -1,4 +1,5 @@ import 'package:drift/drift.dart'; +import 'package:flutter/foundation.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/extensions/string_extensions.dart'; import 'package:immich_mobile/infrastructure/entities/partner.entity.drift.dart'; @@ -15,7 +16,7 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository DriftSyncStreamRepository(super.db) : _db = db; @override - Future deleteUsersV1(Iterable data) async { + Future deleteUsersV1(Iterable data) async { try { await _db.batch((batch) { for (final user in data) { @@ -25,15 +26,14 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository ); } }); - return true; - } catch (e, s) { - _logger.severe('Error while processing SyncUserDeleteV1', e, s); - return false; + } catch (error, stack) { + _logger.severe('Error while processing SyncUserDeleteV1', error, stack); + rethrow; } } @override - Future updateUsersV1(Iterable data) async { + Future updateUsersV1(Iterable data) async { try { await _db.batch((batch) { for (final user in data) { @@ -49,15 +49,14 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository ); } }); - return true; - } catch (e, s) { - _logger.severe('Error while processing SyncUserV1', e, s); - return false; + } catch (error, stack) { + _logger.severe('Error while processing SyncUserV1', error, stack); + rethrow; } } @override - Future deletePartnerV1(Iterable data) async { + Future deletePartnerV1(Iterable data) async { try { await _db.batch((batch) { for (final partner in data) { @@ -70,15 +69,14 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository ); } }); - return true; } catch (e, s) { _logger.severe('Error while processing SyncPartnerDeleteV1', e, s); - return false; + rethrow; } } @override - Future updatePartnerV1(Iterable data) async { + Future updatePartnerV1(Iterable data) async { try { await _db.batch((batch) { for (final partner in data) { @@ -95,10 +93,42 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository ); } }); - return true; } catch (e, s) { _logger.severe('Error while processing SyncPartnerV1', e, s); - return false; + rethrow; } } + + // Assets + @override + Future updateAssetsV1(Iterable data) async { + debugPrint("updateAssetsV1 - ${data.length}"); + } + + @override + Future deleteAssetsV1(Iterable data) async { + debugPrint("deleteAssetsV1 - ${data.length}"); + } + + // Partner Assets + @override + Future updatePartnerAssetsV1(Iterable data) async { + debugPrint("updatePartnerAssetsV1 - ${data.length}"); + } + + @override + Future deletePartnerAssetsV1(Iterable data) async { + debugPrint("deletePartnerAssetsV1 - ${data.length}"); + } + + // EXIF + @override + Future updateAssetsExifV1(Iterable data) async { + debugPrint("updateAssetsExifV1 - ${data.length}"); + } + + @override + Future updatePartnerAssetsExifV1(Iterable data) async { + debugPrint("updatePartnerAssetsExifV1 - ${data.length}"); + } } diff --git a/mobile/lib/widgets/common/immich_app_bar.dart b/mobile/lib/widgets/common/immich_app_bar.dart index 51b4faa0144..4f95e657d95 100644 --- a/mobile/lib/widgets/common/immich_app_bar.dart +++ b/mobile/lib/widgets/common/immich_app_bar.dart @@ -1,11 +1,13 @@ import 'package:auto_route/auto_route.dart'; import 'package:easy_localization/easy_localization.dart'; +import 'package:flutter/foundation.dart'; import 'package:flutter/material.dart'; import 'package:flutter_svg/svg.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/extensions/build_context_extensions.dart'; import 'package:immich_mobile/models/backup/backup_state.model.dart'; import 'package:immich_mobile/models/server_info/server_info.model.dart'; +import 'package:immich_mobile/providers/background_sync.provider.dart'; import 'package:immich_mobile/providers/backup/backup.provider.dart'; import 'package:immich_mobile/providers/server_info.provider.dart'; import 'package:immich_mobile/providers/user.provider.dart'; @@ -178,6 +180,11 @@ class ImmichAppBar extends ConsumerWidget implements PreferredSizeWidget { child: action, ), ), + if (kDebugMode) + IconButton( + onPressed: () => ref.read(backgroundSyncProvider).sync(), + icon: const Icon(Icons.sync), + ), if (showUploadButton) Padding( padding: const EdgeInsets.only(right: 20), diff --git a/mobile/test/api.mocks.dart b/mobile/test/api.mocks.dart index d502ea06756..b0a4e9b8fdb 100644 --- a/mobile/test/api.mocks.dart +++ b/mobile/test/api.mocks.dart @@ -2,3 +2,5 @@ import 'package:mocktail/mocktail.dart'; import 'package:openapi/api.dart'; class MockAssetsApi extends Mock implements AssetsApi {} + +class MockSyncApi extends Mock implements SyncApi {} diff --git a/mobile/test/domain/services/sync_stream_service_test.dart b/mobile/test/domain/services/sync_stream_service_test.dart index e1d8e6987f3..b78a44342be 100644 --- a/mobile/test/domain/services/sync_stream_service_test.dart +++ b/mobile/test/domain/services/sync_stream_service_test.dart @@ -1,4 +1,4 @@ -// ignore_for_file: avoid-unnecessary-futures, avoid-async-call-in-sync-function +// ignore_for_file: avoid-declaring-call-method, avoid-unnecessary-futures import 'dart:async'; @@ -8,16 +8,22 @@ import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:mocktail/mocktail.dart'; -import 'package:openapi/api.dart'; -import 'package:worker_manager/worker_manager.dart'; import '../../fixtures/sync_stream.stub.dart'; import '../../infrastructure/repository.mock.dart'; +class _AbortCallbackWrapper { + const _AbortCallbackWrapper(); + + bool call() => false; +} + +class _MockAbortCallbackWrapper extends Mock implements _AbortCallbackWrapper {} + class _CancellationWrapper { const _CancellationWrapper(); - bool isCancelled() => false; + bool call() => false; } class _MockCancellationWrapper extends Mock implements _CancellationWrapper {} @@ -26,35 +32,26 @@ void main() { late SyncStreamService sut; late ISyncStreamRepository mockSyncStreamRepo; late ISyncApiRepository mockSyncApiRepo; - late StreamController> streamController; + late Function(List, Function()) handleEventsCallback; + late _MockAbortCallbackWrapper mockAbortCallbackWrapper; successHandler(Invocation _) async => true; - failureHandler(Invocation _) async => false; setUp(() { mockSyncStreamRepo = MockSyncStreamRepository(); mockSyncApiRepo = MockSyncApiRepository(); - streamController = StreamController>.broadcast(); + mockAbortCallbackWrapper = _MockAbortCallbackWrapper(); - sut = SyncStreamService( - syncApiRepository: mockSyncApiRepo, - syncStreamRepository: mockSyncStreamRepo, - ); + when(() => mockAbortCallbackWrapper()).thenReturn(false); - // Default stream setup - emits one batch and closes - when(() => mockSyncApiRepo.getSyncEvents(any())) - .thenAnswer((_) => streamController.stream); + when(() => mockSyncApiRepo.streamChanges(any())) + .thenAnswer((invocation) async { + // ignore: avoid-unsafe-collection-methods + handleEventsCallback = invocation.positionalArguments.first; + }); - // Default ack setup when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) async => {}); - // Register fallbacks for mocktail verification - registerFallbackValue([]); - registerFallbackValue([]); - registerFallbackValue([]); - registerFallbackValue([]); - - // Default successful repository calls when(() => mockSyncStreamRepo.updateUsersV1(any())) .thenAnswer(successHandler); when(() => mockSyncStreamRepo.deleteUsersV1(any())) @@ -63,381 +60,163 @@ void main() { .thenAnswer(successHandler); when(() => mockSyncStreamRepo.deletePartnerV1(any())) .thenAnswer(successHandler); + when(() => mockSyncStreamRepo.updateAssetsV1(any())) + .thenAnswer(successHandler); + when(() => mockSyncStreamRepo.deleteAssetsV1(any())) + .thenAnswer(successHandler); + when(() => mockSyncStreamRepo.updateAssetsExifV1(any())) + .thenAnswer(successHandler); + when(() => mockSyncStreamRepo.updatePartnerAssetsV1(any())) + .thenAnswer(successHandler); + when(() => mockSyncStreamRepo.deletePartnerAssetsV1(any())) + .thenAnswer(successHandler); + when(() => mockSyncStreamRepo.updatePartnerAssetsExifV1(any())) + .thenAnswer(successHandler); + + sut = SyncStreamService( + syncApiRepository: mockSyncApiRepo, + syncStreamRepository: mockSyncStreamRepo, + ); }); - tearDown(() async { - if (!streamController.isClosed) { - await streamController.close(); - } - }); - - // Helper to trigger sync and add events to the stream - Future triggerSyncAndEmit(List events) async { - final future = sut.syncUsers(); // Start listening - await Future.delayed(Duration.zero); // Allow listener to attach - if (!streamController.isClosed) { - streamController.add(events); - await streamController.close(); // Close after emitting - } - await future; // Wait for processing to complete + Future simulateEvents(List events) async { + await sut.sync(); + await handleEventsCallback(events, mockAbortCallbackWrapper.call); } - group("SyncStreamService", () { + group("SyncStreamService - _handleEvents", () { test( - "completes successfully when stream emits data and handlers succeed", + "processes events and acks successfully when handlers succeed", () async { final events = [ - ...SyncStreamStub.userEvents, - ...SyncStreamStub.partnerEvents, + SyncStreamStub.userDeleteV1, + SyncStreamStub.userV1Admin, + SyncStreamStub.userV1User, + SyncStreamStub.partnerDeleteV1, + SyncStreamStub.partnerV1, ]; - final future = triggerSyncAndEmit(events); - await expectLater(future, completes); - // Verify ack includes last ack from each successfully handled type - verify( - () => - mockSyncApiRepo.ack(any(that: containsAll(["5", "2", "4", "3"]))), - ).called(1); + + await simulateEvents(events); + + verifyInOrder([ + () => mockSyncStreamRepo.deleteUsersV1(any()), + () => mockSyncApiRepo.ack(["2"]), + () => mockSyncStreamRepo.updateUsersV1(any()), + () => mockSyncApiRepo.ack(["5"]), + () => mockSyncStreamRepo.deletePartnerV1(any()), + () => mockSyncApiRepo.ack(["4"]), + () => mockSyncStreamRepo.updatePartnerV1(any()), + () => mockSyncApiRepo.ack(["3"]), + ]); + verifyNever(() => mockAbortCallbackWrapper()); }, ); - test("completes successfully when stream emits an error", () async { - when(() => mockSyncApiRepo.getSyncEvents(any())) - .thenAnswer((_) => Stream.error(Exception("Stream Error"))); - // Should complete gracefully without throwing - await expectLater(sut.syncUsers(), throwsException); - verifyNever(() => mockSyncApiRepo.ack(any())); // No ack on stream error - }); - - test("throws when initial getSyncEvents call fails", () async { - final apiException = Exception("API Error"); - when(() => mockSyncApiRepo.getSyncEvents(any())).thenThrow(apiException); - // Should rethrow the exception from the initial call - await expectLater(sut.syncUsers(), throwsA(apiException)); - verifyNever(() => mockSyncApiRepo.ack(any())); - }); - - test( - "completes successfully when a repository handler throws an exception", - () async { - when(() => mockSyncStreamRepo.updateUsersV1(any())) - .thenThrow(Exception("Repo Error")); - final events = [ - ...SyncStreamStub.userEvents, - ...SyncStreamStub.partnerEvents, - ]; - // Should complete, but ack only for the successful types - await triggerSyncAndEmit(events); - // Only partner delete was successful by default setup - verify(() => mockSyncApiRepo.ack(["2", "4", "3"])).called(1); - }, - ); - - test( - "completes successfully but sends no ack when all handlers fail", - () async { - when(() => mockSyncStreamRepo.updateUsersV1(any())) - .thenAnswer(failureHandler); - when(() => mockSyncStreamRepo.deleteUsersV1(any())) - .thenAnswer(failureHandler); - when(() => mockSyncStreamRepo.updatePartnerV1(any())) - .thenAnswer(failureHandler); - when(() => mockSyncStreamRepo.deletePartnerV1(any())) - .thenAnswer(failureHandler); - - final events = [ - ...SyncStreamStub.userEvents, - ...SyncStreamStub.partnerEvents, - ]; - await triggerSyncAndEmit(events); - verifyNever(() => mockSyncApiRepo.ack(any())); - }, - ); - - test("sends ack only for types where handler returns true", () async { - // Mock specific handlers: user update fails, user delete succeeds - when(() => mockSyncStreamRepo.updateUsersV1(any())) - .thenAnswer(failureHandler); - when(() => mockSyncStreamRepo.deleteUsersV1(any())) - .thenAnswer(successHandler); - // partner update fails, partner delete succeeds - when(() => mockSyncStreamRepo.updatePartnerV1(any())) - .thenAnswer(failureHandler); - + test("processes final batch correctly", () async { final events = [ - ...SyncStreamStub.userEvents, - ...SyncStreamStub.partnerEvents, + SyncStreamStub.userDeleteV1, + SyncStreamStub.userV1Admin, ]; - await triggerSyncAndEmit(events); - // Expect ack only for userDeleteV1 (ack: "2") and partnerDeleteV1 (ack: "4") - verify(() => mockSyncApiRepo.ack(any(that: containsAll(["2", "4"])))) - .called(1); + await simulateEvents(events); + + verifyInOrder([ + () => mockSyncStreamRepo.deleteUsersV1(any()), + () => mockSyncApiRepo.ack(["2"]), + () => mockSyncStreamRepo.updateUsersV1(any()), + () => mockSyncApiRepo.ack(["1"]), + ]); + verifyNever(() => mockAbortCallbackWrapper()); }); - test("does not process or ack when stream emits an empty list", () async { - final future = sut.syncUsers(); - streamController.add([]); // Emit empty list - await streamController.close(); - await future; // Wait for completion + test("does not process or ack when event list is empty", () async { + await simulateEvents([]); verifyNever(() => mockSyncStreamRepo.updateUsersV1(any())); verifyNever(() => mockSyncStreamRepo.deleteUsersV1(any())); verifyNever(() => mockSyncStreamRepo.updatePartnerV1(any())); verifyNever(() => mockSyncStreamRepo.deletePartnerV1(any())); + verifyNever(() => mockAbortCallbackWrapper()); verifyNever(() => mockSyncApiRepo.ack(any())); }); - test("processes multiple batches sequentially using mutex", () async { - final completer1 = Completer(); - final completer2 = Completer(); - int callOrder = 0; - int handler1StartOrder = -1; - int handler2StartOrder = -1; - int handler1Calls = 0; - int handler2Calls = 0; + test("aborts and stops processing if cancelled during iteration", () async { + final cancellationChecker = _MockCancellationWrapper(); + when(() => cancellationChecker()).thenReturn(false); - when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async { - handler1Calls++; - handler1StartOrder = ++callOrder; - await completer1.future; - return true; - }); - when(() => mockSyncStreamRepo.updatePartnerV1(any())) - .thenAnswer((_) async { - handler2Calls++; - handler2StartOrder = ++callOrder; - await completer2.future; - return true; + sut = SyncStreamService( + syncApiRepository: mockSyncApiRepo, + syncStreamRepository: mockSyncStreamRepo, + cancelChecker: cancellationChecker.call, + ); + await sut.sync(); + + final events = [ + SyncStreamStub.userDeleteV1, + SyncStreamStub.userV1Admin, + SyncStreamStub.partnerDeleteV1, + ]; + + when(() => mockSyncStreamRepo.deleteUsersV1(any())).thenAnswer((_) async { + when(() => cancellationChecker()).thenReturn(true); }); - final batch1 = SyncStreamStub.userEvents; - final batch2 = SyncStreamStub.partnerEvents; + await handleEventsCallback(events, mockAbortCallbackWrapper.call); - final syncFuture = sut.syncUsers(); - await pumpEventQueue(); + verify(() => mockSyncStreamRepo.deleteUsersV1(any())).called(1); + verifyNever(() => mockSyncStreamRepo.updateUsersV1(any())); + verifyNever(() => mockSyncStreamRepo.deletePartnerV1(any())); - streamController.add(batch1); - await pumpEventQueue(); - // Small delay to ensure the first handler starts - await Future.delayed(const Duration(milliseconds: 20)); + verify(() => mockAbortCallbackWrapper()).called(1); - expect(handler1StartOrder, 1, reason: "Handler 1 should start first"); - expect(handler1Calls, 1); - - streamController.add(batch2); - await pumpEventQueue(); - // Small delay - await Future.delayed(const Duration(milliseconds: 20)); - - expect(handler2StartOrder, -1, reason: "Handler 2 should wait"); - expect(handler2Calls, 0); - - completer1.complete(); - await pumpEventQueue(times: 40); - // Small delay to ensure the second handler starts - await Future.delayed(const Duration(milliseconds: 20)); - - expect(handler2StartOrder, 2, reason: "Handler 2 should start after H1"); - expect(handler2Calls, 1); - - completer2.complete(); - await pumpEventQueue(times: 40); - // Small delay before closing the stream - await Future.delayed(const Duration(milliseconds: 20)); - - if (!streamController.isClosed) { - await streamController.close(); - } - await pumpEventQueue(times: 40); - // Small delay to ensure the sync completes - await Future.delayed(const Duration(milliseconds: 20)); - - await syncFuture; - - verify(() => mockSyncStreamRepo.updateUsersV1(any())).called(1); - verify(() => mockSyncStreamRepo.updatePartnerV1(any())).called(1); - verify(() => mockSyncApiRepo.ack(any())).called(2); + verify(() => mockSyncApiRepo.ack(["2"])).called(1); }); test( - "stops processing and ack when cancel checker is completed", + "aborts and stops processing if cancelled before processing batch", () async { final cancellationChecker = _MockCancellationWrapper(); - when(() => cancellationChecker.isCancelled()).thenAnswer((_) => false); + when(() => cancellationChecker()).thenReturn(false); + + final processingCompleter = Completer(); + bool handler1Started = false; + when(() => mockSyncStreamRepo.deleteUsersV1(any())) + .thenAnswer((_) async { + handler1Started = true; + return processingCompleter.future; + }); sut = SyncStreamService( syncApiRepository: mockSyncApiRepo, syncStreamRepository: mockSyncStreamRepo, - cancelChecker: cancellationChecker.isCancelled, + cancelChecker: cancellationChecker.call, ); - final processingCompleter = Completer(); - bool handlerStarted = false; + await sut.sync(); - // Make handler wait so we can cancel it mid-flight - when(() => mockSyncStreamRepo.deleteUsersV1(any())) - .thenAnswer((_) async { - handlerStarted = true; - await processingCompleter - .future; // Wait indefinitely until test completes it - return true; - }); - - final syncFuture = sut.syncUsers(); - await pumpEventQueue(times: 30); - - streamController.add(SyncStreamStub.userEvents); - // Ensure processing starts - await Future.delayed(const Duration(milliseconds: 10)); - - expect(handlerStarted, isTrue, reason: "Handler should have started"); - - when(() => cancellationChecker.isCancelled()).thenAnswer((_) => true); - - // Allow cancellation logic to propagate - await Future.delayed(const Duration(milliseconds: 10)); - - // Complete the handler's completer after cancellation signal - // to ensure the cancellation logic itself isn't blocked by the handler. - processingCompleter.complete(); - - await expectLater(syncFuture, throwsA(isA())); - - // Verify that ack was NOT called because processing was cancelled - verifyNever(() => mockSyncApiRepo.ack(any())); - }, - ); - - test("completes successfully when ack call throws an exception", () async { - when(() => mockSyncApiRepo.ack(any())).thenThrow(Exception("Ack Error")); - final events = [ - ...SyncStreamStub.userEvents, - ...SyncStreamStub.partnerEvents, - ]; - - // Should still complete even if ack fails - await triggerSyncAndEmit(events); - verify(() => mockSyncApiRepo.ack(any())) - .called(1); // Verify ack was attempted - }); - - test("waits for processing to finish if onDone called early", () async { - final processingCompleter = Completer(); - bool handlerFinished = false; - - when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async { - await processingCompleter.future; // Wait inside handler - handlerFinished = true; - return true; - }); - - final syncFuture = sut.syncUsers(); - // Allow listener to attach - // This is necessary to ensure the stream is ready to receive events - await Future.delayed(Duration.zero); - - streamController.add(SyncStreamStub.userEvents); // Emit batch - await Future.delayed( - const Duration(milliseconds: 10), - ); // Ensure processing starts - - await streamController - .close(); // Close stream (triggers onDone internally) - await Future.delayed( - const Duration(milliseconds: 10), - ); // Give onDone a chance to fire - - // At this point, onDone was called, but processing is blocked - expect(handlerFinished, isFalse); - - processingCompleter.complete(); // Allow processing to finish - await syncFuture; // Now the main future should complete - - expect(handlerFinished, isTrue); - verify(() => mockSyncApiRepo.ack(any())).called(1); - }); - - test("processes events in the defined _kSyncTypeOrder", () async { - final future = sut.syncUsers(); - await pumpEventQueue(); - if (!streamController.isClosed) { final events = [ - SyncEvent( - type: SyncEntityType.partnerV1, - data: SyncStreamStub.partnerV1, - ack: "1", - ), // Should be processed last - SyncEvent( - type: SyncEntityType.userV1, - data: SyncStreamStub.userV1Admin, - ack: "2", - ), // Should be processed second - SyncEvent( - type: SyncEntityType.partnerDeleteV1, - data: SyncStreamStub.partnerDeleteV1, - ack: "3", - ), // Should be processed third - SyncEvent( - type: SyncEntityType.userDeleteV1, - data: SyncStreamStub.userDeleteV1, - ack: "4", - ), // Should be processed first + SyncStreamStub.userDeleteV1, + SyncStreamStub.userV1Admin, + SyncStreamStub.partnerDeleteV1, ]; - streamController.add(events); - await streamController.close(); - } - await future; + final processingFuture = + handleEventsCallback(events, mockAbortCallbackWrapper.call); + await pumpEventQueue(); - verifyInOrder([ - () => mockSyncStreamRepo.deleteUsersV1(any()), - () => mockSyncStreamRepo.updateUsersV1(any()), - () => mockSyncStreamRepo.deletePartnerV1(any()), - () => mockSyncStreamRepo.updatePartnerV1(any()), - // Verify ack happens after all processing - () => mockSyncApiRepo.ack(any()), - ]); - }); - }); + expect(handler1Started, isTrue); - group("syncUsers", () { - test("calls getSyncEvents with correct types", () async { - // Need to close the stream for the future to complete - final future = sut.syncUsers(); - await streamController.close(); - await future; + // Signal cancellation while handler 1 is waiting + when(() => cancellationChecker()).thenReturn(true); + await pumpEventQueue(); - verify( - () => mockSyncApiRepo.getSyncEvents([ - SyncRequestType.usersV1, - SyncRequestType.partnersV1, - ]), - ).called(1); - }); + processingCompleter.complete(); + await processingFuture; - test("calls repository methods with correctly grouped data", () async { - final events = [ - ...SyncStreamStub.userEvents, - ...SyncStreamStub.partnerEvents, - ]; - await triggerSyncAndEmit(events); + verifyNever(() => mockSyncStreamRepo.updateUsersV1(any())); - // Verify each handler was called with the correct list of data payloads - verify( - () => mockSyncStreamRepo.updateUsersV1( - [SyncStreamStub.userV1Admin, SyncStreamStub.userV1User], - ), - ).called(1); - verify( - () => mockSyncStreamRepo.deleteUsersV1([SyncStreamStub.userDeleteV1]), - ).called(1); - verify( - () => mockSyncStreamRepo.updatePartnerV1([SyncStreamStub.partnerV1]), - ).called(1); - verify( - () => mockSyncStreamRepo - .deletePartnerV1([SyncStreamStub.partnerDeleteV1]), - ).called(1); - }); + verify(() => mockSyncApiRepo.ack(["2"])).called(1); + }, + ); }); } diff --git a/mobile/test/fixtures/sync_stream.stub.dart b/mobile/test/fixtures/sync_stream.stub.dart index 781e63a2bb3..ba97f1434ad 100644 --- a/mobile/test/fixtures/sync_stream.stub.dart +++ b/mobile/test/fixtures/sync_stream.stub.dart @@ -2,44 +2,44 @@ import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:openapi/api.dart'; abstract final class SyncStreamStub { - static final userV1Admin = SyncUserV1( - deletedAt: DateTime(2020), - email: "admin@admin", - id: "1", - name: "Admin", - ); - static final userV1User = SyncUserV1( - deletedAt: DateTime(2021), - email: "user@user", - id: "2", - name: "User", - ); - static final userDeleteV1 = SyncUserDeleteV1(userId: "2"); - static final userEvents = [ - SyncEvent(type: SyncEntityType.userV1, data: userV1Admin, ack: "1"), - SyncEvent( - type: SyncEntityType.userDeleteV1, - data: userDeleteV1, - ack: "2", + static final userV1Admin = SyncEvent( + type: SyncEntityType.userV1, + data: SyncUserV1( + deletedAt: DateTime(2020), + email: "admin@admin", + id: "1", + name: "Admin", ), - SyncEvent(type: SyncEntityType.userV1, data: userV1User, ack: "5"), - ]; + ack: "1", + ); + static final userV1User = SyncEvent( + type: SyncEntityType.userV1, + data: SyncUserV1( + deletedAt: DateTime(2021), + email: "user@user", + id: "5", + name: "User", + ), + ack: "5", + ); + static final userDeleteV1 = SyncEvent( + type: SyncEntityType.userDeleteV1, + data: SyncUserDeleteV1(userId: "2"), + ack: "2", + ); - static final partnerV1 = SyncPartnerV1( - inTimeline: true, - sharedById: "1", - sharedWithId: "2", - ); - static final partnerDeleteV1 = SyncPartnerDeleteV1( - sharedById: "3", - sharedWithId: "4", - ); - static final partnerEvents = [ - SyncEvent( - type: SyncEntityType.partnerDeleteV1, - data: partnerDeleteV1, - ack: "4", + static final partnerV1 = SyncEvent( + type: SyncEntityType.partnerV1, + data: SyncPartnerV1( + inTimeline: true, + sharedById: "1", + sharedWithId: "2", ), - SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"), - ]; + ack: "3", + ); + static final partnerDeleteV1 = SyncEvent( + type: SyncEntityType.partnerDeleteV1, + data: SyncPartnerDeleteV1(sharedById: "3", sharedWithId: "4"), + ack: "4", + ); } diff --git a/mobile/test/infrastructure/repositories/sync_api_repository_test.dart b/mobile/test/infrastructure/repositories/sync_api_repository_test.dart new file mode 100644 index 00000000000..55b03a81164 --- /dev/null +++ b/mobile/test/infrastructure/repositories/sync_api_repository_test.dart @@ -0,0 +1,299 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:http/http.dart' as http; +import 'package:immich_mobile/domain/models/sync_event.model.dart'; +import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:openapi/api.dart'; + +import '../../api.mocks.dart'; +import '../../service.mocks.dart'; + +class MockHttpClient extends Mock implements http.Client {} + +class MockApiClient extends Mock implements ApiClient {} + +class MockStreamedResponse extends Mock implements http.StreamedResponse {} + +class FakeBaseRequest extends Fake implements http.BaseRequest {} + +String _createJsonLine(String type, Map data, String ack) { + return '${jsonEncode({'type': type, 'data': data, 'ack': ack})}\n'; +} + +void main() { + late SyncApiRepository sut; + late MockApiService mockApiService; + late MockApiClient mockApiClient; + late MockSyncApi mockSyncApi; + late MockHttpClient mockHttpClient; + late MockStreamedResponse mockStreamedResponse; + late StreamController> responseStreamController; + late int testBatchSize = 3; + + setUp(() { + mockApiService = MockApiService(); + mockApiClient = MockApiClient(); + mockSyncApi = MockSyncApi(); + mockHttpClient = MockHttpClient(); + mockStreamedResponse = MockStreamedResponse(); + responseStreamController = + StreamController>.broadcast(sync: true); + + registerFallbackValue(FakeBaseRequest()); + + when(() => mockApiService.apiClient).thenReturn(mockApiClient); + when(() => mockApiService.syncApi).thenReturn(mockSyncApi); + when(() => mockApiClient.basePath).thenReturn('http://demo.immich.app/api'); + when(() => mockApiService.applyToParams(any(), any())) + .thenAnswer((_) async => {}); + + // Mock HTTP client behavior + when(() => mockHttpClient.send(any())) + .thenAnswer((_) async => mockStreamedResponse); + when(() => mockStreamedResponse.statusCode).thenReturn(200); + when(() => mockStreamedResponse.stream) + .thenAnswer((_) => http.ByteStream(responseStreamController.stream)); + when(() => mockHttpClient.close()).thenAnswer((_) => {}); + + sut = SyncApiRepository(mockApiService); + }); + + tearDown(() async { + if (!responseStreamController.isClosed) { + await responseStreamController.close(); + } + }); + + Future streamChanges( + Function(List, Function() abort) onDataCallback, + ) { + return sut.streamChanges( + onDataCallback, + batchSize: testBatchSize, + httpClient: mockHttpClient, + ); + } + + test('streamChanges stops processing stream when abort is called', () async { + int onDataCallCount = 0; + bool abortWasCalledInCallback = false; + List receivedEventsBatch1 = []; + + onDataCallback(List events, Function() abort) { + onDataCallCount++; + if (onDataCallCount == 1) { + receivedEventsBatch1 = events; + abort(); + abortWasCalledInCallback = true; + } else { + fail("onData called more than once after abort was invoked"); + } + } + + final streamChangesFuture = streamChanges(onDataCallback); + + await pumpEventQueue(); + + for (int i = 0; i < testBatchSize; i++) { + responseStreamController.add( + utf8.encode( + _createJsonLine( + SyncEntityType.userDeleteV1.toString(), + SyncUserDeleteV1(userId: "user$i").toJson(), + 'ack$i', + ), + ), + ); + } + + for (int i = testBatchSize; i < testBatchSize * 2; i++) { + responseStreamController.add( + utf8.encode( + _createJsonLine( + SyncEntityType.userDeleteV1.toString(), + SyncUserDeleteV1(userId: "user$i").toJson(), + 'ack$i', + ), + ), + ); + } + + await responseStreamController.close(); + await expectLater(streamChangesFuture, completes); + + expect(onDataCallCount, 1); + expect(abortWasCalledInCallback, isTrue); + expect(receivedEventsBatch1.length, testBatchSize); + verify(() => mockHttpClient.close()).called(1); + }); + + test( + 'streamChanges does not process remaining lines in finally block if aborted', + () async { + int onDataCallCount = 0; + bool abortWasCalledInCallback = false; + + onDataCallback(List events, Function() abort) { + onDataCallCount++; + if (onDataCallCount == 1) { + abort(); + abortWasCalledInCallback = true; + } else { + fail("onData called more than once after abort was invoked"); + } + } + + final streamChangesFuture = streamChanges(onDataCallback); + + await pumpEventQueue(); + + for (int i = 0; i < testBatchSize; i++) { + responseStreamController.add( + utf8.encode( + _createJsonLine( + SyncEntityType.userDeleteV1.toString(), + SyncUserDeleteV1(userId: "user$i").toJson(), + 'ack$i', + ), + ), + ); + } + + // emit a single event to skip batching and trigger finally + responseStreamController.add( + utf8.encode( + _createJsonLine( + SyncEntityType.userDeleteV1.toString(), + SyncUserDeleteV1(userId: "user100").toJson(), + 'ack100', + ), + ), + ); + + await responseStreamController.close(); + await expectLater(streamChangesFuture, completes); + + expect(onDataCallCount, 1); + expect(abortWasCalledInCallback, isTrue); + verify(() => mockHttpClient.close()).called(1); + }, + ); + + test( + 'streamChanges processes remaining lines in finally block if not aborted', + () async { + int onDataCallCount = 0; + List receivedEventsBatch1 = []; + List receivedEventsBatch2 = []; + + onDataCallback(List events, Function() _) { + onDataCallCount++; + if (onDataCallCount == 1) { + receivedEventsBatch1 = events; + } else if (onDataCallCount == 2) { + receivedEventsBatch2 = events; + } else { + fail("onData called more than expected"); + } + } + + final streamChangesFuture = streamChanges(onDataCallback); + + await pumpEventQueue(); + + // Batch 1 + for (int i = 0; i < testBatchSize; i++) { + responseStreamController.add( + utf8.encode( + _createJsonLine( + SyncEntityType.userDeleteV1.toString(), + SyncUserDeleteV1(userId: "user$i").toJson(), + 'ack$i', + ), + ), + ); + } + + // Partial Batch 2 + responseStreamController.add( + utf8.encode( + _createJsonLine( + SyncEntityType.userDeleteV1.toString(), + SyncUserDeleteV1(userId: "user100").toJson(), + 'ack100', + ), + ), + ); + + await responseStreamController.close(); + await expectLater(streamChangesFuture, completes); + + expect(onDataCallCount, 2); + expect(receivedEventsBatch1.length, testBatchSize); + expect(receivedEventsBatch2.length, 1); + verify(() => mockHttpClient.close()).called(1); + }, + ); + + test('streamChanges handles stream error gracefully', () async { + final streamError = Exception("Network Error"); + int onDataCallCount = 0; + + onDataCallback(List events, Function() _) { + onDataCallCount++; + } + + final streamChangesFuture = streamChanges(onDataCallback); + + await pumpEventQueue(); + + responseStreamController.add( + utf8.encode( + _createJsonLine( + SyncEntityType.userDeleteV1.toString(), + SyncUserDeleteV1(userId: "user1").toJson(), + 'ack1', + ), + ), + ); + + responseStreamController.addError(streamError); + await expectLater(streamChangesFuture, throwsA(streamError)); + + expect(onDataCallCount, 0); + verify(() => mockHttpClient.close()).called(1); + }); + + test('streamChanges throws ApiException on non-200 status code', () async { + when(() => mockStreamedResponse.statusCode).thenReturn(401); + final errorBodyController = StreamController>(sync: true); + when(() => mockStreamedResponse.stream) + .thenAnswer((_) => http.ByteStream(errorBodyController.stream)); + + int onDataCallCount = 0; + + onDataCallback(List events, Function() _) { + onDataCallCount++; + } + + final future = streamChanges(onDataCallback); + + errorBodyController.add(utf8.encode('{"error":"Unauthorized"}')); + await errorBodyController.close(); + + await expectLater( + future, + throwsA( + isA() + .having((e) => e.code, 'code', 401) + .having((e) => e.message, 'message', contains('Unauthorized')), + ), + ); + + expect(onDataCallCount, 0); + verify(() => mockHttpClient.close()).called(1); + }); +}