feat: add worker pool for parallel processing

- Dynamic pool sizing based on CPU cores
- Job queue with automatic worker assignment
- Strips Svelte proxies before sending to workers
- Graceful error handling and cleanup

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2026-05-13 18:10:39 -04:00
parent c06a51cad9
commit ee71b91868

View File

@@ -0,0 +1,128 @@
// Worker pool for parallel image processing
import type { Device, PipelineConfig } from '../types';
type PendingJob = {
resolve: (result: { blob: Blob; dataUrl: string }) => void;
reject: (error: Error) => void;
};
type JobPayload = {
imageData: ArrayBuffer;
device: Device;
config: PipelineConfig;
};
class PhotonWorkerPool {
private workers: Worker[] = [];
private availableWorkers: Worker[] = [];
private pendingJobs: Map<string, PendingJob> = new Map();
private jobQueue: Array<{ id: string; payload: JobPayload }> = [];
private initialized = false;
private initPromise: Promise<void> | null = null;
private poolSize: number;
constructor(poolSize = navigator.hardwareConcurrency || 4) {
this.poolSize = Math.max(2, Math.min(poolSize, 6));
}
private async init(): Promise<void> {
if (this.initialized) return;
if (this.initPromise) return this.initPromise;
this.initPromise = (async () => {
const WorkerConstructor = (await import('./photonWorker?worker')).default;
for (let i = 0; i < this.poolSize; i++) {
const worker = new WorkerConstructor();
worker.onmessage = (e: MessageEvent) => this.handleMessage(e, worker);
worker.onerror = (e: ErrorEvent) => this.handleError(e, worker);
this.workers.push(worker);
this.availableWorkers.push(worker);
}
this.initialized = true;
})();
return this.initPromise;
}
private handleMessage(e: MessageEvent, worker: Worker): void {
const { id, success, result, error } = e.data;
const pending = this.pendingJobs.get(id);
if (pending) {
this.pendingJobs.delete(id);
if (success) {
const blob = new Blob([result.blobData], { type: result.blobType });
pending.resolve({ blob, dataUrl: result.dataUrl });
} else {
pending.reject(new Error(error));
}
}
this.availableWorkers.push(worker);
this.processQueue();
}
private handleError(e: ErrorEvent, worker: Worker): void {
console.error('Worker error:', e);
this.availableWorkers.push(worker);
this.processQueue();
}
private processQueue(): void {
while (this.jobQueue.length > 0 && this.availableWorkers.length > 0) {
const job = this.jobQueue.shift()!;
const worker = this.availableWorkers.pop()!;
worker.postMessage({ id: job.id, type: 'process', payload: job.payload }, [
job.payload.imageData
]);
}
}
async process(
file: File,
device: Device,
config: PipelineConfig
): Promise<{ blob: Blob; dataUrl: string }> {
await this.init();
const id = crypto.randomUUID();
const imageData = await file.arrayBuffer();
// Strip Svelte proxies before sending to worker
const plainDevice = JSON.parse(JSON.stringify(device));
const plainConfig = JSON.parse(JSON.stringify(config));
return new Promise((resolve, reject) => {
this.pendingJobs.set(id, { resolve, reject });
const payload: JobPayload = {
imageData,
device: plainDevice,
config: plainConfig
};
if (this.availableWorkers.length > 0) {
const worker = this.availableWorkers.pop()!;
worker.postMessage({ id, type: 'process', payload }, [imageData]);
} else {
this.jobQueue.push({ id, payload });
}
});
}
terminate(): void {
for (const worker of this.workers) {
worker.terminate();
}
this.workers = [];
this.availableWorkers = [];
this.initialized = false;
this.initPromise = null;
}
}
export const workerPool = new PhotonWorkerPool();