This commit is contained in:
Manav Rathi 2024-05-14 14:56:33 +05:30
parent c5f02a47d4
commit 9c0f1fac92
No known key found for this signature in database
4 changed files with 116 additions and 121 deletions

View file

@ -1,7 +1,13 @@
import { haveWindow } from "@/next/env";
import log from "@/next/log";
import { ComlinkWorker } from "@/next/worker/comlink-worker";
import { APPS } from "@ente/shared/apps/constants";
import ComlinkCryptoWorker from "@ente/shared/crypto";
import ComlinkCryptoWorker, {
getDedicatedCryptoWorker,
} from "@ente/shared/crypto";
import { DedicatedCryptoWorker } from "@ente/shared/crypto/internal/crypto.worker";
import { CustomError, parseUploadErrorCodes } from "@ente/shared/error";
import PQueue from "p-queue";
import downloadManager from "services/download";
import { putEmbedding } from "services/embeddingService";
import { getLocalFiles } from "services/fileService";
@ -11,10 +17,23 @@ import mlIDbStorage, {
ML_SYNC_JOB_CONFIG_NAME,
} from "services/ml/db";
import {
BlurDetectionMethod,
BlurDetectionService,
ClipEmbedding,
ClusteringMethod,
ClusteringService,
Face,
FaceAlignmentMethod,
FaceAlignmentService,
FaceCropMethod,
FaceCropService,
FaceDetection,
FaceDetectionMethod,
FaceDetectionService,
FaceEmbeddingMethod,
FaceEmbeddingService,
Landmark,
MLLibraryData,
MLSearchConfig,
MLSyncConfig,
MLSyncContext,
@ -22,39 +41,18 @@ import {
MLSyncResult,
MlFileData,
} from "services/ml/types";
import { JobConfig } from "types/common/job";
import { EnteFile } from "types/file";
import { isInternalUserForML } from "utils/user";
import FaceService from "./faceService";
import PeopleService from "./peopleService";
import ReaderService from "./readerService";
import { haveWindow } from "@/next/env";
import { ComlinkWorker } from "@/next/worker/comlink-worker";
import { getDedicatedCryptoWorker } from "@ente/shared/crypto";
import { DedicatedCryptoWorker } from "@ente/shared/crypto/internal/crypto.worker";
import PQueue from "p-queue";
import {
BlurDetectionMethod,
BlurDetectionService,
ClusteringMethod,
ClusteringService,
FaceAlignmentMethod,
FaceAlignmentService,
FaceCropMethod,
FaceCropService,
FaceDetectionMethod,
FaceDetectionService,
FaceEmbeddingMethod,
FaceEmbeddingService,
MLLibraryData,
} from "services/ml/types";
import arcfaceAlignmentService from "./arcfaceAlignmentService";
import arcfaceCropService from "./arcfaceCropService";
import dbscanClusteringService from "./dbscanClusteringService";
import FaceService from "./faceService";
import hdbscanClusteringService from "./hdbscanClusteringService";
import laplacianBlurDetectionService from "./laplacianBlurDetectionService";
import type { JobConfig } from "./mlWorkManager";
import mobileFaceNetEmbeddingService from "./mobileFaceNetEmbeddingService";
import PeopleService from "./peopleService";
import ReaderService from "./readerService";
import yoloFaceDetectionService from "./yoloFaceDetectionService";
export const DEFAULT_ML_SYNC_JOB_CONFIG: JobConfig = {

View file

@ -8,10 +8,8 @@ import PQueue from "p-queue";
import { getMLSyncJobConfig } from "services/machineLearning/machineLearningService";
import mlIDbStorage from "services/ml/db";
import { MLSyncResult } from "services/ml/types";
import { JobResult } from "types/common/job";
import { EnteFile } from "types/file";
import { getDedicatedMLWorker } from "utils/comlink/ComlinkMLWorker";
import { SimpleJob } from "utils/common/job";
import { DedicatedMLWorker } from "worker/ml.worker";
import { logQueueStats } from "./machineLearningService";
@ -19,6 +17,98 @@ const LIVE_SYNC_IDLE_DEBOUNCE_SEC = 30;
const LIVE_SYNC_QUEUE_TIMEOUT_SEC = 300;
const LOCAL_FILES_UPDATED_DEBOUNCE_SEC = 30;
export type JobState = "Scheduled" | "Running" | "NotScheduled";
export interface JobConfig {
intervalSec: number;
maxItervalSec: number;
backoffMultiplier: number;
}
export interface JobResult {
shouldBackoff: boolean;
}
export class SimpleJob<R extends JobResult> {
private config: JobConfig;
private runCallback: () => Promise<R>;
private state: JobState;
private stopped: boolean;
private intervalSec: number;
private nextTimeoutId: ReturnType<typeof setTimeout>;
constructor(config: JobConfig, runCallback: () => Promise<R>) {
this.config = config;
this.runCallback = runCallback;
this.state = "NotScheduled";
this.stopped = true;
this.intervalSec = this.config.intervalSec;
}
public resetInterval() {
this.intervalSec = this.config.intervalSec;
}
public start() {
this.stopped = false;
this.resetInterval();
if (this.state !== "Running") {
this.scheduleNext();
} else {
log.info("Job already running, not scheduling");
}
}
private scheduleNext() {
if (this.state === "Scheduled" || this.nextTimeoutId) {
this.clearScheduled();
}
this.nextTimeoutId = setTimeout(
() => this.run(),
this.intervalSec * 1000,
);
this.state = "Scheduled";
log.info("Scheduled next job after: ", this.intervalSec);
}
async run() {
this.nextTimeoutId = undefined;
this.state = "Running";
try {
const jobResult = await this.runCallback();
if (jobResult && jobResult.shouldBackoff) {
this.intervalSec = Math.min(
this.config.maxItervalSec,
this.intervalSec * this.config.backoffMultiplier,
);
} else {
this.resetInterval();
}
log.info("Job completed");
} catch (e) {
console.error("Error while running Job: ", e);
} finally {
this.state = "NotScheduled";
!this.stopped && this.scheduleNext();
}
}
// currently client is responsible to terminate running job
public stop() {
this.stopped = true;
this.clearScheduled();
}
private clearScheduled() {
clearTimeout(this.nextTimeoutId);
this.nextTimeoutId = undefined;
this.state = "NotScheduled";
log.info("Cleared next job");
}
}
export interface MLSyncJobResult extends JobResult {
mlSyncResult: MLSyncResult;
}

View file

@ -1,11 +0,0 @@
export type JobState = "Scheduled" | "Running" | "NotScheduled";
export interface JobConfig {
intervalSec: number;
maxItervalSec: number;
backoffMultiplier: number;
}
export interface JobResult {
shouldBackoff: boolean;
}

View file

@ -1,82 +0,0 @@
import log from "@/next/log";
import { JobConfig, JobResult, JobState } from "types/common/job";
export class SimpleJob<R extends JobResult> {
private config: JobConfig;
private runCallback: () => Promise<R>;
private state: JobState;
private stopped: boolean;
private intervalSec: number;
private nextTimeoutId: ReturnType<typeof setTimeout>;
constructor(config: JobConfig, runCallback: () => Promise<R>) {
this.config = config;
this.runCallback = runCallback;
this.state = "NotScheduled";
this.stopped = true;
this.intervalSec = this.config.intervalSec;
}
public resetInterval() {
this.intervalSec = this.config.intervalSec;
}
public start() {
this.stopped = false;
this.resetInterval();
if (this.state !== "Running") {
this.scheduleNext();
} else {
log.info("Job already running, not scheduling");
}
}
private scheduleNext() {
if (this.state === "Scheduled" || this.nextTimeoutId) {
this.clearScheduled();
}
this.nextTimeoutId = setTimeout(
() => this.run(),
this.intervalSec * 1000,
);
this.state = "Scheduled";
log.info("Scheduled next job after: ", this.intervalSec);
}
async run() {
this.nextTimeoutId = undefined;
this.state = "Running";
try {
const jobResult = await this.runCallback();
if (jobResult && jobResult.shouldBackoff) {
this.intervalSec = Math.min(
this.config.maxItervalSec,
this.intervalSec * this.config.backoffMultiplier,
);
} else {
this.resetInterval();
}
log.info("Job completed");
} catch (e) {
console.error("Error while running Job: ", e);
} finally {
this.state = "NotScheduled";
!this.stopped && this.scheduleNext();
}
}
// currently client is responsible to terminate running job
public stop() {
this.stopped = true;
this.clearScheduled();
}
private clearScheduled() {
clearTimeout(this.nextTimeoutId);
this.nextTimeoutId = undefined;
this.state = "NotScheduled";
log.info("Cleared next job");
}
}