From 8c5116bc1deab20582c2aa1b34db5c633b6fea86 Mon Sep 17 00:00:00 2001 From: Jason Rasmussen Date: Wed, 30 Apr 2025 10:40:32 -0400 Subject: [PATCH] refactor: stream search duplicates (#17991) --- server/src/queries/asset.job.repository.sql | 25 +++++++++++++++--- .../src/repositories/asset-job.repository.ts | 25 +++++++++++++----- server/src/repositories/asset.repository.ts | 9 ------- server/src/services/duplicate.service.spec.ts | 17 ++++-------- server/src/services/duplicate.service.ts | 26 ++++++++++--------- 5 files changed, 60 insertions(+), 42 deletions(-) diff --git a/server/src/queries/asset.job.repository.sql b/server/src/queries/asset.job.repository.sql index 67a3b8021db..48a233ffe47 100644 --- a/server/src/queries/asset.job.repository.sql +++ b/server/src/queries/asset.job.repository.sql @@ -194,15 +194,16 @@ where "asset_files"."assetId" = $1 and "asset_files"."type" = $2 --- AssetJobRepository.streamForEncodeClip +-- AssetJobRepository.streamForSearchDuplicates select "assets"."id" from "assets" inner join "asset_job_status" as "job_status" on "assetId" = "assets"."id" where - "job_status"."previewAt" is not null - and "assets"."isVisible" = $1 + "assets"."isVisible" = $1 + and "assets"."deletedAt" is null + and "job_status"."previewAt" is not null and not exists ( select from @@ -210,7 +211,25 @@ where where "assetId" = "assets"."id" ) + and "job_status"."duplicatesDetectedAt" is null + +-- AssetJobRepository.streamForEncodeClip +select + "assets"."id" +from + "assets" + inner join "asset_job_status" as "job_status" on "assetId" = "assets"."id" +where + "assets"."isVisible" = $1 and "assets"."deletedAt" is null + and "job_status"."previewAt" is not null + and not exists ( + select + from + "smart_search" + where + "assetId" = "assets"."id" + ) -- AssetJobRepository.getForClipEncoding select diff --git a/server/src/repositories/asset-job.repository.ts b/server/src/repositories/asset-job.repository.ts index 262cf6217ce..693cd193c1f 100644 --- a/server/src/repositories/asset-job.repository.ts +++ b/server/src/repositories/asset-job.repository.ts @@ -135,20 +135,33 @@ export class AssetJobRepository { .execute(); } - @GenerateSql({ params: [], stream: true }) - streamForEncodeClip(force?: boolean) { + private assetsWithPreviews() { return this.db .selectFrom('assets') - .select(['assets.id']) - .innerJoin('asset_job_status as job_status', 'assetId', 'assets.id') - .where('job_status.previewAt', 'is not', null) .where('assets.isVisible', '=', true) + .where('assets.deletedAt', 'is', null) + .innerJoin('asset_job_status as job_status', 'assetId', 'assets.id') + .where('job_status.previewAt', 'is not', null); + } + + @GenerateSql({ params: [], stream: true }) + streamForSearchDuplicates(force?: boolean) { + return this.assetsWithPreviews() + .where((eb) => eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id')))) + .$if(!force, (qb) => qb.where('job_status.duplicatesDetectedAt', 'is', null)) + .select(['assets.id']) + .stream(); + } + + @GenerateSql({ params: [], stream: true }) + streamForEncodeClip(force?: boolean) { + return this.assetsWithPreviews() + .select(['assets.id']) .$if(!force, (qb) => qb.where((eb) => eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id'))), ), ) - .where('assets.deletedAt', 'is', null) .stream(); } diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index ab7d9c80ea2..7b6fb862d02 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -49,7 +49,6 @@ export enum WithoutProperty { THUMBNAIL = 'thumbnail', ENCODED_VIDEO = 'encoded-video', EXIF = 'exif', - DUPLICATE = 'duplicate', FACES = 'faces', SIDECAR = 'sidecar', } @@ -539,14 +538,6 @@ export class AssetRepository { const items = await this.db .selectFrom('assets') .selectAll('assets') - .$if(property === WithoutProperty.DUPLICATE, (qb) => - qb - .innerJoin('asset_job_status as job_status', 'assets.id', 'job_status.assetId') - .where('job_status.duplicatesDetectedAt', 'is', null) - .where('job_status.previewAt', 'is not', null) - .where((eb) => eb.exists(eb.selectFrom('smart_search').where('assetId', '=', eb.ref('assets.id')))) - .where('assets.isVisible', '=', true), - ) .$if(property === WithoutProperty.ENCODED_VIDEO, (qb) => qb .where('assets.type', '=', AssetType.VIDEO) diff --git a/server/src/services/duplicate.service.spec.ts b/server/src/services/duplicate.service.spec.ts index 57ab955514e..ed8f2cf1779 100644 --- a/server/src/services/duplicate.service.spec.ts +++ b/server/src/services/duplicate.service.spec.ts @@ -1,10 +1,9 @@ import { AssetFileType, AssetType, JobName, JobStatus } from 'src/enum'; -import { WithoutProperty } from 'src/repositories/asset.repository'; import { DuplicateService } from 'src/services/duplicate.service'; import { SearchService } from 'src/services/search.service'; import { assetStub } from 'test/fixtures/asset.stub'; import { authStub } from 'test/fixtures/auth.stub'; -import { newTestService, ServiceMocks } from 'test/utils'; +import { makeStream, newTestService, ServiceMocks } from 'test/utils'; import { beforeEach, vitest } from 'vitest'; vitest.useFakeTimers(); @@ -113,14 +112,11 @@ describe(SearchService.name, () => { }); it('should queue missing assets', async () => { - mocks.asset.getWithout.mockResolvedValue({ - items: [assetStub.image], - hasNextPage: false, - }); + mocks.assetJob.streamForSearchDuplicates.mockReturnValue(makeStream([assetStub.image])); await sut.handleQueueSearchDuplicates({}); - expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.DUPLICATE); + expect(mocks.assetJob.streamForSearchDuplicates).toHaveBeenCalledWith(undefined); expect(mocks.job.queueAll).toHaveBeenCalledWith([ { name: JobName.DUPLICATE_DETECTION, @@ -130,14 +126,11 @@ describe(SearchService.name, () => { }); it('should queue all assets', async () => { - mocks.asset.getAll.mockResolvedValue({ - items: [assetStub.image], - hasNextPage: false, - }); + mocks.assetJob.streamForSearchDuplicates.mockReturnValue(makeStream([assetStub.image])); await sut.handleQueueSearchDuplicates({ force: true }); - expect(mocks.asset.getAll).toHaveBeenCalled(); + expect(mocks.assetJob.streamForSearchDuplicates).toHaveBeenCalledWith(true); expect(mocks.job.queueAll).toHaveBeenCalledWith([ { name: JobName.DUPLICATE_DETECTION, diff --git a/server/src/services/duplicate.service.ts b/server/src/services/duplicate.service.ts index c504b1a3053..41e3f13c4dd 100644 --- a/server/src/services/duplicate.service.ts +++ b/server/src/services/duplicate.service.ts @@ -5,13 +5,11 @@ import { mapAsset } from 'src/dtos/asset-response.dto'; import { AuthDto } from 'src/dtos/auth.dto'; import { DuplicateResponseDto } from 'src/dtos/duplicate.dto'; import { AssetFileType, JobName, JobStatus, QueueName } from 'src/enum'; -import { WithoutProperty } from 'src/repositories/asset.repository'; import { AssetDuplicateResult } from 'src/repositories/search.repository'; import { BaseService } from 'src/services/base.service'; -import { JobOf } from 'src/types'; +import { JobItem, JobOf } from 'src/types'; import { getAssetFile } from 'src/utils/asset.util'; import { isDuplicateDetectionEnabled } from 'src/utils/misc'; -import { usePagination } from 'src/utils/pagination'; @Injectable() export class DuplicateService extends BaseService { @@ -30,18 +28,22 @@ export class DuplicateService extends BaseService { return JobStatus.SKIPPED; } - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getAll(pagination, { isVisible: true }) - : this.assetRepository.getWithout(pagination, WithoutProperty.DUPLICATE); - }); + let jobs: JobItem[] = []; + const queueAll = async () => { + await this.jobRepository.queueAll(jobs); + jobs = []; + }; - for await (const assets of assetPagination) { - await this.jobRepository.queueAll( - assets.map((asset) => ({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } })), - ); + const assets = this.assetJobRepository.streamForSearchDuplicates(force); + for await (const asset of assets) { + jobs.push({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } }); + if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) { + await queueAll(); + } } + await queueAll(); + return JobStatus.SUCCESS; }