ente/lib/utils/file_uploader.dart

560 lines
18 KiB
Dart
Raw Normal View History

2020-11-09 12:28:43 +00:00
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io' as io;
2021-02-13 12:28:35 +00:00
import 'dart:math';
2020-11-16 16:35:16 +00:00
import 'package:connectivity/connectivity.dart';
import 'package:dio/dio.dart';
import 'package:flutter_sodium/flutter_sodium.dart';
import 'package:logging/logging.dart';
import 'package:photos/core/configuration.dart';
import 'package:photos/core/constants.dart';
2021-02-26 09:21:47 +00:00
import 'package:photos/core/errors.dart';
import 'package:photos/core/event_bus.dart';
2020-11-19 18:22:30 +00:00
import 'package:photos/core/network.dart';
2020-11-09 16:10:43 +00:00
import 'package:photos/db/files_db.dart';
import 'package:photos/db/upload_locks_db.dart';
import 'package:photos/events/subscription_purchased_event.dart';
import 'package:photos/models/encryption_result.dart';
import 'package:photos/models/file.dart';
import 'package:photos/models/location.dart';
import 'package:photos/models/upload_url.dart';
import 'package:photos/repositories/file_repository.dart';
import 'package:photos/services/collections_service.dart';
2021-01-13 08:50:14 +00:00
import 'package:photos/services/sync_service.dart';
import 'package:photos/utils/crypto_util.dart';
2020-11-22 18:07:40 +00:00
import 'package:photos/utils/file_util.dart';
class FileUploader {
final _logger = Logger("FileUploader");
2020-11-19 18:22:30 +00:00
final _dio = Network.instance.getDio();
final _queue = LinkedHashMap<String, FileUploadItem>();
final _uploadLocks = UploadLocksDB.instance;
2021-02-14 07:01:57 +00:00
final kMaximumConcurrentUploads = 4;
final kMaximumThumbnailCompressionAttempts = 2;
2021-02-25 15:53:32 +00:00
final kMaximumUploadAttempts = 4;
final kSafeBufferForLockExpiry = Duration(days: 1).inMicroseconds;
2020-11-09 12:28:43 +00:00
int _currentlyUploading = 0;
LockOwner _lockOwner;
2020-11-09 17:43:40 +00:00
final _uploadURLs = Queue<UploadURL>();
FileUploader._privateConstructor() {
Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
_uploadURLFetchInProgress = null;
});
}
2020-10-21 16:20:41 +00:00
static FileUploader instance = FileUploader._privateConstructor();
Future<void> init(bool isBackground) async {
_lockOwner = isBackground
? LockOwner.background_process
: LockOwner.foreground_process;
await _uploadLocks.releaseLocksAcquiredByOwnerBefore(
_lockOwner, DateTime.now().microsecondsSinceEpoch);
await _uploadLocks.releaseAllLocksAcquiredBefore(
DateTime.now().microsecondsSinceEpoch - kSafeBufferForLockExpiry);
}
2020-11-09 16:10:43 +00:00
Future<File> upload(File file, int collectionID) {
// If the file hasn't been queued yet, queue it
if (!_queue.containsKey(file.localID)) {
2020-11-09 16:10:43 +00:00
final completer = Completer<File>();
_queue[file.localID] = FileUploadItem(file, collectionID, completer);
2020-11-09 12:28:43 +00:00
_pollQueue();
2020-11-09 16:10:43 +00:00
return completer.future;
2020-11-09 12:28:43 +00:00
}
2020-11-09 16:10:43 +00:00
// If the file exists in the queue for a matching collectionID,
// return the existing future
final item = _queue[file.localID];
2020-11-09 16:10:43 +00:00
if (item.collectionID == collectionID) {
return item.completer.future;
}
// Else wait for the existing upload to complete,
// and add it to the relevant collection
return item.completer.future.then((uploadedFile) {
return CollectionsService.instance
.addToCollection(collectionID, [uploadedFile]).then((aVoid) {
return uploadedFile;
});
});
}
2020-11-09 16:10:43 +00:00
Future<File> forceUpload(File file, int collectionID) async {
2021-02-26 08:19:35 +00:00
_logger.info("Force uploading " +
file.toString() +
" into collection " +
collectionID.toString());
2020-11-09 16:10:43 +00:00
// If the file hasn't been queued yet, ez.
if (!_queue.containsKey(file.localID)) {
2020-11-19 14:59:01 +00:00
final completer = Completer<File>();
_queue[file.localID] = FileUploadItem(
2020-11-19 14:59:01 +00:00
file,
collectionID,
completer,
status: UploadStatus.in_progress,
);
_encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true);
return completer.future;
2020-11-09 16:10:43 +00:00
}
var item = _queue[file.localID];
2020-11-09 16:10:43 +00:00
// If the file is being uploaded right now, wait and proceed
if (item.status == UploadStatus.in_progress) {
2021-02-26 08:28:01 +00:00
final uploadedFile = await item.completer.future;
if (uploadedFile.collectionID == collectionID) {
// Do nothing
} else {
await CollectionsService.instance
.addToCollection(collectionID, [uploadedFile]);
}
return uploadedFile;
2020-11-09 16:10:43 +00:00
} else {
// If the file is yet to be processed,
2021-02-26 08:16:31 +00:00
// 1. Set the status to in_progress
// 2. Force upload the file
// 3. Add to the relevant collection
item = _queue[file.localID];
2021-02-26 08:16:31 +00:00
item.status = UploadStatus.in_progress;
2021-02-26 08:28:01 +00:00
final uploadedFile = await _encryptAndUploadFileToCollection(
file, collectionID,
forcedUpload: true);
if (item.collectionID == collectionID) {
return uploadedFile;
} else {
await CollectionsService.instance
.addToCollection(item.collectionID, [uploadedFile]);
return uploadedFile;
}
2020-11-09 16:10:43 +00:00
}
}
2021-02-25 16:14:27 +00:00
void clearQueue(final Error reason) {
final uploadsToBeRemoved = List<String>();
_queue.entries
.where((entry) => entry.value.status == UploadStatus.not_started)
.forEach((pendingUpload) {
uploadsToBeRemoved.add(pendingUpload.key);
});
for (final id in uploadsToBeRemoved) {
2021-02-25 16:14:27 +00:00
_queue.remove(id).completer.completeError(reason);
}
}
2020-11-09 12:28:43 +00:00
void _pollQueue() {
2021-01-13 08:50:14 +00:00
if (SyncService.instance.shouldStopSync()) {
2021-02-25 16:14:27 +00:00
clearQueue(SyncStopRequestedError());
2021-01-13 08:50:14 +00:00
}
2021-02-14 07:01:57 +00:00
if (_queue.length > 0 && _currentlyUploading < kMaximumConcurrentUploads) {
2020-11-09 12:28:43 +00:00
final firstPendingEntry = _queue.entries
2020-11-16 16:35:16 +00:00
.firstWhere((entry) => entry.value.status == UploadStatus.not_started,
orElse: () => null)
?.value;
if (firstPendingEntry != null) {
firstPendingEntry.status = UploadStatus.in_progress;
_encryptAndUploadFileToCollection(
firstPendingEntry.file, firstPendingEntry.collectionID);
}
2020-11-09 12:28:43 +00:00
}
}
2020-11-09 16:10:43 +00:00
Future<File> _encryptAndUploadFileToCollection(File file, int collectionID,
2020-11-09 12:28:43 +00:00
{bool forcedUpload = false}) async {
2020-11-19 14:59:01 +00:00
_currentlyUploading++;
try {
final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload);
_queue.remove(file.localID).completer.complete(uploadedFile);
2021-02-26 08:16:31 +00:00
return uploadedFile;
} catch (e) {
_queue.remove(file.localID).completer.completeError(e);
2021-02-26 08:16:31 +00:00
return null;
} finally {
2020-11-19 14:59:01 +00:00
_currentlyUploading--;
_pollQueue();
}
}
Future<File> _tryToUpload(
File file, int collectionID, bool forcedUpload) async {
2020-11-16 16:35:16 +00:00
final connectivityResult = await (Connectivity().checkConnectivity());
2020-11-18 16:02:32 +00:00
var canUploadUnderCurrentNetworkConditions =
(connectivityResult == ConnectivityResult.wifi ||
Configuration.instance.shouldBackupOverMobileData());
if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) {
2020-11-16 16:35:16 +00:00
throw WiFiUnavailableError();
}
try {
await _uploadLocks.acquireLock(
file.localID,
_lockOwner,
DateTime.now().microsecondsSinceEpoch,
);
} catch (e) {
_logger.warning("Lock was already taken for " + file.toString());
throw LockAlreadyAcquiredError();
}
final tempDirectory = Configuration.instance.getTempDirectory();
final encryptedFilePath =
tempDirectory + file.generatedID.toString() + ".encrypted";
final encryptedThumbnailPath =
tempDirectory + file.generatedID.toString() + "_thumbnail.encrypted";
var sourceFile;
try {
2021-02-26 08:32:16 +00:00
_logger.info("Trying to upload " +
file.toString() +
", isForced: " +
forcedUpload.toString());
2021-02-26 08:56:20 +00:00
final asset = await file.getAsset();
if (asset == null) {
2021-02-26 09:04:02 +00:00
await _onInvalidFileError(file);
2021-02-26 08:56:20 +00:00
}
sourceFile = (await asset.originFile);
var key;
var isAlreadyUploadedFile = file.uploadedFileID != null;
if (isAlreadyUploadedFile) {
key = decryptFileKey(file);
} else {
key = null;
}
if (io.File(encryptedFilePath).existsSync()) {
io.File(encryptedFilePath).deleteSync();
}
final encryptedFile = io.File(encryptedFilePath);
final fileAttributes = await CryptoUtil.encryptFile(
sourceFile.path,
encryptedFilePath,
key: key,
);
2021-02-26 08:56:20 +00:00
var thumbnailData = await asset.thumbDataWithSize(
THUMBNAIL_LARGE_SIZE,
THUMBNAIL_LARGE_SIZE,
quality: 50,
2021-02-26 08:56:20 +00:00
);
if (thumbnailData == null) {
2021-02-26 09:04:02 +00:00
await _onInvalidFileError(file);
}
2021-02-14 07:01:57 +00:00
int compressionAttempts = 0;
while (thumbnailData.length > THUMBNAIL_DATA_LIMIT &&
compressionAttempts < kMaximumThumbnailCompressionAttempts) {
_logger.info("Thumbnail size " + thumbnailData.length.toString());
thumbnailData = await compressThumbnail(thumbnailData);
_logger.info(
"Compressed thumbnail size " + thumbnailData.length.toString());
2021-02-14 07:01:57 +00:00
compressionAttempts++;
}
2020-11-22 18:07:40 +00:00
final encryptedThumbnailData =
CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
if (io.File(encryptedThumbnailPath).existsSync()) {
io.File(encryptedThumbnailPath).deleteSync();
}
final encryptedThumbnailFile = io.File(encryptedThumbnailPath);
encryptedThumbnailFile
.writeAsBytesSync(encryptedThumbnailData.encryptedData);
final thumbnailUploadURL = await _getUploadURL();
String thumbnailObjectKey =
await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
2021-02-25 16:32:49 +00:00
final fileUploadURL = await _getUploadURL();
String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
// h4ck to fetch location data if missing (thank you Android Q+) lazily only during uploads
2021-01-19 08:30:48 +00:00
if (file.location == null ||
(file.location.latitude == 0 && file.location.longitude == 0)) {
final latLong = await (await file.getAsset()).latlngAsync();
file.location = Location(latLong.latitude, latLong.longitude);
}
final encryptedMetadataData = CryptoUtil.encryptChaCha(
utf8.encode(jsonEncode(file.getMetadata())), fileAttributes.key);
final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
final thumbnailDecryptionHeader =
Sodium.bin2base64(encryptedThumbnailData.header);
final encryptedMetadata =
Sodium.bin2base64(encryptedMetadataData.encryptedData);
final metadataDecryptionHeader =
Sodium.bin2base64(encryptedMetadataData.header);
2021-03-03 16:04:45 +00:00
var remoteFile;
if (isAlreadyUploadedFile) {
2021-03-03 16:04:45 +00:00
remoteFile = await _updateFile(
file,
fileObjectKey,
fileDecryptionHeader,
thumbnailObjectKey,
thumbnailDecryptionHeader,
encryptedMetadata,
metadataDecryptionHeader,
);
// Update across all collections
2021-03-03 16:04:45 +00:00
await FilesDB.instance.updateUploadedFileAcrossCollections(remoteFile);
FileRepository.instance.reloadFiles();
} else {
2021-03-03 16:04:45 +00:00
remoteFile = await _uploadFile(
file,
collectionID,
fileAttributes,
fileObjectKey,
fileDecryptionHeader,
thumbnailObjectKey,
thumbnailDecryptionHeader,
encryptedMetadata,
metadataDecryptionHeader,
);
2021-03-03 16:04:45 +00:00
await FilesDB.instance.update(remoteFile);
FileRepository.instance.reloadFiles();
}
2021-03-03 16:04:45 +00:00
_logger.info("File upload complete for " + remoteFile.toString());
return remoteFile;
} catch (e, s) {
2021-02-26 08:30:50 +00:00
if (!(e is NoActiveSubscriptionError || e is StorageLimitExceededError)) {
2021-02-26 08:31:28 +00:00
_logger.severe("File upload failed for " + file.toString(), e, s);
2021-02-02 16:35:38 +00:00
}
throw e;
} finally {
if (io.Platform.isIOS && sourceFile != null) {
sourceFile.deleteSync();
}
if (io.File(encryptedFilePath).existsSync()) {
io.File(encryptedFilePath).deleteSync();
}
if (io.File(encryptedThumbnailPath).existsSync()) {
io.File(encryptedThumbnailPath).deleteSync();
}
await _uploadLocks.releaseLock(file.localID, _lockOwner);
}
}
2021-02-26 09:04:02 +00:00
Future _onInvalidFileError(File file) async {
_logger.severe("Invalid file encountered: " + file.toString());
await FilesDB.instance.deleteLocalFile(file.localID);
throw InvalidFileError();
}
Future<File> _uploadFile(
File file,
int collectionID,
EncryptionResult fileAttributes,
String fileObjectKey,
String fileDecryptionHeader,
String thumbnailObjectKey,
String thumbnailDecryptionHeader,
String encryptedMetadata,
String metadataDecryptionHeader,
) async {
final encryptedFileKeyData = CryptoUtil.encryptSync(
fileAttributes.key,
CollectionsService.instance.getCollectionKey(collectionID),
);
final encryptedKey = Sodium.bin2base64(encryptedFileKeyData.encryptedData);
final keyDecryptionNonce = Sodium.bin2base64(encryptedFileKeyData.nonce);
final request = {
"collectionID": collectionID,
"encryptedKey": encryptedKey,
"keyDecryptionNonce": keyDecryptionNonce,
"file": {
"objectKey": fileObjectKey,
"decryptionHeader": fileDecryptionHeader,
},
"thumbnail": {
"objectKey": thumbnailObjectKey,
"decryptionHeader": thumbnailDecryptionHeader,
},
"metadata": {
"encryptedData": encryptedMetadata,
"decryptionHeader": metadataDecryptionHeader,
}
};
2021-02-02 19:01:32 +00:00
try {
final response = await _dio.post(
Configuration.instance.getHttpEndpoint() + "/files",
options: Options(
headers: {"X-Auth-Token": Configuration.instance.getToken()}),
data: request,
);
final data = response.data;
file.uploadedFileID = data["id"];
file.collectionID = collectionID;
file.updationTime = data["updationTime"];
file.ownerID = data["ownerID"];
file.encryptedKey = encryptedKey;
file.keyDecryptionNonce = keyDecryptionNonce;
file.fileDecryptionHeader = fileDecryptionHeader;
file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
file.metadataDecryptionHeader = metadataDecryptionHeader;
return file;
} on DioError catch (e) {
2021-03-03 04:58:36 +00:00
if (e.response?.statusCode == 426) {
_onStorageLimitExceeded();
2021-02-02 19:01:32 +00:00
}
throw e;
}
}
Future<File> _updateFile(
File file,
String fileObjectKey,
String fileDecryptionHeader,
String thumbnailObjectKey,
String thumbnailDecryptionHeader,
String encryptedMetadata,
String metadataDecryptionHeader,
) async {
final request = {
"id": file.uploadedFileID,
"file": {
"objectKey": fileObjectKey,
"decryptionHeader": fileDecryptionHeader,
},
"thumbnail": {
"objectKey": thumbnailObjectKey,
"decryptionHeader": thumbnailDecryptionHeader,
},
"metadata": {
"encryptedData": encryptedMetadata,
"decryptionHeader": metadataDecryptionHeader,
}
};
2021-02-02 19:01:32 +00:00
try {
final response = await _dio.post(
Configuration.instance.getHttpEndpoint() + "/files",
options: Options(
headers: {"X-Auth-Token": Configuration.instance.getToken()}),
data: request,
);
final data = response.data;
file.uploadedFileID = data["id"];
file.updationTime = data["updationTime"];
file.fileDecryptionHeader = fileDecryptionHeader;
file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
file.metadataDecryptionHeader = metadataDecryptionHeader;
return file;
} on DioError catch (e) {
2021-03-03 04:58:36 +00:00
if (e.response?.statusCode == 426) {
_onStorageLimitExceeded();
2021-02-02 19:01:32 +00:00
}
throw e;
}
}
2020-11-09 17:43:40 +00:00
Future<UploadURL> _getUploadURL() async {
if (_uploadURLs.isEmpty) {
await _fetchUploadURLs();
}
return _uploadURLs.removeFirst();
}
Future<void> _uploadURLFetchInProgress;
2021-02-02 16:35:38 +00:00
Future<void> _fetchUploadURLs() async {
2020-11-09 17:43:40 +00:00
if (_uploadURLFetchInProgress == null) {
2021-02-02 16:35:38 +00:00
final completer = Completer<void>();
_uploadURLFetchInProgress = completer.future;
try {
final response = await _dio.get(
Configuration.instance.getHttpEndpoint() + "/files/upload-urls",
queryParameters: {
2021-02-13 12:28:35 +00:00
"count": min(42, 2 * _queue.length), // m4gic number
2021-02-02 16:35:38 +00:00
},
options: Options(
headers: {"X-Auth-Token": Configuration.instance.getToken()}),
);
2020-11-09 17:43:40 +00:00
final urls = (response.data["urls"] as List)
.map((e) => UploadURL.fromMap(e))
.toList();
_uploadURLs.addAll(urls);
2021-02-02 16:35:38 +00:00
} on DioError catch (e) {
2021-02-25 16:14:27 +00:00
if (e.response != null) {
if (e.response.statusCode == 402) {
_onExpiredSubscription();
} else if (e.response.statusCode == 426) {
_onStorageLimitExceeded();
}
2021-02-02 16:35:38 +00:00
}
throw e;
}
_uploadURLFetchInProgress = null;
completer.complete();
2020-11-09 17:43:40 +00:00
}
return _uploadURLFetchInProgress;
2020-11-09 12:28:43 +00:00
}
void _onStorageLimitExceeded() {
2021-02-25 16:14:27 +00:00
clearQueue(StorageLimitExceededError());
throw StorageLimitExceededError();
}
void _onExpiredSubscription() {
2021-02-25 16:14:27 +00:00
clearQueue(NoActiveSubscriptionError());
throw NoActiveSubscriptionError();
}
Future<String> _putFile(
UploadURL uploadURL,
io.File file, {
int contentLength,
int attempt = 1,
}) async {
2021-02-13 12:26:37 +00:00
final fileSize = contentLength ?? file.lengthSync();
2020-11-09 12:28:43 +00:00
final startTime = DateTime.now().millisecondsSinceEpoch;
2021-02-13 12:26:37 +00:00
try {
await _dio.put(
uploadURL.url,
2020-11-19 14:59:01 +00:00
data: file.openRead(),
2021-02-13 12:26:37 +00:00
options: Options(
headers: {
Headers.contentLengthHeader: fileSize,
},
),
);
_logger.info("Upload speed : " +
(file.lengthSync() /
(DateTime.now().millisecondsSinceEpoch - startTime))
.toString() +
" kilo bytes per second");
return uploadURL.objectKey;
} on DioError catch (e) {
if (e.message.startsWith(
"HttpException: Content size exceeds specified contentLength.") &&
attempt == 1) {
2021-02-13 12:26:37 +00:00
return _putFile(uploadURL, file,
contentLength: file.readAsBytesSync().length, attempt: 2);
2021-02-25 15:53:32 +00:00
} else if (attempt < kMaximumUploadAttempts) {
_logger.info("Retrying upload that failed ", e);
final newUploadURL = await _getUploadURL();
return _putFile(newUploadURL, file,
contentLength: file.readAsBytesSync().length, attempt: attempt++);
2021-02-13 12:26:37 +00:00
} else {
throw e;
}
}
}
}
2020-11-09 12:28:43 +00:00
class FileUploadItem {
final File file;
2020-11-09 16:10:43 +00:00
final int collectionID;
2020-11-09 12:28:43 +00:00
final Completer<File> completer;
UploadStatus status;
FileUploadItem(
this.file,
2020-11-09 16:10:43 +00:00
this.collectionID,
2020-11-09 12:28:43 +00:00
this.completer, {
this.status = UploadStatus.not_started,
});
}
enum UploadStatus {
not_started,
in_progress,
completed,
}