From 0881212e4fc57d44c867ecaf69b696c18d437117 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Mon, 15 Apr 2024 15:40:50 +0530 Subject: [PATCH] Rearrange --- desktop/src/main.ts | 54 ++++-------- desktop/src/main/fs.ts | 67 +------------- desktop/src/main/services/ffmpeg.ts | 2 +- desktop/src/main/services/imageProcessor.ts | 2 +- desktop/src/main/services/ml-clip.ts | 2 +- desktop/src/main/services/ml.ts | 2 +- desktop/src/main/stream.ts | 97 +++++++++++++++++++++ 7 files changed, 118 insertions(+), 108 deletions(-) create mode 100644 desktop/src/main/stream.ts diff --git a/desktop/src/main.ts b/desktop/src/main.ts index a8c94aa30..8526e2363 100644 --- a/desktop/src/main.ts +++ b/desktop/src/main.ts @@ -27,6 +27,7 @@ import { setupAutoUpdater } from "./main/services/app-update"; import autoLauncher from "./main/services/autoLauncher"; import { initWatcher } from "./main/services/chokidar"; import { userPreferences } from "./main/stores/user-preferences"; +import { registerStreamProtocol } from "./main/stream"; import { isDev } from "./main/util"; /** @@ -91,24 +92,26 @@ const setupRendererServer = () => serveNextAt(rendererURL); /** * Register privileged schemes. * - * We have two priviliged schemes: + * We have two privileged schemes: * * 1. "ente", used for serving our web app (@see {@link setupRendererServer}). * * 2. "stream", used for streaming IPC (@see {@link registerStreamProtocol}). * * Both of these need some privileges, however, the documentation for Electron's - * registerSchemesAsPrivileged says: + * [registerSchemesAsPrivileged](https://www.electronjs.org/docs/latest/api/protocol) + * says: * * > This method ... can be called only once. - * > - * > https://www.electronjs.org/docs/latest/api/protocol#protocolisprotocolregisteredscheme-deprecated * - * The library we use for the "ente" scheme already calls it once when we invoke - * {@link setupRendererServer}. Luckily, in practice it seems that the last call - * wins, and we don't need to modify the next-electron-server to prevent it from - * calling registerSchemesAsPrivileged. Instead, both schemes get registered - * fine, but we do need to repeat what next-electron-server had done. + * The library we use for the "ente" scheme, next-electron-server, already calls + * it once when we invoke {@link setupRendererServer}. + * + * In practice calling it multiple times just causes the values to be + * overwritten, and the last call wins. So we don't need to modify + * next-electron-server to prevent it from calling registerSchemesAsPrivileged. + * Instead, we (a) repeat what next-electron-server had done here, and (b) + * ensure that we're called after {@link setupRendererServer}. */ const registerPrivilegedSchemes = () => { protocol.registerSchemesAsPrivileged([ @@ -264,32 +267,6 @@ const setupTrayItem = (mainWindow: BrowserWindow) => { tray.setContextMenu(createTrayContextMenu(mainWindow)); }; -/** - * Register a protocol handler that we use for streaming large files between the - * main process (node) and the renderer process (browser) layer. - * - * [Note: IPC streams] - * - * When running without node integration, there is no direct way to pass streams - * across IPC. And passing the entire contents of the file is not feasible for - * large video files because of the memory pressure the copying would entail. - * - * As an alternative, we register a custom protocol handler that can provided a - * bi-directional stream. The renderer can stream data to the node side by - * streaming the request. The node side can stream to the renderer side by - * streaming the response. - * - * See also: [Note: Transferring large amount of data over IPC] - * - * Depends on: {@link registerPrivilegedSchemes} - */ -const registerStreamProtocol = () => { - protocol.handle("stream", (request: Request) => { - log.info({ e: "Got incoming stream", request }); - return new Response("", { status: 200 }); - }); -}; - /** * Older versions of our app used to maintain a cache dir using the main * process. This has been deprecated in favor of using a normal web cache. @@ -349,16 +326,15 @@ const main = () => { // Note that some Electron APIs can only be used after this event occurs. app.on("ready", async () => { mainWindow = await createMainWindow(); - const watcher = initWatcher(mainWindow); - setupTrayItem(mainWindow); Menu.setApplicationMenu(await createApplicationMenu(mainWindow)); + setupTrayItem(mainWindow); attachIPCHandlers(); - attachFSWatchIPCHandlers(watcher); + attachFSWatchIPCHandlers(initWatcher(mainWindow)); + registerStreamProtocol(); if (!isDev) setupAutoUpdater(mainWindow); handleDownloads(mainWindow); handleExternalLinks(mainWindow); addAllowOriginHeader(mainWindow); - registerStreamProtocol(); try { deleteLegacyDiskCacheDirIfExists(); diff --git a/desktop/src/main/fs.ts b/desktop/src/main/fs.ts index d8cfe7714..a870a7ab5 100644 --- a/desktop/src/main/fs.ts +++ b/desktop/src/main/fs.ts @@ -1,9 +1,9 @@ /** * @file file system related functions exposed over the context bridge. */ -import { createWriteStream, existsSync } from "node:fs"; +import { existsSync } from "node:fs"; import fs from "node:fs/promises"; -import { Readable } from "node:stream"; +import { writeStream } from "./stream"; export const fsExists = (path: string) => existsSync(path); @@ -23,69 +23,6 @@ export const fsReadTextFile = async (filePath: string) => export const fsWriteFile = (path: string, contents: string) => fs.writeFile(path, contents); -/** - * Write a (web) ReadableStream to a file at the given {@link filePath}. - * - * The returned promise resolves when the write completes. - * - * @param filePath The local filesystem path where the file should be written. - * @param readableStream A [web - * ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) - */ -export const writeStream = (filePath: string, readableStream: ReadableStream) => - writeNodeStream(filePath, convertWebReadableStreamToNode(readableStream)); - -/** - * Convert a Web ReadableStream into a Node.js ReadableStream - * - * This can be used to, for example, write a ReadableStream obtained via - * `net.fetch` into a file using the Node.js `fs` APIs - */ -const convertWebReadableStreamToNode = (readableStream: ReadableStream) => { - const reader = readableStream.getReader(); - const rs = new Readable(); - - rs._read = async () => { - try { - const result = await reader.read(); - - if (!result.done) { - rs.push(Buffer.from(result.value)); - } else { - rs.push(null); - return; - } - } catch (e) { - rs.emit("error", e); - } - }; - - return rs; -}; - -const writeNodeStream = async ( - filePath: string, - fileStream: NodeJS.ReadableStream, -) => { - const writeable = createWriteStream(filePath); - - fileStream.on("error", (error) => { - writeable.destroy(error); // Close the writable stream with an error - }); - - fileStream.pipe(writeable); - - await new Promise((resolve, reject) => { - writeable.on("finish", resolve); - writeable.on("error", async (e: unknown) => { - if (existsSync(filePath)) { - await fs.unlink(filePath); - } - reject(e); - }); - }); -}; - /* TODO: Audit below this */ export const saveStreamToDisk = writeStream; diff --git a/desktop/src/main/services/ffmpeg.ts b/desktop/src/main/services/ffmpeg.ts index e9639a26f..3072d5ee7 100644 --- a/desktop/src/main/services/ffmpeg.ts +++ b/desktop/src/main/services/ffmpeg.ts @@ -2,7 +2,7 @@ import pathToFfmpeg from "ffmpeg-static"; import { existsSync } from "node:fs"; import fs from "node:fs/promises"; import { ElectronFile } from "../../types/ipc"; -import { writeStream } from "../fs"; +import { writeStream } from "../stream"; import log from "../log"; import { generateTempFilePath, getTempDirPath } from "../temp"; import { execAsync } from "../util"; diff --git a/desktop/src/main/services/imageProcessor.ts b/desktop/src/main/services/imageProcessor.ts index 890e0e634..d87fb0c5f 100644 --- a/desktop/src/main/services/imageProcessor.ts +++ b/desktop/src/main/services/imageProcessor.ts @@ -2,7 +2,7 @@ import { existsSync } from "fs"; import fs from "node:fs/promises"; import path from "path"; import { CustomErrors, ElectronFile } from "../../types/ipc"; -import { writeStream } from "../fs"; +import { writeStream } from "../stream"; import log from "../log"; import { isPlatform } from "../platform"; import { generateTempFilePath } from "../temp"; diff --git a/desktop/src/main/services/ml-clip.ts b/desktop/src/main/services/ml-clip.ts index 63fa75148..af8198a3c 100644 --- a/desktop/src/main/services/ml-clip.ts +++ b/desktop/src/main/services/ml-clip.ts @@ -11,7 +11,7 @@ import fs from "node:fs/promises"; import * as ort from "onnxruntime-node"; import Tokenizer from "../../thirdparty/clip-bpe-ts/mod"; import { CustomErrors } from "../../types/ipc"; -import { writeStream } from "../fs"; +import { writeStream } from "../stream"; import log from "../log"; import { generateTempFilePath } from "../temp"; import { deleteTempFile } from "./ffmpeg"; diff --git a/desktop/src/main/services/ml.ts b/desktop/src/main/services/ml.ts index 10402db21..e1d68e2dd 100644 --- a/desktop/src/main/services/ml.ts +++ b/desktop/src/main/services/ml.ts @@ -15,7 +15,7 @@ import { existsSync } from "fs"; import fs from "node:fs/promises"; import path from "node:path"; import * as ort from "onnxruntime-node"; -import { writeStream } from "../fs"; +import { writeStream } from "../stream"; import log from "../log"; /** diff --git a/desktop/src/main/stream.ts b/desktop/src/main/stream.ts new file mode 100644 index 000000000..2edf59937 --- /dev/null +++ b/desktop/src/main/stream.ts @@ -0,0 +1,97 @@ +/** + * @file stream data to-from renderer using a custom protocol handler. + */ +import { protocol } from "electron/main"; +import { createWriteStream, existsSync } from "node:fs"; +import fs from "node:fs/promises"; +import { Readable } from "node:stream"; +import log from "./log"; + +/** + * Register a protocol handler that we use for streaming large files between the + * main process (node) and the renderer process (browser) layer. + * + * [Note: IPC streams] + * + * When running without node integration, there is no direct way to pass streams + * across IPC. And passing the entire contents of the file is not feasible for + * large video files because of the memory pressure the copying would entail. + * + * As an alternative, we register a custom protocol handler that can provided a + * bi-directional stream. The renderer can stream data to the node side by + * streaming the request. The node side can stream to the renderer side by + * streaming the response. + * + * See also: [Note: Transferring large amount of data over IPC] + * + * Depends on {@link registerPrivilegedSchemes}. + */ +export const registerStreamProtocol = () => { + protocol.handle("stream", (request: Request) => { + log.info({ e: "Got incoming stream", request }); + return new Response("", { status: 200 }); + }); +}; + +/** + * Write a (web) ReadableStream to a file at the given {@link filePath}. + * + * The returned promise resolves when the write completes. + * + * @param filePath The local filesystem path where the file should be written. + * @param readableStream A [web + * ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) + */ +export const writeStream = (filePath: string, readableStream: ReadableStream) => + writeNodeStream(filePath, convertWebReadableStreamToNode(readableStream)); + +/** + * Convert a Web ReadableStream into a Node.js ReadableStream + * + * This can be used to, for example, write a ReadableStream obtained via + * `net.fetch` into a file using the Node.js `fs` APIs + */ +const convertWebReadableStreamToNode = (readableStream: ReadableStream) => { + const reader = readableStream.getReader(); + const rs = new Readable(); + + rs._read = async () => { + try { + const result = await reader.read(); + + if (!result.done) { + rs.push(Buffer.from(result.value)); + } else { + rs.push(null); + return; + } + } catch (e) { + rs.emit("error", e); + } + }; + + return rs; +}; + +const writeNodeStream = async ( + filePath: string, + fileStream: NodeJS.ReadableStream, +) => { + const writeable = createWriteStream(filePath); + + fileStream.on("error", (error) => { + writeable.destroy(error); // Close the writable stream with an error + }); + + fileStream.pipe(writeable); + + await new Promise((resolve, reject) => { + writeable.on("finish", resolve); + writeable.on("error", async (e: unknown) => { + if (existsSync(filePath)) { + await fs.unlink(filePath); + } + reject(e); + }); + }); +};