ente/lib/utils/file_uploader.dart

534 lines
18 KiB
Dart
Raw Normal View History

2020-11-09 12:28:43 +00:00
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io' as io;
2021-02-13 12:28:35 +00:00
import 'dart:math';
2020-11-16 16:35:16 +00:00
import 'package:connectivity/connectivity.dart';
import 'package:dio/dio.dart';
import 'package:flutter_sodium/flutter_sodium.dart';
import 'package:logging/logging.dart';
import 'package:photos/core/configuration.dart';
import 'package:photos/core/constants.dart';
import 'package:photos/core/event_bus.dart';
2020-11-19 18:22:30 +00:00
import 'package:photos/core/network.dart';
2020-11-09 16:10:43 +00:00
import 'package:photos/db/files_db.dart';
import 'package:photos/events/subscription_purchased_event.dart';
import 'package:photos/models/encryption_result.dart';
import 'package:photos/models/file.dart';
import 'package:photos/models/location.dart';
import 'package:photos/models/upload_url.dart';
import 'package:photos/repositories/file_repository.dart';
import 'package:photos/services/collections_service.dart';
2021-01-13 08:50:14 +00:00
import 'package:photos/services/sync_service.dart';
import 'package:photos/utils/crypto_util.dart';
2020-11-22 18:07:40 +00:00
import 'package:photos/utils/file_util.dart';
class FileUploader {
final _logger = Logger("FileUploader");
2020-11-19 18:22:30 +00:00
final _dio = Network.instance.getDio();
2020-11-09 12:28:43 +00:00
final _queue = LinkedHashMap<int, FileUploadItem>();
2021-02-14 07:01:57 +00:00
final kMaximumConcurrentUploads = 4;
final kMaximumThumbnailCompressionAttempts = 2;
2021-02-25 15:53:32 +00:00
final kMaximumUploadAttempts = 4;
2020-11-09 12:28:43 +00:00
int _currentlyUploading = 0;
2020-11-09 17:43:40 +00:00
final _uploadURLs = Queue<UploadURL>();
FileUploader._privateConstructor() {
Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
_uploadURLFetchInProgress = null;
});
}
2020-10-21 16:20:41 +00:00
static FileUploader instance = FileUploader._privateConstructor();
2020-11-09 16:10:43 +00:00
Future<File> upload(File file, int collectionID) {
// If the file hasn't been queued yet, queue it
if (!_queue.containsKey(file.generatedID)) {
final completer = Completer<File>();
_queue[file.generatedID] = FileUploadItem(file, collectionID, completer);
2020-11-09 12:28:43 +00:00
_pollQueue();
2020-11-09 16:10:43 +00:00
return completer.future;
2020-11-09 12:28:43 +00:00
}
2020-11-09 16:10:43 +00:00
// If the file exists in the queue for a matching collectionID,
// return the existing future
final item = _queue[file.generatedID];
if (item.collectionID == collectionID) {
return item.completer.future;
}
// Else wait for the existing upload to complete,
// and add it to the relevant collection
return item.completer.future.then((uploadedFile) {
return CollectionsService.instance
.addToCollection(collectionID, [uploadedFile]).then((aVoid) {
return uploadedFile;
});
});
}
2020-11-09 16:10:43 +00:00
Future<File> forceUpload(File file, int collectionID) async {
// If the file hasn't been queued yet, ez.
if (!_queue.containsKey(file.generatedID)) {
2020-11-19 14:59:01 +00:00
final completer = Completer<File>();
_queue[file.generatedID] = FileUploadItem(
file,
collectionID,
completer,
status: UploadStatus.in_progress,
);
_encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true);
return completer.future;
2020-11-09 16:10:43 +00:00
}
var item = _queue[file.generatedID];
// If the file is being uploaded right now, wait and proceed
if (item.status == UploadStatus.in_progress) {
return item.completer.future.then((uploadedFile) async {
if (uploadedFile.collectionID == collectionID) {
// Do nothing
return uploadedFile;
} else {
return CollectionsService.instance
.addToCollection(collectionID, [uploadedFile]).then((aVoid) {
return uploadedFile;
});
}
});
} else {
// If the file is yet to be processed,
// 1. Remove it from the queue,
// 2. Force upload the current file
// 3. Trigger the callback for the original request
item = _queue.remove(file.generatedID);
return _encryptAndUploadFileToCollection(file, collectionID,
forcedUpload: true)
.then((uploadedFile) {
if (item.collectionID == collectionID) {
item.completer.complete(uploadedFile);
return uploadedFile;
} else {
CollectionsService.instance
.addToCollection(item.collectionID, [uploadedFile]).then((aVoid) {
item.completer.complete(uploadedFile);
});
return uploadedFile;
}
});
}
}
void clearQueue() {
final uploadsToBeRemoved = List<int>();
_queue.entries
.where((entry) => entry.value.status == UploadStatus.not_started)
.forEach((pendingUpload) {
uploadsToBeRemoved.add(pendingUpload.key);
});
for (final id in uploadsToBeRemoved) {
_queue.remove(id).completer.completeError(SyncStopRequestedError());
}
}
2020-11-09 12:28:43 +00:00
void _pollQueue() {
2021-01-13 08:50:14 +00:00
if (SyncService.instance.shouldStopSync()) {
clearQueue();
2021-01-13 08:50:14 +00:00
}
2021-02-14 07:01:57 +00:00
if (_queue.length > 0 && _currentlyUploading < kMaximumConcurrentUploads) {
2020-11-09 12:28:43 +00:00
final firstPendingEntry = _queue.entries
2020-11-16 16:35:16 +00:00
.firstWhere((entry) => entry.value.status == UploadStatus.not_started,
orElse: () => null)
?.value;
if (firstPendingEntry != null) {
firstPendingEntry.status = UploadStatus.in_progress;
_encryptAndUploadFileToCollection(
firstPendingEntry.file, firstPendingEntry.collectionID);
}
2020-11-09 12:28:43 +00:00
}
}
2020-11-09 16:10:43 +00:00
Future<File> _encryptAndUploadFileToCollection(File file, int collectionID,
2020-11-09 12:28:43 +00:00
{bool forcedUpload = false}) async {
2020-11-19 14:59:01 +00:00
_currentlyUploading++;
try {
final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload);
2020-11-19 14:59:01 +00:00
_queue.remove(file.generatedID).completer.complete(uploadedFile);
} catch (e) {
2020-11-19 14:59:01 +00:00
_queue.remove(file.generatedID).completer.completeError(e);
} finally {
2020-11-19 14:59:01 +00:00
_currentlyUploading--;
_pollQueue();
}
return null;
}
Future<File> _tryToUpload(
File file, int collectionID, bool forcedUpload) async {
2020-11-16 16:35:16 +00:00
final connectivityResult = await (Connectivity().checkConnectivity());
2020-11-18 16:02:32 +00:00
var canUploadUnderCurrentNetworkConditions =
(connectivityResult == ConnectivityResult.wifi ||
Configuration.instance.shouldBackupOverMobileData());
if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) {
2020-11-16 16:35:16 +00:00
throw WiFiUnavailableError();
}
final tempDirectory = Configuration.instance.getTempDirectory();
final encryptedFilePath =
tempDirectory + file.generatedID.toString() + ".encrypted";
final encryptedThumbnailPath =
tempDirectory + file.generatedID.toString() + "_thumbnail.encrypted";
var sourceFile;
try {
// Placing this in the try-catch block to safe guard against: https://github.com/CaiJingLong/flutter_photo_manager/issues/405
2021-02-18 07:19:13 +00:00
_logger.info("Trying to upload " + file.toString());
sourceFile = (await (await file.getAsset()).originFile);
var key;
var isAlreadyUploadedFile = file.uploadedFileID != null;
if (isAlreadyUploadedFile) {
key = decryptFileKey(file);
} else {
key = null;
}
if (io.File(encryptedFilePath).existsSync()) {
io.File(encryptedFilePath).deleteSync();
}
final encryptedFile = io.File(encryptedFilePath);
final fileAttributes = await CryptoUtil.encryptFile(
sourceFile.path,
encryptedFilePath,
key: key,
);
var thumbnailData = (await (await file.getAsset()).thumbDataWithSize(
THUMBNAIL_LARGE_SIZE,
THUMBNAIL_LARGE_SIZE,
quality: 50,
));
if (thumbnailData == null) {
_logger.severe("Could not generate thumbnail for " + file.toString());
2021-02-06 21:13:51 +00:00
await FilesDB.instance.deleteLocalFile(file.localID);
throw InvalidFileError();
}
2021-02-14 07:01:57 +00:00
int compressionAttempts = 0;
while (thumbnailData.length > THUMBNAIL_DATA_LIMIT &&
compressionAttempts < kMaximumThumbnailCompressionAttempts) {
_logger.info("Thumbnail size " + thumbnailData.length.toString());
thumbnailData = await compressThumbnail(thumbnailData);
_logger.info(
"Compressed thumbnail size " + thumbnailData.length.toString());
2021-02-14 07:01:57 +00:00
compressionAttempts++;
}
2020-11-22 18:07:40 +00:00
final encryptedThumbnailData =
CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
if (io.File(encryptedThumbnailPath).existsSync()) {
io.File(encryptedThumbnailPath).deleteSync();
}
final encryptedThumbnailFile = io.File(encryptedThumbnailPath);
encryptedThumbnailFile
.writeAsBytesSync(encryptedThumbnailData.encryptedData);
final fileUploadURL = await _getUploadURL();
String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
2020-11-16 16:35:16 +00:00
final thumbnailUploadURL = await _getUploadURL();
String thumbnailObjectKey =
await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
// h4ck to fetch location data if missing (thank you Android Q+) lazily only during uploads
2021-01-19 08:30:48 +00:00
if (file.location == null ||
(file.location.latitude == 0 && file.location.longitude == 0)) {
final latLong = await (await file.getAsset()).latlngAsync();
file.location = Location(latLong.latitude, latLong.longitude);
}
final encryptedMetadataData = CryptoUtil.encryptChaCha(
utf8.encode(jsonEncode(file.getMetadata())), fileAttributes.key);
final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
final thumbnailDecryptionHeader =
Sodium.bin2base64(encryptedThumbnailData.header);
final encryptedMetadata =
Sodium.bin2base64(encryptedMetadataData.encryptedData);
final metadataDecryptionHeader =
Sodium.bin2base64(encryptedMetadataData.header);
if (isAlreadyUploadedFile) {
final updatedFile = await _updateFile(
file,
fileObjectKey,
fileDecryptionHeader,
thumbnailObjectKey,
thumbnailDecryptionHeader,
encryptedMetadata,
metadataDecryptionHeader,
);
// Update across all collections
await FilesDB.instance.updateUploadedFileAcrossCollections(updatedFile);
FileRepository.instance.reloadFiles();
return updatedFile;
} else {
final uploadedFile = await _uploadFile(
file,
collectionID,
fileAttributes,
fileObjectKey,
fileDecryptionHeader,
thumbnailObjectKey,
thumbnailDecryptionHeader,
encryptedMetadata,
metadataDecryptionHeader,
);
await FilesDB.instance.update(uploadedFile);
FileRepository.instance.reloadFiles();
return uploadedFile;
}
} catch (e, s) {
2021-02-02 16:35:38 +00:00
if (!(e is NoActiveSubscriptionError)) {
_logger.severe(
"File upload failed for " + file.generatedID.toString(), e, s);
}
throw e;
} finally {
if (io.Platform.isIOS && sourceFile != null) {
sourceFile.deleteSync();
}
if (io.File(encryptedFilePath).existsSync()) {
io.File(encryptedFilePath).deleteSync();
}
if (io.File(encryptedThumbnailPath).existsSync()) {
io.File(encryptedThumbnailPath).deleteSync();
}
}
}
Future<File> _uploadFile(
File file,
int collectionID,
EncryptionResult fileAttributes,
String fileObjectKey,
String fileDecryptionHeader,
String thumbnailObjectKey,
String thumbnailDecryptionHeader,
String encryptedMetadata,
String metadataDecryptionHeader,
) async {
final encryptedFileKeyData = CryptoUtil.encryptSync(
fileAttributes.key,
CollectionsService.instance.getCollectionKey(collectionID),
);
final encryptedKey = Sodium.bin2base64(encryptedFileKeyData.encryptedData);
final keyDecryptionNonce = Sodium.bin2base64(encryptedFileKeyData.nonce);
final request = {
"collectionID": collectionID,
"encryptedKey": encryptedKey,
"keyDecryptionNonce": keyDecryptionNonce,
"file": {
"objectKey": fileObjectKey,
"decryptionHeader": fileDecryptionHeader,
},
"thumbnail": {
"objectKey": thumbnailObjectKey,
"decryptionHeader": thumbnailDecryptionHeader,
},
"metadata": {
"encryptedData": encryptedMetadata,
"decryptionHeader": metadataDecryptionHeader,
}
};
2021-02-02 19:01:32 +00:00
try {
final response = await _dio.post(
Configuration.instance.getHttpEndpoint() + "/files",
options: Options(
headers: {"X-Auth-Token": Configuration.instance.getToken()}),
data: request,
);
final data = response.data;
file.uploadedFileID = data["id"];
file.collectionID = collectionID;
file.updationTime = data["updationTime"];
file.ownerID = data["ownerID"];
file.encryptedKey = encryptedKey;
file.keyDecryptionNonce = keyDecryptionNonce;
file.fileDecryptionHeader = fileDecryptionHeader;
file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
file.metadataDecryptionHeader = metadataDecryptionHeader;
return file;
} on DioError catch (e) {
if (e.response.statusCode == 426) {
_onStorageLimitExceeded();
2021-02-02 19:01:32 +00:00
}
throw e;
}
}
Future<File> _updateFile(
File file,
String fileObjectKey,
String fileDecryptionHeader,
String thumbnailObjectKey,
String thumbnailDecryptionHeader,
String encryptedMetadata,
String metadataDecryptionHeader,
) async {
final request = {
"id": file.uploadedFileID,
"file": {
"objectKey": fileObjectKey,
"decryptionHeader": fileDecryptionHeader,
},
"thumbnail": {
"objectKey": thumbnailObjectKey,
"decryptionHeader": thumbnailDecryptionHeader,
},
"metadata": {
"encryptedData": encryptedMetadata,
"decryptionHeader": metadataDecryptionHeader,
}
};
2021-02-02 19:01:32 +00:00
try {
final response = await _dio.post(
Configuration.instance.getHttpEndpoint() + "/files",
options: Options(
headers: {"X-Auth-Token": Configuration.instance.getToken()}),
data: request,
);
final data = response.data;
file.uploadedFileID = data["id"];
file.updationTime = data["updationTime"];
file.fileDecryptionHeader = fileDecryptionHeader;
file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
file.metadataDecryptionHeader = metadataDecryptionHeader;
return file;
} on DioError catch (e) {
if (e.response.statusCode == 426) {
_onStorageLimitExceeded();
2021-02-02 19:01:32 +00:00
}
throw e;
}
}
2020-11-09 17:43:40 +00:00
Future<UploadURL> _getUploadURL() async {
if (_uploadURLs.isEmpty) {
await _fetchUploadURLs();
}
return _uploadURLs.removeFirst();
}
Future<void> _uploadURLFetchInProgress;
2021-02-02 16:35:38 +00:00
Future<void> _fetchUploadURLs() async {
2020-11-09 17:43:40 +00:00
if (_uploadURLFetchInProgress == null) {
2021-02-02 16:35:38 +00:00
final completer = Completer<void>();
_uploadURLFetchInProgress = completer.future;
try {
final response = await _dio.get(
Configuration.instance.getHttpEndpoint() + "/files/upload-urls",
queryParameters: {
2021-02-13 12:28:35 +00:00
"count": min(42, 2 * _queue.length), // m4gic number
2021-02-02 16:35:38 +00:00
},
options: Options(
headers: {"X-Auth-Token": Configuration.instance.getToken()}),
);
2020-11-09 17:43:40 +00:00
final urls = (response.data["urls"] as List)
.map((e) => UploadURL.fromMap(e))
.toList();
_uploadURLs.addAll(urls);
2021-02-02 16:35:38 +00:00
} on DioError catch (e) {
if (e.response.statusCode == 402) {
_onExpiredSubscription();
} else if (e.response.statusCode == 426) {
_onStorageLimitExceeded();
2021-02-02 16:35:38 +00:00
}
throw e;
}
_uploadURLFetchInProgress = null;
completer.complete();
2020-11-09 17:43:40 +00:00
}
return _uploadURLFetchInProgress;
2020-11-09 12:28:43 +00:00
}
void _onStorageLimitExceeded() {
clearQueue();
throw StorageLimitExceededError();
}
void _onExpiredSubscription() {
clearQueue();
throw NoActiveSubscriptionError();
}
Future<String> _putFile(
UploadURL uploadURL,
io.File file, {
int contentLength,
int attempt = 1,
}) async {
2021-02-13 12:26:37 +00:00
final fileSize = contentLength ?? file.lengthSync();
2020-11-09 12:28:43 +00:00
final startTime = DateTime.now().millisecondsSinceEpoch;
2021-02-13 12:26:37 +00:00
_logger.info(
"Putting file of size " + fileSize.toString() + " to " + uploadURL.url);
try {
await _dio.put(
uploadURL.url,
2020-11-19 14:59:01 +00:00
data: file.openRead(),
2021-02-13 12:26:37 +00:00
options: Options(
headers: {
Headers.contentLengthHeader: fileSize,
},
),
);
_logger.info("Upload speed : " +
(file.lengthSync() /
(DateTime.now().millisecondsSinceEpoch - startTime))
.toString() +
" kilo bytes per second");
return uploadURL.objectKey;
} on DioError catch (e) {
if (e.message.startsWith(
"HttpException: Content size exceeds specified contentLength.") &&
attempt == 1) {
2021-02-13 12:26:37 +00:00
return _putFile(uploadURL, file,
contentLength: file.readAsBytesSync().length, attempt: 2);
2021-02-25 15:53:32 +00:00
} else if (attempt < kMaximumUploadAttempts) {
_logger.info("Retrying upload that failed ", e);
final newUploadURL = await _getUploadURL();
return _putFile(newUploadURL, file,
contentLength: file.readAsBytesSync().length, attempt: attempt++);
2021-02-13 12:26:37 +00:00
} else {
throw e;
}
}
}
}
2020-11-09 12:28:43 +00:00
class FileUploadItem {
final File file;
2020-11-09 16:10:43 +00:00
final int collectionID;
2020-11-09 12:28:43 +00:00
final Completer<File> completer;
UploadStatus status;
FileUploadItem(
this.file,
2020-11-09 16:10:43 +00:00
this.collectionID,
2020-11-09 12:28:43 +00:00
this.completer, {
this.status = UploadStatus.not_started,
});
}
enum UploadStatus {
not_started,
in_progress,
completed,
}
2020-11-16 16:35:16 +00:00
class InvalidFileError extends Error {}
class WiFiUnavailableError extends Error {}
2021-01-13 08:50:14 +00:00
class SyncStopRequestedError extends Error {}
2021-02-02 16:35:38 +00:00
class NoActiveSubscriptionError extends Error {}
2021-02-02 19:01:32 +00:00
class StorageLimitExceededError extends Error {}