import 'dart:async'; import 'dart:collection'; import 'dart:convert'; import 'dart:io' as io; import 'dart:math'; import 'package:connectivity/connectivity.dart'; import 'package:dio/dio.dart'; import 'package:flutter_sodium/flutter_sodium.dart'; import 'package:logging/logging.dart'; import 'package:photos/core/configuration.dart'; import 'package:photos/core/constants.dart'; import 'package:photos/core/errors.dart'; import 'package:photos/core/event_bus.dart'; import 'package:photos/core/network.dart'; import 'package:photos/db/files_db.dart'; import 'package:photos/db/upload_locks_db.dart'; import 'package:photos/events/local_photos_updated_event.dart'; import 'package:photos/events/subscription_purchased_event.dart'; import 'package:photos/main.dart'; import 'package:photos/models/encryption_result.dart'; import 'package:photos/models/file.dart'; import 'package:photos/models/location.dart'; import 'package:photos/models/upload_url.dart'; import 'package:photos/services/collections_service.dart'; import 'package:photos/services/local_sync_service.dart'; import 'package:photos/services/sync_service.dart'; import 'package:photos/utils/crypto_util.dart'; import 'package:photos/utils/file_util.dart'; import 'package:shared_preferences/shared_preferences.dart'; class FileUploader { static const kMaximumConcurrentUploads = 4; static const kMaximumThumbnailCompressionAttempts = 2; static const kMaximumUploadAttempts = 4; static const kBlockedUploadsPollFrequency = Duration(seconds: 2); final _logger = Logger("FileUploader"); final _dio = Network.instance.getDio(); final _queue = LinkedHashMap(); final _uploadLocks = UploadLocksDB.instance; final kSafeBufferForLockExpiry = Duration(days: 1).inMicroseconds; final kBGTaskDeathTimeout = Duration(seconds: 5).inMicroseconds; final _uploadURLs = Queue(); int _currentlyUploading = 0; ProcessType _processType; bool _isBackground; SharedPreferences _prefs; FileUploader._privateConstructor() { Bus.instance.on().listen((event) { _uploadURLFetchInProgress = null; }); } static FileUploader instance = FileUploader._privateConstructor(); Future init(bool isBackground) async { _prefs = await SharedPreferences.getInstance(); _isBackground = isBackground; _processType = isBackground ? ProcessType.background : ProcessType.foreground; final currentTime = DateTime.now().microsecondsSinceEpoch; await _uploadLocks.releaseLocksAcquiredByOwnerBefore( _processType.toString(), currentTime); await _uploadLocks .releaseAllLocksAcquiredBefore(currentTime - kSafeBufferForLockExpiry); if (!isBackground) { await _prefs.reload(); final isBGTaskDead = (_prefs.getInt(kLastBGTaskHeartBeatTime) ?? 0) < (currentTime - kBGTaskDeathTimeout); if (isBGTaskDead) { await _uploadLocks.releaseLocksAcquiredByOwnerBefore( ProcessType.background.toString(), currentTime); _logger.info("BG task was found dead, cleared all locks"); } _pollBackgroundUploadStatus(); } } Future upload(File file, int collectionID) { // If the file hasn't been queued yet, queue it if (!_queue.containsKey(file.localID)) { final completer = Completer(); _queue[file.localID] = FileUploadItem(file, collectionID, completer); _pollQueue(); return completer.future; } // If the file exists in the queue for a matching collectionID, // return the existing future final item = _queue[file.localID]; if (item.collectionID == collectionID) { return item.completer.future; } // Else wait for the existing upload to complete, // and add it to the relevant collection return item.completer.future.then((uploadedFile) { return CollectionsService.instance .addToCollection(collectionID, [uploadedFile]).then((aVoid) { return uploadedFile; }); }); } Future forceUpload(File file, int collectionID) async { _logger.info("Force uploading " + file.toString() + " into collection " + collectionID.toString()); // If the file hasn't been queued yet, ez. if (!_queue.containsKey(file.localID)) { final completer = Completer(); _queue[file.localID] = FileUploadItem( file, collectionID, completer, status: UploadStatus.in_progress, ); _encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true); return completer.future; } var item = _queue[file.localID]; // If the file is being uploaded right now, wait and proceed if (item.status == UploadStatus.in_progress || item.status == UploadStatus.in_background) { final uploadedFile = await item.completer.future; if (uploadedFile.collectionID == collectionID) { // Do nothing } else { await CollectionsService.instance .addToCollection(collectionID, [uploadedFile]); } return uploadedFile; } else { // If the file is yet to be processed, // 1. Set the status to in_progress // 2. Force upload the file // 3. Add to the relevant collection item = _queue[file.localID]; item.status = UploadStatus.in_progress; final uploadedFile = await _encryptAndUploadFileToCollection( file, collectionID, forcedUpload: true); if (item.collectionID == collectionID) { return uploadedFile; } else { await CollectionsService.instance .addToCollection(item.collectionID, [uploadedFile]); return uploadedFile; } } } int queueSize() { return _queue.length; } void clearQueue(final Error reason) { final uploadsToBeRemoved = List(); _queue.entries .where((entry) => entry.value.status == UploadStatus.not_started) .forEach((pendingUpload) { uploadsToBeRemoved.add(pendingUpload.key); }); for (final id in uploadsToBeRemoved) { _queue.remove(id).completer.completeError(reason); } } void removeFromQueueWhere(final bool Function(File) fn, final Error reason) { List uploadsToBeRemoved = []; _queue.entries .where((entry) => entry.value.status == UploadStatus.not_started) .forEach((pendingUpload) { if (fn(pendingUpload.value.file)) { uploadsToBeRemoved.add(pendingUpload.key); } }); for (final id in uploadsToBeRemoved) { _queue.remove(id).completer.completeError(reason); } } void _pollQueue() { if (SyncService.instance.shouldStopSync()) { clearQueue(SyncStopRequestedError()); } if (_queue.length > 0 && _currentlyUploading < kMaximumConcurrentUploads) { final firstPendingEntry = _queue.entries .firstWhere((entry) => entry.value.status == UploadStatus.not_started, orElse: () => null) ?.value; if (firstPendingEntry != null) { firstPendingEntry.status = UploadStatus.in_progress; _encryptAndUploadFileToCollection( firstPendingEntry.file, firstPendingEntry.collectionID); } } } Future _encryptAndUploadFileToCollection(File file, int collectionID, {bool forcedUpload = false}) async { _currentlyUploading++; final localID = file.localID; try { final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload); _queue.remove(localID).completer.complete(uploadedFile); return uploadedFile; } catch (e) { if (e is LockAlreadyAcquiredError) { _queue[localID].status = UploadStatus.in_background; return _queue[localID].completer.future; } else { _queue.remove(localID).completer.completeError(e); return null; } } finally { _currentlyUploading--; _pollQueue(); } } Future _tryToUpload( File file, int collectionID, bool forcedUpload) async { final connectivityResult = await (Connectivity().checkConnectivity()); var canUploadUnderCurrentNetworkConditions = (connectivityResult == ConnectivityResult.wifi || Configuration.instance.shouldBackupOverMobileData()); if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) { throw WiFiUnavailableError(); } try { await _uploadLocks.acquireLock( file.localID, _processType.toString(), DateTime.now().microsecondsSinceEpoch, ); } catch (e) { _logger.warning("Lock was already taken for " + file.toString()); throw LockAlreadyAcquiredError(); } final tempDirectory = Configuration.instance.getTempDirectory(); final encryptedFilePath = tempDirectory + file.generatedID.toString() + (_isBackground ? "_bg" : "") + ".encrypted"; final encryptedThumbnailPath = tempDirectory + file.generatedID.toString() + "_thumbnail" + (_isBackground ? "_bg" : "") + ".encrypted"; io.File sourceFile; try { _logger.info("Trying to upload " + file.toString() + ", isForced: " + forcedUpload.toString()); // The timeouts are to safeguard against https://github.com/CaiJingLong/flutter_photo_manager/issues/467 final asset = await file .getAsset() .timeout(Duration(seconds: 3)) .catchError((e) async { if (e is TimeoutException) { _logger.info("Asset fetch timed out for " + file.toString()); return await file.getAsset(); } else { throw e; } }); if (asset == null) { await _onInvalidFileError(file); } sourceFile = await asset.originFile .timeout(Duration(seconds: 3)) .catchError((e) async { if (e is TimeoutException) { _logger.info("Origin file fetch timed out for " + file.toString()); return await asset.originFile; } else { throw e; } }); if (!sourceFile.existsSync()) { await _onInvalidFileError(file); } var key; var isAlreadyUploadedFile = file.uploadedFileID != null; if (isAlreadyUploadedFile) { key = decryptFileKey(file); } else { key = null; } if (io.File(encryptedFilePath).existsSync()) { io.File(encryptedFilePath).deleteSync(); } final encryptedFile = io.File(encryptedFilePath); final fileAttributes = await CryptoUtil.encryptFile( sourceFile.path, encryptedFilePath, key: key, ); var thumbnailData = await asset.thumbDataWithSize( THUMBNAIL_LARGE_SIZE, THUMBNAIL_LARGE_SIZE, quality: 50, ); if (thumbnailData == null) { await _onInvalidFileError(file); } int compressionAttempts = 0; while (thumbnailData.length > THUMBNAIL_DATA_LIMIT && compressionAttempts < kMaximumThumbnailCompressionAttempts) { _logger.info("Thumbnail size " + thumbnailData.length.toString()); thumbnailData = await compressThumbnail(thumbnailData); _logger.info( "Compressed thumbnail size " + thumbnailData.length.toString()); compressionAttempts++; } final encryptedThumbnailData = await CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key); if (io.File(encryptedThumbnailPath).existsSync()) { io.File(encryptedThumbnailPath).deleteSync(); } final encryptedThumbnailFile = io.File(encryptedThumbnailPath); encryptedThumbnailFile .writeAsBytesSync(encryptedThumbnailData.encryptedData); final thumbnailUploadURL = await _getUploadURL(); String thumbnailObjectKey = await _putFile(thumbnailUploadURL, encryptedThumbnailFile); final fileUploadURL = await _getUploadURL(); String fileObjectKey = await _putFile(fileUploadURL, encryptedFile); // h4ck to fetch location data if missing (thank you Android Q+) lazily only during uploads if (file.location == null || (file.location.latitude == 0 && file.location.longitude == 0)) { final latLong = await asset.latlngAsync(); file.location = Location(latLong.latitude, latLong.longitude); } if (file.title == null || file.title.isEmpty) { _logger.severe("Title was missing"); file.title = await asset.titleAsync; } final encryptedMetadataData = await CryptoUtil.encryptChaCha( utf8.encode(jsonEncode(file.getMetadata())), fileAttributes.key); final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header); final thumbnailDecryptionHeader = Sodium.bin2base64(encryptedThumbnailData.header); final encryptedMetadata = Sodium.bin2base64(encryptedMetadataData.encryptedData); final metadataDecryptionHeader = Sodium.bin2base64(encryptedMetadataData.header); if (SyncService.instance.shouldStopSync()) { throw SyncStopRequestedError(); } File remoteFile; if (isAlreadyUploadedFile) { remoteFile = await _updateFile( file, fileObjectKey, fileDecryptionHeader, thumbnailObjectKey, thumbnailDecryptionHeader, encryptedMetadata, metadataDecryptionHeader, ); // Update across all collections await FilesDB.instance.updateUploadedFileAcrossCollections(remoteFile); } else { remoteFile = await _uploadFile( file, collectionID, fileAttributes, fileObjectKey, fileDecryptionHeader, thumbnailObjectKey, thumbnailDecryptionHeader, encryptedMetadata, metadataDecryptionHeader, ); if (asset == null || !(await asset.exists)) { _logger.info("File found to be deleted"); remoteFile.localID = null; } await FilesDB.instance.update(remoteFile); } if (!_isBackground) { Bus.instance.fire(LocalPhotosUpdatedEvent([remoteFile])); } _logger.info("File upload complete for " + remoteFile.toString()); return remoteFile; } catch (e, s) { if (!(e is NoActiveSubscriptionError || e is StorageLimitExceededError)) { _logger.severe("File upload failed for " + file.toString(), e, s); } throw e; } finally { if (io.Platform.isIOS && sourceFile != null) { sourceFile.deleteSync(); } if (io.File(encryptedFilePath).existsSync()) { io.File(encryptedFilePath).deleteSync(); } if (io.File(encryptedThumbnailPath).existsSync()) { io.File(encryptedThumbnailPath).deleteSync(); } await _uploadLocks.releaseLock(file.localID, _processType.toString()); } } Future _onInvalidFileError(File file) async { _logger.warning("Invalid file encountered: " + file.toString()); await FilesDB.instance.deleteLocalFile(file.localID); await LocalSyncService.instance.trackInvalidFile(file); throw InvalidFileError(); } Future _uploadFile( File file, int collectionID, EncryptionResult fileAttributes, String fileObjectKey, String fileDecryptionHeader, String thumbnailObjectKey, String thumbnailDecryptionHeader, String encryptedMetadata, String metadataDecryptionHeader, ) async { final encryptedFileKeyData = CryptoUtil.encryptSync( fileAttributes.key, CollectionsService.instance.getCollectionKey(collectionID), ); final encryptedKey = Sodium.bin2base64(encryptedFileKeyData.encryptedData); final keyDecryptionNonce = Sodium.bin2base64(encryptedFileKeyData.nonce); final request = { "collectionID": collectionID, "encryptedKey": encryptedKey, "keyDecryptionNonce": keyDecryptionNonce, "file": { "objectKey": fileObjectKey, "decryptionHeader": fileDecryptionHeader, }, "thumbnail": { "objectKey": thumbnailObjectKey, "decryptionHeader": thumbnailDecryptionHeader, }, "metadata": { "encryptedData": encryptedMetadata, "decryptionHeader": metadataDecryptionHeader, } }; try { final response = await _dio.post( Configuration.instance.getHttpEndpoint() + "/files", options: Options( headers: {"X-Auth-Token": Configuration.instance.getToken()}), data: request, ); final data = response.data; file.uploadedFileID = data["id"]; file.collectionID = collectionID; file.updationTime = data["updationTime"]; file.ownerID = data["ownerID"]; file.encryptedKey = encryptedKey; file.keyDecryptionNonce = keyDecryptionNonce; file.fileDecryptionHeader = fileDecryptionHeader; file.thumbnailDecryptionHeader = thumbnailDecryptionHeader; file.metadataDecryptionHeader = metadataDecryptionHeader; return file; } on DioError catch (e) { if (e.response?.statusCode == 426) { _onStorageLimitExceeded(); } throw e; } } Future _updateFile( File file, String fileObjectKey, String fileDecryptionHeader, String thumbnailObjectKey, String thumbnailDecryptionHeader, String encryptedMetadata, String metadataDecryptionHeader, ) async { final request = { "id": file.uploadedFileID, "file": { "objectKey": fileObjectKey, "decryptionHeader": fileDecryptionHeader, }, "thumbnail": { "objectKey": thumbnailObjectKey, "decryptionHeader": thumbnailDecryptionHeader, }, "metadata": { "encryptedData": encryptedMetadata, "decryptionHeader": metadataDecryptionHeader, } }; try { final response = await _dio.post( Configuration.instance.getHttpEndpoint() + "/files", options: Options( headers: {"X-Auth-Token": Configuration.instance.getToken()}), data: request, ); final data = response.data; file.uploadedFileID = data["id"]; file.updationTime = data["updationTime"]; file.fileDecryptionHeader = fileDecryptionHeader; file.thumbnailDecryptionHeader = thumbnailDecryptionHeader; file.metadataDecryptionHeader = metadataDecryptionHeader; return file; } on DioError catch (e) { if (e.response?.statusCode == 426) { _onStorageLimitExceeded(); } throw e; } } Future _getUploadURL() async { if (_uploadURLs.isEmpty) { await _fetchUploadURLs(); } return _uploadURLs.removeFirst(); } Future _uploadURLFetchInProgress; Future _fetchUploadURLs() async { if (_uploadURLFetchInProgress == null) { _uploadURLFetchInProgress = Future(() async { try { final response = await _dio.get( Configuration.instance.getHttpEndpoint() + "/files/upload-urls", queryParameters: { "count": min(42, 2 * _queue.length), // m4gic number }, options: Options( headers: {"X-Auth-Token": Configuration.instance.getToken()}), ); final urls = (response.data["urls"] as List) .map((e) => UploadURL.fromMap(e)) .toList(); _uploadURLs.addAll(urls); _uploadURLFetchInProgress = null; } on DioError catch (e) { _uploadURLFetchInProgress = null; if (e.response != null) { if (e.response.statusCode == 402) { final error = NoActiveSubscriptionError(); clearQueue(error); throw error; } else if (e.response.statusCode == 426) { final error = StorageLimitExceededError(); clearQueue(error); throw error; } } throw e; } }); } return _uploadURLFetchInProgress; } void _onStorageLimitExceeded() { clearQueue(StorageLimitExceededError()); throw StorageLimitExceededError(); } Future _putFile( UploadURL uploadURL, io.File file, { int contentLength, int attempt = 1, }) async { final fileSize = contentLength ?? file.lengthSync(); final startTime = DateTime.now().millisecondsSinceEpoch; try { await _dio.put( uploadURL.url, data: file.openRead(), options: Options( headers: { Headers.contentLengthHeader: fileSize, }, ), ); _logger.info("Upload speed : " + (file.lengthSync() / (DateTime.now().millisecondsSinceEpoch - startTime)) .toString() + " kilo bytes per second"); return uploadURL.objectKey; } on DioError catch (e) { if (e.message.startsWith( "HttpException: Content size exceeds specified contentLength.") && attempt == 1) { return _putFile(uploadURL, file, contentLength: file.readAsBytesSync().length, attempt: 2); } else if (attempt < kMaximumUploadAttempts) { final newUploadURL = await _getUploadURL(); return _putFile(newUploadURL, file, contentLength: file.readAsBytesSync().length, attempt: attempt++); } else { _logger.info( "Upload failed for file with size " + fileSize.toString(), e); throw e; } } } Future _pollBackgroundUploadStatus() async { final blockedUploads = _queue.entries .where((e) => e.value.status == UploadStatus.in_background) .toList(); for (final upload in blockedUploads) { final file = upload.value.file; final isStillLocked = await _uploadLocks.isLocked( file.localID, ProcessType.background.toString()); if (!isStillLocked) { final completer = _queue.remove(upload.key).completer; final dbFile = await FilesDB.instance.getFile(upload.value.file.generatedID); if (dbFile.uploadedFileID != null) { _logger.info("Background upload success detected"); completer.complete(dbFile); } else { _logger.info("Background upload failure detected"); completer.completeError(SilentlyCancelUploadsError()); } } } Future.delayed(kBlockedUploadsPollFrequency, () async { await _pollBackgroundUploadStatus(); }); } } class FileUploadItem { final File file; final int collectionID; final Completer completer; UploadStatus status; FileUploadItem( this.file, this.collectionID, this.completer, { this.status = UploadStatus.not_started, }); } enum UploadStatus { not_started, in_progress, in_background, completed, } enum ProcessType { background, foreground, }