diff --git a/server/src/domain/asset/asset.service.spec.ts b/server/src/domain/asset/asset.service.spec.ts index 3826ad969a1..fb0bdc60b32 100644 --- a/server/src/domain/asset/asset.service.spec.ts +++ b/server/src/domain/asset/asset.service.spec.ts @@ -784,9 +784,9 @@ describe(AssetService.name, () => { await sut.deleteAll(authStub.user1, { ids: ['asset1', 'asset2'], force: true }); - expect(jobMock.queue.mock.calls).toEqual([ - [{ name: JobName.ASSET_DELETION, data: { id: 'asset1' } }], - [{ name: JobName.ASSET_DELETION, data: { id: 'asset2' } }], + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { name: JobName.ASSET_DELETION, data: { id: 'asset1' } }, + { name: JobName.ASSET_DELETION, data: { id: 'asset2' } }, ]); }); @@ -895,6 +895,7 @@ describe(AssetService.name, () => { await sut.handleAssetDeletion({ id: assetStub.external.id }); expect(jobMock.queue).not.toBeCalled(); + expect(jobMock.queueAll).not.toBeCalled(); expect(assetMock.remove).not.toBeCalled(); }); @@ -952,19 +953,21 @@ describe(AssetService.name, () => { it('should run the refresh metadata job', async () => { accessMock.asset.checkOwnerAccess.mockResolvedValue(new Set(['asset-1'])); await sut.run(authStub.admin, { assetIds: ['asset-1'], name: AssetJobName.REFRESH_METADATA }), - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.METADATA_EXTRACTION, data: { id: 'asset-1' } }); + expect(jobMock.queueAll).toHaveBeenCalledWith([{ name: JobName.METADATA_EXTRACTION, data: { id: 'asset-1' } }]); }); it('should run the refresh thumbnails job', async () => { accessMock.asset.checkOwnerAccess.mockResolvedValue(new Set(['asset-1'])); await sut.run(authStub.admin, { assetIds: ['asset-1'], name: AssetJobName.REGENERATE_THUMBNAIL }), - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: 'asset-1' } }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: 'asset-1' } }, + ]); }); it('should run the transcode video', async () => { accessMock.asset.checkOwnerAccess.mockResolvedValue(new Set(['asset-1'])); await sut.run(authStub.admin, { assetIds: ['asset-1'], name: AssetJobName.TRANSCODE_VIDEO }), - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.VIDEO_CONVERSION, data: { id: 'asset-1' } }); + expect(jobMock.queueAll).toHaveBeenCalledWith([{ name: JobName.VIDEO_CONVERSION, data: { id: 'asset-1' } }]); }); }); diff --git a/server/src/domain/asset/asset.service.ts b/server/src/domain/asset/asset.service.ts index 84438f19d67..7ab77a5fcb3 100644 --- a/server/src/domain/asset/asset.service.ts +++ b/server/src/domain/asset/asset.service.ts @@ -21,6 +21,7 @@ import { IStorageRepository, ISystemConfigRepository, ImmichReadStream, + JobItem, TimeBucketOptions, } from '../repositories'; import { StorageCore, StorageFolder } from '../storage'; @@ -449,9 +450,9 @@ export class AssetService { ); for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.ASSET_DELETION, data: { id: asset.id } }); - } + await this.jobRepository.queueAll( + assets.map((asset) => ({ name: JobName.ASSET_DELETION, data: { id: asset.id } })), + ); } return true; @@ -504,9 +505,7 @@ export class AssetService { await this.access.requirePermission(auth, Permission.ASSET_DELETE, ids); if (force) { - for (const id of ids) { - await this.jobRepository.queue({ name: JobName.ASSET_DELETION, data: { id } }); - } + await this.jobRepository.queueAll(ids.map((id) => ({ name: JobName.ASSET_DELETION, data: { id } }))); } else { await this.assetRepository.softDeleteAll(ids); this.communicationRepository.send(ClientEvent.ASSET_TRASH, auth.user.id, ids); @@ -529,9 +528,9 @@ export class AssetService { if (action == TrashAction.EMPTY_ALL) { for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.ASSET_DELETION, data: { id: asset.id } }); - } + await this.jobRepository.queueAll( + assets.map((asset) => ({ name: JobName.ASSET_DELETION, data: { id: asset.id } })), + ); } return; } @@ -566,21 +565,25 @@ export class AssetService { async run(auth: AuthDto, dto: AssetJobsDto) { await this.access.requirePermission(auth, Permission.ASSET_UPDATE, dto.assetIds); + const jobs: JobItem[] = []; + for (const id of dto.assetIds) { switch (dto.name) { case AssetJobName.REFRESH_METADATA: - await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id } }); + jobs.push({ name: JobName.METADATA_EXTRACTION, data: { id } }); break; case AssetJobName.REGENERATE_THUMBNAIL: - await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id } }); + jobs.push({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id } }); break; case AssetJobName.TRANSCODE_VIDEO: - await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id } }); + jobs.push({ name: JobName.VIDEO_CONVERSION, data: { id } }); break; } } + + await this.jobRepository.queueAll(jobs); } private async updateMetadata(dto: ISidecarWriteJob) { diff --git a/server/src/domain/job/job.service.spec.ts b/server/src/domain/job/job.service.spec.ts index 1e5da85729d..6dbbd9b6003 100644 --- a/server/src/domain/job/job.service.spec.ts +++ b/server/src/domain/job/job.service.spec.ts @@ -55,12 +55,12 @@ describe(JobService.name, () => { it('should run the scheduled jobs', async () => { await sut.handleNightlyJobs(); - expect(jobMock.queue.mock.calls).toEqual([ - [{ name: JobName.ASSET_DELETION_CHECK }], - [{ name: JobName.USER_DELETE_CHECK }], - [{ name: JobName.PERSON_CLEANUP }], - [{ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } }], - [{ name: JobName.CLEAN_OLD_AUDIT_LOGS }], + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { name: JobName.ASSET_DELETION_CHECK }, + { name: JobName.USER_DELETE_CHECK }, + { name: JobName.PERSON_CLEANUP }, + { name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } }, + { name: JobName.CLEAN_OLD_AUDIT_LOGS }, ]); }); }); @@ -138,6 +138,7 @@ describe(JobService.name, () => { ).rejects.toBeInstanceOf(BadRequestException); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); }); it('should handle a start video conversion command', async () => { @@ -204,6 +205,7 @@ describe(JobService.name, () => { ).rejects.toBeInstanceOf(BadRequestException); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); }); }); @@ -276,18 +278,18 @@ describe(JobService.name, () => { item: { name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: 'asset-1' } }, jobs: [ JobName.GENERATE_WEBP_THUMBNAIL, + JobName.GENERATE_THUMBHASH_THUMBNAIL, JobName.ENCODE_CLIP, JobName.RECOGNIZE_FACES, - JobName.GENERATE_THUMBHASH_THUMBNAIL, ], }, { item: { name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: 'asset-1', source: 'upload' } }, jobs: [ JobName.GENERATE_WEBP_THUMBNAIL, + JobName.GENERATE_THUMBHASH_THUMBNAIL, JobName.ENCODE_CLIP, JobName.RECOGNIZE_FACES, - JobName.GENERATE_THUMBHASH_THUMBNAIL, JobName.VIDEO_CONVERSION, ], }, @@ -295,9 +297,9 @@ describe(JobService.name, () => { item: { name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: 'asset-live-image', source: 'upload' } }, jobs: [ JobName.GENERATE_WEBP_THUMBNAIL, - JobName.RECOGNIZE_FACES, JobName.GENERATE_THUMBHASH_THUMBNAIL, JobName.ENCODE_CLIP, + JobName.RECOGNIZE_FACES, JobName.VIDEO_CONVERSION, ], }, @@ -327,9 +329,15 @@ describe(JobService.name, () => { await jobMock.addHandler.mock.calls[0][2](item); await asyncTick(3); - expect(jobMock.queue).toHaveBeenCalledTimes(jobs.length); - for (const jobName of jobs) { - expect(jobMock.queue).toHaveBeenCalledWith({ name: jobName, data: expect.anything() }); + if (jobs.length > 1) { + expect(jobMock.queueAll).toHaveBeenCalledWith( + jobs.map((jobName) => ({ name: jobName, data: expect.anything() })), + ); + } else { + expect(jobMock.queue).toHaveBeenCalledTimes(jobs.length); + for (const jobName of jobs) { + expect(jobMock.queue).toHaveBeenCalledWith({ name: jobName, data: expect.anything() }); + } } }); @@ -338,7 +346,7 @@ describe(JobService.name, () => { await jobMock.addHandler.mock.calls[0][2](item); await asyncTick(3); - expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); }); } diff --git a/server/src/domain/job/job.service.ts b/server/src/domain/job/job.service.ts index f622dbdf201..3f418cf335f 100644 --- a/server/src/domain/job/job.service.ts +++ b/server/src/domain/job/job.service.ts @@ -158,11 +158,13 @@ export class JobService { } async handleNightlyJobs() { - await this.jobRepository.queue({ name: JobName.ASSET_DELETION_CHECK }); - await this.jobRepository.queue({ name: JobName.USER_DELETE_CHECK }); - await this.jobRepository.queue({ name: JobName.PERSON_CLEANUP }); - await this.jobRepository.queue({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } }); - await this.jobRepository.queue({ name: JobName.CLEAN_OLD_AUDIT_LOGS }); + await this.jobRepository.queueAll([ + { name: JobName.ASSET_DELETION_CHECK }, + { name: JobName.USER_DELETE_CHECK }, + { name: JobName.PERSON_CLEANUP }, + { name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } }, + { name: JobName.CLEAN_OLD_AUDIT_LOGS }, + ]); } /** @@ -210,19 +212,23 @@ export class JobService { break; case JobName.GENERATE_JPEG_THUMBNAIL: { - await this.jobRepository.queue({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: item.data }); - await this.jobRepository.queue({ name: JobName.GENERATE_THUMBHASH_THUMBNAIL, data: item.data }); - await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: item.data }); - await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: item.data }); + const jobs: JobItem[] = [ + { name: JobName.GENERATE_WEBP_THUMBNAIL, data: item.data }, + { name: JobName.GENERATE_THUMBHASH_THUMBNAIL, data: item.data }, + { name: JobName.ENCODE_CLIP, data: item.data }, + { name: JobName.RECOGNIZE_FACES, data: item.data }, + ]; const [asset] = await this.assetRepository.getByIds([item.data.id]); if (asset) { if (asset.type === AssetType.VIDEO) { - await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: item.data }); + jobs.push({ name: JobName.VIDEO_CONVERSION, data: item.data }); } else if (asset.livePhotoVideoId) { - await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: asset.livePhotoVideoId } }); + jobs.push({ name: JobName.VIDEO_CONVERSION, data: { id: asset.livePhotoVideoId } }); } } + + await this.jobRepository.queueAll(jobs); break; } diff --git a/server/src/domain/library/library.service.spec.ts b/server/src/domain/library/library.service.spec.ts index d8fac9bf7e6..e52d4cdc4bf 100644 --- a/server/src/domain/library/library.service.spec.ts +++ b/server/src/domain/library/library.service.spec.ts @@ -135,18 +135,16 @@ describe(LibraryService.name, () => { await sut.handleQueueAssetRefresh(mockLibraryJob); - expect(jobMock.queue.mock.calls).toEqual([ - [ - { - name: JobName.LIBRARY_SCAN_ASSET, - data: { - id: libraryStub.externalLibrary1.id, - ownerId: libraryStub.externalLibrary1.owner.id, - assetPath: '/data/user1/photo.jpg', - force: false, - }, + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.LIBRARY_SCAN_ASSET, + data: { + id: libraryStub.externalLibrary1.id, + ownerId: libraryStub.externalLibrary1.owner.id, + assetPath: '/data/user1/photo.jpg', + force: false, }, - ], + }, ]); }); @@ -420,6 +418,7 @@ describe(LibraryService.name, () => { await expect(sut.handleAssetRefresh(mockLibraryJob)).resolves.toBe(true); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); }); it('should import an asset when mtime differs from db asset', async () => { @@ -468,6 +467,7 @@ describe(LibraryService.name, () => { expect(assetMock.save).toHaveBeenCalledWith({ id: assetStub.image.id, isOffline: true }); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); }); it('should online a previously-offline asset', async () => { @@ -607,6 +607,7 @@ describe(LibraryService.name, () => { ); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); expect(libraryMock.softDelete).not.toHaveBeenCalled(); }); @@ -953,9 +954,9 @@ describe(LibraryService.name, () => { libraryMock.getAllDeleted.mockResolvedValue([libraryStub.uploadLibrary1, libraryStub.externalLibrary1]); await expect(sut.handleQueueCleanup()).resolves.toBe(true); - expect(jobMock.queue.mock.calls).toEqual([ - [{ name: JobName.LIBRARY_DELETE, data: { id: libraryStub.uploadLibrary1.id } }], - [{ name: JobName.LIBRARY_DELETE, data: { id: libraryStub.externalLibrary1.id } }], + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { name: JobName.LIBRARY_DELETE, data: { id: libraryStub.uploadLibrary1.id } }, + { name: JobName.LIBRARY_DELETE, data: { id: libraryStub.externalLibrary1.id } }, ]); }); }); @@ -1101,16 +1102,16 @@ describe(LibraryService.name, () => { data: {}, }, ], - [ - { - name: JobName.LIBRARY_SCAN, - data: { - id: libraryStub.externalLibrary1.id, - refreshModifiedFiles: true, - refreshAllFiles: false, - }, + ]); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.LIBRARY_SCAN, + data: { + id: libraryStub.externalLibrary1.id, + refreshModifiedFiles: true, + refreshAllFiles: false, }, - ], + }, ]); }); @@ -1126,16 +1127,16 @@ describe(LibraryService.name, () => { data: {}, }, ], - [ - { - name: JobName.LIBRARY_SCAN, - data: { - id: libraryStub.externalLibrary1.id, - refreshModifiedFiles: false, - refreshAllFiles: true, - }, + ]); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.LIBRARY_SCAN, + data: { + id: libraryStub.externalLibrary1.id, + refreshModifiedFiles: false, + refreshAllFiles: true, }, - ], + }, ]); }); }); @@ -1147,13 +1148,11 @@ describe(LibraryService.name, () => { await expect(sut.handleOfflineRemoval({ id: libraryStub.externalLibrary1.id })).resolves.toBe(true); - expect(jobMock.queue.mock.calls).toEqual([ - [ - { - name: JobName.ASSET_DELETION, - data: { id: assetStub.image1.id, fromExternal: true }, - }, - ], + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.ASSET_DELETION, + data: { id: assetStub.image1.id, fromExternal: true }, + }, ]); }); }); diff --git a/server/src/domain/library/library.service.ts b/server/src/domain/library/library.service.ts index ac4bd065d9b..cdd5140f1bd 100644 --- a/server/src/domain/library/library.service.ts +++ b/server/src/domain/library/library.service.ts @@ -94,9 +94,9 @@ export class LibraryService { async handleQueueCleanup(): Promise { this.logger.debug('Cleaning up any pending library deletions'); const pendingDeletion = await this.repository.getAllDeleted(); - for (const libraryToDelete of pendingDeletion) { - await this.jobRepository.queue({ name: JobName.LIBRARY_DELETE, data: { id: libraryToDelete.id } }); - } + await this.jobRepository.queueAll( + pendingDeletion.map((libraryToDelete) => ({ name: JobName.LIBRARY_DELETE, data: { id: libraryToDelete.id } })), + ); return true; } @@ -160,9 +160,9 @@ export class LibraryService { // TODO use pagination const assetIds = await this.repository.getAssetIds(job.id, true); this.logger.debug(`Will delete ${assetIds.length} asset(s) in library ${job.id}`); - for (const assetId of assetIds) { - await this.jobRepository.queue({ name: JobName.ASSET_DELETION, data: { id: assetId, fromExternal: true } }); - } + await this.jobRepository.queueAll( + assetIds.map((assetId) => ({ name: JobName.ASSET_DELETION, data: { id: assetId, fromExternal: true } })), + ); if (assetIds.length === 0) { this.logger.log(`Deleting library ${job.id}`); @@ -333,16 +333,16 @@ export class LibraryService { // Queue all library refresh const libraries = await this.repository.getAll(true, LibraryType.EXTERNAL); - for (const library of libraries) { - await this.jobRepository.queue({ + await this.jobRepository.queueAll( + libraries.map((library) => ({ name: JobName.LIBRARY_SCAN, data: { id: library.id, refreshModifiedFiles: !job.force, refreshAllFiles: job.force ?? false, }, - }); - } + })), + ); return true; } @@ -353,9 +353,9 @@ export class LibraryService { for await (const assets of assetPagination) { this.logger.debug(`Removing ${assets.length} offline assets`); - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.ASSET_DELETION, data: { id: asset.id, fromExternal: true } }); - } + await this.jobRepository.queueAll( + assets.map((asset) => ({ name: JobName.ASSET_DELETION, data: { id: asset.id, fromExternal: true } })), + ); } return true; @@ -411,16 +411,17 @@ export class LibraryService { this.logger.debug(`Will import ${filteredPaths.length} new asset(s)`); } - for (const assetPath of filteredPaths) { - const libraryJobData: ILibraryFileJob = { - id: job.id, - assetPath: path.normalize(assetPath), - ownerId: library.ownerId, - force: job.refreshAllFiles ?? false, - }; - - await this.jobRepository.queue({ name: JobName.LIBRARY_SCAN_ASSET, data: libraryJobData }); - } + await this.jobRepository.queueAll( + filteredPaths.map((assetPath) => ({ + name: JobName.LIBRARY_SCAN_ASSET, + data: { + id: job.id, + assetPath: path.normalize(assetPath), + ownerId: library.ownerId, + force: job.refreshAllFiles ?? false, + }, + })), + ); } await this.repository.update({ id: job.id, refreshedAt: new Date() }); diff --git a/server/src/domain/media/media.service.spec.ts b/server/src/domain/media/media.service.spec.ts index 2ab3def4f16..abc4fab583b 100644 --- a/server/src/domain/media/media.service.spec.ts +++ b/server/src/domain/media/media.service.spec.ts @@ -77,17 +77,21 @@ describe(MediaService.name, () => { expect(assetMock.getAll).toHaveBeenCalled(); expect(assetMock.getWithout).not.toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.GENERATE_JPEG_THUMBNAIL, - data: { id: assetStub.image.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.GENERATE_JPEG_THUMBNAIL, + data: { id: assetStub.image.id }, + }, + ]); expect(personMock.getAll).toHaveBeenCalled(); expect(personMock.getAllWithoutThumbnail).not.toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.GENERATE_PERSON_THUMBNAIL, - data: { id: personStub.newThumbnail.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.GENERATE_PERSON_THUMBNAIL, + data: { id: personStub.newThumbnail.id }, + }, + ]); }); it('should queue all people with missing thumbnail path', async () => { @@ -106,12 +110,14 @@ describe(MediaService.name, () => { expect(personMock.getAll).not.toHaveBeenCalled(); expect(personMock.getAllWithoutThumbnail).toHaveBeenCalled(); expect(personMock.getRandomFace).toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.GENERATE_PERSON_THUMBNAIL, - data: { - id: personStub.newThumbnail.id, + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.GENERATE_PERSON_THUMBNAIL, + data: { + id: personStub.newThumbnail.id, + }, }, - }); + ]); }); it('should queue all assets with missing resize path', async () => { @@ -125,10 +131,12 @@ describe(MediaService.name, () => { expect(assetMock.getAll).not.toHaveBeenCalled(); expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.GENERATE_JPEG_THUMBNAIL, - data: { id: assetStub.image.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.GENERATE_JPEG_THUMBNAIL, + data: { id: assetStub.image.id }, + }, + ]); expect(personMock.getAll).not.toHaveBeenCalled(); expect(personMock.getAllWithoutThumbnail).toHaveBeenCalled(); @@ -145,10 +153,12 @@ describe(MediaService.name, () => { expect(assetMock.getAll).not.toHaveBeenCalled(); expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.GENERATE_WEBP_THUMBNAIL, - data: { id: assetStub.image.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.GENERATE_WEBP_THUMBNAIL, + data: { id: assetStub.image.id }, + }, + ]); expect(personMock.getAll).not.toHaveBeenCalled(); expect(personMock.getAllWithoutThumbnail).toHaveBeenCalled(); @@ -165,10 +175,12 @@ describe(MediaService.name, () => { expect(assetMock.getAll).not.toHaveBeenCalled(); expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.GENERATE_THUMBHASH_THUMBNAIL, - data: { id: assetStub.image.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.GENERATE_THUMBHASH_THUMBNAIL, + data: { id: assetStub.image.id }, + }, + ]); expect(personMock.getAll).not.toHaveBeenCalled(); expect(personMock.getAllWithoutThumbnail).toHaveBeenCalled(); @@ -388,10 +400,12 @@ describe(MediaService.name, () => { expect(assetMock.getAll).toHaveBeenCalledWith({ skip: 0, take: 1000 }, { type: AssetType.VIDEO }); expect(assetMock.getWithout).not.toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.VIDEO_CONVERSION, - data: { id: assetStub.video.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.VIDEO_CONVERSION, + data: { id: assetStub.video.id }, + }, + ]); }); it('should queue all video assets without encoded videos', async () => { @@ -404,10 +418,12 @@ describe(MediaService.name, () => { expect(assetMock.getAll).not.toHaveBeenCalled(); expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.ENCODED_VIDEO); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.VIDEO_CONVERSION, - data: { id: assetStub.video.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.VIDEO_CONVERSION, + data: { id: assetStub.video.id }, + }, + ]); }); }); diff --git a/server/src/domain/media/media.service.ts b/server/src/domain/media/media.service.ts index 6f16a7b66e2..f3f0ccd1d39 100644 --- a/server/src/domain/media/media.service.ts +++ b/server/src/domain/media/media.service.ts @@ -21,6 +21,7 @@ import { IPersonRepository, IStorageRepository, ISystemConfigRepository, + JobItem, VideoCodecHWConfig, VideoStreamInfo, WithoutProperty, @@ -74,22 +75,27 @@ export class MediaService { }); for await (const assets of assetPagination) { + const jobs: JobItem[] = []; + for (const asset of assets) { if (!asset.resizePath || force) { - await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: asset.id } }); + jobs.push({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: asset.id } }); continue; } if (!asset.webpPath) { - await this.jobRepository.queue({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: { id: asset.id } }); + jobs.push({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: { id: asset.id } }); } if (!asset.thumbhash) { - await this.jobRepository.queue({ name: JobName.GENERATE_THUMBHASH_THUMBNAIL, data: { id: asset.id } }); + jobs.push({ name: JobName.GENERATE_THUMBHASH_THUMBNAIL, data: { id: asset.id } }); } } + + await this.jobRepository.queueAll(jobs); } const people = force ? await this.personRepository.getAll() : await this.personRepository.getAllWithoutThumbnail(); + const jobs: JobItem[] = []; for (const person of people) { if (!person.faceAssetId) { const face = await this.personRepository.getRandomFace(person.id); @@ -100,9 +106,11 @@ export class MediaService { await this.personRepository.update({ id: person.id, faceAssetId: face.assetId }); } - await this.jobRepository.queue({ name: JobName.GENERATE_PERSON_THUMBNAIL, data: { id: person.id } }); + jobs.push({ name: JobName.GENERATE_PERSON_THUMBNAIL, data: { id: person.id } }); } + await this.jobRepository.queueAll(jobs); + return true; } @@ -118,15 +126,15 @@ export class MediaService { } for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.MIGRATE_ASSET, data: { id: asset.id } }); - } + await this.jobRepository.queueAll( + assets.map((asset) => ({ name: JobName.MIGRATE_ASSET, data: { id: asset.id } })), + ); } const people = await this.personRepository.getAll(); - for (const person of people) { - await this.jobRepository.queue({ name: JobName.MIGRATE_PERSON, data: { id: person.id } }); - } + await this.jobRepository.queueAll( + people.map((person) => ({ name: JobName.MIGRATE_PERSON, data: { id: person.id } })), + ); return true; } @@ -224,9 +232,9 @@ export class MediaService { }); for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: asset.id } }); - } + await this.jobRepository.queueAll( + assets.map((asset) => ({ name: JobName.VIDEO_CONVERSION, data: { id: asset.id } })), + ); } return true; diff --git a/server/src/domain/metadata/metadata.service.spec.ts b/server/src/domain/metadata/metadata.service.spec.ts index fd2a29d45c3..94bdce25627 100644 --- a/server/src/domain/metadata/metadata.service.spec.ts +++ b/server/src/domain/metadata/metadata.service.spec.ts @@ -208,10 +208,12 @@ describe(MetadataService.name, () => { await expect(sut.handleQueueMetadataExtraction({ force: false })).resolves.toBe(true); expect(assetMock.getWithout).toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.METADATA_EXTRACTION, - data: { id: assetStub.image.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.METADATA_EXTRACTION, + data: { id: assetStub.image.id }, + }, + ]); }); it('should queue metadata extraction for all assets', async () => { @@ -219,10 +221,12 @@ describe(MetadataService.name, () => { await expect(sut.handleQueueMetadataExtraction({ force: true })).resolves.toBe(true); expect(assetMock.getAll).toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.METADATA_EXTRACTION, - data: { id: assetStub.image.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.METADATA_EXTRACTION, + data: { id: assetStub.image.id }, + }, + ]); }); }); @@ -320,6 +324,7 @@ describe(MetadataService.name, () => { expect(assetMock.getByIds).toHaveBeenCalledWith([assetStub.livePhotoMotionAsset.id]); expect(storageMock.writeFile).not.toHaveBeenCalled(); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); expect(assetMock.save).not.toHaveBeenCalledWith( expect.objectContaining({ assetType: AssetType.VIDEO, isVisible: false }), ); @@ -512,10 +517,12 @@ describe(MetadataService.name, () => { expect(assetMock.getWith).toHaveBeenCalledWith({ take: 1000, skip: 0 }, WithProperty.SIDECAR); expect(assetMock.getWithout).not.toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.SIDECAR_SYNC, - data: { id: assetStub.sidecar.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.SIDECAR_SYNC, + data: { id: assetStub.sidecar.id }, + }, + ]); }); it('should queue assets without sidecar files', async () => { @@ -525,10 +532,12 @@ describe(MetadataService.name, () => { expect(assetMock.getWithout).toHaveBeenCalledWith({ take: 1000, skip: 0 }, WithoutProperty.SIDECAR); expect(assetMock.getWith).not.toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.SIDECAR_DISCOVERY, - data: { id: assetStub.image.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.SIDECAR_DISCOVERY, + data: { id: assetStub.image.id }, + }, + ]); }); }); diff --git a/server/src/domain/metadata/metadata.service.ts b/server/src/domain/metadata/metadata.service.ts index 544813d5081..111b201ac1d 100644 --- a/server/src/domain/metadata/metadata.service.ts +++ b/server/src/domain/metadata/metadata.service.ts @@ -196,9 +196,9 @@ export class MetadataService { }); for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id } }); - } + await this.jobRepository.queueAll( + assets.map((asset) => ({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id } })), + ); } return true; @@ -264,10 +264,12 @@ export class MetadataService { }); for await (const assets of assetPagination) { - for (const asset of assets) { - const name = force ? JobName.SIDECAR_SYNC : JobName.SIDECAR_DISCOVERY; - await this.jobRepository.queue({ name, data: { id: asset.id } }); - } + await this.jobRepository.queueAll( + assets.map((asset) => ({ + name: force ? JobName.SIDECAR_SYNC : JobName.SIDECAR_DISCOVERY, + data: { id: asset.id }, + })), + ); } return true; diff --git a/server/src/domain/person/person.service.spec.ts b/server/src/domain/person/person.service.spec.ts index 4ca718e417c..ba576bc454a 100644 --- a/server/src/domain/person/person.service.spec.ts +++ b/server/src/domain/person/person.service.spec.ts @@ -286,6 +286,7 @@ describe(PersonService.name, () => { expect(personMock.getById).toHaveBeenCalledWith('person-1'); expect(personMock.update).toHaveBeenCalledWith({ id: 'person-1', birthDate: new Date('1976-06-30') }); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); expect(accessMock.person.checkOwnerAccess).toHaveBeenCalledWith(authStub.admin.user.id, new Set(['person-1'])); }); @@ -403,6 +404,7 @@ describe(PersonService.name, () => { }), ).rejects.toBeInstanceOf(BadRequestException); expect(jobMock.queue).not.toHaveBeenCalledWith(); + expect(jobMock.queueAll).not.toHaveBeenCalledWith(); }); it('should reassign a face', async () => { accessMock.person.checkOwnerAccess.mockResolvedValue(new Set([personStub.withName.id])); @@ -417,10 +419,12 @@ describe(PersonService.name, () => { }), ).resolves.toEqual([personStub.noName]); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.GENERATE_PERSON_THUMBNAIL, - data: { id: personStub.newThumbnail.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.GENERATE_PERSON_THUMBNAIL, + data: { id: personStub.newThumbnail.id }, + }, + ]); }); }); @@ -452,10 +456,12 @@ describe(PersonService.name, () => { it('should change person feature photo', async () => { personMock.getRandomFace.mockResolvedValue(faceStub.primaryFace1); await sut.createNewFeaturePhoto([personStub.newThumbnail.id]); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.GENERATE_PERSON_THUMBNAIL, - data: { id: personStub.newThumbnail.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.GENERATE_PERSON_THUMBNAIL, + data: { id: personStub.newThumbnail.id }, + }, + ]); }); }); @@ -480,6 +486,7 @@ describe(PersonService.name, () => { }); expect(jobMock.queue).not.toHaveBeenCalledWith(); + expect(jobMock.queueAll).not.toHaveBeenCalledWith(); }); it('should fail if user has not the correct permissions on the asset', async () => { @@ -495,6 +502,7 @@ describe(PersonService.name, () => { ).rejects.toBeInstanceOf(BadRequestException); expect(jobMock.queue).not.toHaveBeenCalledWith(); + expect(jobMock.queueAll).not.toHaveBeenCalledWith(); }); }); @@ -542,7 +550,9 @@ describe(PersonService.name, () => { await sut.handlePersonCleanup(); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.PERSON_DELETE, data: { id: personStub.noName.id } }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { name: JobName.PERSON_DELETE, data: { id: personStub.noName.id } }, + ]); }); }); @@ -552,6 +562,7 @@ describe(PersonService.name, () => { await expect(sut.handleQueueRecognizeFaces({})).resolves.toBe(true); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); expect(configMock.load).toHaveBeenCalled(); }); @@ -563,10 +574,12 @@ describe(PersonService.name, () => { await sut.handleQueueRecognizeFaces({}); expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.FACES); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.RECOGNIZE_FACES, - data: { id: assetStub.image.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.RECOGNIZE_FACES, + data: { id: assetStub.image.id }, + }, + ]); }); it('should queue all assets', async () => { @@ -580,14 +593,18 @@ describe(PersonService.name, () => { await sut.handleQueueRecognizeFaces({ force: true }); expect(assetMock.getAll).toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.RECOGNIZE_FACES, - data: { id: assetStub.image.id }, - }); - expect(jobMock.queue).toHaveBeenCalledWith({ - name: JobName.PERSON_DELETE, - data: { id: personStub.withName.id }, - }); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.RECOGNIZE_FACES, + data: { id: assetStub.image.id }, + }, + ]); + expect(jobMock.queueAll).toHaveBeenCalledWith([ + { + name: JobName.PERSON_DELETE, + data: { id: personStub.withName.id }, + }, + ]); }); }); @@ -644,6 +661,7 @@ describe(PersonService.name, () => { ); expect(personMock.createFace).not.toHaveBeenCalled(); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); expect(assetMock.upsertJobStatus).toHaveBeenCalledWith({ assetId: assetStub.image.id, diff --git a/server/src/domain/person/person.service.ts b/server/src/domain/person/person.service.ts index d63eda3cea9..73fb37489b0 100644 --- a/server/src/domain/person/person.service.ts +++ b/server/src/domain/person/person.service.ts @@ -22,6 +22,7 @@ import { ISmartInfoRepository, IStorageRepository, ISystemConfigRepository, + JobItem, UpdateFacesData, WithoutProperty, } from '../repositories'; @@ -153,6 +154,8 @@ export class PersonService { this.logger.debug( `Changing feature photos for ${changeFeaturePhoto.length} ${changeFeaturePhoto.length > 1 ? 'people' : 'person'}`, ); + + const jobs: JobItem[] = []; for (const personId of changeFeaturePhoto) { const assetFace = await this.repository.getRandomFace(personId); @@ -161,15 +164,11 @@ export class PersonService { id: personId, faceAssetId: assetFace.id, }); - - await this.jobRepository.queue({ - name: JobName.GENERATE_PERSON_THUMBNAIL, - data: { - id: personId, - }, - }); + jobs.push({ name: JobName.GENERATE_PERSON_THUMBNAIL, data: { id: personId } }); } } + + await this.jobRepository.queueAll(jobs); } async getById(auth: AuthDto, id: string): Promise { @@ -270,8 +269,10 @@ export class PersonService { const people = await this.repository.getAllWithoutFaces(); for (const person of people) { this.logger.debug(`Person ${person.name || person.id} no longer has any faces, deleting.`); - await this.jobRepository.queue({ name: JobName.PERSON_DELETE, data: { id: person.id } }); } + await this.jobRepository.queueAll( + people.map((person) => ({ name: JobName.PERSON_DELETE, data: { id: person.id } })), + ); return true; } @@ -290,16 +291,16 @@ export class PersonService { if (force) { const people = await this.repository.getAll(); - for (const person of people) { - await this.jobRepository.queue({ name: JobName.PERSON_DELETE, data: { id: person.id } }); - } + await this.jobRepository.queueAll( + people.map((person) => ({ name: JobName.PERSON_DELETE, data: { id: person.id } })), + ); this.logger.debug(`Deleted ${people.length} people`); } for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: { id: asset.id } }); - } + await this.jobRepository.queueAll( + assets.map((asset) => ({ name: JobName.RECOGNIZE_FACES, data: { id: asset.id } })), + ); } return true; diff --git a/server/src/domain/repositories/job.repository.ts b/server/src/domain/repositories/job.repository.ts index bd6982ca19e..faa78adb06c 100644 --- a/server/src/domain/repositories/job.repository.ts +++ b/server/src/domain/repositories/job.repository.ts @@ -103,6 +103,7 @@ export interface IJobRepository { deleteCronJob(name: string): void; setConcurrency(queueName: QueueName, concurrency: number): void; queue(item: JobItem): Promise; + queueAll(items: JobItem[]): Promise; pause(name: QueueName): Promise; resume(name: QueueName): Promise; empty(name: QueueName): Promise; diff --git a/server/src/domain/smart-info/smart-info.service.spec.ts b/server/src/domain/smart-info/smart-info.service.spec.ts index 9686e6ac630..7dd21d7a53a 100644 --- a/server/src/domain/smart-info/smart-info.service.spec.ts +++ b/server/src/domain/smart-info/smart-info.service.spec.ts @@ -69,7 +69,7 @@ describe(SmartInfoService.name, () => { await sut.handleQueueEncodeClip({ force: false }); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.ENCODE_CLIP, data: { id: assetStub.image.id } }); + expect(jobMock.queueAll).toHaveBeenCalledWith([{ name: JobName.ENCODE_CLIP, data: { id: assetStub.image.id } }]); expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.CLIP_ENCODING); }); @@ -81,7 +81,7 @@ describe(SmartInfoService.name, () => { await sut.handleQueueEncodeClip({ force: true }); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.ENCODE_CLIP, data: { id: assetStub.image.id } }); + expect(jobMock.queueAll).toHaveBeenCalledWith([{ name: JobName.ENCODE_CLIP, data: { id: assetStub.image.id } }]); expect(assetMock.getAll).toHaveBeenCalled(); }); }); diff --git a/server/src/domain/smart-info/smart-info.service.ts b/server/src/domain/smart-info/smart-info.service.ts index 5f6cf9b40f6..0a7876cd52a 100644 --- a/server/src/domain/smart-info/smart-info.service.ts +++ b/server/src/domain/smart-info/smart-info.service.ts @@ -64,9 +64,7 @@ export class SmartInfoService { }); for await (const assets of assetPagination) { - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: { id: asset.id } }); - } + await this.jobRepository.queueAll(assets.map((asset) => ({ name: JobName.ENCODE_CLIP, data: { id: asset.id } }))); } return true; diff --git a/server/src/domain/user/user.service.spec.ts b/server/src/domain/user/user.service.spec.ts index 5410beb10f3..45fe4a90250 100644 --- a/server/src/domain/user/user.service.spec.ts +++ b/server/src/domain/user/user.service.spec.ts @@ -342,7 +342,7 @@ describe(UserService.name, () => { userMock.update.mockResolvedValue({ ...userStub.admin, profileImagePath: file.path }); await sut.createProfileImage(authStub.admin, file); - await expect(jobMock.queue.mock.calls).toEqual([[{ name: JobName.DELETE_FILES, data: { files } }]]); + expect(jobMock.queue.mock.calls).toEqual([[{ name: JobName.DELETE_FILES, data: { files } }]]); }); it('should not delete the profile image if it has not been set', async () => { @@ -352,6 +352,7 @@ describe(UserService.name, () => { await sut.createProfileImage(authStub.admin, file); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); }); }); @@ -361,6 +362,7 @@ describe(UserService.name, () => { await expect(sut.deleteProfileImage(authStub.admin)).rejects.toBeInstanceOf(BadRequestException); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).not.toHaveBeenCalled(); }); it('should delete the profile image if user has one', async () => { @@ -368,7 +370,7 @@ describe(UserService.name, () => { const files = [userStub.profilePath.profileImagePath]; await sut.deleteProfileImage(authStub.admin); - await expect(jobMock.queue.mock.calls).toEqual([[{ name: JobName.DELETE_FILES, data: { files } }]]); + expect(jobMock.queue.mock.calls).toEqual([[{ name: JobName.DELETE_FILES, data: { files } }]]); }); }); @@ -456,6 +458,7 @@ describe(UserService.name, () => { expect(userMock.getDeletedUsers).toHaveBeenCalled(); expect(jobMock.queue).not.toHaveBeenCalled(); + expect(jobMock.queueAll).toHaveBeenCalledWith([]); }); it('should queue user ready for deletion', async () => { @@ -465,7 +468,7 @@ describe(UserService.name, () => { await sut.handleUserDeleteCheck(); expect(userMock.getDeletedUsers).toHaveBeenCalled(); - expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.USER_DELETION, data: { id: user.id } }); + expect(jobMock.queueAll).toHaveBeenCalledWith([{ name: JobName.USER_DELETION, data: { id: user.id } }]); }); }); diff --git a/server/src/domain/user/user.service.ts b/server/src/domain/user/user.service.ts index 85380ca2a12..8185a83c644 100644 --- a/server/src/domain/user/user.service.ts +++ b/server/src/domain/user/user.service.ts @@ -129,12 +129,11 @@ export class UserService { async handleUserDeleteCheck() { const users = await this.userRepository.getDeletedUsers(); - for (const user of users) { - if (this.isReadyForDeletion(user)) { - await this.jobRepository.queue({ name: JobName.USER_DELETION, data: { id: user.id } }); - } - } - + await this.jobRepository.queueAll( + users.flatMap((user) => + this.isReadyForDeletion(user) ? [{ name: JobName.USER_DELETION, data: { id: user.id } }] : [], + ), + ); return true; } diff --git a/server/src/infra/repositories/job.repository.ts b/server/src/infra/repositories/job.repository.ts index 61238fac78b..01c938e0059 100644 --- a/server/src/infra/repositories/job.repository.ts +++ b/server/src/infra/repositories/job.repository.ts @@ -116,12 +116,31 @@ export class JobRepository implements IJobRepository { ) as unknown as Promise; } - async queue(item: JobItem): Promise { - const jobName = item.name; - const jobData = (item as { data?: any })?.data || {}; - const jobOptions = this.getJobOptions(item) || undefined; + async queueAll(items: JobItem[]): Promise { + if (!items.length) { + return; + } - await this.getQueue(JOBS_TO_QUEUE[jobName]).add(jobName, jobData, jobOptions); + const itemsByQueue = items.reduce>((acc, item) => { + const queueName = JOBS_TO_QUEUE[item.name]; + acc[queueName] = acc[queueName] || []; + acc[queueName].push(item); + return acc; + }, {}); + + for (const [queueName, items] of Object.entries(itemsByQueue)) { + const queue = this.getQueue(queueName as QueueName); + const jobs = items.map((item) => ({ + name: item.name, + data: (item as { data?: any })?.data || {}, + options: this.getJobOptions(item) || undefined, + })); + await queue.addBulk(jobs); + } + } + + async queue(item: JobItem): Promise { + await this.queueAll([item]); } private getJobOptions(item: JobItem): JobsOptions | null { diff --git a/server/test/repositories/job.repository.mock.ts b/server/test/repositories/job.repository.mock.ts index 967c6a8040c..977e73f837b 100644 --- a/server/test/repositories/job.repository.mock.ts +++ b/server/test/repositories/job.repository.mock.ts @@ -11,6 +11,7 @@ export const newJobRepositoryMock = (): jest.Mocked => { pause: jest.fn(), resume: jest.fn(), queue: jest.fn().mockImplementation(() => Promise.resolve()), + queueAll: jest.fn().mockImplementation(() => Promise.resolve()), getQueueStatus: jest.fn(), getJobCounts: jest.fn(), clear: jest.fn(), diff --git a/server/test/test-utils.ts b/server/test/test-utils.ts index 0a64132e414..5356d83bff0 100644 --- a/server/test/test-utils.ts +++ b/server/test/test-utils.ts @@ -77,6 +77,7 @@ export const testApp = { deleteCronJob: jest.fn(), validateCronExpression: jest.fn(), queue: (item: JobItem) => jobs && _handler(item), + queueAll: (items: JobItem[]) => jobs && Promise.all(items.map(_handler)).then(() => Promise.resolve()), resume: jest.fn(), empty: jest.fn(), setConcurrency: jest.fn(),