feat: get web worker mechanism working (pass file around)
This commit is contained in:
parent
29b169cce6
commit
911ee2a2fa
@ -3,7 +3,7 @@ import React, { useId } from 'react';
|
|||||||
import { Box, Text } from '@chakra-ui/react';
|
import { Box, Text } from '@chakra-ui/react';
|
||||||
import { UnlockIcon } from '@chakra-ui/icons';
|
import { UnlockIcon } from '@chakra-ui/icons';
|
||||||
import { useAppDispatch } from './hooks';
|
import { useAppDispatch } from './hooks';
|
||||||
import { addNewFile } from './features/file-listing/fileListingSlice';
|
import { addNewFile, processFile } from './features/file-listing/fileListingSlice';
|
||||||
import { nanoid } from 'nanoid';
|
import { nanoid } from 'nanoid';
|
||||||
|
|
||||||
export function SelectFile() {
|
export function SelectFile() {
|
||||||
@ -15,13 +15,16 @@ export function SelectFile() {
|
|||||||
for (const file of e.target.files) {
|
for (const file of e.target.files) {
|
||||||
const blobURI = URL.createObjectURL(file);
|
const blobURI = URL.createObjectURL(file);
|
||||||
const fileName = file.name;
|
const fileName = file.name;
|
||||||
|
const fileId = 'file://' + nanoid();
|
||||||
|
// FIXME: this should be a single action/thunk that first adds the item, then updates it.
|
||||||
dispatch(
|
dispatch(
|
||||||
addNewFile({
|
addNewFile({
|
||||||
id: nanoid(),
|
id: fileId,
|
||||||
blobURI,
|
blobURI,
|
||||||
fileName,
|
fileName,
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
dispatch(processFile(fileId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1 +1,18 @@
|
|||||||
|
import { ConcurrentQueue } from '../util/ConcurrentQueue';
|
||||||
|
import { WorkerClientBus } from '../util/WorkerEventBus';
|
||||||
|
import { DECRYPTION_WORKER_ACTION_NAME } from './constants';
|
||||||
|
|
||||||
|
// TODO: Worker pool?
|
||||||
export const workerClient = new Worker(new URL('./worker', import.meta.url), { type: 'module' });
|
export const workerClient = new Worker(new URL('./worker', import.meta.url), { type: 'module' });
|
||||||
|
|
||||||
|
class DecryptionQueue extends ConcurrentQueue<{ id: string; blobURI: string }> {
|
||||||
|
constructor(private workerClientBus: WorkerClientBus, maxQueue?: number) {
|
||||||
|
super(maxQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
async handler(item: { id: string; blobURI: string }): Promise<void> {
|
||||||
|
return this.workerClientBus.request(DECRYPTION_WORKER_ACTION_NAME.DECRYPT, item.blobURI);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const decryptionQueue = new DecryptionQueue(new WorkerClientBus(workerClient));
|
||||||
|
3
src/decrypt-worker/constants.ts
Normal file
3
src/decrypt-worker/constants.ts
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
export enum DECRYPTION_WORKER_ACTION_NAME {
|
||||||
|
DECRYPT = 'DECRYPT',
|
||||||
|
}
|
@ -1,3 +1,12 @@
|
|||||||
onmessage = (e) => {
|
import { WorkerServerBus } from '../util/WorkerEventBus';
|
||||||
console.log(e.data);
|
import { DECRYPTION_WORKER_ACTION_NAME } from './constants';
|
||||||
};
|
|
||||||
|
const bus = new WorkerServerBus();
|
||||||
|
onmessage = bus.onmessage;
|
||||||
|
|
||||||
|
bus.addEventHandler(DECRYPTION_WORKER_ACTION_NAME.DECRYPT, async (blobURI) => {
|
||||||
|
const blob = await fetch(blobURI).then((r) => r.arrayBuffer());
|
||||||
|
// TODO: Implement decryptor for blob received here.
|
||||||
|
console.log(blob);
|
||||||
|
return { hello: true };
|
||||||
|
});
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
import { createSlice } from '@reduxjs/toolkit';
|
import { createSlice, createAsyncThunk } from '@reduxjs/toolkit';
|
||||||
import type { PayloadAction } from '@reduxjs/toolkit';
|
import type { PayloadAction } from '@reduxjs/toolkit';
|
||||||
import type { RootState } from '../../store';
|
import type { RootState } from '../../store';
|
||||||
|
import { decryptionQueue } from '../../decrypt-worker/client';
|
||||||
|
|
||||||
export enum ProcessState {
|
export enum ProcessState {
|
||||||
UNTOUCHED = 'UNTOUCHED',
|
UNTOUCHED = 'UNTOUCHED',
|
||||||
@ -39,6 +40,15 @@ const initialState: FileListingState = {
|
|||||||
displayMode: ListingMode.LIST,
|
displayMode: ListingMode.LIST,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const processFile = createAsyncThunk('fileListing/processFile', async (fileId: string, thunkAPI) => {
|
||||||
|
const file = selectFiles(thunkAPI.getState() as RootState)[fileId];
|
||||||
|
if (!file) {
|
||||||
|
return thunkAPI.rejectWithValue('ERROR: File not found');
|
||||||
|
}
|
||||||
|
|
||||||
|
return decryptionQueue.add({ id: fileId, blobURI: file.raw });
|
||||||
|
});
|
||||||
|
|
||||||
export const fileListingSlice = createSlice({
|
export const fileListingSlice = createSlice({
|
||||||
name: 'fileListing',
|
name: 'fileListing',
|
||||||
initialState,
|
initialState,
|
||||||
|
40
src/util/ConcurrentQueue.ts
Normal file
40
src/util/ConcurrentQueue.ts
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
export abstract class ConcurrentQueue<T> {
|
||||||
|
protected items: [T, (result: any) => void, (error: Error) => void][] = [];
|
||||||
|
protected currentlyWorking = 0;
|
||||||
|
|
||||||
|
constructor(protected maxQueue = 5) {}
|
||||||
|
|
||||||
|
abstract handler(item: T): Promise<void>;
|
||||||
|
|
||||||
|
public async add<R = never>(item: T): Promise<R> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this.items.push([item, resolve, reject]);
|
||||||
|
this.runWorkerIfFree();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private runWorkerIfFree() {
|
||||||
|
if (this.currentlyWorking < this.maxQueue) {
|
||||||
|
this.currentlyWorking++;
|
||||||
|
this.processQueue()
|
||||||
|
.catch((e) => {
|
||||||
|
console.error('process queue with error', e);
|
||||||
|
})
|
||||||
|
.finally(() => {
|
||||||
|
this.currentlyWorking--;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async processQueue() {
|
||||||
|
while (true) {
|
||||||
|
const item = this.items.pop();
|
||||||
|
if (item === undefined) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const [payload, resolve, reject] = item;
|
||||||
|
await this.handler(payload).then(resolve).catch(reject);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
62
src/util/WorkerEventBus.ts
Normal file
62
src/util/WorkerEventBus.ts
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
import { nanoid } from 'nanoid';
|
||||||
|
|
||||||
|
export class WorkerClientBus {
|
||||||
|
private idPromiseMap = new Map<string, [(data: any) => void, (error: Error) => void]>();
|
||||||
|
|
||||||
|
constructor(private worker: Worker) {
|
||||||
|
worker.addEventListener('message', (e) => {
|
||||||
|
const { id, result, error } = e.data;
|
||||||
|
const actionPromise = this.idPromiseMap.get(id);
|
||||||
|
if (!actionPromise) {
|
||||||
|
console.error('cound not fetch worker promise for action: %s', id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.idPromiseMap.delete(id);
|
||||||
|
|
||||||
|
const [resolve, reject] = actionPromise;
|
||||||
|
if (error) {
|
||||||
|
reject(error);
|
||||||
|
} else {
|
||||||
|
resolve(result);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async request<R = any, P = any>(actionName: string, payload: P): Promise<R> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const id = nanoid();
|
||||||
|
this.idPromiseMap.set(id, [resolve, reject]);
|
||||||
|
this.worker.postMessage({
|
||||||
|
id,
|
||||||
|
action: actionName,
|
||||||
|
payload,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class WorkerServerBus {
|
||||||
|
private handlers = new Map<string, (payload: any) => Promise<any>>();
|
||||||
|
|
||||||
|
addEventHandler<R = any, P = any>(actionName: string, handler: (payload: P) => Promise<R>) {
|
||||||
|
this.handlers.set(actionName, handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
onmessage = async (e: MessageEvent<any>) => {
|
||||||
|
const { id, action, payload } = e.data;
|
||||||
|
const handler = this.handlers.get(action);
|
||||||
|
if (!handler) {
|
||||||
|
postMessage({ id, result: null, error: new Error('Handler missing for action ' + action) });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let result = undefined;
|
||||||
|
let error = undefined;
|
||||||
|
try {
|
||||||
|
result = await handler(payload);
|
||||||
|
} catch (e: unknown) {
|
||||||
|
error = e;
|
||||||
|
}
|
||||||
|
postMessage({ id, result, error });
|
||||||
|
};
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user