fix: add support for inital resumable

This commit is contained in:
Prateek Sunal 2024-04-10 19:36:57 +05:30
parent de06c8f38b
commit a4f89c0337
3 changed files with 175 additions and 25 deletions

View file

@ -3,16 +3,41 @@ import 'dart:io';
import 'package:path/path.dart';
import 'package:path_provider/path_provider.dart';
import "package:photos/utils/multipart_upload_util.dart";
import 'package:sqflite/sqflite.dart';
class UploadLocksDB {
static const _databaseName = "ente.upload_locks.db";
static const _databaseVersion = 1;
static const _table = "upload_locks";
static const _columnID = "id";
static const _columnOwner = "owner";
static const _columnTime = "time";
static const _table = (
table: "upload_locks",
columnID: "id",
columnOwner: "owner",
columnTime: "time",
);
static const _trackUploadTable = (
table: "track_uploads",
columnID: "id",
columnLocalID: "local_id",
columnFileHash: "file_hash",
columnEncryptedFilePath: "encrypted_file_path",
columnEncryptedFileSize: "encrypted_file_size",
columnFileKey: "file_key",
columnObjectKey: "object_key",
columnCompleteUrl: "complete_url",
columnCompletionStatus: "completion_status",
columnPartSize: "part_size",
);
static const _partsTable = (
table: "upload_parts",
columnObjectKey: "object_key",
columnPartNumber: "part_number",
columnPartUrl: "part_url",
columnPartStatus: "part_status",
);
UploadLocksDB._privateConstructor();
static final UploadLocksDB instance = UploadLocksDB._privateConstructor();
@ -37,10 +62,36 @@ class UploadLocksDB {
Future _onCreate(Database db, int version) async {
await db.execute(
'''
CREATE TABLE $_table (
$_columnID TEXT PRIMARY KEY NOT NULL,
$_columnOwner TEXT NOT NULL,
$_columnTime TEXT NOT NULL
CREATE TABLE ${_table.table} (
${_table.columnID} TEXT PRIMARY KEY NOT NULL,
${_table.columnOwner} TEXT NOT NULL,
${_table.columnTime} TEXT NOT NULL
)
''',
);
await db.execute(
'''
CREATE TABLE ${_trackUploadTable.table} (
${_trackUploadTable.columnID} TEXT PRIMARY KEY NOT NULL,
${_trackUploadTable.columnLocalID} TEXT NOT NULL,
${_trackUploadTable.columnFileHash} TEXT NOT NULL UNIQUE,
${_trackUploadTable.columnEncryptedFilePath} TEXT NOT NULL,
${_trackUploadTable.columnEncryptedFileSize} INTEGER NOT NULL,
${_trackUploadTable.columnFileKey} TEXT NOT NULL,
${_trackUploadTable.columnObjectKey} TEXT NOT NULL,
${_trackUploadTable.columnCompleteUrl} TEXT NOT NULL,
${_trackUploadTable.columnCompletionStatus} TEXT NOT NULL,
${_trackUploadTable.columnPartSize} INTEGER NOT NULL
)
''',
);
await db.execute(
'''
CREATE TABLE ${_partsTable.table} (
${_partsTable.columnObjectKey} TEXT PRIMARY KEY NOT NULL REFERENCES ${_trackUploadTable.table}(${_trackUploadTable.columnObjectKey}) ON DELETE CASCADE,
${_partsTable.columnPartNumber} INTEGER NOT NULL,
${_partsTable.columnPartUrl} TEXT NOT NULL,
${_partsTable.columnPartStatus} TEXT NOT NULL
)
''',
);
@ -48,23 +99,32 @@ class UploadLocksDB {
Future<void> clearTable() async {
final db = await instance.database;
await db.delete(_table);
await db.delete(_table.table);
}
Future<void> clearTrackTable() async {
final db = await instance.database;
await db.delete(_trackUploadTable.table);
}
Future<void> acquireLock(String id, String owner, int time) async {
final db = await instance.database;
final row = <String, dynamic>{};
row[_columnID] = id;
row[_columnOwner] = owner;
row[_columnTime] = time;
await db.insert(_table, row, conflictAlgorithm: ConflictAlgorithm.fail);
row[_table.columnID] = id;
row[_table.columnOwner] = owner;
row[_table.columnTime] = time;
await db.insert(
_table.table,
row,
conflictAlgorithm: ConflictAlgorithm.fail,
);
}
Future<bool> isLocked(String id, String owner) async {
final db = await instance.database;
final rows = await db.query(
_table,
where: '$_columnID = ? AND $_columnOwner = ?',
_table.table,
where: '${_table.columnID} = ? AND ${_table.columnOwner} = ?',
whereArgs: [id, owner],
);
return rows.length == 1;
@ -73,8 +133,8 @@ class UploadLocksDB {
Future<int> releaseLock(String id, String owner) async {
final db = await instance.database;
return db.delete(
_table,
where: '$_columnID = ? AND $_columnOwner = ?',
_table.table,
where: '${_table.columnID} = ? AND ${_table.columnOwner} = ?',
whereArgs: [id, owner],
);
}
@ -82,8 +142,8 @@ class UploadLocksDB {
Future<int> releaseLocksAcquiredByOwnerBefore(String owner, int time) async {
final db = await instance.database;
return db.delete(
_table,
where: '$_columnOwner = ? AND $_columnTime < ?',
_table.table,
where: '${_table.columnOwner} = ? AND ${_table.columnTime} < ?',
whereArgs: [owner, time],
);
}
@ -91,9 +151,63 @@ class UploadLocksDB {
Future<int> releaseAllLocksAcquiredBefore(int time) async {
final db = await instance.database;
return db.delete(
_table,
where: '$_columnTime < ?',
_table.table,
where: '${_table.columnTime} < ?',
whereArgs: [time],
);
}
// For multipart download tracking
Future<bool> doesExists(String localId, String hash) async {
final db = await instance.database;
final rows = await db.query(_trackUploadTable.table);
return rows.isNotEmpty;
}
Future<MultipartUploadURLs> getCachedLinks(
String localId,
String fileHash,
) async {
final db = await instance.database;
final rows = await db.query(
_trackUploadTable.table,
where:
'${_trackUploadTable.columnLocalID} = ? AND ${_trackUploadTable.columnFileHash} = ?',
whereArgs: [localId, fileHash],
);
if (rows.isEmpty) {
throw Exception("No cached links found for $localId and $fileHash");
}
final row = rows.first;
final objectKey = row[_trackUploadTable.columnObjectKey] as String;
final partsStatus = await db.query(
_partsTable.table,
where: '${_partsTable.columnObjectKey} = ?',
whereArgs: [objectKey],
);
final List<bool> partUploadStatus = [];
final List<String> partsURLs = List.generate(
partsStatus.length,
(index) => "",
);
for (final part in partsStatus) {
final partNumber = part[_partsTable.columnPartNumber] as int;
final partUrl = part[_partsTable.columnPartUrl] as String;
final partStatus = part[_partsTable.columnPartStatus] as String;
if (partStatus == "uploaded") {
partsURLs[partNumber] = partUrl;
partUploadStatus.add(partStatus == "uploaded");
}
}
final urls = MultipartUploadURLs(
objectKey: objectKey,
completeURL: row[_trackUploadTable.columnCompleteUrl] as String,
partsURLs: partsURLs,
partUploadStatus: partUploadStatus,
);
return urls;
}
}

View file

@ -503,8 +503,20 @@ class FileUploader {
final fileUploadURL = await _getUploadURL();
fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
} else {
final fileUploadURLs = await getMultipartUploadURLs(count);
fileObjectKey = await putMultipartFile(fileUploadURLs, encryptedFile);
if (mediaUploadData.hashData?.fileHash != null &&
await _uploadLocks.doesExists(
lockKey,
mediaUploadData.hashData!.fileHash!,
)) {
fileObjectKey = await putExistingMultipartFile(
encryptedFile,
lockKey,
mediaUploadData.hashData!.fileHash!,
);
} else {
final fileUploadURLs = await getMultipartUploadURLs(count);
fileObjectKey = await putMultipartFile(fileUploadURLs, encryptedFile);
}
}
final metadata = await file.getMetadataForUpload(mediaUploadData);

View file

@ -6,6 +6,7 @@ import "package:dio/dio.dart";
import "package:logging/logging.dart";
import "package:photos/core/constants.dart";
import "package:photos/core/network/network.dart";
import "package:photos/db/upload_locks_db.dart";
import "package:photos/utils/xml_parser_util.dart";
final _enteDio = NetworkClient.instance.enteDio;
@ -33,11 +34,13 @@ class MultipartUploadURLs {
final String objectKey;
final List<String> partsURLs;
final String completeURL;
final List<bool>? partUploadStatus;
MultipartUploadURLs({
required this.objectKey,
required this.partsURLs,
required this.completeURL,
this.partUploadStatus,
});
factory MultipartUploadURLs.fromMap(Map<String, dynamic> map) {
@ -70,12 +73,28 @@ Future<MultipartUploadURLs> getMultipartUploadURLs(int count) async {
}
}
Future<String> putExistingMultipartFile(
File encryptedFile,
String localId,
String fileHash,
) async {
final urls = await UploadLocksDB.instance.getCachedLinks(localId, fileHash);
// upload individual parts and get their etags
final etags = await uploadParts(urls, encryptedFile);
// complete the multipart upload
await completeMultipartUpload(etags, urls.completeURL);
return urls.objectKey;
}
Future<String> putMultipartFile(
MultipartUploadURLs urls,
File encryptedFile,
) async {
// upload individual parts and get their etags
final etags = await uploadParts(urls.partsURLs, encryptedFile);
final etags = await uploadParts(urls, encryptedFile);
// complete the multipart upload
await completeMultipartUpload(etags, urls.completeURL);
@ -84,13 +103,18 @@ Future<String> putMultipartFile(
}
Future<Map<int, String>> uploadParts(
List<String> partsURLs,
MultipartUploadURLs url,
File encryptedFile,
) async {
final partsURLs = url.partsURLs;
final partUploadStatus = url.partUploadStatus;
final partsLength = partsURLs.length;
final etags = <int, String>{};
for (int i = 0; i < partsLength; i++) {
if (partUploadStatus?[i] ?? false) {
continue;
}
final partURL = partsURLs[i];
final isLastPart = i == partsLength - 1;
final fileSize = isLastPart