mirror of
https://github.com/immich-app/immich
synced 2025-06-07 11:01:09 +00:00
refactor: smart search queue (#17977)
This commit is contained in:
parent
038a82c4f1
commit
2e8a286540
@ -194,6 +194,24 @@ where
|
|||||||
"asset_files"."assetId" = $1
|
"asset_files"."assetId" = $1
|
||||||
and "asset_files"."type" = $2
|
and "asset_files"."type" = $2
|
||||||
|
|
||||||
|
-- AssetJobRepository.streamForEncodeClip
|
||||||
|
select
|
||||||
|
"assets"."id"
|
||||||
|
from
|
||||||
|
"assets"
|
||||||
|
inner join "asset_job_status" as "job_status" on "assetId" = "assets"."id"
|
||||||
|
where
|
||||||
|
"job_status"."previewAt" is not null
|
||||||
|
and "assets"."isVisible" = $1
|
||||||
|
and not exists (
|
||||||
|
select
|
||||||
|
from
|
||||||
|
"smart_search"
|
||||||
|
where
|
||||||
|
"assetId" = "assets"."id"
|
||||||
|
)
|
||||||
|
and "assets"."deletedAt" is null
|
||||||
|
|
||||||
-- AssetJobRepository.getForClipEncoding
|
-- AssetJobRepository.getForClipEncoding
|
||||||
select
|
select
|
||||||
"assets"."id",
|
"assets"."id",
|
||||||
|
@ -135,6 +135,23 @@ export class AssetJobRepository {
|
|||||||
.execute();
|
.execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GenerateSql({ params: [], stream: true })
|
||||||
|
streamForEncodeClip(force?: boolean) {
|
||||||
|
return this.db
|
||||||
|
.selectFrom('assets')
|
||||||
|
.select(['assets.id'])
|
||||||
|
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
|
||||||
|
.where('job_status.previewAt', 'is not', null)
|
||||||
|
.where('assets.isVisible', '=', true)
|
||||||
|
.$if(!force, (qb) =>
|
||||||
|
qb.where((eb) =>
|
||||||
|
eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id'))),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.where('assets.deletedAt', 'is', null)
|
||||||
|
.stream();
|
||||||
|
}
|
||||||
|
|
||||||
@GenerateSql({ params: [DummyValue.UUID] })
|
@GenerateSql({ params: [DummyValue.UUID] })
|
||||||
getForClipEncoding(id: string) {
|
getForClipEncoding(id: string) {
|
||||||
return this.db
|
return this.db
|
||||||
|
@ -49,7 +49,6 @@ export enum WithoutProperty {
|
|||||||
THUMBNAIL = 'thumbnail',
|
THUMBNAIL = 'thumbnail',
|
||||||
ENCODED_VIDEO = 'encoded-video',
|
ENCODED_VIDEO = 'encoded-video',
|
||||||
EXIF = 'exif',
|
EXIF = 'exif',
|
||||||
SMART_SEARCH = 'smart-search',
|
|
||||||
DUPLICATE = 'duplicate',
|
DUPLICATE = 'duplicate',
|
||||||
FACES = 'faces',
|
FACES = 'faces',
|
||||||
SIDECAR = 'sidecar',
|
SIDECAR = 'sidecar',
|
||||||
@ -571,15 +570,6 @@ export class AssetRepository {
|
|||||||
.where((eb) => eb.or([eb('assets.sidecarPath', '=', ''), eb('assets.sidecarPath', 'is', null)]))
|
.where((eb) => eb.or([eb('assets.sidecarPath', '=', ''), eb('assets.sidecarPath', 'is', null)]))
|
||||||
.where('assets.isVisible', '=', true),
|
.where('assets.isVisible', '=', true),
|
||||||
)
|
)
|
||||||
.$if(property === WithoutProperty.SMART_SEARCH, (qb) =>
|
|
||||||
qb
|
|
||||||
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
|
|
||||||
.where('job_status.previewAt', 'is not', null)
|
|
||||||
.where('assets.isVisible', '=', true)
|
|
||||||
.where((eb) =>
|
|
||||||
eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id'))),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.$if(property === WithoutProperty.THUMBNAIL, (qb) =>
|
.$if(property === WithoutProperty.THUMBNAIL, (qb) =>
|
||||||
qb
|
qb
|
||||||
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
|
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
|
||||||
|
@ -1,11 +1,10 @@
|
|||||||
import { SystemConfig } from 'src/config';
|
import { SystemConfig } from 'src/config';
|
||||||
import { ImmichWorker, JobName, JobStatus } from 'src/enum';
|
import { ImmichWorker, JobName, JobStatus } from 'src/enum';
|
||||||
import { WithoutProperty } from 'src/repositories/asset.repository';
|
|
||||||
import { SmartInfoService } from 'src/services/smart-info.service';
|
import { SmartInfoService } from 'src/services/smart-info.service';
|
||||||
import { getCLIPModelInfo } from 'src/utils/misc';
|
import { getCLIPModelInfo } from 'src/utils/misc';
|
||||||
import { assetStub } from 'test/fixtures/asset.stub';
|
import { assetStub } from 'test/fixtures/asset.stub';
|
||||||
import { systemConfigStub } from 'test/fixtures/system-config.stub';
|
import { systemConfigStub } from 'test/fixtures/system-config.stub';
|
||||||
import { newTestService, ServiceMocks } from 'test/utils';
|
import { makeStream, newTestService, ServiceMocks } from 'test/utils';
|
||||||
|
|
||||||
describe(SmartInfoService.name, () => {
|
describe(SmartInfoService.name, () => {
|
||||||
let sut: SmartInfoService;
|
let sut: SmartInfoService;
|
||||||
@ -152,38 +151,31 @@ describe(SmartInfoService.name, () => {
|
|||||||
|
|
||||||
await sut.handleQueueEncodeClip({});
|
await sut.handleQueueEncodeClip({});
|
||||||
|
|
||||||
expect(mocks.asset.getAll).not.toHaveBeenCalled();
|
|
||||||
expect(mocks.asset.getWithout).not.toHaveBeenCalled();
|
expect(mocks.asset.getWithout).not.toHaveBeenCalled();
|
||||||
expect(mocks.search.setDimensionSize).not.toHaveBeenCalled();
|
expect(mocks.search.setDimensionSize).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should queue the assets without clip embeddings', async () => {
|
it('should queue the assets without clip embeddings', async () => {
|
||||||
mocks.asset.getWithout.mockResolvedValue({
|
mocks.assetJob.streamForEncodeClip.mockReturnValue(makeStream([assetStub.image]));
|
||||||
items: [assetStub.image],
|
|
||||||
hasNextPage: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
await sut.handleQueueEncodeClip({ force: false });
|
await sut.handleQueueEncodeClip({ force: false });
|
||||||
|
|
||||||
expect(mocks.job.queueAll).toHaveBeenCalledWith([
|
expect(mocks.job.queueAll).toHaveBeenCalledWith([
|
||||||
{ name: JobName.SMART_SEARCH, data: { id: assetStub.image.id } },
|
{ name: JobName.SMART_SEARCH, data: { id: assetStub.image.id } },
|
||||||
]);
|
]);
|
||||||
expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.SMART_SEARCH);
|
expect(mocks.assetJob.streamForEncodeClip).toHaveBeenCalledWith(false);
|
||||||
expect(mocks.search.setDimensionSize).not.toHaveBeenCalled();
|
expect(mocks.search.setDimensionSize).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should queue all the assets', async () => {
|
it('should queue all the assets', async () => {
|
||||||
mocks.asset.getAll.mockResolvedValue({
|
mocks.assetJob.streamForEncodeClip.mockReturnValue(makeStream([assetStub.image]));
|
||||||
items: [assetStub.image],
|
|
||||||
hasNextPage: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
await sut.handleQueueEncodeClip({ force: true });
|
await sut.handleQueueEncodeClip({ force: true });
|
||||||
|
|
||||||
expect(mocks.job.queueAll).toHaveBeenCalledWith([
|
expect(mocks.job.queueAll).toHaveBeenCalledWith([
|
||||||
{ name: JobName.SMART_SEARCH, data: { id: assetStub.image.id } },
|
{ name: JobName.SMART_SEARCH, data: { id: assetStub.image.id } },
|
||||||
]);
|
]);
|
||||||
expect(mocks.asset.getAll).toHaveBeenCalled();
|
expect(mocks.assetJob.streamForEncodeClip).toHaveBeenCalledWith(true);
|
||||||
expect(mocks.search.setDimensionSize).toHaveBeenCalledExactlyOnceWith(512);
|
expect(mocks.search.setDimensionSize).toHaveBeenCalledExactlyOnceWith(512);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -3,12 +3,10 @@ import { SystemConfig } from 'src/config';
|
|||||||
import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
|
import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
|
||||||
import { OnEvent, OnJob } from 'src/decorators';
|
import { OnEvent, OnJob } from 'src/decorators';
|
||||||
import { DatabaseLock, ImmichWorker, JobName, JobStatus, QueueName } from 'src/enum';
|
import { DatabaseLock, ImmichWorker, JobName, JobStatus, QueueName } from 'src/enum';
|
||||||
import { WithoutProperty } from 'src/repositories/asset.repository';
|
|
||||||
import { ArgOf } from 'src/repositories/event.repository';
|
import { ArgOf } from 'src/repositories/event.repository';
|
||||||
import { BaseService } from 'src/services/base.service';
|
import { BaseService } from 'src/services/base.service';
|
||||||
import { JobOf } from 'src/types';
|
import { JobItem, JobOf } from 'src/types';
|
||||||
import { getCLIPModelInfo, isSmartSearchEnabled } from 'src/utils/misc';
|
import { getCLIPModelInfo, isSmartSearchEnabled } from 'src/utils/misc';
|
||||||
import { usePagination } from 'src/utils/pagination';
|
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class SmartInfoService extends BaseService {
|
export class SmartInfoService extends BaseService {
|
||||||
@ -79,18 +77,18 @@ export class SmartInfoService extends BaseService {
|
|||||||
await this.searchRepository.setDimensionSize(dimSize);
|
await this.searchRepository.setDimensionSize(dimSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => {
|
let queue: JobItem[] = [];
|
||||||
return force
|
const assets = this.assetJobRepository.streamForEncodeClip(force);
|
||||||
? this.assetRepository.getAll(pagination, { isVisible: true })
|
for await (const asset of assets) {
|
||||||
: this.assetRepository.getWithout(pagination, WithoutProperty.SMART_SEARCH);
|
queue.push({ name: JobName.SMART_SEARCH, data: { id: asset.id } });
|
||||||
});
|
if (queue.length >= JOBS_ASSET_PAGINATION_SIZE) {
|
||||||
|
await this.jobRepository.queueAll(queue);
|
||||||
for await (const assets of assetPagination) {
|
queue = [];
|
||||||
await this.jobRepository.queueAll(
|
}
|
||||||
assets.map((asset) => ({ name: JobName.SMART_SEARCH, data: { id: asset.id } })),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this.jobRepository.queueAll(queue);
|
||||||
|
|
||||||
return JobStatus.SUCCESS;
|
return JobStatus.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user