Sync: Upload local files #225

Signed-off-by: Michael Mayer <michael@liquidbytes.net>
This commit is contained in:
Michael Mayer 2020-04-07 12:51:01 +02:00
parent b020b4e415
commit f1b3b4b6bc
10 changed files with 421 additions and 264 deletions

View file

@ -0,0 +1,30 @@
package query
import (
"github.com/photoprism/photoprism/internal/entity"
"github.com/photoprism/photoprism/pkg/fs"
)
// AccountUploads a list of files for uploading to a remote account.
func (q *Query) AccountUploads(a entity.Account, limit int) (results []entity.File, err error) {
s := q.db
s = s.Where("files.file_missing = 0").
Where("files.id NOT IN (SELECT file_id FROM files_sync WHERE file_id > 0 AND account_id = ?)", a.ID)
if !a.SyncRaw {
s = s.Where("files.file_type <> ? OR files.file_type IS NULL", fs.TypeRaw)
}
s = s.Order("files.file_name ASC")
if limit > 0 {
s = s.Limit(limit).Offset(0)
}
if result := s.Find(&results); result.Error != nil {
return results, result.Error
}
return results, nil
}

View file

@ -0,0 +1,30 @@
package query
import (
"testing"
"github.com/photoprism/photoprism/internal/entity"
"github.com/stretchr/testify/assert"
"github.com/photoprism/photoprism/internal/config"
)
func TestQuery_AccountUploads(t *testing.T) {
conf := config.TestConfig()
q := New(conf.Db())
a := entity.Account{ID: 1, SyncRaw: false}
t.Run("find uploads", func(t *testing.T) {
results, err := q.AccountUploads(a, 10)
if err != nil {
t.Fatal(err)
}
// t.Logf("uploads: %+v", results)
assert.GreaterOrEqual(t, len(results), 1)
})
}

View file

@ -1,13 +1,10 @@
package query package query
import ( import (
"errors"
"os"
"github.com/photoprism/photoprism/internal/entity" "github.com/photoprism/photoprism/internal/entity"
) )
// FileSyncs returns up to 100 file syncs for a given account id and status. // FileSyncs returns a list of FileSync entities for a given account and status.
func (q *Query) FileSyncs(accountId uint, status string, limit int) (result []entity.FileSync, err error) { func (q *Query) FileSyncs(accountId uint, status string, limit int) (result []entity.FileSync, err error) {
s := q.db.Where(&entity.FileSync{}) s := q.db.Where(&entity.FileSync{})
@ -33,21 +30,3 @@ func (q *Query) FileSyncs(accountId uint, status string, limit int) (result []en
return result, nil return result, nil
} }
// SetDownloadFileID updates the local file id for remote downloads.
func (q *Query) SetDownloadFileID(filename string, fileId uint) error {
if len(filename) == 0 {
return errors.New("sync: can't update, filename empty")
}
// TODO: Might break on Windows
if filename[0] != os.PathSeparator {
filename = string(os.PathSeparator) + filename
}
result := q.db.Model(entity.FileSync{}).
Where("remote_name = ? AND status = ? AND file_id = 0", filename, entity.FileSyncDownloaded).
Update("file_id", fileId)
return result.Error
}

View file

@ -0,0 +1,26 @@
package query
import (
"errors"
"os"
"github.com/photoprism/photoprism/internal/entity"
)
// SetDownloadFileID updates the local file id for remote downloads.
func (q *Query) SetDownloadFileID(filename string, fileId uint) error {
if len(filename) == 0 {
return errors.New("sync: can't update, filename empty")
}
// TODO: Might break on Windows
if filename[0] != os.PathSeparator {
filename = string(os.PathSeparator) + filename
}
result := q.db.Model(entity.FileSync{}).
Where("remote_name = ? AND status = ? AND file_id = 0", filename, entity.FileSyncDownloaded).
Update("file_id", fileId)
return result.Error
}

View file

@ -166,7 +166,7 @@ func (c Client) DownloadDir(from, to string, recursive, force bool) (errs []erro
// CreateDir recursively creates directories if they don't exist. // CreateDir recursively creates directories if they don't exist.
func (c Client) CreateDir(dir string) error { func (c Client) CreateDir(dir string) error {
if dir == "" || dir == "/" { if dir == "" || dir == "/" || dir == "." {
return nil return nil
} }

View file

@ -2,7 +2,7 @@ package workers
import ( import (
"fmt" "fmt"
"os" "path"
"path/filepath" "path/filepath"
"github.com/photoprism/photoprism/internal/config" "github.com/photoprism/photoprism/internal/config"
@ -77,14 +77,14 @@ func (s *Share) Start() (err error) {
dir := filepath.Dir(file.RemoteName) dir := filepath.Dir(file.RemoteName)
if _, ok := existingDirs[dir]; ok == false && dir != "/" && dir != "." { if _, ok := existingDirs[dir]; !ok {
if err := client.CreateDir(dir); err != nil { if err := client.CreateDir(dir); err != nil {
log.Errorf("share: could not create directory %s", dir) log.Errorf("share: could not create directory %s", dir)
continue continue
} }
} }
srcFileName := s.conf.OriginalsPath() + string(os.PathSeparator) + file.File.FileName srcFileName := path.Join(s.conf.OriginalsPath(), file.File.FileName)
if a.ShareSize != "" { if a.ShareSize != "" {
thumbType, ok := thumb.Types[a.ShareSize] thumbType, ok := thumb.Types[a.ShareSize]

View file

@ -9,12 +9,8 @@ import (
"github.com/photoprism/photoprism/internal/event" "github.com/photoprism/photoprism/internal/event"
"github.com/photoprism/photoprism/internal/form" "github.com/photoprism/photoprism/internal/form"
"github.com/photoprism/photoprism/internal/mutex" "github.com/photoprism/photoprism/internal/mutex"
"github.com/photoprism/photoprism/internal/photoprism"
"github.com/photoprism/photoprism/internal/query" "github.com/photoprism/photoprism/internal/query"
"github.com/photoprism/photoprism/internal/remote" "github.com/photoprism/photoprism/internal/remote"
"github.com/photoprism/photoprism/internal/remote/webdav"
"github.com/photoprism/photoprism/internal/service"
"github.com/photoprism/photoprism/pkg/fs"
) )
// Sync represents a sync worker. // Sync represents a sync worker.
@ -23,8 +19,6 @@ type Sync struct {
q *query.Query q *query.Query
} }
type Downloads map[string][]entity.FileSync
// NewSync returns a new service sync worker. // NewSync returns a new service sync worker.
func NewSync(conf *config.Config) *Sync { func NewSync(conf *config.Config) *Sync {
return &Sync{ return &Sync{
@ -33,11 +27,6 @@ func NewSync(conf *config.Config) *Sync {
} }
} }
// DownloadPath returns a temporary download path.
func (s *Sync) DownloadPath() string {
return s.conf.TempPath() + "/sync"
}
// Start starts the sync worker. // Start starts the sync worker.
func (s *Sync) Start() (err error) { func (s *Sync) Start() (err error) {
if err := mutex.Sync.Start(); err != nil { if err := mutex.Sync.Start(); err != nil {
@ -68,7 +57,7 @@ func (s *Sync) Start() (err error) {
switch a.SyncStatus { switch a.SyncStatus {
case entity.AccountSyncStatusRefresh: case entity.AccountSyncStatusRefresh:
if complete, err := s.getRemoteFiles(a); err != nil { if complete, err := s.refresh(a); err != nil {
a.AccErrors++ a.AccErrors++
a.AccError = err.Error() a.AccError = err.Error()
} else if complete { } else if complete {
@ -130,229 +119,3 @@ func (s *Sync) Start() (err error) {
return err return err
} }
func (s *Sync) getRemoteFiles(a entity.Account) (complete bool, err error) {
if a.AccType != remote.ServiceWebDAV {
return false, nil
}
db := s.conf.Db()
client := webdav.New(a.AccURL, a.AccUser, a.AccPass)
subDirs, err := client.Directories(a.SyncPath, true)
if err != nil {
log.Error(err)
return false, err
}
dirs := append(subDirs.Abs(), a.SyncPath)
for _, dir := range dirs {
if mutex.Sync.Canceled() {
return false, nil
}
files, err := client.Files(dir)
if err != nil {
log.Error(err)
return false, err
}
for _, file := range files {
if mutex.Sync.Canceled() {
return false, nil
}
f := entity.NewFileSync(a.ID, file.Abs)
f.Status = entity.FileSyncIgnore
f.RemoteDate = file.Date
f.RemoteSize = file.Size
// Select supported types for download
mediaType := fs.GetMediaType(file.Name)
switch mediaType {
case fs.MediaImage:
f.Status = entity.FileSyncNew
case fs.MediaSidecar:
f.Status = entity.FileSyncNew
case fs.MediaRaw:
if a.SyncRaw {
f.Status = entity.FileSyncNew
}
}
f.FirstOrCreate(db)
if f.Status == entity.FileSyncIgnore && mediaType == fs.MediaRaw && a.SyncRaw {
f.Status = entity.FileSyncNew
db.Save(&f)
}
if f.Status == entity.FileSyncDownloaded && !f.RemoteDate.Equal(file.Date) {
f.Status = entity.FileSyncNew
f.RemoteDate = file.Date
f.RemoteSize = file.Size
db.Save(&f)
}
}
}
return true, nil
}
func (s *Sync) relatedDownloads(a entity.Account) (result Downloads, err error) {
result = make(Downloads)
maxResults := 1000
// Get remote files from database
files, err := s.q.FileSyncs(a.ID, entity.FileSyncNew, maxResults)
if err != nil {
return result, err
}
// Group results by directory and base name
for i, file := range files {
k := fs.AbsBase(file.RemoteName)
result[k] = append(result[k], file)
// Skip last 50 to make sure we see all related files
if i > (maxResults - 50) {
return result, nil
}
}
return result, nil
}
func (s *Sync) download(a entity.Account) (complete bool, err error) {
db := s.conf.Db()
// Set up index worker
indexJobs := make(chan photoprism.IndexJob)
go photoprism.IndexWorker(indexJobs)
defer close(indexJobs)
// Set up import worker
importJobs := make(chan photoprism.ImportJob)
go photoprism.ImportWorker(importJobs)
defer close(importJobs)
relatedFiles, err := s.relatedDownloads(a)
if err != nil {
log.Errorf("sync: %s", err.Error())
return false, err
}
if len(relatedFiles) == 0 {
log.Infof("sync: download complete for %s", a.AccName)
event.Publish("sync.downloaded", event.Data{"account": a})
return true, nil
}
log.Infof("sync: downloading from %s", a.AccName)
client := webdav.New(a.AccURL, a.AccUser, a.AccPass)
var baseDir string
if a.SyncFilenames {
baseDir = s.conf.OriginalsPath()
} else {
baseDir = fmt.Sprintf("%s/%d", s.DownloadPath(), a.ID)
}
done := make(map[string]bool)
for _, files := range relatedFiles {
for _, file := range files {
if mutex.Sync.Canceled() {
return false, nil
}
if file.Errors > a.RetryLimit {
log.Warnf("sync: downloading %s failed more than %d times", file.RemoteName, a.RetryLimit)
continue
}
localName := baseDir + file.RemoteName
if err := client.Download(file.RemoteName, localName, false); err != nil {
log.Errorf("sync: %s", err.Error())
file.Errors++
file.Error = err.Error()
} else {
log.Infof("sync: downloaded %s from %s", file.RemoteName, a.AccName)
file.Status = entity.FileSyncDownloaded
}
if mutex.Sync.Canceled() {
return false, nil
}
if err := db.Save(&file).Error; err != nil {
log.Errorf("sync: %s", err.Error())
}
}
for _, file := range files {
mf, err := photoprism.NewMediaFile(baseDir + file.RemoteName)
if err != nil || !mf.IsPhoto() {
continue
}
related, err := mf.RelatedFiles()
if err != nil {
log.Warnf("sync: %s", err.Error())
continue
}
var rf photoprism.MediaFiles
for _, f := range related.Files {
if done[f.FileName()] {
continue
}
rf = append(rf, f)
done[f.FileName()] = true
}
done[mf.FileName()] = true
related.Files = rf
if a.SyncFilenames {
log.Infof("sync: indexing %s and related files", file.RemoteName)
indexJobs <- photoprism.IndexJob{
FileName: mf.FileName(),
Related: related,
IndexOpt: photoprism.IndexOptionsAll(),
Ind: service.Index(),
}
} else {
log.Infof("sync: importing %s and related files", file.RemoteName)
importJobs <- photoprism.ImportJob{
FileName: mf.FileName(),
Related: related,
IndexOpt: photoprism.IndexOptionsAll(),
ImportOpt: photoprism.ImportOptionsMove(baseDir),
Imp: service.Import(),
}
}
}
}
return false, nil
}
func (s *Sync) upload(a entity.Account) (complete bool, err error) {
// TODO: Not implemented yet
return false, nil
}

View file

@ -0,0 +1,170 @@
package workers
import (
"fmt"
"github.com/photoprism/photoprism/internal/entity"
"github.com/photoprism/photoprism/internal/event"
"github.com/photoprism/photoprism/internal/mutex"
"github.com/photoprism/photoprism/internal/photoprism"
"github.com/photoprism/photoprism/internal/remote/webdav"
"github.com/photoprism/photoprism/internal/service"
"github.com/photoprism/photoprism/pkg/fs"
)
type Downloads map[string][]entity.FileSync
// downloadPath returns a temporary download path.
func (s *Sync) downloadPath() string {
return s.conf.TempPath() + "/sync"
}
func (s *Sync) relatedDownloads(a entity.Account) (result Downloads, err error) {
result = make(Downloads)
maxResults := 1000
// Get remote files from database
files, err := s.q.FileSyncs(a.ID, entity.FileSyncNew, maxResults)
if err != nil {
return result, err
}
// Group results by directory and base name
for i, file := range files {
k := fs.AbsBase(file.RemoteName)
result[k] = append(result[k], file)
// Skip last 50 to make sure we see all related files
if i > (maxResults - 50) {
return result, nil
}
}
return result, nil
}
// Downloads remote files in batches and imports / indexes them
func (s *Sync) download(a entity.Account) (complete bool, err error) {
db := s.conf.Db()
// Set up index worker
indexJobs := make(chan photoprism.IndexJob)
go photoprism.IndexWorker(indexJobs)
defer close(indexJobs)
// Set up import worker
importJobs := make(chan photoprism.ImportJob)
go photoprism.ImportWorker(importJobs)
defer close(importJobs)
relatedFiles, err := s.relatedDownloads(a)
if err != nil {
log.Errorf("sync: %s", err.Error())
return false, err
}
if len(relatedFiles) == 0 {
log.Infof("sync: download complete for %s", a.AccName)
event.Publish("sync.downloaded", event.Data{"account": a})
return true, nil
}
log.Infof("sync: downloading from %s", a.AccName)
client := webdav.New(a.AccURL, a.AccUser, a.AccPass)
var baseDir string
if a.SyncFilenames {
baseDir = s.conf.OriginalsPath()
} else {
baseDir = fmt.Sprintf("%s/%d", s.downloadPath(), a.ID)
}
done := make(map[string]bool)
for _, files := range relatedFiles {
for _, file := range files {
if mutex.Sync.Canceled() {
return false, nil
}
if file.Errors > a.RetryLimit {
log.Warnf("sync: downloading %s failed more than %d times", file.RemoteName, a.RetryLimit)
continue
}
localName := baseDir + file.RemoteName
if err := client.Download(file.RemoteName, localName, false); err != nil {
log.Errorf("sync: %s", err.Error())
file.Errors++
file.Error = err.Error()
} else {
log.Infof("sync: downloaded %s from %s", file.RemoteName, a.AccName)
file.Status = entity.FileSyncDownloaded
}
if mutex.Sync.Canceled() {
return false, nil
}
if err := db.Save(&file).Error; err != nil {
log.Errorf("sync: %s", err.Error())
}
}
for _, file := range files {
mf, err := photoprism.NewMediaFile(baseDir + file.RemoteName)
if err != nil || !mf.IsPhoto() {
continue
}
related, err := mf.RelatedFiles()
if err != nil {
log.Warnf("sync: %s", err.Error())
continue
}
var rf photoprism.MediaFiles
for _, f := range related.Files {
if done[f.FileName()] {
continue
}
rf = append(rf, f)
done[f.FileName()] = true
}
done[mf.FileName()] = true
related.Files = rf
if a.SyncFilenames {
log.Infof("sync: indexing %s and related files", file.RemoteName)
indexJobs <- photoprism.IndexJob{
FileName: mf.FileName(),
Related: related,
IndexOpt: photoprism.IndexOptionsAll(),
Ind: service.Index(),
}
} else {
log.Infof("sync: importing %s and related files", file.RemoteName)
importJobs <- photoprism.ImportJob{
FileName: mf.FileName(),
Related: related,
IndexOpt: photoprism.IndexOptionsAll(),
ImportOpt: photoprism.ImportOptionsMove(baseDir),
Imp: service.Import(),
}
}
}
}
return false, nil
}

View file

@ -0,0 +1,82 @@
package workers
import (
"github.com/photoprism/photoprism/internal/entity"
"github.com/photoprism/photoprism/internal/mutex"
"github.com/photoprism/photoprism/internal/remote"
"github.com/photoprism/photoprism/internal/remote/webdav"
"github.com/photoprism/photoprism/pkg/fs"
)
// Updates the local list of remote files so that they can be downloaded in batches
func (s *Sync) refresh(a entity.Account) (complete bool, err error) {
if a.AccType != remote.ServiceWebDAV {
return false, nil
}
db := s.conf.Db()
client := webdav.New(a.AccURL, a.AccUser, a.AccPass)
subDirs, err := client.Directories(a.SyncPath, true)
if err != nil {
log.Error(err)
return false, err
}
dirs := append(subDirs.Abs(), a.SyncPath)
for _, dir := range dirs {
if mutex.Sync.Canceled() {
return false, nil
}
files, err := client.Files(dir)
if err != nil {
log.Error(err)
return false, err
}
for _, file := range files {
if mutex.Sync.Canceled() {
return false, nil
}
f := entity.NewFileSync(a.ID, file.Abs)
f.Status = entity.FileSyncIgnore
f.RemoteDate = file.Date
f.RemoteSize = file.Size
// Select supported types for download
mediaType := fs.GetMediaType(file.Name)
switch mediaType {
case fs.MediaImage:
f.Status = entity.FileSyncNew
case fs.MediaSidecar:
f.Status = entity.FileSyncNew
case fs.MediaRaw:
if a.SyncRaw {
f.Status = entity.FileSyncNew
}
}
f.FirstOrCreate(db)
if f.Status == entity.FileSyncIgnore && mediaType == fs.MediaRaw && a.SyncRaw {
f.Status = entity.FileSyncNew
db.Save(&f)
}
if f.Status == entity.FileSyncDownloaded && !f.RemoteDate.Equal(file.Date) {
f.Status = entity.FileSyncNew
f.RemoteDate = file.Date
f.RemoteSize = file.Size
db.Save(&f)
}
}
}
return true, nil
}

View file

@ -0,0 +1,77 @@
package workers
import (
"path"
"path/filepath"
"time"
"github.com/photoprism/photoprism/internal/entity"
"github.com/photoprism/photoprism/internal/event"
"github.com/photoprism/photoprism/internal/mutex"
"github.com/photoprism/photoprism/internal/remote/webdav"
)
// Uploads local files to a remote account
func (s *Sync) upload(a entity.Account) (complete bool, err error) {
db := s.conf.Db()
q := s.q
maxResults := 250
// Get upload file list from database
files, err := q.AccountUploads(a, maxResults)
if err != nil {
return false, err
}
if len(files) == 0 {
log.Infof("sync: upload complete for %s", a.AccName)
event.Publish("sync.uploaded", event.Data{"account": a})
return true, nil
}
client := webdav.New(a.AccURL, a.AccUser, a.AccPass)
existingDirs := make(map[string]string)
for _, file := range files {
if mutex.Sync.Canceled() {
return false, nil
}
fileName := path.Join(s.conf.OriginalsPath(), file.FileName)
remoteName := path.Join(a.SyncPath, file.FileName)
remoteDir := filepath.Dir(remoteName)
if _, ok := existingDirs[remoteDir]; !ok {
if err := client.CreateDir(remoteDir); err != nil {
log.Errorf("sync: could not create remote directory %s", remoteDir)
continue // try again next time
}
}
if err := client.Upload(fileName, remoteName); err != nil {
log.Errorf("sync: %s", err.Error())
continue // try again next time
}
log.Infof("sync: uploaded %s to %s on %s", fileName, remoteName, a.AccName)
fileSync := entity.NewFileSync(a.ID, remoteName)
fileSync.Status = entity.FileSyncUploaded
fileSync.RemoteDate = time.Now()
fileSync.RemoteSize = file.FileSize
fileSync.FileID = file.ID
fileSync.Error = ""
fileSync.Errors = 0
if mutex.Sync.Canceled() {
return false, nil
}
if err := db.Save(&fileSync).Error; err != nil {
log.Errorf("sync: %s", err.Error())
}
}
return false, nil
}