From 6b8731eea6b3f337d169750e663cfdfe8fb92fca Mon Sep 17 00:00:00 2001 From: Jixun Wu Date: Mon, 8 May 2023 17:36:10 +0100 Subject: [PATCH] feat: get web worker mechanism working (pass file around) --- src/SelectFile.tsx | 7 ++- src/decrypt-worker/client.ts | 17 +++++ src/decrypt-worker/constants.ts | 3 + src/decrypt-worker/worker.ts | 15 ++++- src/features/file-listing/fileListingSlice.ts | 12 +++- src/util/ConcurrentQueue.ts | 40 ++++++++++++ src/util/WorkerEventBus.ts | 62 +++++++++++++++++++ 7 files changed, 150 insertions(+), 6 deletions(-) create mode 100644 src/decrypt-worker/constants.ts create mode 100644 src/util/ConcurrentQueue.ts create mode 100644 src/util/WorkerEventBus.ts diff --git a/src/SelectFile.tsx b/src/SelectFile.tsx index 19d6f7a..fca2afc 100644 --- a/src/SelectFile.tsx +++ b/src/SelectFile.tsx @@ -3,7 +3,7 @@ import React, { useId } from 'react'; import { Box, Text } from '@chakra-ui/react'; import { UnlockIcon } from '@chakra-ui/icons'; import { useAppDispatch } from './hooks'; -import { addNewFile } from './features/file-listing/fileListingSlice'; +import { addNewFile, processFile } from './features/file-listing/fileListingSlice'; import { nanoid } from 'nanoid'; export function SelectFile() { @@ -15,13 +15,16 @@ export function SelectFile() { for (const file of e.target.files) { const blobURI = URL.createObjectURL(file); const fileName = file.name; + const fileId = 'file://' + nanoid(); + // FIXME: this should be a single action/thunk that first adds the item, then updates it. dispatch( addNewFile({ - id: nanoid(), + id: fileId, blobURI, fileName, }) ); + dispatch(processFile(fileId)); } } diff --git a/src/decrypt-worker/client.ts b/src/decrypt-worker/client.ts index f966b7b..a81f2b2 100644 --- a/src/decrypt-worker/client.ts +++ b/src/decrypt-worker/client.ts @@ -1 +1,18 @@ +import { ConcurrentQueue } from '../util/ConcurrentQueue'; +import { WorkerClientBus } from '../util/WorkerEventBus'; +import { DECRYPTION_WORKER_ACTION_NAME } from './constants'; + +// TODO: Worker pool? export const workerClient = new Worker(new URL('./worker', import.meta.url), { type: 'module' }); + +class DecryptionQueue extends ConcurrentQueue<{ id: string; blobURI: string }> { + constructor(private workerClientBus: WorkerClientBus, maxQueue?: number) { + super(maxQueue); + } + + async handler(item: { id: string; blobURI: string }): Promise { + return this.workerClientBus.request(DECRYPTION_WORKER_ACTION_NAME.DECRYPT, item.blobURI); + } +} + +export const decryptionQueue = new DecryptionQueue(new WorkerClientBus(workerClient)); diff --git a/src/decrypt-worker/constants.ts b/src/decrypt-worker/constants.ts new file mode 100644 index 0000000..0cbfbdf --- /dev/null +++ b/src/decrypt-worker/constants.ts @@ -0,0 +1,3 @@ +export enum DECRYPTION_WORKER_ACTION_NAME { + DECRYPT = 'DECRYPT', +} diff --git a/src/decrypt-worker/worker.ts b/src/decrypt-worker/worker.ts index 3304a7a..629973e 100644 --- a/src/decrypt-worker/worker.ts +++ b/src/decrypt-worker/worker.ts @@ -1,3 +1,12 @@ -onmessage = (e) => { - console.log(e.data); -}; +import { WorkerServerBus } from '../util/WorkerEventBus'; +import { DECRYPTION_WORKER_ACTION_NAME } from './constants'; + +const bus = new WorkerServerBus(); +onmessage = bus.onmessage; + +bus.addEventHandler(DECRYPTION_WORKER_ACTION_NAME.DECRYPT, async (blobURI) => { + const blob = await fetch(blobURI).then((r) => r.arrayBuffer()); + // TODO: Implement decryptor for blob received here. + console.log(blob); + return { hello: true }; +}); diff --git a/src/features/file-listing/fileListingSlice.ts b/src/features/file-listing/fileListingSlice.ts index adb4cb0..1118182 100644 --- a/src/features/file-listing/fileListingSlice.ts +++ b/src/features/file-listing/fileListingSlice.ts @@ -1,6 +1,7 @@ -import { createSlice } from '@reduxjs/toolkit'; +import { createSlice, createAsyncThunk } from '@reduxjs/toolkit'; import type { PayloadAction } from '@reduxjs/toolkit'; import type { RootState } from '../../store'; +import { decryptionQueue } from '../../decrypt-worker/client'; export enum ProcessState { UNTOUCHED = 'UNTOUCHED', @@ -39,6 +40,15 @@ const initialState: FileListingState = { displayMode: ListingMode.LIST, }; +export const processFile = createAsyncThunk('fileListing/processFile', async (fileId: string, thunkAPI) => { + const file = selectFiles(thunkAPI.getState() as RootState)[fileId]; + if (!file) { + return thunkAPI.rejectWithValue('ERROR: File not found'); + } + + return decryptionQueue.add({ id: fileId, blobURI: file.raw }); +}); + export const fileListingSlice = createSlice({ name: 'fileListing', initialState, diff --git a/src/util/ConcurrentQueue.ts b/src/util/ConcurrentQueue.ts new file mode 100644 index 0000000..784e586 --- /dev/null +++ b/src/util/ConcurrentQueue.ts @@ -0,0 +1,40 @@ +export abstract class ConcurrentQueue { + protected items: [T, (result: any) => void, (error: Error) => void][] = []; + protected currentlyWorking = 0; + + constructor(protected maxQueue = 5) {} + + abstract handler(item: T): Promise; + + public async add(item: T): Promise { + return new Promise((resolve, reject) => { + this.items.push([item, resolve, reject]); + this.runWorkerIfFree(); + }); + } + + private runWorkerIfFree() { + if (this.currentlyWorking < this.maxQueue) { + this.currentlyWorking++; + this.processQueue() + .catch((e) => { + console.error('process queue with error', e); + }) + .finally(() => { + this.currentlyWorking--; + }); + } + } + + private async processQueue() { + while (true) { + const item = this.items.pop(); + if (item === undefined) { + break; + } + + const [payload, resolve, reject] = item; + await this.handler(payload).then(resolve).catch(reject); + } + } +} diff --git a/src/util/WorkerEventBus.ts b/src/util/WorkerEventBus.ts new file mode 100644 index 0000000..cab16b5 --- /dev/null +++ b/src/util/WorkerEventBus.ts @@ -0,0 +1,62 @@ +import { nanoid } from 'nanoid'; + +export class WorkerClientBus { + private idPromiseMap = new Map void, (error: Error) => void]>(); + + constructor(private worker: Worker) { + worker.addEventListener('message', (e) => { + const { id, result, error } = e.data; + const actionPromise = this.idPromiseMap.get(id); + if (!actionPromise) { + console.error('cound not fetch worker promise for action: %s', id); + return; + } + this.idPromiseMap.delete(id); + + const [resolve, reject] = actionPromise; + if (error) { + reject(error); + } else { + resolve(result); + } + }); + } + + async request(actionName: string, payload: P): Promise { + return new Promise((resolve, reject) => { + const id = nanoid(); + this.idPromiseMap.set(id, [resolve, reject]); + this.worker.postMessage({ + id, + action: actionName, + payload, + }); + }); + } +} + +export class WorkerServerBus { + private handlers = new Map Promise>(); + + addEventHandler(actionName: string, handler: (payload: P) => Promise) { + this.handlers.set(actionName, handler); + } + + onmessage = async (e: MessageEvent) => { + const { id, action, payload } = e.data; + const handler = this.handlers.get(action); + if (!handler) { + postMessage({ id, result: null, error: new Error('Handler missing for action ' + action) }); + return; + } + + let result = undefined; + let error = undefined; + try { + result = await handler(payload); + } catch (e: unknown) { + error = e; + } + postMessage({ id, result, error }); + }; +}