import 'dart:async'; import 'dart:collection'; import 'dart:convert'; import 'dart:io' as io; import 'dart:math'; import 'dart:typed_data'; import 'package:connectivity/connectivity.dart'; import 'package:dio/dio.dart'; import 'package:flutter_sodium/flutter_sodium.dart'; import 'package:logging/logging.dart'; import 'package:path/path.dart'; import 'package:photos/core/configuration.dart'; import 'package:photos/core/errors.dart'; import 'package:photos/core/event_bus.dart'; import 'package:photos/core/network.dart'; import 'package:photos/db/files_db.dart'; import 'package:photos/db/upload_locks_db.dart'; import 'package:photos/events/files_updated_event.dart'; import 'package:photos/events/local_photos_updated_event.dart'; import 'package:photos/events/subscription_purchased_event.dart'; import 'package:photos/main.dart'; import 'package:photos/models/encryption_result.dart'; import 'package:photos/models/file.dart'; import 'package:photos/models/file_type.dart'; import 'package:photos/models/upload_url.dart'; import 'package:photos/services/collections_service.dart'; import 'package:photos/services/local_sync_service.dart'; import 'package:photos/services/sync_service.dart'; import 'package:photos/utils/crypto_util.dart'; import 'package:photos/utils/file_download_util.dart'; import 'package:photos/utils/file_uploader_util.dart'; import 'package:shared_preferences/shared_preferences.dart'; class FileUploader { static const kMaximumConcurrentUploads = 4; static const kMaximumConcurrentVideoUploads = 2; static const kMaximumThumbnailCompressionAttempts = 2; static const kMaximumUploadAttempts = 4; static const kBlockedUploadsPollFrequency = Duration(seconds: 2); static const kFileUploadTimeout = Duration(minutes: 50); final _logger = Logger("FileUploader"); final _dio = Network.instance.getDio(); final _queue = LinkedHashMap(); final _uploadLocks = UploadLocksDB.instance; final kSafeBufferForLockExpiry = Duration(days: 1).inMicroseconds; final kBGTaskDeathTimeout = Duration(seconds: 5).inMicroseconds; final _uploadURLs = Queue(); // Maintains the count of files in the current upload session. // Upload session is the period between the first entry into the _queue and last entry out of the _queue int _totalCountInUploadSession = 0; // _uploadCounter indicates number of uploads which are currently in progress int _uploadCounter = 0; int _videoUploadCounter = 0; ProcessType _processType; bool _isBackground; SharedPreferences _prefs; FileUploader._privateConstructor() { Bus.instance.on().listen((event) { _uploadURLFetchInProgress = null; }); } static FileUploader instance = FileUploader._privateConstructor(); Future init(bool isBackground) async { _prefs = await SharedPreferences.getInstance(); _isBackground = isBackground; _processType = isBackground ? ProcessType.background : ProcessType.foreground; final currentTime = DateTime.now().microsecondsSinceEpoch; await _uploadLocks.releaseLocksAcquiredByOwnerBefore( _processType.toString(), currentTime, ); await _uploadLocks .releaseAllLocksAcquiredBefore(currentTime - kSafeBufferForLockExpiry); if (!isBackground) { await _prefs.reload(); final isBGTaskDead = (_prefs.getInt(kLastBGTaskHeartBeatTime) ?? 0) < (currentTime - kBGTaskDeathTimeout); if (isBGTaskDead) { await _uploadLocks.releaseLocksAcquiredByOwnerBefore( ProcessType.background.toString(), currentTime, ); _logger.info("BG task was found dead, cleared all locks"); } _pollBackgroundUploadStatus(); } Bus.instance.on().listen((event) { if (event.type == EventType.deletedFromDevice || event.type == EventType.deletedFromEverywhere) { removeFromQueueWhere( (file) { for (final updatedFile in event.updatedFiles) { if (file.generatedID == updatedFile.generatedID) { return true; } } return false; }, InvalidFileError("File already deleted"), ); } }); } Future upload(File file, int collectionID) { // If the file hasn't been queued yet, queue it _totalCountInUploadSession++; if (!_queue.containsKey(file.localID)) { final completer = Completer(); _queue[file.localID] = FileUploadItem(file, collectionID, completer); _pollQueue(); return completer.future; } // If the file exists in the queue for a matching collectionID, // return the existing future final item = _queue[file.localID]; if (item.collectionID == collectionID) { _totalCountInUploadSession--; return item.completer.future; } // Else wait for the existing upload to complete, // and add it to the relevant collection return item.completer.future.then((uploadedFile) { return CollectionsService.instance .addToCollection(collectionID, [uploadedFile]).then((aVoid) { return uploadedFile; }); }); } Future forceUpload(File file, int collectionID) async { _logger.info( "Force uploading " + file.toString() + " into collection " + collectionID.toString(), ); _totalCountInUploadSession++; // If the file hasn't been queued yet, ez. if (!_queue.containsKey(file.localID)) { final completer = Completer(); _queue[file.localID] = FileUploadItem( file, collectionID, completer, status: UploadStatus.in_progress, ); _encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true); return completer.future; } var item = _queue[file.localID]; // If the file is being uploaded right now, wait and proceed if (item.status == UploadStatus.in_progress || item.status == UploadStatus.in_background) { _totalCountInUploadSession--; final uploadedFile = await item.completer.future; if (uploadedFile.collectionID == collectionID) { // Do nothing } else { await CollectionsService.instance .addToCollection(collectionID, [uploadedFile]); } return uploadedFile; } else { // If the file is yet to be processed, // 1. Set the status to in_progress // 2. Force upload the file // 3. Add to the relevant collection item = _queue[file.localID]; item.status = UploadStatus.in_progress; final uploadedFile = await _encryptAndUploadFileToCollection( file, collectionID, forcedUpload: true, ); if (item.collectionID == collectionID) { return uploadedFile; } else { await CollectionsService.instance .addToCollection(item.collectionID, [uploadedFile]); return uploadedFile; } } } int getCurrentSessionUploadCount() { return _totalCountInUploadSession; } void clearQueue(final Error reason) { final List uploadsToBeRemoved = []; _queue.entries .where((entry) => entry.value.status == UploadStatus.not_started) .forEach((pendingUpload) { uploadsToBeRemoved.add(pendingUpload.key); }); for (final id in uploadsToBeRemoved) { _queue.remove(id).completer.completeError(reason); } _totalCountInUploadSession = 0; } void removeFromQueueWhere(final bool Function(File) fn, final Error reason) { List uploadsToBeRemoved = []; _queue.entries .where((entry) => entry.value.status == UploadStatus.not_started) .forEach((pendingUpload) { if (fn(pendingUpload.value.file)) { uploadsToBeRemoved.add(pendingUpload.key); } }); for (final id in uploadsToBeRemoved) { _queue.remove(id).completer.completeError(reason); } _totalCountInUploadSession -= uploadsToBeRemoved.length; } void _pollQueue() { if (SyncService.instance.shouldStopSync()) { clearQueue(SyncStopRequestedError()); } if (_queue.isEmpty) { // Upload session completed _totalCountInUploadSession = 0; return; } if (_uploadCounter < kMaximumConcurrentUploads) { var pendingEntry = _queue.entries .firstWhere( (entry) => entry.value.status == UploadStatus.not_started, orElse: () => null, ) ?.value; if (pendingEntry != null && pendingEntry.file.fileType == FileType.video && _videoUploadCounter >= kMaximumConcurrentVideoUploads) { // check if there's any non-video entry which can be queued for upload pendingEntry = _queue.entries .firstWhere( (entry) => entry.value.status == UploadStatus.not_started && entry.value.file.fileType != FileType.video, orElse: () => null, ) ?.value; } if (pendingEntry != null) { pendingEntry.status = UploadStatus.in_progress; _encryptAndUploadFileToCollection( pendingEntry.file, pendingEntry.collectionID, ); } } } Future _encryptAndUploadFileToCollection( File file, int collectionID, { bool forcedUpload = false, }) async { _uploadCounter++; if (file.fileType == FileType.video) { _videoUploadCounter++; } final localID = file.localID; try { final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload).timeout( kFileUploadTimeout, onTimeout: () { final message = "Upload timed out for file " + file.toString(); _logger.severe(message); throw TimeoutException(message); }, ); _queue.remove(localID).completer.complete(uploadedFile); return uploadedFile; } catch (e) { if (e is LockAlreadyAcquiredError) { _queue[localID].status = UploadStatus.in_background; return _queue[localID].completer.future; } else { _queue.remove(localID).completer.completeError(e); return null; } } finally { _uploadCounter--; if (file.fileType == FileType.video) { _videoUploadCounter--; } _pollQueue(); } } Future _tryToUpload( File file, int collectionID, bool forcedUpload, ) async { final connectivityResult = await (Connectivity().checkConnectivity()); var canUploadUnderCurrentNetworkConditions = (connectivityResult == ConnectivityResult.wifi || Configuration.instance.shouldBackupOverMobileData()); if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) { throw WiFiUnavailableError(); } final fileOnDisk = await FilesDB.instance.getFile(file.generatedID); final wasAlreadyUploaded = fileOnDisk.uploadedFileID != null && fileOnDisk.updationTime != -1 && fileOnDisk.collectionID == collectionID; if (wasAlreadyUploaded) { return fileOnDisk; } try { await _uploadLocks.acquireLock( file.localID, _processType.toString(), DateTime.now().microsecondsSinceEpoch, ); } catch (e) { _logger.warning("Lock was already taken for " + file.toString()); throw LockAlreadyAcquiredError(); } final tempDirectory = Configuration.instance.getTempDirectory(); final encryptedFilePath = tempDirectory + file.generatedID.toString() + (_isBackground ? "_bg" : "") + ".encrypted"; final encryptedThumbnailPath = tempDirectory + file.generatedID.toString() + "_thumbnail" + (_isBackground ? "_bg" : "") + ".encrypted"; MediaUploadData mediaUploadData; try { _logger.info( "Trying to upload " + file.toString() + ", isForced: " + forcedUpload.toString(), ); try { mediaUploadData = await getUploadDataFromEnteFile(file); } catch (e) { if (e is InvalidFileError) { await _onInvalidFileError(file, e); } else { rethrow; } } Uint8List key; bool isUpdatedFile = file.uploadedFileID != null && file.updationTime == -1; if (isUpdatedFile) { _logger.info("File was updated " + file.toString()); key = decryptFileKey(file); } else { key = null; } if (io.File(encryptedFilePath).existsSync()) { await io.File(encryptedFilePath).delete(); } final encryptedFile = io.File(encryptedFilePath); final fileAttributes = await CryptoUtil.encryptFile( mediaUploadData.sourceFile.path, encryptedFilePath, key: key, ); var thumbnailData = mediaUploadData.thumbnail; final encryptedThumbnailData = await CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key); if (io.File(encryptedThumbnailPath).existsSync()) { await io.File(encryptedThumbnailPath).delete(); } final encryptedThumbnailFile = io.File(encryptedThumbnailPath); await encryptedThumbnailFile .writeAsBytes(encryptedThumbnailData.encryptedData); final thumbnailUploadURL = await _getUploadURL(); String thumbnailObjectKey = await _putFile(thumbnailUploadURL, encryptedThumbnailFile); final fileUploadURL = await _getUploadURL(); String fileObjectKey = await _putFile(fileUploadURL, encryptedFile); final metadata = await file.getMetadataForUpload(mediaUploadData.sourceFile); final encryptedMetadataData = await CryptoUtil.encryptChaCha( utf8.encode(jsonEncode(metadata)), fileAttributes.key, ); final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header); final thumbnailDecryptionHeader = Sodium.bin2base64(encryptedThumbnailData.header); final encryptedMetadata = Sodium.bin2base64(encryptedMetadataData.encryptedData); final metadataDecryptionHeader = Sodium.bin2base64(encryptedMetadataData.header); if (SyncService.instance.shouldStopSync()) { throw SyncStopRequestedError(); } File remoteFile; if (isUpdatedFile) { remoteFile = await _updateFile( file, fileObjectKey, fileDecryptionHeader, await encryptedFile.length(), thumbnailObjectKey, thumbnailDecryptionHeader, await encryptedThumbnailFile.length(), encryptedMetadata, metadataDecryptionHeader, ); // Update across all collections await FilesDB.instance.updateUploadedFileAcrossCollections(remoteFile); } else { final encryptedFileKeyData = CryptoUtil.encryptSync( fileAttributes.key, CollectionsService.instance.getCollectionKey(collectionID), ); final encryptedKey = Sodium.bin2base64(encryptedFileKeyData.encryptedData); final keyDecryptionNonce = Sodium.bin2base64(encryptedFileKeyData.nonce); remoteFile = await _uploadFile( file, collectionID, encryptedKey, keyDecryptionNonce, fileAttributes, fileObjectKey, fileDecryptionHeader, await encryptedFile.length(), thumbnailObjectKey, thumbnailDecryptionHeader, await encryptedThumbnailFile.length(), encryptedMetadata, metadataDecryptionHeader, ); if (mediaUploadData.isDeleted) { _logger.info("File found to be deleted"); remoteFile.localID = null; } await FilesDB.instance.update(remoteFile); } if (!_isBackground) { Bus.instance.fire(LocalPhotosUpdatedEvent([remoteFile])); } _logger.info("File upload complete for " + remoteFile.toString()); return remoteFile; } catch (e, s) { if (!(e is NoActiveSubscriptionError || e is StorageLimitExceededError || e is FileTooLargeForPlanError)) { _logger.severe("File upload failed for " + file.toString(), e, s); } rethrow; } finally { if (io.Platform.isIOS && mediaUploadData != null && mediaUploadData.sourceFile != null) { await mediaUploadData.sourceFile.delete(); } if (io.File(encryptedFilePath).existsSync()) { await io.File(encryptedFilePath).delete(); } if (io.File(encryptedThumbnailPath).existsSync()) { await io.File(encryptedThumbnailPath).delete(); } await _uploadLocks.releaseLock(file.localID, _processType.toString()); } } Future _onInvalidFileError(File file, InvalidFileError e) async { String ext = file.title == null ? "no title" : extension(file.title); _logger.severe( "Invalid file: (ext: $ext) encountered: " + file.toString(), e, ); await FilesDB.instance.deleteLocalFile(file); await LocalSyncService.instance.trackInvalidFile(file); throw e; } Future _uploadFile( File file, int collectionID, String encryptedKey, String keyDecryptionNonce, EncryptionResult fileAttributes, String fileObjectKey, String fileDecryptionHeader, int fileSize, String thumbnailObjectKey, String thumbnailDecryptionHeader, int thumbnailSize, String encryptedMetadata, String metadataDecryptionHeader, { int attempt = 1, }) async { final request = { "collectionID": collectionID, "encryptedKey": encryptedKey, "keyDecryptionNonce": keyDecryptionNonce, "file": { "objectKey": fileObjectKey, "decryptionHeader": fileDecryptionHeader, "size": fileSize, }, "thumbnail": { "objectKey": thumbnailObjectKey, "decryptionHeader": thumbnailDecryptionHeader, "size": thumbnailSize, }, "metadata": { "encryptedData": encryptedMetadata, "decryptionHeader": metadataDecryptionHeader, } }; try { final response = await _dio.post( Configuration.instance.getHttpEndpoint() + "/files", options: Options( headers: {"X-Auth-Token": Configuration.instance.getToken()}, ), data: request, ); final data = response.data; file.uploadedFileID = data["id"]; file.collectionID = collectionID; file.updationTime = data["updationTime"]; file.ownerID = data["ownerID"]; file.encryptedKey = encryptedKey; file.keyDecryptionNonce = keyDecryptionNonce; file.fileDecryptionHeader = fileDecryptionHeader; file.thumbnailDecryptionHeader = thumbnailDecryptionHeader; file.metadataDecryptionHeader = metadataDecryptionHeader; return file; } on DioError catch (e) { if (e.response?.statusCode == 413) { throw FileTooLargeForPlanError(); } else if (e.response?.statusCode == 426) { _onStorageLimitExceeded(); } else if (attempt < kMaximumUploadAttempts) { _logger.info("Upload file failed, will retry in 3 seconds"); await Future.delayed(Duration(seconds: 3)); return _uploadFile( file, collectionID, encryptedKey, keyDecryptionNonce, fileAttributes, fileObjectKey, fileDecryptionHeader, fileSize, thumbnailObjectKey, thumbnailDecryptionHeader, thumbnailSize, encryptedMetadata, metadataDecryptionHeader, attempt: attempt + 1, ); } rethrow; } } Future _updateFile( File file, String fileObjectKey, String fileDecryptionHeader, int fileSize, String thumbnailObjectKey, String thumbnailDecryptionHeader, int thumbnailSize, String encryptedMetadata, String metadataDecryptionHeader, { int attempt = 1, }) async { final request = { "id": file.uploadedFileID, "file": { "objectKey": fileObjectKey, "decryptionHeader": fileDecryptionHeader, "size": fileSize, }, "thumbnail": { "objectKey": thumbnailObjectKey, "decryptionHeader": thumbnailDecryptionHeader, "size": thumbnailSize, }, "metadata": { "encryptedData": encryptedMetadata, "decryptionHeader": metadataDecryptionHeader, } }; try { final response = await _dio.put( Configuration.instance.getHttpEndpoint() + "/files/update", options: Options( headers: {"X-Auth-Token": Configuration.instance.getToken()}, ), data: request, ); final data = response.data; file.uploadedFileID = data["id"]; file.updationTime = data["updationTime"]; file.fileDecryptionHeader = fileDecryptionHeader; file.thumbnailDecryptionHeader = thumbnailDecryptionHeader; file.metadataDecryptionHeader = metadataDecryptionHeader; return file; } on DioError catch (e) { if (e.response?.statusCode == 426) { _onStorageLimitExceeded(); } else if (attempt < kMaximumUploadAttempts) { _logger.info("Update file failed, will retry in 3 seconds"); await Future.delayed(Duration(seconds: 3)); return _updateFile( file, fileObjectKey, fileDecryptionHeader, fileSize, thumbnailObjectKey, thumbnailDecryptionHeader, thumbnailSize, encryptedMetadata, metadataDecryptionHeader, attempt: attempt + 1, ); } rethrow; } } Future _getUploadURL() async { if (_uploadURLs.isEmpty) { await _fetchUploadURLs(); } return _uploadURLs.removeFirst(); } Future _uploadURLFetchInProgress; Future _fetchUploadURLs() async { _uploadURLFetchInProgress ??= Future(() async { try { final response = await _dio.get( Configuration.instance.getHttpEndpoint() + "/files/upload-urls", queryParameters: { "count": min(42, 2 * _queue.length), // m4gic number }, options: Options( headers: {"X-Auth-Token": Configuration.instance.getToken()}, ), ); final urls = (response.data["urls"] as List) .map((e) => UploadURL.fromMap(e)) .toList(); _uploadURLs.addAll(urls); } on DioError catch (e, s) { if (e.response != null) { if (e.response.statusCode == 402) { final error = NoActiveSubscriptionError(); clearQueue(error); throw error; } else if (e.response.statusCode == 426) { final error = StorageLimitExceededError(); clearQueue(error); throw error; } else { _logger.severe("Could not fetch upload URLs", e, s); } } rethrow; } finally { _uploadURLFetchInProgress = null; } }); return _uploadURLFetchInProgress; } void _onStorageLimitExceeded() { clearQueue(StorageLimitExceededError()); throw StorageLimitExceededError(); } Future _putFile( UploadURL uploadURL, io.File file, { int contentLength, int attempt = 1, }) async { final fileSize = contentLength ?? await file.length(); _logger.info( "Putting object for " + file.toString() + " of size: " + fileSize.toString(), ); final startTime = DateTime.now().millisecondsSinceEpoch; try { await _dio.put( uploadURL.url, data: file.openRead(), options: Options( headers: { Headers.contentLengthHeader: fileSize, }, ), ); _logger.info( "Upload speed : " + (fileSize / (DateTime.now().millisecondsSinceEpoch - startTime)) .toString() + " kilo bytes per second", ); return uploadURL.objectKey; } on DioError catch (e) { if (e.message.startsWith( "HttpException: Content size exceeds specified contentLength.", ) && attempt == 1) { return _putFile( uploadURL, file, contentLength: (await file.readAsBytes()).length, attempt: 2, ); } else if (attempt < kMaximumUploadAttempts) { final newUploadURL = await _getUploadURL(); return _putFile( newUploadURL, file, contentLength: (await file.readAsBytes()).length, attempt: attempt + 1, ); } else { _logger.info( "Upload failed for file with size " + fileSize.toString(), e, ); rethrow; } } } Future _pollBackgroundUploadStatus() async { final blockedUploads = _queue.entries .where((e) => e.value.status == UploadStatus.in_background) .toList(); for (final upload in blockedUploads) { final file = upload.value.file; final isStillLocked = await _uploadLocks.isLocked( file.localID, ProcessType.background.toString(), ); if (!isStillLocked) { final completer = _queue.remove(upload.key).completer; final dbFile = await FilesDB.instance.getFile(upload.value.file.generatedID); if (dbFile.uploadedFileID != null) { _logger.info("Background upload success detected"); completer.complete(dbFile); } else { _logger.info("Background upload failure detected"); completer.completeError(SilentlyCancelUploadsError()); } } } Future.delayed(kBlockedUploadsPollFrequency, () async { await _pollBackgroundUploadStatus(); }); } } class FileUploadItem { final File file; final int collectionID; final Completer completer; UploadStatus status; FileUploadItem( this.file, this.collectionID, this.completer, { this.status = UploadStatus.not_started, }); } enum UploadStatus { not_started, in_progress, in_background, completed, } enum ProcessType { background, foreground, }