diff --git a/src/lib/processing/workerPool.ts b/src/lib/processing/workerPool.ts new file mode 100644 index 0000000..ca9cfa0 --- /dev/null +++ b/src/lib/processing/workerPool.ts @@ -0,0 +1,128 @@ +// Worker pool for parallel image processing + +import type { Device, PipelineConfig } from '../types'; + +type PendingJob = { + resolve: (result: { blob: Blob; dataUrl: string }) => void; + reject: (error: Error) => void; +}; + +type JobPayload = { + imageData: ArrayBuffer; + device: Device; + config: PipelineConfig; +}; + +class PhotonWorkerPool { + private workers: Worker[] = []; + private availableWorkers: Worker[] = []; + private pendingJobs: Map = new Map(); + private jobQueue: Array<{ id: string; payload: JobPayload }> = []; + private initialized = false; + private initPromise: Promise | null = null; + private poolSize: number; + + constructor(poolSize = navigator.hardwareConcurrency || 4) { + this.poolSize = Math.max(2, Math.min(poolSize, 6)); + } + + private async init(): Promise { + if (this.initialized) return; + if (this.initPromise) return this.initPromise; + + this.initPromise = (async () => { + const WorkerConstructor = (await import('./photonWorker?worker')).default; + + for (let i = 0; i < this.poolSize; i++) { + const worker = new WorkerConstructor(); + worker.onmessage = (e: MessageEvent) => this.handleMessage(e, worker); + worker.onerror = (e: ErrorEvent) => this.handleError(e, worker); + this.workers.push(worker); + this.availableWorkers.push(worker); + } + + this.initialized = true; + })(); + + return this.initPromise; + } + + private handleMessage(e: MessageEvent, worker: Worker): void { + const { id, success, result, error } = e.data; + const pending = this.pendingJobs.get(id); + + if (pending) { + this.pendingJobs.delete(id); + + if (success) { + const blob = new Blob([result.blobData], { type: result.blobType }); + pending.resolve({ blob, dataUrl: result.dataUrl }); + } else { + pending.reject(new Error(error)); + } + } + + this.availableWorkers.push(worker); + this.processQueue(); + } + + private handleError(e: ErrorEvent, worker: Worker): void { + console.error('Worker error:', e); + this.availableWorkers.push(worker); + this.processQueue(); + } + + private processQueue(): void { + while (this.jobQueue.length > 0 && this.availableWorkers.length > 0) { + const job = this.jobQueue.shift()!; + const worker = this.availableWorkers.pop()!; + worker.postMessage({ id: job.id, type: 'process', payload: job.payload }, [ + job.payload.imageData + ]); + } + } + + async process( + file: File, + device: Device, + config: PipelineConfig + ): Promise<{ blob: Blob; dataUrl: string }> { + await this.init(); + + const id = crypto.randomUUID(); + const imageData = await file.arrayBuffer(); + + // Strip Svelte proxies before sending to worker + const plainDevice = JSON.parse(JSON.stringify(device)); + const plainConfig = JSON.parse(JSON.stringify(config)); + + return new Promise((resolve, reject) => { + this.pendingJobs.set(id, { resolve, reject }); + + const payload: JobPayload = { + imageData, + device: plainDevice, + config: plainConfig + }; + + if (this.availableWorkers.length > 0) { + const worker = this.availableWorkers.pop()!; + worker.postMessage({ id, type: 'process', payload }, [imageData]); + } else { + this.jobQueue.push({ id, payload }); + } + }); + } + + terminate(): void { + for (const worker of this.workers) { + worker.terminate(); + } + this.workers = []; + this.availableWorkers = []; + this.initialized = false; + this.initPromise = null; + } +} + +export const workerPool = new PhotonWorkerPool();