[web] Rework the video chunk decryptor stream logic (#1817)
This commit is contained in:
commit
371b8bf9cc
|
@ -11,7 +11,7 @@ import { Remote } from "comlink";
|
||||||
import isElectron from "is-electron";
|
import isElectron from "is-electron";
|
||||||
import * as ffmpeg from "services/ffmpeg";
|
import * as ffmpeg from "services/ffmpeg";
|
||||||
import { EnteFile } from "types/file";
|
import { EnteFile } from "types/file";
|
||||||
import { generateStreamFromArrayBuffer, getRenderableImage } from "utils/file";
|
import { getRenderableImage } from "utils/file";
|
||||||
import { PhotosDownloadClient } from "./clients/photos";
|
import { PhotosDownloadClient } from "./clients/photos";
|
||||||
import { PublicAlbumsDownloadClient } from "./clients/publicAlbums";
|
import { PublicAlbumsDownloadClient } from "./clients/publicAlbums";
|
||||||
|
|
||||||
|
@ -289,7 +289,7 @@ class DownloadManagerImpl {
|
||||||
await this.cryptoWorker.fromB64(file.file.decryptionHeader),
|
await this.cryptoWorker.fromB64(file.file.decryptionHeader),
|
||||||
file.key,
|
file.key,
|
||||||
);
|
);
|
||||||
return generateStreamFromArrayBuffer(decrypted);
|
return new Response(decrypted).body;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e.message === CustomError.PROCESSING_FAILED) {
|
if (e.message === CustomError.PROCESSING_FAILED) {
|
||||||
log.error(
|
log.error(
|
||||||
|
@ -318,81 +318,78 @@ class DownloadManagerImpl {
|
||||||
const contentLength = +res.headers.get("Content-Length") ?? 0;
|
const contentLength = +res.headers.get("Content-Length") ?? 0;
|
||||||
let downloadedBytes = 0;
|
let downloadedBytes = 0;
|
||||||
|
|
||||||
const stream = new ReadableStream({
|
const decryptionHeader = await this.cryptoWorker.fromB64(
|
||||||
start: async (controller) => {
|
file.file.decryptionHeader,
|
||||||
try {
|
);
|
||||||
const decryptionHeader = await this.cryptoWorker.fromB64(
|
const fileKey = await this.cryptoWorker.fromB64(file.key);
|
||||||
file.file.decryptionHeader,
|
const { pullState, decryptionChunkSize } =
|
||||||
);
|
await this.cryptoWorker.initChunkDecryption(
|
||||||
const fileKey = await this.cryptoWorker.fromB64(file.key);
|
decryptionHeader,
|
||||||
const { pullState, decryptionChunkSize } =
|
fileKey,
|
||||||
await this.cryptoWorker.initChunkDecryption(
|
);
|
||||||
decryptionHeader,
|
|
||||||
fileKey,
|
let leftoverBytes = new Uint8Array();
|
||||||
|
|
||||||
|
return new ReadableStream({
|
||||||
|
pull: async (controller) => {
|
||||||
|
// Each time pull is called, we want to enqueue at least once.
|
||||||
|
let didEnqueue = false;
|
||||||
|
do {
|
||||||
|
// done is a boolean and value is an Uint8Array. When done
|
||||||
|
// is true value will be empty.
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
|
||||||
|
let data: Uint8Array;
|
||||||
|
if (done) {
|
||||||
|
data = leftoverBytes;
|
||||||
|
} else {
|
||||||
|
downloadedBytes += value.length;
|
||||||
|
onDownloadProgress({
|
||||||
|
loaded: downloadedBytes,
|
||||||
|
total: contentLength,
|
||||||
|
});
|
||||||
|
|
||||||
|
data = new Uint8Array(
|
||||||
|
leftoverBytes.length + value.length,
|
||||||
);
|
);
|
||||||
|
data.set(new Uint8Array(leftoverBytes), 0);
|
||||||
let data = new Uint8Array();
|
data.set(new Uint8Array(value), leftoverBytes.length);
|
||||||
let more = true;
|
|
||||||
while (more) {
|
|
||||||
more = false;
|
|
||||||
|
|
||||||
// "done" is a Boolean and value a "Uint8Array"
|
|
||||||
const { done, value } = await reader.read();
|
|
||||||
|
|
||||||
// Is there more data to read?
|
|
||||||
if (!done) {
|
|
||||||
downloadedBytes += value.length;
|
|
||||||
onDownloadProgress({
|
|
||||||
loaded: downloadedBytes,
|
|
||||||
total: contentLength,
|
|
||||||
});
|
|
||||||
|
|
||||||
const buffer = new Uint8Array(
|
|
||||||
data.length + value.length,
|
|
||||||
);
|
|
||||||
buffer.set(new Uint8Array(data), 0);
|
|
||||||
buffer.set(new Uint8Array(value), data.length);
|
|
||||||
|
|
||||||
// Note that buffer.length might be a multiple of
|
|
||||||
// decryptionChunkSize. We let these accumulate, and
|
|
||||||
// drain it all with a nested while loop when done.
|
|
||||||
|
|
||||||
if (buffer.length > decryptionChunkSize) {
|
|
||||||
const { decryptedData } =
|
|
||||||
await this.cryptoWorker.decryptFileChunk(
|
|
||||||
buffer.slice(0, decryptionChunkSize),
|
|
||||||
pullState,
|
|
||||||
);
|
|
||||||
controller.enqueue(decryptedData);
|
|
||||||
data = buffer.slice(decryptionChunkSize);
|
|
||||||
} else {
|
|
||||||
data = buffer;
|
|
||||||
}
|
|
||||||
more = true;
|
|
||||||
} else {
|
|
||||||
while (data && data.length) {
|
|
||||||
const { decryptedData } =
|
|
||||||
await this.cryptoWorker.decryptFileChunk(
|
|
||||||
data.slice(0, decryptionChunkSize),
|
|
||||||
pullState,
|
|
||||||
);
|
|
||||||
controller.enqueue(decryptedData);
|
|
||||||
data =
|
|
||||||
data.length > decryptionChunkSize
|
|
||||||
? data.slice(decryptionChunkSize)
|
|
||||||
: undefined;
|
|
||||||
}
|
|
||||||
controller.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (e) {
|
|
||||||
log.error("Failed to process file stream", e);
|
// data.length might be a multiple of decryptionChunkSize,
|
||||||
controller.error(e);
|
// and we might need multiple iterations to drain it all.
|
||||||
}
|
while (data.length >= decryptionChunkSize) {
|
||||||
|
const { decryptedData } =
|
||||||
|
await this.cryptoWorker.decryptFileChunk(
|
||||||
|
data.slice(0, decryptionChunkSize),
|
||||||
|
pullState,
|
||||||
|
);
|
||||||
|
controller.enqueue(decryptedData);
|
||||||
|
didEnqueue = true;
|
||||||
|
data = data.slice(decryptionChunkSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (done) {
|
||||||
|
// Send off the remaining bytes without waiting for a
|
||||||
|
// full chunk, no more bytes are going to come.
|
||||||
|
if (data.length) {
|
||||||
|
const { decryptedData } =
|
||||||
|
await this.cryptoWorker.decryptFileChunk(
|
||||||
|
data,
|
||||||
|
pullState,
|
||||||
|
);
|
||||||
|
controller.enqueue(decryptedData);
|
||||||
|
}
|
||||||
|
// Don't loop again even if we didn't enqueue.
|
||||||
|
didEnqueue = true;
|
||||||
|
controller.close();
|
||||||
|
} else {
|
||||||
|
// Save it for the next pull.
|
||||||
|
leftoverBytes = data;
|
||||||
|
}
|
||||||
|
} while (!didEnqueue);
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
return stream;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
trackDownloadProgress = (fileID: number, fileSize: number) => {
|
trackDownloadProgress = (fileID: number, fileSize: number) => {
|
||||||
|
|
|
@ -29,7 +29,6 @@ import {
|
||||||
getNonEmptyPersonalCollections,
|
getNonEmptyPersonalCollections,
|
||||||
} from "utils/collection";
|
} from "utils/collection";
|
||||||
import {
|
import {
|
||||||
generateStreamFromArrayBuffer,
|
|
||||||
getPersonalFiles,
|
getPersonalFiles,
|
||||||
getUpdatedEXIFFileForDownload,
|
getUpdatedEXIFFileForDownload,
|
||||||
mergeMetadata,
|
mergeMetadata,
|
||||||
|
@ -1026,7 +1025,6 @@ class ExportService {
|
||||||
videoExportName,
|
videoExportName,
|
||||||
);
|
);
|
||||||
|
|
||||||
const imageStream = generateStreamFromArrayBuffer(livePhoto.imageData);
|
|
||||||
await this.saveMetadataFile(
|
await this.saveMetadataFile(
|
||||||
collectionExportPath,
|
collectionExportPath,
|
||||||
imageExportName,
|
imageExportName,
|
||||||
|
@ -1035,10 +1033,9 @@ class ExportService {
|
||||||
await writeStream(
|
await writeStream(
|
||||||
electron,
|
electron,
|
||||||
`${collectionExportPath}/${imageExportName}`,
|
`${collectionExportPath}/${imageExportName}`,
|
||||||
imageStream,
|
new Response(livePhoto.imageData).body,
|
||||||
);
|
);
|
||||||
|
|
||||||
const videoStream = generateStreamFromArrayBuffer(livePhoto.videoData);
|
|
||||||
await this.saveMetadataFile(
|
await this.saveMetadataFile(
|
||||||
collectionExportPath,
|
collectionExportPath,
|
||||||
videoExportName,
|
videoExportName,
|
||||||
|
@ -1048,7 +1045,7 @@ class ExportService {
|
||||||
await writeStream(
|
await writeStream(
|
||||||
electron,
|
electron,
|
||||||
`${collectionExportPath}/${videoExportName}`,
|
`${collectionExportPath}/${videoExportName}`,
|
||||||
videoStream,
|
new Response(livePhoto.videoData).body,
|
||||||
);
|
);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
await fs.rm(`${collectionExportPath}/${imageExportName}`);
|
await fs.rm(`${collectionExportPath}/${imageExportName}`);
|
||||||
|
|
|
@ -262,15 +262,6 @@ export async function decryptFile(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function generateStreamFromArrayBuffer(data: Uint8Array) {
|
|
||||||
return new ReadableStream({
|
|
||||||
async start(controller: ReadableStreamDefaultController) {
|
|
||||||
controller.enqueue(data);
|
|
||||||
controller.close();
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The returned blob.type is filled in, whenever possible, with the MIME type of
|
* The returned blob.type is filled in, whenever possible, with the MIME type of
|
||||||
* the data that we're dealing with.
|
* the data that we're dealing with.
|
||||||
|
@ -649,7 +640,7 @@ async function downloadFileDesktop(
|
||||||
imageFileName,
|
imageFileName,
|
||||||
fs.exists,
|
fs.exists,
|
||||||
);
|
);
|
||||||
const imageStream = generateStreamFromArrayBuffer(imageData);
|
const imageStream = new Response(imageData).body;
|
||||||
await writeStream(
|
await writeStream(
|
||||||
electron,
|
electron,
|
||||||
`${downloadDir}/${imageExportName}`,
|
`${downloadDir}/${imageExportName}`,
|
||||||
|
@ -661,7 +652,7 @@ async function downloadFileDesktop(
|
||||||
videoFileName,
|
videoFileName,
|
||||||
fs.exists,
|
fs.exists,
|
||||||
);
|
);
|
||||||
const videoStream = generateStreamFromArrayBuffer(videoData);
|
const videoStream = new Response(videoData).body;
|
||||||
await writeStream(
|
await writeStream(
|
||||||
electron,
|
electron,
|
||||||
`${downloadDir}/${videoExportName}`,
|
`${downloadDir}/${videoExportName}`,
|
||||||
|
|
|
@ -136,6 +136,10 @@ export const openBlobCache = async (
|
||||||
*
|
*
|
||||||
* new Blob([arrayBuffer, andOrAnyArray, andOrstring])
|
* new Blob([arrayBuffer, andOrAnyArray, andOrstring])
|
||||||
*
|
*
|
||||||
|
* To convert from a Uint8Array/ArrayBuffer/Blob to a ReadableStream
|
||||||
|
*
|
||||||
|
* new Response(array).body
|
||||||
|
*
|
||||||
* Refs:
|
* Refs:
|
||||||
* - https://github.com/yigitunallar/arraybuffer-vs-blob
|
* - https://github.com/yigitunallar/arraybuffer-vs-blob
|
||||||
* - https://stackoverflow.com/questions/11821096/what-is-the-difference-between-an-arraybuffer-and-a-blob
|
* - https://stackoverflow.com/questions/11821096/what-is-the-difference-between-an-arraybuffer-and-a-blob
|
||||||
|
|
Loading…
Reference in a new issue