[mob][photos] Move multlipart uploader in separate service

This commit is contained in:
Neeraj Gupta 2024-04-15 14:40:06 +05:30
parent 30d562ed1a
commit 5e32752ba4
7 changed files with 291 additions and 270 deletions

View file

@ -4,7 +4,7 @@ import 'dart:io';
import 'package:path/path.dart';
import 'package:path_provider/path_provider.dart';
import "package:photos/core/constants.dart";
import "package:photos/utils/multipart_upload_util.dart";
import "package:photos/module/upload/model/multipart.dart";
import 'package:sqflite/sqflite.dart';
class UploadLocksDB {

View file

@ -0,0 +1,43 @@
import "package:photos/module/upload/model/xml.dart";
class PartETag extends XmlParsableObject {
final int partNumber;
final String eTag;
PartETag(this.partNumber, this.eTag);
@override
String get elementName => "Part";
@override
Map<String, dynamic> toMap() {
return {
"PartNumber": partNumber,
"ETag": eTag,
};
}
}
class MultipartUploadURLs {
final String objectKey;
final List<String> partsURLs;
final String completeURL;
final List<bool>? partUploadStatus;
final Map<int, String>? partETags;
MultipartUploadURLs({
required this.objectKey,
required this.partsURLs,
required this.completeURL,
this.partUploadStatus,
this.partETags,
});
factory MultipartUploadURLs.fromMap(Map<String, dynamic> map) {
return MultipartUploadURLs(
objectKey: map["urls"]["objectKey"],
partsURLs: (map["urls"]["partURLs"] as List).cast<String>(),
completeURL: map["urls"]["completeURL"],
);
}
}

View file

@ -0,0 +1,41 @@
// ignore_for_file: implementation_imports
import "package:xml/xml.dart";
// used for classes that can be converted to xml
abstract class XmlParsableObject {
Map<String, dynamic> toMap();
String get elementName;
}
// for converting the response to xml
String convertJs2Xml(Map<String, dynamic> json) {
final builder = XmlBuilder();
buildXml(builder, json);
return builder.buildDocument().toXmlString(
pretty: true,
indent: ' ',
);
}
// for building the xml node tree recursively
void buildXml(XmlBuilder builder, dynamic node) {
if (node is Map<String, dynamic>) {
node.forEach((key, value) {
builder.element(key, nest: () => buildXml(builder, value));
});
} else if (node is List<dynamic>) {
for (var item in node) {
buildXml(builder, item);
}
} else if (node is XmlParsableObject) {
builder.element(
node.elementName,
nest: () {
buildXml(builder, node.toMap());
},
);
} else {
builder.text(node.toString());
}
}

View file

@ -0,0 +1,189 @@
import "dart:io";
import "dart:typed_data";
import "package:dio/dio.dart";
import "package:logging/logging.dart";
import "package:photos/core/constants.dart";
import "package:photos/db/upload_locks_db.dart";
import "package:photos/module/upload/model/multipart.dart";
import "package:photos/module/upload/model/xml.dart";
import "package:photos/services/feature_flag_service.dart";
import "package:photos/utils/crypto_util.dart";
class MultiPartUploader {
final Dio _enteDio;
final Dio _s3Dio;
final UploadLocksDB _db;
final FeatureFlagService _featureFlagService;
late final Logger _logger = Logger("MultiPartUploader");
MultiPartUploader(
this._enteDio,
this._s3Dio,
this._db,
this._featureFlagService,
) {}
Future<int> calculatePartCount(int fileSize) async {
final partCount = (fileSize / multipartPartSizeForUpload).ceil();
return partCount;
}
Future<MultipartUploadURLs> getMultipartUploadURLs(int count) async {
try {
assert(
_featureFlagService.isInternalUserOrDebugBuild(),
"Multipart upload should not be enabled for external users.",
);
final response = await _enteDio.get(
"/files/multipart-upload-urls",
queryParameters: {
"count": count,
},
);
return MultipartUploadURLs.fromMap(response.data);
} on Exception catch (e) {
_logger.severe('failed to get multipart url', e);
rethrow;
}
}
Future<void> createTableEntry(
String localId,
String fileHash,
MultipartUploadURLs urls,
String encryptedFilePath,
int fileSize,
Uint8List fileKey,
) async {
await _db.createTrackUploadsEntry(
localId,
fileHash,
urls,
encryptedFilePath,
fileSize,
CryptoUtil.bin2base64(fileKey),
);
}
Future<String> putExistingMultipartFile(
File encryptedFile,
String localId,
String fileHash,
) async {
final (urls, status) = await _db.getCachedLinks(localId, fileHash);
Map<int, String> etags = urls.partETags ?? {};
if (status == UploadLocksDB.trackStatus.pending) {
// upload individual parts and get their etags
etags = await uploadParts(urls, encryptedFile);
}
if (status != UploadLocksDB.trackStatus.completed) {
// complete the multipart upload
await completeMultipartUpload(urls.objectKey, etags, urls.completeURL);
}
return urls.objectKey;
}
Future<String> putMultipartFile(
MultipartUploadURLs urls,
File encryptedFile,
) async {
// upload individual parts and get their etags
final etags = await uploadParts(urls, encryptedFile);
// complete the multipart upload
await completeMultipartUpload(urls.objectKey, etags, urls.completeURL);
return urls.objectKey;
}
Future<Map<int, String>> uploadParts(
MultipartUploadURLs url,
File encryptedFile,
) async {
final partsURLs = url.partsURLs;
final partUploadStatus = url.partUploadStatus;
final partsLength = partsURLs.length;
final etags = url.partETags ?? <int, String>{};
for (int i = 0; i < partsLength; i++) {
if (i < (partUploadStatus?.length ?? 0) &&
(partUploadStatus?[i] ?? false)) {
continue;
}
final partURL = partsURLs[i];
final isLastPart = i == partsLength - 1;
final fileSize = isLastPart
? encryptedFile.lengthSync() % multipartPartSizeForUpload
: multipartPartSizeForUpload;
final response = await _s3Dio.put(
partURL,
data: encryptedFile.openRead(
i * multipartPartSizeForUpload,
isLastPart ? null : (i + 1) * multipartPartSizeForUpload,
),
options: Options(
headers: {
Headers.contentLengthHeader: fileSize,
},
),
);
final eTag = response.headers.value("etag");
if (eTag?.isEmpty ?? true) {
throw Exception('ETAG_MISSING');
}
etags[i] = eTag!;
await _db.updatePartStatus(url.objectKey, i, eTag);
}
await _db.updateTrackUploadStatus(
url.objectKey,
UploadLocksDB.trackStatus.uploaded,
);
return etags;
}
Future<void> completeMultipartUpload(
String objectKey,
Map<int, String> partEtags,
String completeURL,
) async {
final body = convertJs2Xml({
'CompleteMultipartUpload': partEtags.entries
.map(
(e) => PartETag(
e.key + 1,
e.value,
),
)
.toList(),
}).replaceAll('"', '').replaceAll('&quot;', '');
try {
await _s3Dio.post(
completeURL,
data: body,
options: Options(
contentType: "text/xml",
),
);
await _db.updateTrackUploadStatus(
objectKey,
UploadLocksDB.trackStatus.completed,
);
} catch (e) {
Logger("MultipartUpload").severe(e);
rethrow;
}
}
}

View file

@ -28,6 +28,7 @@ import 'package:photos/models/file/file_type.dart';
import "package:photos/models/metadata/file_magic.dart";
import 'package:photos/models/upload_url.dart';
import "package:photos/models/user_details.dart";
import "package:photos/module/upload/service/multipart.dart";
import 'package:photos/services/collections_service.dart';
import "package:photos/services/feature_flag_service.dart";
import "package:photos/services/file_magic_service.dart";
@ -38,7 +39,6 @@ import 'package:photos/utils/crypto_util.dart';
import 'package:photos/utils/file_download_util.dart';
import 'package:photos/utils/file_uploader_util.dart';
import "package:photos/utils/file_util.dart";
import "package:photos/utils/multipart_upload_util.dart";
import 'package:shared_preferences/shared_preferences.dart';
import 'package:tuple/tuple.dart';
import "package:uuid/uuid.dart";
@ -80,6 +80,7 @@ class FileUploader {
// cases, we don't want to clear the stale upload files. See #removeStaleFiles
// as it can result in clearing files which are still being force uploaded.
bool _hasInitiatedForceUpload = false;
late MultiPartUploader _multiPartUploader;
FileUploader._privateConstructor() {
Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
@ -115,6 +116,12 @@ class FileUploader {
// ignore: unawaited_futures
_pollBackgroundUploadStatus();
}
_multiPartUploader = MultiPartUploader(
_enteDio,
_dio,
UploadLocksDB.instance,
FeatureFlagService.instance,
);
Bus.instance.on<LocalPhotosUpdatedEvent>().listen((event) {
if (event.type == EventType.deletedFromDevice ||
event.type == EventType.deletedFromEverywhere) {
@ -497,7 +504,7 @@ class FileUploader {
// Calculate the number of parts for the file. Multiple part upload
// is only enabled for internal users and debug builds till it's battle tested.
final count = FeatureFlagService.instance.isInternalUserOrDebugBuild()
? await calculatePartCount(
? await _multiPartUploader.calculatePartCount(
await encryptedFile.length(),
)
: 1;
@ -513,15 +520,15 @@ class FileUploader {
lockKey,
mediaUploadData.hashData!.fileHash!,
)) {
fileObjectKey = await putExistingMultipartFile(
fileObjectKey = await _multiPartUploader.putExistingMultipartFile(
encryptedFile,
lockKey,
mediaUploadData.hashData!.fileHash!,
);
} else {
final fileUploadURLs = await getMultipartUploadURLs(count);
await createTableEntry(
final fileUploadURLs =
await _multiPartUploader.getMultipartUploadURLs(count);
await _multiPartUploader.createTableEntry(
lockKey,
mediaUploadData.hashData!.fileHash!,
fileUploadURLs,
@ -529,7 +536,10 @@ class FileUploader {
await encryptedFile.length(),
fileAttributes.key!,
);
fileObjectKey = await putMultipartFile(fileUploadURLs, encryptedFile);
fileObjectKey = await _multiPartUploader.putMultipartFile(
fileUploadURLs,
encryptedFile,
);
}
}

View file

@ -1,222 +0,0 @@
// ignore_for_file: implementation_imports
import "dart:io";
import "dart:typed_data";
import "package:dio/dio.dart";
import "package:logging/logging.dart";
import "package:photos/core/constants.dart";
import "package:photos/core/network/network.dart";
import "package:photos/db/upload_locks_db.dart";
import "package:photos/services/feature_flag_service.dart";
import "package:photos/utils/crypto_util.dart";
import "package:photos/utils/xml_parser_util.dart";
final _enteDio = NetworkClient.instance.enteDio;
final _dio = NetworkClient.instance.getDio();
final _uploadLocksDb = UploadLocksDB.instance;
class PartETag extends XmlParsableObject {
final int partNumber;
final String eTag;
PartETag(this.partNumber, this.eTag);
@override
String get elementName => "Part";
@override
Map<String, dynamic> toMap() {
return {
"PartNumber": partNumber,
"ETag": eTag,
};
}
}
class MultipartUploadURLs {
final String objectKey;
final List<String> partsURLs;
final String completeURL;
final List<bool>? partUploadStatus;
final Map<int, String>? partETags;
MultipartUploadURLs({
required this.objectKey,
required this.partsURLs,
required this.completeURL,
this.partUploadStatus,
this.partETags,
});
factory MultipartUploadURLs.fromMap(Map<String, dynamic> map) {
return MultipartUploadURLs(
objectKey: map["urls"]["objectKey"],
partsURLs: (map["urls"]["partURLs"] as List).cast<String>(),
completeURL: map["urls"]["completeURL"],
);
}
}
Future<int> calculatePartCount(int fileSize) async {
final partCount = (fileSize / multipartPartSizeForUpload).ceil();
return partCount;
}
Future<MultipartUploadURLs> getMultipartUploadURLs(int count) async {
try {
assert(
FeatureFlagService.instance.isInternalUserOrDebugBuild(),
"Multipart upload should not be enabled for external users.",
);
final response = await _enteDio.get(
"/files/multipart-upload-urls",
queryParameters: {
"count": count,
},
);
return MultipartUploadURLs.fromMap(response.data);
} on Exception catch (e) {
Logger("MultipartUploadURL").severe(e);
rethrow;
}
}
Future<void> createTableEntry(
String localId,
String fileHash,
MultipartUploadURLs urls,
String encryptedFilePath,
int fileSize,
Uint8List fileKey,
) async {
await _uploadLocksDb.createTrackUploadsEntry(
localId,
fileHash,
urls,
encryptedFilePath,
fileSize,
CryptoUtil.bin2base64(fileKey),
);
}
Future<String> putExistingMultipartFile(
File encryptedFile,
String localId,
String fileHash,
) async {
final (urls, status) = await _uploadLocksDb.getCachedLinks(localId, fileHash);
Map<int, String> etags = urls.partETags ?? {};
if (status == UploadLocksDB.trackStatus.pending) {
// upload individual parts and get their etags
etags = await uploadParts(urls, encryptedFile);
}
if (status != UploadLocksDB.trackStatus.completed) {
// complete the multipart upload
await completeMultipartUpload(urls.objectKey, etags, urls.completeURL);
}
return urls.objectKey;
}
Future<String> putMultipartFile(
MultipartUploadURLs urls,
File encryptedFile,
) async {
// upload individual parts and get their etags
final etags = await uploadParts(urls, encryptedFile);
// complete the multipart upload
await completeMultipartUpload(urls.objectKey, etags, urls.completeURL);
return urls.objectKey;
}
Future<Map<int, String>> uploadParts(
MultipartUploadURLs url,
File encryptedFile,
) async {
final partsURLs = url.partsURLs;
final partUploadStatus = url.partUploadStatus;
final partsLength = partsURLs.length;
final etags = url.partETags ?? <int, String>{};
for (int i = 0; i < partsLength; i++) {
if (i < (partUploadStatus?.length ?? 0) &&
(partUploadStatus?[i] ?? false)) {
continue;
}
final partURL = partsURLs[i];
final isLastPart = i == partsLength - 1;
final fileSize = isLastPart
? encryptedFile.lengthSync() % multipartPartSizeForUpload
: multipartPartSizeForUpload;
final response = await _dio.put(
partURL,
data: encryptedFile.openRead(
i * multipartPartSizeForUpload,
isLastPart ? null : (i + 1) * multipartPartSizeForUpload,
),
options: Options(
headers: {
Headers.contentLengthHeader: fileSize,
},
),
);
final eTag = response.headers.value("etag");
if (eTag?.isEmpty ?? true) {
throw Exception('ETAG_MISSING');
}
etags[i] = eTag!;
await _uploadLocksDb.updatePartStatus(url.objectKey, i, eTag);
}
await _uploadLocksDb.updateTrackUploadStatus(
url.objectKey,
UploadLocksDB.trackStatus.uploaded,
);
return etags;
}
Future<void> completeMultipartUpload(
String objectKey,
Map<int, String> partEtags,
String completeURL,
) async {
final body = convertJs2Xml({
'CompleteMultipartUpload': partEtags.entries
.map(
(e) => PartETag(
e.key + 1,
e.value,
),
)
.toList(),
}).replaceAll('"', '').replaceAll('&quot;', '');
try {
await _dio.post(
completeURL,
data: body,
options: Options(
contentType: "text/xml",
),
);
await _uploadLocksDb.updateTrackUploadStatus(
objectKey,
UploadLocksDB.trackStatus.completed,
);
} catch (e) {
Logger("MultipartUpload").severe(e);
rethrow;
}
}

View file

@ -1,41 +1 @@
// ignore_for_file: implementation_imports
import "package:xml/xml.dart";
// used for classes that can be converted to xml
abstract class XmlParsableObject {
Map<String, dynamic> toMap();
String get elementName;
}
// for converting the response to xml
String convertJs2Xml(Map<String, dynamic> json) {
final builder = XmlBuilder();
buildXml(builder, json);
return builder.buildDocument().toXmlString(
pretty: true,
indent: ' ',
);
}
// for building the xml node tree recursively
void buildXml(XmlBuilder builder, dynamic node) {
if (node is Map<String, dynamic>) {
node.forEach((key, value) {
builder.element(key, nest: () => buildXml(builder, value));
});
} else if (node is List<dynamic>) {
for (var item in node) {
buildXml(builder, item);
}
} else if (node is XmlParsableObject) {
builder.element(
node.elementName,
nest: () {
buildXml(builder, node.toMap());
},
);
} else {
builder.text(node.toString());
}
}