diff --git a/src/util/ConcurrentQueue.ts b/src/util/ConcurrentQueue.ts index e4d33bd..4ce7771 100644 --- a/src/util/ConcurrentQueue.ts +++ b/src/util/ConcurrentQueue.ts @@ -1,12 +1,11 @@ -export abstract class ConcurrentQueue { - protected items: [T, (result: any) => void, (error: Error) => void][] = []; - protected currentlyWorking = 0; +export abstract class ConcurrentQueue { + protected items: [T, (result: R) => void, (error: unknown) => void][] = []; - constructor(protected maxQueue = 5) {} + constructor(protected maxConcurrent = 5) {} - abstract handler(item: T): Promise; + abstract handler(item: T): Promise; - public async add(item: T): Promise { + public async add(item: T): Promise { return new Promise((resolve, reject) => { this.items.push([item, resolve, reject]); this.runWorkerIfFree(); @@ -14,27 +13,32 @@ export abstract class ConcurrentQueue { } private runWorkerIfFree() { - if (this.currentlyWorking < this.maxQueue) { - this.currentlyWorking++; + if (this.maxConcurrent > 0) { + this.maxConcurrent--; this.processQueue() + /* c8 ignore start: imposible to reach */ .catch((e) => { - console.error('process queue with error', e); + console.error('process queue with error, continue.', e); }) + /* c8 ignore stop: imposible to reach */ .finally(() => { - this.currentlyWorking--; + this.maxConcurrent++; }); } } private async processQueue() { - while (this.items.length > 0) { - const item = this.items.pop(); - if (item === undefined) { - break; - } - + let item: (typeof this.items)[0] | void; + while ((item = this.items.shift())) { const [payload, resolve, reject] = item; - await this.handler(payload).then(resolve).catch(reject); + + try { + resolve(await this.handler(payload)); + } catch (error: unknown) { + reject(error); + } finally { + await new Promise((resolve) => setImmediate(resolve)); + } } } } diff --git a/src/util/DecryptionQueue.ts b/src/util/DecryptionQueue.ts index e111d22..339035f 100644 --- a/src/util/DecryptionQueue.ts +++ b/src/util/DecryptionQueue.ts @@ -1,13 +1,13 @@ -import { DECRYPTION_WORKER_ACTION_NAME } from '~/decrypt-worker/constants'; +import { DECRYPTION_WORKER_ACTION_NAME, DecryptionResult } from '~/decrypt-worker/constants'; import { ConcurrentQueue } from './ConcurrentQueue'; import { WorkerClientBus } from './WorkerEventBus'; -export class DecryptionQueue extends ConcurrentQueue<{ id: string; blobURI: string }> { +export class DecryptionQueue extends ConcurrentQueue<{ id: string; blobURI: string }, DecryptionResult> { constructor(private workerClientBus: WorkerClientBus, maxQueue?: number) { super(maxQueue); } - async handler(item: { id: string; blobURI: string }): Promise { + async handler(item: { id: string; blobURI: string }): Promise { return this.workerClientBus.request(DECRYPTION_WORKER_ACTION_NAME.DECRYPT, item.blobURI); } }