From 3f06a494a9058e9cee547efd53362e1085e33dae Mon Sep 17 00:00:00 2001 From: Jason Rasmussen Date: Fri, 7 Mar 2025 17:22:57 -0500 Subject: [PATCH] refactor: queue asset deletes via stream (#16706) --- server/src/repositories/asset.repository.ts | 8 +++++ server/src/services/asset.service.spec.ts | 21 ++++++----- server/src/services/asset.service.ts | 35 +++++++++++-------- .../repositories/asset.repository.mock.ts | 1 + 4 files changed, 42 insertions(+), 23 deletions(-) diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index 1efcb3875be..5f020ba8361 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -637,6 +637,14 @@ export class AssetRepository { return this.storageTemplateAssetQuery().stream() as AsyncIterableIterator; } + streamDeletedAssets(trashedBefore: Date) { + return this.db + .selectFrom('assets') + .select(['id', 'isOffline']) + .where('assets.deletedAt', '<=', trashedBefore) + .stream(); + } + @GenerateSql( ...Object.values(WithProperty).map((property) => ({ name: property, diff --git a/server/src/services/asset.service.spec.ts b/server/src/services/asset.service.spec.ts index f91f600bb15..9286008581a 100755 --- a/server/src/services/asset.service.spec.ts +++ b/server/src/services/asset.service.spec.ts @@ -11,7 +11,8 @@ import { authStub } from 'test/fixtures/auth.stub'; import { faceStub } from 'test/fixtures/face.stub'; import { partnerStub } from 'test/fixtures/partner.stub'; import { userStub } from 'test/fixtures/user.stub'; -import { newTestService, ServiceMocks } from 'test/utils'; +import { factory } from 'test/small.factory'; +import { makeStream, newTestService, ServiceMocks } from 'test/utils'; import { vitest } from 'vitest'; const stats: AssetStats = { @@ -473,28 +474,30 @@ describe(AssetService.name, () => { }); it('should immediately queue assets for deletion if trash is disabled', async () => { - mocks.asset.getAll.mockResolvedValue({ hasNextPage: false, items: [assetStub.image] }); + const asset = factory.asset({ isOffline: false }); + + mocks.asset.streamDeletedAssets.mockReturnValue(makeStream([asset])); mocks.systemMetadata.get.mockResolvedValue({ trash: { enabled: false } }); await expect(sut.handleAssetDeletionCheck()).resolves.toBe(JobStatus.SUCCESS); - expect(mocks.asset.getAll).toHaveBeenCalledWith(expect.anything(), { trashedBefore: new Date() }); + expect(mocks.asset.streamDeletedAssets).toHaveBeenCalledWith(new Date()); expect(mocks.job.queueAll).toHaveBeenCalledWith([ - { name: JobName.ASSET_DELETION, data: { id: assetStub.image.id, deleteOnDisk: true } }, + { name: JobName.ASSET_DELETION, data: { id: asset.id, deleteOnDisk: true } }, ]); }); it('should queue assets for deletion after trash duration', async () => { - mocks.asset.getAll.mockResolvedValue({ hasNextPage: false, items: [assetStub.image] }); + const asset = factory.asset({ isOffline: false }); + + mocks.asset.streamDeletedAssets.mockReturnValue(makeStream([asset])); mocks.systemMetadata.get.mockResolvedValue({ trash: { enabled: true, days: 7 } }); await expect(sut.handleAssetDeletionCheck()).resolves.toBe(JobStatus.SUCCESS); - expect(mocks.asset.getAll).toHaveBeenCalledWith(expect.anything(), { - trashedBefore: DateTime.now().minus({ days: 7 }).toJSDate(), - }); + expect(mocks.asset.streamDeletedAssets).toHaveBeenCalledWith(DateTime.now().minus({ days: 7 }).toJSDate()); expect(mocks.job.queueAll).toHaveBeenCalledWith([ - { name: JobName.ASSET_DELETION, data: { id: assetStub.image.id, deleteOnDisk: true } }, + { name: JobName.ASSET_DELETION, data: { id: asset.id, deleteOnDisk: true } }, ]); }); }); diff --git a/server/src/services/asset.service.ts b/server/src/services/asset.service.ts index df66d405b7a..56b7f7743c6 100644 --- a/server/src/services/asset.service.ts +++ b/server/src/services/asset.service.ts @@ -25,7 +25,6 @@ import { AssetStatus, JobName, JobStatus, Permission, QueueName } from 'src/enum import { BaseService } from 'src/services/base.service'; import { ISidecarWriteJob, JobItem, JobOf } from 'src/types'; import { getAssetFiles, getMyPartnerIds, onAfterUnlink, onBeforeLink, onBeforeUnlink } from 'src/utils/asset.util'; -import { usePagination } from 'src/utils/pagination'; @Injectable() export class AssetService extends BaseService { @@ -156,22 +155,30 @@ export class AssetService extends BaseService { const trashedBefore = DateTime.now() .minus(Duration.fromObject({ days: trashedDays })) .toJSDate(); - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => - this.assetRepository.getAll(pagination, { trashedBefore }), - ); - for await (const assets of assetPagination) { - await this.jobRepository.queueAll( - assets.map((asset) => ({ - name: JobName.ASSET_DELETION, - data: { - id: asset.id, - deleteOnDisk: !asset.isOffline, - }, - })), - ); + let chunk: Array<{ id: string; isOffline: boolean }> = []; + const queueChunk = async () => { + if (chunk.length > 0) { + await this.jobRepository.queueAll( + chunk.map(({ id, isOffline }) => ({ + name: JobName.ASSET_DELETION, + data: { id, deleteOnDisk: !isOffline }, + })), + ); + chunk = []; + } + }; + + const assets = this.assetRepository.streamDeletedAssets(trashedBefore); + for await (const asset of assets) { + chunk.push(asset); + if (chunk.length >= JOBS_ASSET_PAGINATION_SIZE) { + await queueChunk(); + } } + await queueChunk(); + return JobStatus.SUCCESS; } diff --git a/server/test/repositories/asset.repository.mock.ts b/server/test/repositories/asset.repository.mock.ts index 66b56b0ecc9..19464f7ff28 100644 --- a/server/test/repositories/asset.repository.mock.ts +++ b/server/test/repositories/asset.repository.mock.ts @@ -46,5 +46,6 @@ export const newAssetRepositoryMock = (): Mocked