Add a fixed size block transformer

Surprisingly, this is not a primitive. Or maybe I didn't find it.

The highWaterMark-ing didn't work, that seems more of a recommendation than an
enforcement.
This commit is contained in:
Manav Rathi 2024-04-27 16:57:40 +05:30
parent 31a70674ff
commit 536bcf1091
No known key found for this signature in database

View file

@ -501,16 +501,53 @@ const readFileOrPath = async (
const readFileOrPathStream = async (
fileOrPath: File | string,
): Promise<DataStream> => {
const N = ENCRYPTION_CHUNK_SIZE;
let underlyingStream: ReadableStream;
let chunkCount: number;
if (fileOrPath instanceof File) {
return getFileStream(fileOrPath, ENCRYPTION_CHUNK_SIZE);
const file = fileOrPath;
underlyingStream = file.stream();
chunkCount = Math.ceil(file.size / N);
} else {
const { response, size } = await readStream(
ensureElectron(),
fileOrPath,
);
const chunkCount = Math.ceil(size / ENCRYPTION_CHUNK_SIZE);
return { stream: response.body, chunkCount };
const path = fileOrPath;
const { response, size } = await readStream(ensureElectron(), path);
underlyingStream = response.body;
chunkCount = Math.ceil(size / N);
}
// Pipe the underlying stream through a transformer that emits
// ENCRYPTION_CHUNK_SIZE-ed chunks (except the last one, which can be
// smaller).
let pending: Uint8Array | undefined;
const transformer = new TransformStream<Uint8Array, Uint8Array>({
async transform(
chunk: Uint8Array,
controller: TransformStreamDefaultController,
) {
let next: Uint8Array;
if (pending) {
next = new Uint8Array(pending.length + chunk.length);
next.set(pending);
next.set(chunk, pending.length);
pending = undefined;
} else {
next = chunk;
}
while (next.length >= N) {
controller.enqueue(next.slice(0, N));
next = next.slice(N);
}
if (next.length) pending = next;
},
flush(controller: TransformStreamDefaultController) {
if (pending) controller.enqueue(pending);
},
});
const stream = underlyingStream.pipeThrough(transformer);
return { stream, chunkCount };
};
interface ReadAssetDetailsResult {
@ -770,8 +807,6 @@ const computeHash = async (
worker: Remote<DedicatedCryptoWorker>,
) => {
const { stream, chunkCount } = await readFileOrPathStream(fileOrPath);
// TODO(MR): ElectronFile
console.log("got stream and chunks", stream, chunkCount);
const hashState = await worker.initChunkHashing();
const streamReader = stream.getReader();