fix: type safe ConcurrentQueue

This commit is contained in:
鲁树人 2023-05-18 00:04:37 +01:00
parent 880a3c67fe
commit 1e761610dd
2 changed files with 24 additions and 20 deletions

View File

@ -1,12 +1,11 @@
export abstract class ConcurrentQueue<T> { export abstract class ConcurrentQueue<T, R = unknown> {
protected items: [T, (result: any) => void, (error: Error) => void][] = []; protected items: [T, (result: R) => void, (error: unknown) => void][] = [];
protected currentlyWorking = 0;
constructor(protected maxQueue = 5) {} constructor(protected maxConcurrent = 5) {}
abstract handler(item: T): Promise<void>; abstract handler(item: T): Promise<R>;
public async add<R = never>(item: T): Promise<R> { public async add(item: T): Promise<R> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.items.push([item, resolve, reject]); this.items.push([item, resolve, reject]);
this.runWorkerIfFree(); this.runWorkerIfFree();
@ -14,27 +13,32 @@ export abstract class ConcurrentQueue<T> {
} }
private runWorkerIfFree() { private runWorkerIfFree() {
if (this.currentlyWorking < this.maxQueue) { if (this.maxConcurrent > 0) {
this.currentlyWorking++; this.maxConcurrent--;
this.processQueue() this.processQueue()
/* c8 ignore start: imposible to reach */
.catch((e) => { .catch((e) => {
console.error('process queue with error', e); console.error('process queue with error, continue.', e);
}) })
/* c8 ignore stop: imposible to reach */
.finally(() => { .finally(() => {
this.currentlyWorking--; this.maxConcurrent++;
}); });
} }
} }
private async processQueue() { private async processQueue() {
while (this.items.length > 0) { let item: (typeof this.items)[0] | void;
const item = this.items.pop(); while ((item = this.items.shift())) {
if (item === undefined) {
break;
}
const [payload, resolve, reject] = item; const [payload, resolve, reject] = item;
await this.handler(payload).then(resolve).catch(reject);
try {
resolve(await this.handler(payload));
} catch (error: unknown) {
reject(error);
} finally {
await new Promise((resolve) => setImmediate(resolve));
}
} }
} }
} }

View File

@ -1,13 +1,13 @@
import { DECRYPTION_WORKER_ACTION_NAME } from '~/decrypt-worker/constants'; import { DECRYPTION_WORKER_ACTION_NAME, DecryptionResult } from '~/decrypt-worker/constants';
import { ConcurrentQueue } from './ConcurrentQueue'; import { ConcurrentQueue } from './ConcurrentQueue';
import { WorkerClientBus } from './WorkerEventBus'; import { WorkerClientBus } from './WorkerEventBus';
export class DecryptionQueue extends ConcurrentQueue<{ id: string; blobURI: string }> { export class DecryptionQueue extends ConcurrentQueue<{ id: string; blobURI: string }, DecryptionResult> {
constructor(private workerClientBus: WorkerClientBus<DECRYPTION_WORKER_ACTION_NAME>, maxQueue?: number) { constructor(private workerClientBus: WorkerClientBus<DECRYPTION_WORKER_ACTION_NAME>, maxQueue?: number) {
super(maxQueue); super(maxQueue);
} }
async handler(item: { id: string; blobURI: string }): Promise<void> { async handler(item: { id: string; blobURI: string }): Promise<DecryptionResult> {
return this.workerClientBus.request(DECRYPTION_WORKER_ACTION_NAME.DECRYPT, item.blobURI); return this.workerClientBus.request(DECRYPTION_WORKER_ACTION_NAME.DECRYPT, item.blobURI);
} }
} }