ente/src/services/upload/queueProcessor.ts

72 lines
2 KiB
TypeScript
Raw Normal View History

import { CustomError } from 'utils/common/errorUtil';
2021-08-26 10:22:05 +00:00
interface RequestQueueItem {
2021-08-26 11:00:41 +00:00
request: (canceller?: RequestCanceller) => Promise<any>;
successCallback: (response: any) => void;
failureCallback: (error: Error) => void;
2021-08-26 10:22:05 +00:00
isCanceled: { status: boolean };
canceller: { exec: () => void };
}
2021-09-12 10:26:16 +00:00
export interface RequestCanceller {
2021-08-26 11:00:41 +00:00
exec: () => void;
}
export default class QueueProcessor<T> {
2021-08-26 10:22:05 +00:00
private requestQueue: RequestQueueItem[] = [];
private requestInProcessing = 0;
constructor(private maxParallelProcesses: number) {}
2021-09-12 10:26:16 +00:00
public queueUpRequest(
request: (canceller?: RequestCanceller) => Promise<T>
) {
2021-08-26 10:22:05 +00:00
const isCanceled = { status: false };
const canceller: RequestCanceller = {
exec: () => {
isCanceled.status = true;
},
};
const promise = new Promise<T>((resolve, reject) => {
2021-08-26 10:22:05 +00:00
this.requestQueue.push({
request,
successCallback: resolve,
failureCallback: reject,
2021-08-26 10:22:05 +00:00
isCanceled,
canceller,
});
this.pollQueue();
});
2021-08-26 11:00:41 +00:00
return { promise, canceller };
2021-08-26 10:22:05 +00:00
}
async pollQueue() {
if (this.requestInProcessing < this.maxParallelProcesses) {
this.requestInProcessing++;
await this.processQueue();
this.requestInProcessing--;
}
}
public async processQueue() {
while (this.requestQueue.length > 0) {
const queueItem = this.requestQueue.pop();
let response = null;
2021-08-26 10:22:05 +00:00
if (queueItem.isCanceled.status) {
queueItem.failureCallback(Error(CustomError.REQUEST_CANCELLED));
2021-08-26 10:22:05 +00:00
} else {
try {
response = await queueItem.request(queueItem.canceller);
queueItem.successCallback(response);
} catch (e) {
queueItem.failureCallback(e);
}
2021-08-26 10:22:05 +00:00
}
}
}
}