This commit is contained in:
Manav Rathi 2024-05-13 15:37:35 +05:30
parent b82507f74c
commit 1b8b840ecf
No known key found for this signature in database

View file

@ -10,6 +10,7 @@ import { ReadableStream } from "node:stream/web";
import { pathToFileURL } from "node:url";
import log from "./log";
import { ensure } from "./utils/common";
import { deleteTempFile } from "./utils/temp";
/**
* Register a protocol handler that we use for streaming large files between the
@ -34,125 +35,117 @@ import { ensure } from "./utils/common";
* Depends on {@link registerPrivilegedSchemes}.
*/
export const registerStreamProtocol = () => {
protocol.handle("stream", async (request: Request) => {
const url = request.url;
// The request URL contains the command to run as the host, and the
// pathname of the file(s) as the search params.
const { host, searchParams } = new URL(url);
switch (host) {
case "read":
return handleRead(ensure(searchParams.get("path")));
case "read-zip":
return handleReadZip(
ensure(searchParams.get("zipPath")),
ensure(searchParams.get("entryName")),
);
case "write":
return handleWrite(ensure(searchParams.get("path")), request);
case "convert-to-mp4":
return handleConvertToMP4(searchParams.get("token"), request);
default:
return new Response("", { status: 404 });
protocol.handle("stream", (request: Request) => {
try {
return handleStreamRequest(request);
} catch (e) {
log.error(`Failed to handle stream request for ${request.url}`, e);
return new Response(String(e), { status: 500 });
}
});
};
const handleRead = async (path: string) => {
try {
const res = await net.fetch(pathToFileURL(path).toString());
if (res.ok) {
// net.fetch already seems to add "Content-Type" and "Last-Modified"
// headers, but I couldn't find documentation for this. In any case,
// since we already are stat-ting the file for the "Content-Length",
// we explicitly add the "X-Last-Modified-Ms" too,
//
// 1. Guaranteeing its presence,
//
// 2. Having it be in the exact format we want (no string <-> date
// conversions),
//
// 3. Retaining milliseconds.
const handleStreamRequest = async (request: Request): Promise<Response> => {
const url = request.url;
// The request URL contains the command to run as the host, and the
// pathname of the file(s) as the search params.
const { host, searchParams } = new URL(url);
switch (host) {
case "read":
return handleRead(ensure(searchParams.get("path")));
const stat = await fs.stat(path);
case "read-zip":
return handleReadZip(
ensure(searchParams.get("zipPath")),
ensure(searchParams.get("entryName")),
);
// Add the file's size as the Content-Length header.
const fileSize = stat.size;
res.headers.set("Content-Length", `${fileSize}`);
case "write":
return handleWrite(ensure(searchParams.get("path")), request);
// Add the file's last modified time (as epoch milliseconds).
const mtimeMs = stat.mtimeMs;
res.headers.set("X-Last-Modified-Ms", `${mtimeMs}`);
case "convert-to-mp4": {
const token = searchParams.get("token");
const done = searchParams.get("done") !== null;
return token
? done
? handleConvertToMP4ReadDone(token)
: handleConvertToMP4Read(token)
: handleConvertToMP4Write(request);
}
return res;
} catch (e) {
log.error(`Failed to read stream at ${path}`, e);
return new Response(`Failed to read stream: ${String(e)}`, {
status: 500,
});
default:
return new Response("", { status: 404 });
}
};
const handleRead = async (path: string) => {
const res = await net.fetch(pathToFileURL(path).toString());
if (res.ok) {
// net.fetch already seems to add "Content-Type" and "Last-Modified"
// headers, but I couldn't find documentation for this. In any case,
// since we already are stat-ting the file for the "Content-Length", we
// explicitly add the "X-Last-Modified-Ms" too,
//
// 1. Guaranteeing its presence,
//
// 2. Having it be in the exact format we want (no string <-> date
// conversions),
//
// 3. Retaining milliseconds.
const stat = await fs.stat(path);
// Add the file's size as the Content-Length header.
const fileSize = stat.size;
res.headers.set("Content-Length", `${fileSize}`);
// Add the file's last modified time (as epoch milliseconds).
const mtimeMs = stat.mtimeMs;
res.headers.set("X-Last-Modified-Ms", `${mtimeMs}`);
}
return res;
};
const handleReadZip = async (zipPath: string, entryName: string) => {
try {
const zip = new StreamZip.async({ file: zipPath });
const entry = await zip.entry(entryName);
if (!entry) return new Response("", { status: 404 });
const zip = new StreamZip.async({ file: zipPath });
const entry = await zip.entry(entryName);
if (!entry) return new Response("", { status: 404 });
// This returns an "old style" NodeJS.ReadableStream.
const stream = await zip.stream(entry);
// Convert it into a new style NodeJS.Readable.
const nodeReadable = new Readable().wrap(stream);
// Then convert it into a Web stream.
const webReadableStreamAny = Readable.toWeb(nodeReadable);
// However, we get a ReadableStream<any> now. This doesn't go into the
// `BodyInit` expected by the Response constructor, which wants a
// ReadableStream<Uint8Array>. Force a cast.
const webReadableStream =
webReadableStreamAny as ReadableStream<Uint8Array>;
// This returns an "old style" NodeJS.ReadableStream.
const stream = await zip.stream(entry);
// Convert it into a new style NodeJS.Readable.
const nodeReadable = new Readable().wrap(stream);
// Then convert it into a Web stream.
const webReadableStreamAny = Readable.toWeb(nodeReadable);
// However, we get a ReadableStream<any> now. This doesn't go into the
// `BodyInit` expected by the Response constructor, which wants a
// ReadableStream<Uint8Array>. Force a cast.
const webReadableStream =
webReadableStreamAny as ReadableStream<Uint8Array>;
// Close the zip handle when the underlying stream closes.
stream.on("end", () => void zip.close());
// Close the zip handle when the underlying stream closes.
stream.on("end", () => void zip.close());
return new Response(webReadableStream, {
headers: {
// We don't know the exact type, but it doesn't really matter,
// just set it to a generic binary content-type so that the
// browser doesn't tinker with it thinking of it as text.
"Content-Type": "application/octet-stream",
"Content-Length": `${entry.size}`,
// While it is documented that entry.time is the modification
// time, the units are not mentioned. By seeing the source code,
// we can verify that it is indeed epoch milliseconds. See
// `parseZipTime` in the node-stream-zip source,
// https://github.com/antelle/node-stream-zip/blob/master/node_stream_zip.js
"X-Last-Modified-Ms": `${entry.time}`,
},
});
} catch (e) {
log.error(
`Failed to read entry ${entryName} from zip file at ${zipPath}`,
e,
);
return new Response(`Failed to read stream: ${String(e)}`, {
status: 500,
});
}
return new Response(webReadableStream, {
headers: {
// We don't know the exact type, but it doesn't really matter, just
// set it to a generic binary content-type so that the browser
// doesn't tinker with it thinking of it as text.
"Content-Type": "application/octet-stream",
"Content-Length": `${entry.size}`,
// While it is documented that entry.time is the modification time,
// the units are not mentioned. By seeing the source code, we can
// verify that it is indeed epoch milliseconds. See `parseZipTime`
// in the node-stream-zip source,
// https://github.com/antelle/node-stream-zip/blob/master/node_stream_zip.js
"X-Last-Modified-Ms": `${entry.time}`,
},
});
};
const handleWrite = async (path: string, request: Request) => {
try {
await writeStream(path, ensure(request.body));
return new Response("", { status: 200 });
} catch (e) {
log.error(`Failed to write stream to ${path}`, e);
return new Response(`Failed to write stream: ${String(e)}`, {
status: 500,
});
}
await writeStream(path, ensure(request.body));
return new Response("", { status: 200 });
};
/**
@ -188,6 +181,18 @@ const writeNodeStream = async (filePath: string, fileStream: Readable) => {
});
};
/**
* A map from token to file paths for convert-to-mp4 requests that we have
* received.
*/
const convertToMP4Results = new Map<string, string>();
/**
* Clear any in-memory state for in-flight convert-to-mp4 requests. Meant to be
* called during logout.
*/
export const clearConvertToMP4Results = () => convertToMP4Results.clear();
/**
* [Note: Convert to MP4]
*
@ -208,6 +213,9 @@ const writeNodeStream = async (filePath: string, fileStream: Readable) => {
* renderer main stream://convert-to-mp4?token=<token>
* response.body is the converted video
*
* renderer main stream://convert-to-mp4?token=<token>&done
* 200 OK
*
* Note that the conversion itself is not streaming. The conversion still
* happens in a single shot, we are just streaming the data across the IPC
* boundary to allow us to pass large amounts of data without running out of
@ -215,29 +223,38 @@ const writeNodeStream = async (filePath: string, fileStream: Readable) => {
*
* See also: [Note: IPC streams]
*/
const handleConvertToMP4 = (token: string | undefined, request: Request) => {
// try {
// if (token) {
// } else {
// await writeStream(path, ensure(request.body));
// return new Response("", { status: 200 });
// }
// } catch (e) {
// log.error("Failed to handle convert-to-mp4 stream", e);
// return new Response(`Failed to write stream: ${String(e)}`, {
// status: 500,
// });
// }
const handleConvertToMP4Write = (request: Request) => {
/*
try {
const inputTempFilePath = await makeTempFilePath();
await writeStream(path, ensure(request.body));
return new Response("", { status: 200 });
}
} catch (e) {
log.error("Failed to handle convert-to-mp4 stream", e);
return new Response(`Failed to write stream: ${String(e)}`, {
status: 500,
});
}*/
};
/**
* A map from token to file paths for convert-to-mp4 requests that we have
* received.
*/
const convertToMP4Results = new Map<string, string>();
const handleConvertToMP4Read = async (token: string) => {
const filePath = convertToMP4Results.get(token);
if (!filePath)
return new Response(`Unknown token ${token}`, { status: 404 });
/**
* Clear any in-memory state for in-flight convert-to-mp4 requests. Meant to be
* called during logout.
*/
export const clearConvertToMP4Results = () => convertToMP4Results.clear();
return net.fetch(pathToFileURL(filePath).toString());
};
const handleConvertToMP4ReadDone = async (token: string) => {
const filePath = convertToMP4Results.get(token);
if (!filePath)
return new Response(`Unknown token ${token}`, { status: 404 });
await deleteTempFile(filePath);
convertToMP4Results.delete(token);
return new Response("", { status: 200 });
};