More logging for queue stats and worker

Temporarily add timeout to livesync queue
This commit is contained in:
Shailesh Pandit 2022-01-16 20:21:24 +05:30
parent 1fa4a8cf66
commit eed5497da8
4 changed files with 25 additions and 10 deletions

View file

@ -278,8 +278,7 @@ class MachineLearningService {
syncContext.syncedFiles.push(outOfSyncfile); syncContext.syncedFiles.push(outOfSyncfile);
} }
); );
syncContext.syncQueue.on('error', (error) => { syncContext.syncQueue.on('error', () => {
console.error('syncQueue: onError: ', error);
syncContext.syncQueue.clear(); syncContext.syncQueue.clear();
}); });
await syncContext.syncQueue.addAll(functions); await syncContext.syncQueue.addAll(functions);

View file

@ -4,6 +4,7 @@ import { eventBus, Events } from 'services/events';
import { File, FILE_TYPE } from 'services/fileService'; import { File, FILE_TYPE } from 'services/fileService';
import { FACE_CROPS_CACHE_NAME, MLSyncConfig } from 'types/machineLearning'; import { FACE_CROPS_CACHE_NAME, MLSyncConfig } from 'types/machineLearning';
import { getToken } from 'utils/common/key'; import { getToken } from 'utils/common/key';
import { logQueueStats } from 'utils/machineLearning';
import { getMLSyncJobConfig } from 'utils/machineLearning/config'; import { getMLSyncJobConfig } from 'utils/machineLearning/config';
import { MLWorkerWithProxy } from 'utils/machineLearning/worker'; import { MLWorkerWithProxy } from 'utils/machineLearning/worker';
import { logError } from 'utils/sentry'; import { logError } from 'utils/sentry';
@ -11,6 +12,7 @@ import mlIDbStorage from 'utils/storage/mlIDbStorage';
import { MLSyncJobResult, MLSyncJob } from './mlSyncJob'; import { MLSyncJobResult, MLSyncJob } from './mlSyncJob';
const LIVE_SYNC_IDLE_DEBOUNCE_SEC = 30; const LIVE_SYNC_IDLE_DEBOUNCE_SEC = 30;
const LIVE_SYNC_QUEUE_TIMEOUT_SEC = 300;
const LOCAL_FILES_UPDATED_DEBOUNCE_SEC = 30; const LOCAL_FILES_UPDATED_DEBOUNCE_SEC = 30;
class MLWorkManager { class MLWorkManager {
@ -21,7 +23,13 @@ class MLWorkManager {
private liveSyncWorker: MLWorkerWithProxy; private liveSyncWorker: MLWorkerWithProxy;
constructor() { constructor() {
this.liveSyncQueue = new PQueue({ concurrency: 1 }); this.liveSyncQueue = new PQueue({
concurrency: 1,
// TODO: temp, remove
timeout: LIVE_SYNC_QUEUE_TIMEOUT_SEC * 1000,
throwOnTimeout: true,
});
logQueueStats(this.liveSyncQueue, 'livesync');
const debouncedLiveSyncIdle = debounce( const debouncedLiveSyncIdle = debounce(
() => this.onLiveSyncIdle(), () => this.onLiveSyncIdle(),
@ -82,9 +90,10 @@ class MLWorkManager {
} }
try { try {
await this.syncLocalFile(arg.enteFile, arg.localFile); await this.syncLocalFile(arg.enteFile, arg.localFile);
} catch (e) { } catch (error) {
// console.error(e); console.error('Error in syncLocalFile', error);
logError(e, 'Failed in ML fileUploaded Handler'); this.liveSyncQueue.clear();
// logError(e, 'Failed in ML fileUploaded Handler');
} }
} }

View file

@ -514,9 +514,13 @@ export function getNearestPointIndex(
} }
export function logQueueStats(queue: PQueue, name: string) { export function logQueueStats(queue: PQueue, name: string) {
queue.on('active', () => { queue.on('active', () =>
console.log( console.log(
`queuestats: ${name}: Working on next item. Size: ${queue.size} Pending: ${queue.pending}` `queuestats: ${name}: Active, Size: ${queue.size} Pending: ${queue.pending}`
); )
}); );
queue.on('idle', () => console.log(`queuestats: ${name}: Idle`));
queue.on('error', (error) =>
console.error(`queuestats: ${name}: Error, `, error)
);
} }

View file

@ -15,6 +15,9 @@ export class MLWorkerWithProxy {
new URL('worker/machineLearning.worker', import.meta.url), new URL('worker/machineLearning.worker', import.meta.url),
{ name: 'ml-worker' } { name: 'ml-worker' }
); );
this.worker.onerror = (errorEvent) => {
console.error('Got error event from worker', errorEvent);
};
console.log('Initiated ml-worker'); console.log('Initiated ml-worker');
const comlink = wrap<typeof DedicatedMLWorker>(this.worker); const comlink = wrap<typeof DedicatedMLWorker>(this.worker);
this.proxy = new comlink(); this.proxy = new comlink();