refactor: stream search duplicates (#17991)

This commit is contained in:
Jason Rasmussen 2025-04-30 10:40:32 -04:00 committed by GitHub
parent e3812a0e36
commit 8c5116bc1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 60 additions and 42 deletions

View File

@ -194,15 +194,16 @@ where
"asset_files"."assetId" = $1 "asset_files"."assetId" = $1
and "asset_files"."type" = $2 and "asset_files"."type" = $2
-- AssetJobRepository.streamForEncodeClip -- AssetJobRepository.streamForSearchDuplicates
select select
"assets"."id" "assets"."id"
from from
"assets" "assets"
inner join "asset_job_status" as "job_status" on "assetId" = "assets"."id" inner join "asset_job_status" as "job_status" on "assetId" = "assets"."id"
where where
"job_status"."previewAt" is not null "assets"."isVisible" = $1
and "assets"."isVisible" = $1 and "assets"."deletedAt" is null
and "job_status"."previewAt" is not null
and not exists ( and not exists (
select select
from from
@ -210,7 +211,25 @@ where
where where
"assetId" = "assets"."id" "assetId" = "assets"."id"
) )
and "job_status"."duplicatesDetectedAt" is null
-- AssetJobRepository.streamForEncodeClip
select
"assets"."id"
from
"assets"
inner join "asset_job_status" as "job_status" on "assetId" = "assets"."id"
where
"assets"."isVisible" = $1
and "assets"."deletedAt" is null and "assets"."deletedAt" is null
and "job_status"."previewAt" is not null
and not exists (
select
from
"smart_search"
where
"assetId" = "assets"."id"
)
-- AssetJobRepository.getForClipEncoding -- AssetJobRepository.getForClipEncoding
select select

View File

@ -135,20 +135,33 @@ export class AssetJobRepository {
.execute(); .execute();
} }
@GenerateSql({ params: [], stream: true }) private assetsWithPreviews() {
streamForEncodeClip(force?: boolean) {
return this.db return this.db
.selectFrom('assets') .selectFrom('assets')
.select(['assets.id'])
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
.where('job_status.previewAt', 'is not', null)
.where('assets.isVisible', '=', true) .where('assets.isVisible', '=', true)
.where('assets.deletedAt', 'is', null)
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
.where('job_status.previewAt', 'is not', null);
}
@GenerateSql({ params: [], stream: true })
streamForSearchDuplicates(force?: boolean) {
return this.assetsWithPreviews()
.where((eb) => eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id'))))
.$if(!force, (qb) => qb.where('job_status.duplicatesDetectedAt', 'is', null))
.select(['assets.id'])
.stream();
}
@GenerateSql({ params: [], stream: true })
streamForEncodeClip(force?: boolean) {
return this.assetsWithPreviews()
.select(['assets.id'])
.$if(!force, (qb) => .$if(!force, (qb) =>
qb.where((eb) => qb.where((eb) =>
eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id'))), eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id'))),
), ),
) )
.where('assets.deletedAt', 'is', null)
.stream(); .stream();
} }

View File

@ -49,7 +49,6 @@ export enum WithoutProperty {
THUMBNAIL = 'thumbnail', THUMBNAIL = 'thumbnail',
ENCODED_VIDEO = 'encoded-video', ENCODED_VIDEO = 'encoded-video',
EXIF = 'exif', EXIF = 'exif',
DUPLICATE = 'duplicate',
FACES = 'faces', FACES = 'faces',
SIDECAR = 'sidecar', SIDECAR = 'sidecar',
} }
@ -539,14 +538,6 @@ export class AssetRepository {
const items = await this.db const items = await this.db
.selectFrom('assets') .selectFrom('assets')
.selectAll('assets') .selectAll('assets')
.$if(property === WithoutProperty.DUPLICATE, (qb) =>
qb
.innerJoin('asset_job_status as job_status', 'assets.id', 'job_status.assetId')
.where('job_status.duplicatesDetectedAt', 'is', null)
.where('job_status.previewAt', 'is not', null)
.where((eb) => eb.exists(eb.selectFrom('smart_search').where('assetId', '=', eb.ref('assets.id'))))
.where('assets.isVisible', '=', true),
)
.$if(property === WithoutProperty.ENCODED_VIDEO, (qb) => .$if(property === WithoutProperty.ENCODED_VIDEO, (qb) =>
qb qb
.where('assets.type', '=', AssetType.VIDEO) .where('assets.type', '=', AssetType.VIDEO)

View File

@ -1,10 +1,9 @@
import { AssetFileType, AssetType, JobName, JobStatus } from 'src/enum'; import { AssetFileType, AssetType, JobName, JobStatus } from 'src/enum';
import { WithoutProperty } from 'src/repositories/asset.repository';
import { DuplicateService } from 'src/services/duplicate.service'; import { DuplicateService } from 'src/services/duplicate.service';
import { SearchService } from 'src/services/search.service'; import { SearchService } from 'src/services/search.service';
import { assetStub } from 'test/fixtures/asset.stub'; import { assetStub } from 'test/fixtures/asset.stub';
import { authStub } from 'test/fixtures/auth.stub'; import { authStub } from 'test/fixtures/auth.stub';
import { newTestService, ServiceMocks } from 'test/utils'; import { makeStream, newTestService, ServiceMocks } from 'test/utils';
import { beforeEach, vitest } from 'vitest'; import { beforeEach, vitest } from 'vitest';
vitest.useFakeTimers(); vitest.useFakeTimers();
@ -113,14 +112,11 @@ describe(SearchService.name, () => {
}); });
it('should queue missing assets', async () => { it('should queue missing assets', async () => {
mocks.asset.getWithout.mockResolvedValue({ mocks.assetJob.streamForSearchDuplicates.mockReturnValue(makeStream([assetStub.image]));
items: [assetStub.image],
hasNextPage: false,
});
await sut.handleQueueSearchDuplicates({}); await sut.handleQueueSearchDuplicates({});
expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.DUPLICATE); expect(mocks.assetJob.streamForSearchDuplicates).toHaveBeenCalledWith(undefined);
expect(mocks.job.queueAll).toHaveBeenCalledWith([ expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ {
name: JobName.DUPLICATE_DETECTION, name: JobName.DUPLICATE_DETECTION,
@ -130,14 +126,11 @@ describe(SearchService.name, () => {
}); });
it('should queue all assets', async () => { it('should queue all assets', async () => {
mocks.asset.getAll.mockResolvedValue({ mocks.assetJob.streamForSearchDuplicates.mockReturnValue(makeStream([assetStub.image]));
items: [assetStub.image],
hasNextPage: false,
});
await sut.handleQueueSearchDuplicates({ force: true }); await sut.handleQueueSearchDuplicates({ force: true });
expect(mocks.asset.getAll).toHaveBeenCalled(); expect(mocks.assetJob.streamForSearchDuplicates).toHaveBeenCalledWith(true);
expect(mocks.job.queueAll).toHaveBeenCalledWith([ expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ {
name: JobName.DUPLICATE_DETECTION, name: JobName.DUPLICATE_DETECTION,

View File

@ -5,13 +5,11 @@ import { mapAsset } from 'src/dtos/asset-response.dto';
import { AuthDto } from 'src/dtos/auth.dto'; import { AuthDto } from 'src/dtos/auth.dto';
import { DuplicateResponseDto } from 'src/dtos/duplicate.dto'; import { DuplicateResponseDto } from 'src/dtos/duplicate.dto';
import { AssetFileType, JobName, JobStatus, QueueName } from 'src/enum'; import { AssetFileType, JobName, JobStatus, QueueName } from 'src/enum';
import { WithoutProperty } from 'src/repositories/asset.repository';
import { AssetDuplicateResult } from 'src/repositories/search.repository'; import { AssetDuplicateResult } from 'src/repositories/search.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { JobOf } from 'src/types'; import { JobItem, JobOf } from 'src/types';
import { getAssetFile } from 'src/utils/asset.util'; import { getAssetFile } from 'src/utils/asset.util';
import { isDuplicateDetectionEnabled } from 'src/utils/misc'; import { isDuplicateDetectionEnabled } from 'src/utils/misc';
import { usePagination } from 'src/utils/pagination';
@Injectable() @Injectable()
export class DuplicateService extends BaseService { export class DuplicateService extends BaseService {
@ -30,17 +28,21 @@ export class DuplicateService extends BaseService {
return JobStatus.SKIPPED; return JobStatus.SKIPPED;
} }
const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { let jobs: JobItem[] = [];
return force const queueAll = async () => {
? this.assetRepository.getAll(pagination, { isVisible: true }) await this.jobRepository.queueAll(jobs);
: this.assetRepository.getWithout(pagination, WithoutProperty.DUPLICATE); jobs = [];
}); };
for await (const assets of assetPagination) { const assets = this.assetJobRepository.streamForSearchDuplicates(force);
await this.jobRepository.queueAll( for await (const asset of assets) {
assets.map((asset) => ({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } })), jobs.push({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } });
); if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) {
await queueAll();
} }
}
await queueAll();
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }