Sync: Download remote files #225

Signed-off-by: Michael Mayer <michael@liquidbytes.net>
This commit is contained in:
Michael Mayer 2020-04-04 17:19:34 +02:00
parent 211ea8497c
commit c882b56f82
18 changed files with 383 additions and 73 deletions

View file

@ -31,4 +31,4 @@ INSERT INTO labels (id, label_uuid, label_slug, label_name, label_priority, labe
INSERT INTO labels (id, label_uuid, label_slug, label_name, label_priority, label_favorite) VALUES ('3', '14', 'cow', 'COW', -1, 1);
INSERT INTO photos_labels (photo_id, label_id, label_uncertainty, label_source) VALUES ('1', '1', '38', 'image');
INSERT INTO photos_labels (photo_id, label_id, label_uncertainty, label_source) VALUES ('1', '2', '10', 'image');
INSERT INTO accounts (id, acc_name, acc_owner, acc_url, acc_type, acc_key, acc_user, acc_pass, acc_error, acc_share, acc_sync, retry_limit, share_path, share_size, share_expires, sync_path, sync_interval, sync_upload, sync_download, sync_delete, sync_raw, sync_video, sync_sidecar, sync_start, synced_at, created_at, updated_at, deleted_at) VALUES (1, 'Test Account', 'Admin', 'http://webdav-dummy/', 'webdav', '', 'admin', 'photoprism', null, true, false, 3, '/Photos', null, null, null, null, null, null, null, null, null, null, null, null, '2020-03-06 02:06:51', '2020-03-28 14:06:00', null);
INSERT INTO accounts (id, acc_name, acc_owner, acc_url, acc_type, acc_key, acc_user, acc_pass, acc_error, acc_share, acc_sync, retry_limit, share_path, share_size, share_expires, sync_path, sync_interval, sync_upload, sync_download, sync_raw, sync_video, sync_sidecar, created_at, updated_at, deleted_at) VALUES (1, 'Test Account', 'Admin', 'http://webdav-dummy/', 'webdav', '', 'admin', 'photoprism', null, true, false, 3, '/Photos', null, null, null, null, null, null, null, null, null, '2020-03-06 02:06:51', '2020-03-28 14:06:00', null);

View file

@ -154,6 +154,15 @@
v-model="model.SyncDownload"
></v-checkbox>
</v-flex>
<v-flex xs12 sm6 class="px-2">
<v-checkbox
:disabled="!model.AccSync"
hide-details
color="secondary-dark"
:label="label.SyncFilenames"
v-model="model.SyncFilenames"
></v-checkbox>
</v-flex>
<v-flex xs12 sm6 class="px-2">
<v-checkbox
:disabled="!model.AccSync"
@ -358,8 +367,9 @@
ShareExpires: this.$gettext("Expires"),
SyncPath: this.$gettext("Location"),
SyncInterval: this.$gettext("Interval"),
SyncFilenames: this.$gettext("Preserve remote filenames"),
SyncStart: this.$gettext("Start"),
SyncDownload: this.$gettext("Import remote files"),
SyncDownload: this.$gettext("Download remote files"),
SyncUpload: this.$gettext("Upload local files"),
SyncDelete: this.$gettext("Remote delete"),
SyncRaw: this.$gettext("Sync RAW images"),

View file

@ -1,6 +1,5 @@
import Abstract from "model/abstract";
import Api from "../common/api";
import {DateTime} from "luxon";
class Account extends Abstract {
getDefaults() {
@ -14,6 +13,7 @@ class Account extends Abstract {
AccUser: "",
AccPass: "",
AccError: "",
AccErrors: 0,
AccShare: true,
AccSync: false,
RetryLimit: 3,
@ -21,15 +21,15 @@ class Account extends Abstract {
ShareSize: "",
ShareExpires: 0,
SyncPath: "/",
SyncStatus: "",
SyncInterval: 86400,
SyncDate: null,
SyncFilenames: false,
SyncUpload: false,
SyncDownload: true,
SyncDelete: false,
SyncRaw: true,
SyncVideo: true,
SyncSidecar: true,
SyncStart: null,
SyncedAt: null,
CreatedAt: "",
UpdatedAt: "",
DeletedAt: null,
@ -49,7 +49,7 @@ class Account extends Abstract {
}
Share(UUIDs, dest) {
const values = { Photos: UUIDs, Destination: dest };
const values = {Photos: UUIDs, Destination: dest};
return Api.post(this.getEntityResource() + "/share", values).then((response) => Promise.resolve(response.data));
}

View file

@ -30,7 +30,7 @@
<v-icon v-if="props.item.AccSync" color="secondary-dark">sync</v-icon>
<v-icon v-else color="secondary-dark">sync_disabled</v-icon>
</v-btn></td>
<td class="hidden-sm-and-down">{{ formatDate(props.item.SyncedAt) }}</td>
<td class="hidden-sm-and-down">{{ formatDate(props.item.SyncDate) }}</td>
<td class="hidden-xs-only text-xs-right" nowrap>
<v-btn icon small flat :ripple="false"
class="p-account-remove"
@ -94,7 +94,7 @@
{text: this.$gettext('Name'), value: 'AccName', sortable: false, align: 'left'},
{text: this.$gettext('Sharing'), value: 'AccShare', sortable: false, align: 'center'},
{text: this.$gettext('Sync'), value: 'AccSync', sortable: false, align: 'center'},
{text: this.$gettext('Synced'), value: 'SyncedAt', sortable: false, class: 'hidden-sm-and-down', align: 'left'},
{text: this.$gettext('Synced'), value: 'SyncDate', sortable: false, class: 'hidden-sm-and-down', align: 'left'},
{text: '', value: '', sortable: false, class: 'hidden-xs-only', align: 'right'},
],
};

View file

@ -75,7 +75,7 @@ func wsReader(ws *websocket.Conn, writeMutex *sync.Mutex, connId string, conf *c
func wsWriter(ws *websocket.Conn, writeMutex *sync.Mutex, connId string) {
pingTicker := time.NewTicker(15 * time.Second)
s := event.Subscribe("log.*", "notify.*", "index.*", "upload.*", "import.*", "config.*", "count.*", "photos.*", "albums.*", "labels.*")
s := event.Subscribe("log.*", "notify.*", "index.*", "upload.*", "import.*", "config.*", "count.*", "photos.*", "albums.*", "labels.*", "sync.*")
defer func() {
pingTicker.Stop()

View file

@ -13,42 +13,54 @@ import (
"github.com/ulule/deepcopier"
)
const (
AccountSyncStatusRefresh = "refresh"
AccountSyncStatusDownload = "download"
AccountSyncStatusUpload = "upload"
AccountSyncStatusSynced = "synced"
)
// Account represents a remote service account for uploading, downloading or syncing media files.
type Account struct {
ID uint `gorm:"primary_key"`
AccName string `gorm:"type:varchar(128);"`
AccOwner string `gorm:"type:varchar(128);"`
AccURL string `gorm:"type:varbinary(512);"`
AccType string `gorm:"type:varbinary(256);"`
AccKey string `gorm:"type:varbinary(256);"`
AccUser string `gorm:"type:varbinary(256);"`
AccPass string `gorm:"type:varbinary(256);"`
AccError string `gorm:"type:varbinary(512);"`
AccShare bool
AccSync bool
RetryLimit int
SharePath string `gorm:"type:varbinary(256);"`
ShareSize string `gorm:"type:varbinary(16);"`
ShareExpires int
SyncPath string `gorm:"type:varbinary(256);"`
SyncStatus string `gorm:"type:varbinary(16);"`
SyncInterval int
SyncUpload bool
SyncDownload bool
SyncDelete bool
SyncRaw bool
SyncVideo bool
SyncSidecar bool
SyncStart sql.NullTime
SyncedAt sql.NullTime `deepcopier:"skip"`
CreatedAt time.Time `deepcopier:"skip"`
UpdatedAt time.Time `deepcopier:"skip"`
DeletedAt *time.Time `deepcopier:"skip" sql:"index"`
ID uint `gorm:"primary_key"`
AccName string `gorm:"type:varchar(128);"`
AccOwner string `gorm:"type:varchar(128);"`
AccURL string `gorm:"type:varbinary(512);"`
AccType string `gorm:"type:varbinary(256);"`
AccKey string `gorm:"type:varbinary(256);"`
AccUser string `gorm:"type:varbinary(256);"`
AccPass string `gorm:"type:varbinary(256);"`
AccError string `gorm:"type:varbinary(512);"`
AccErrors int
AccShare bool
AccSync bool
RetryLimit int
SharePath string `gorm:"type:varbinary(256);"`
ShareSize string `gorm:"type:varbinary(16);"`
ShareExpires int
SyncPath string `gorm:"type:varbinary(256);"`
SyncStatus string `gorm:"type:varbinary(16);"`
SyncInterval int
SyncDate sql.NullTime `deepcopier:"skip"`
SyncUpload bool
SyncDownload bool
SyncFilenames bool
SyncRaw bool
SyncVideo bool
SyncSidecar bool
CreatedAt time.Time `deepcopier:"skip"`
UpdatedAt time.Time `deepcopier:"skip"`
DeletedAt *time.Time `deepcopier:"skip" sql:"index"`
}
// CreateAccount creates a new account entity in the database.
func CreateAccount(form form.Account, db *gorm.DB) (model *Account, err error) {
model = &Account{}
model = &Account{
ShareSize: "",
ShareExpires: 0,
RetryLimit: 3,
SyncStatus: AccountSyncStatusRefresh,
}
err = model.Save(form, db)

View file

@ -7,6 +7,12 @@ import (
"github.com/photoprism/photoprism/internal/mutex"
)
const (
FileSyncNew = "new"
FileSyncDownloaded = "downloaded"
FileSyncUploaded = "uploaded"
)
// FileSync represents a one-to-many relation between File and Account for syncing with remote services.
type FileSync struct {
FileID uint `gorm:"index;"`
@ -33,7 +39,7 @@ func NewFileSync(accountID uint, remoteName string) *FileSync {
result := &FileSync{
AccountID: accountID,
RemoteName: remoteName,
Status: "new",
Status: FileSyncNew,
}
return result

View file

@ -1,37 +1,34 @@
package form
import (
"database/sql"
"github.com/photoprism/photoprism/internal/service"
"github.com/ulule/deepcopier"
)
// Account represents a remote service account form for uploading, downloading or syncing media files.
type Account struct {
AccName string `json:"AccName"`
AccOwner string `json:"AccOwner"`
AccURL string `json:"AccURL"`
AccType string `json:"AccType"`
AccKey string `json:"AccKey"`
AccUser string `json:"AccUser"`
AccPass string `json:"AccPass"`
AccError string `json:"AccError"`
AccShare bool `json:"AccShare"`
AccSync bool `json:"AccSync"`
RetryLimit int `json:"RetryLimit"`
SharePath string `json:"SharePath"`
ShareSize string `json:"ShareSize"`
ShareExpires int `json:"ShareExpires"`
SyncPath string `json:"SyncPath"`
SyncInterval int `json:"SyncInterval"`
SyncUpload bool `json:"SyncUpload"`
SyncDownload bool `json:"SyncDownload"`
SyncDelete bool `json:"SyncDelete"`
SyncRaw bool `json:"SyncRaw"`
SyncVideo bool `json:"SyncVideo"`
SyncSidecar bool `json:"SyncSidecar"`
SyncStart sql.NullTime `json:"SyncStart"`
AccName string `json:"AccName"`
AccOwner string `json:"AccOwner"`
AccURL string `json:"AccURL"`
AccType string `json:"AccType"`
AccKey string `json:"AccKey"`
AccUser string `json:"AccUser"`
AccPass string `json:"AccPass"`
AccError string `json:"AccError"`
AccShare bool `json:"AccShare"`
AccSync bool `json:"AccSync"`
RetryLimit int `json:"RetryLimit"`
SharePath string `json:"SharePath"`
ShareSize string `json:"ShareSize"`
ShareExpires int `json:"ShareExpires"`
SyncPath string `json:"SyncPath"`
SyncInterval int `json:"SyncInterval"`
SyncUpload bool `json:"SyncUpload"`
SyncDownload bool `json:"SyncDownload"`
SyncFilenames bool `json:"SyncFilenames"`
SyncRaw bool `json:"SyncRaw"`
SyncVideo bool `json:"SyncVideo"`
SyncSidecar bool `json:"SyncSidecar"`
}
func NewAccount(m interface{}) (f Account, err error) {

View file

@ -5,6 +5,7 @@ type AccountSearch struct {
Query string `form:"q"`
Share bool `form:"share"`
Sync bool `form:"sync"`
Status string `form:"status"`
Count int `form:"count" binding:"required"`
Offset int `form:"offset"`
Order string `form:"order"`

View file

@ -8,14 +8,14 @@ import (
)
func ServiceWorkers(conf *config.Config) chan bool {
ticker := time.NewTicker(5 * time.Minute)
ticker := time.NewTicker(1 * time.Minute) // TODO
stop := make(chan bool, 1)
go func() {
for {
select {
case <-stop:
log.Info("stopping service workers")
log.Info("shutting down service workers")
ticker.Stop()
mutex.Share.Cancel()
mutex.Sync.Cancel()

View file

@ -45,6 +45,10 @@ func (s *Share) Start() (err error) {
accounts, err := q.Accounts(f)
for _, a := range accounts {
if mutex.Share.Canceled() {
return nil
}
if a.AccType != service.TypeWebDAV {
continue
}
@ -65,6 +69,10 @@ func (s *Share) Start() (err error) {
existingDirs := make(map[string]string)
for _, file := range files {
if mutex.Share.Canceled() {
return nil
}
dir := filepath.Dir(file.RemoteName)
if _, ok := existingDirs[dir]; ok == false && dir != "/" && dir != "." {
@ -106,6 +114,10 @@ func (s *Share) Start() (err error) {
file.Status = entity.FileShareError
}
if mutex.Share.Canceled() {
return nil
}
if err := db.Save(&file).Error; err != nil {
log.Errorf("share: %s", err.Error())
}
@ -113,6 +125,10 @@ func (s *Share) Start() (err error) {
}
for _, a := range accounts {
if mutex.Share.Canceled() {
return nil
}
if a.AccType != service.TypeWebDAV {
continue
}
@ -132,6 +148,10 @@ func (s *Share) Start() (err error) {
client := webdav.New(a.AccURL, a.AccUser, a.AccPass)
for _, file := range files {
if mutex.Share.Canceled() {
return nil
}
if err := client.Delete(file.RemoteName); err != nil {
file.Errors++
file.Error = err.Error()

View file

@ -2,10 +2,16 @@ package photoprism
import (
"fmt"
"time"
"github.com/photoprism/photoprism/internal/config"
"github.com/photoprism/photoprism/internal/entity"
"github.com/photoprism/photoprism/internal/event"
"github.com/photoprism/photoprism/internal/form"
"github.com/photoprism/photoprism/internal/mutex"
"github.com/photoprism/photoprism/internal/query"
"github.com/photoprism/photoprism/internal/service"
"github.com/photoprism/photoprism/internal/service/webdav"
)
// Sync represents a sync worker.
@ -19,15 +25,201 @@ func NewSync(conf *config.Config) *Sync {
}
// Start starts the sync worker.
func (c *Sync) Start() (err error) {
func (s *Sync) Start() (err error) {
if err := mutex.Sync.Start(); err != nil {
event.Error(fmt.Sprintf("import: %s", err.Error()))
event.Error(fmt.Sprintf("sync: %s", err.Error()))
return err
}
defer mutex.Sync.Stop()
log.Info("sync: start")
f := form.AccountSearch{
Sync: true,
}
db := s.conf.Db()
q := query.New(db)
accounts, err := q.Accounts(f)
for _, a := range accounts {
if a.AccType != service.TypeWebDAV {
continue
}
if a.AccErrors > a.RetryLimit {
log.Warnf("sync: %s failed more than %d times", a.AccName, a.RetryLimit)
continue
}
switch a.SyncStatus {
case entity.AccountSyncStatusRefresh:
if complete, err := s.getRemoteFiles(a); err != nil {
a.AccErrors++
a.AccError = err.Error()
} else if complete {
a.AccErrors = 0
a.AccError = ""
if a.SyncDownload {
a.SyncStatus = entity.AccountSyncStatusDownload
} else if a.SyncUpload {
a.SyncStatus = entity.AccountSyncStatusUpload
} else {
a.SyncStatus = entity.AccountSyncStatusSynced
a.SyncDate.Time = time.Now()
a.SyncDate.Valid = true
}
}
case entity.AccountSyncStatusDownload:
if complete, err := s.download(a); err != nil {
a.AccErrors++
a.AccError = err.Error()
} else if complete && a.SyncUpload {
a.SyncStatus = entity.AccountSyncStatusUpload
} else if complete {
a.SyncStatus = entity.AccountSyncStatusSynced
a.SyncDate.Time = time.Now()
a.SyncDate.Valid = true
}
case entity.AccountSyncStatusUpload:
if complete, err := s.upload(a); err != nil {
a.AccErrors++
a.AccError = err.Error()
} else if complete {
a.SyncStatus = entity.AccountSyncStatusSynced
a.SyncDate.Time = time.Now()
a.SyncDate.Valid = true
}
case entity.AccountSyncStatusSynced:
if a.SyncDate.Valid && a.SyncDate.Time.Before(time.Now().Add(time.Duration(-1*a.SyncInterval)*time.Second)) {
a.SyncStatus = entity.AccountSyncStatusRefresh
}
default:
a.SyncStatus = entity.AccountSyncStatusRefresh
}
if mutex.Sync.Canceled() {
return nil
}
if err := db.Save(&a).Error; err != nil {
log.Errorf("sync: %s", err.Error())
}
}
return err
}
func (s *Sync) getRemoteFiles(a entity.Account) (complete bool, err error) {
if a.AccType != service.TypeWebDAV {
return false, nil
}
db := s.conf.Db()
client := webdav.New(a.AccURL, a.AccUser, a.AccPass)
subDirs, err := client.Directories(a.SyncPath, true)
if err != nil {
log.Error(err)
return false, err
}
dirs := append(subDirs.Abs(), a.SyncPath)
for _, dir := range dirs {
if mutex.Sync.Canceled() {
return false, nil
}
files, err := client.Files(dir)
if err != nil {
log.Error(err)
return false, err
}
for _, file := range files {
if mutex.Sync.Canceled() {
return false, nil
}
f := entity.NewFileSync(a.ID, file.Abs)
f.RemoteDate = file.Date
f.RemoteSize = file.Size
f.FirstOrCreate(db)
if f.Status == entity.FileSyncDownloaded && !f.RemoteDate.Equal(file.Date) {
f.Status = entity.FileSyncNew
f.RemoteDate = file.Date
f.RemoteSize = file.Size
db.Save(&f)
}
}
}
return true, nil
}
func (s *Sync) download(a entity.Account) (complete bool, err error) {
db := s.conf.Db()
q := query.New(db)
files, err := q.FileSyncs(a.ID, entity.FileSyncNew)
if err != nil {
log.Errorf("sync: %s", err.Error())
return false, err
}
if len(files) == 0 {
// TODO: Subscribe event to start indexing / importing
event.Publish("sync.downloaded", event.Data{"account": a})
return true, nil
}
client := webdav.New(a.AccURL, a.AccUser, a.AccPass)
var baseDir string
if a.SyncFilenames {
baseDir = s.conf.OriginalsPath()
} else {
baseDir = fmt.Sprintf("%s/sync/%d", s.conf.ImportPath(), a.ID)
}
for _, file := range files {
if mutex.Sync.Canceled() {
return false, nil
}
if file.Errors > a.RetryLimit {
log.Warnf("sync: downloading %s failed more than %d times", file.RemoteName, a.RetryLimit)
continue
}
localName := baseDir + file.RemoteName
if err := client.Download(file.RemoteName, localName); err != nil {
file.Errors++
file.Error = err.Error()
} else {
file.Status = entity.FileSyncDownloaded
}
if mutex.Sync.Canceled() {
return false, nil
}
if err := db.Save(&file).Error; err != nil {
log.Errorf("sync: %s", err.Error())
}
}
return false, nil
}
func (s *Sync) upload(a entity.Account) (complete bool, err error) {
return false, nil
}

View file

@ -17,6 +17,10 @@ func (q *Query) Accounts(f form.AccountSearch) (result []entity.Account, err err
s = s.Where("acc_sync = 1")
}
if f.Status != "" {
s = s.Where("sync_status = ?", f.Status)
}
s = s.Order("acc_name ASC")
if f.Count > 0 && f.Count <= 1000 {

View file

@ -38,7 +38,7 @@ func (q *Query) ExpiredFileShares(account entity.Account) (result []entity.FileS
s := q.db.Where(&entity.FileShare{})
exp := time.Now().Add(time.Duration(account.ShareExpires)*time.Second)
exp := time.Now().Add(time.Duration(-1*account.ShareExpires) * time.Second)
s = s.Where("account_id = ?", account.ID)
s = s.Where("status = ?", entity.FileShareShared)

View file

@ -0,0 +1,29 @@
package query
import (
"github.com/photoprism/photoprism/internal/entity"
)
// FileSyncs returns up to 100 file syncs for a given account id and status.
func (q *Query) FileSyncs(accountId uint, status string) (result []entity.FileSync, err error) {
s := q.db.Where(&entity.FileSync{})
if accountId > 0 {
s = s.Where("account_id = ?", accountId)
}
if status != "" {
s = s.Where("status = ?", status)
}
s = s.Order("created_at ASC")
s = s.Limit(100).Offset(0)
s = s.Preload("File")
if err := s.Find(&result).Error; err != nil {
return result, err
}
return result, nil
}

View file

@ -30,7 +30,7 @@ func NewFileInfo(info os.FileInfo, dir string) FileInfo {
result := FileInfo{
Name: info.Name(),
Abs: fmt.Sprintf("%s/%s", dir, info.Name()),
Abs: fmt.Sprintf("%s/%s", dir, info.Name()),
Size: info.Size(),
Date: info.ModTime(),
Dir: info.IsDir(),
@ -46,6 +46,13 @@ func (infos FileInfos) Swap(i, j int) { infos[i], infos[j] = infos[j], infos[i]
func (infos FileInfos) Less(i, j int) bool {
return strings.Compare(infos[i].Abs, infos[j].Abs) == -1
}
func (infos FileInfos) Abs() (result []string) {
for _, info := range infos {
result = append(result, info.Abs)
}
return result
}
func NewFileInfos(infos []os.FileInfo, dir string) FileInfos {
var result FileInfos

View file

@ -1,3 +1,19 @@
sync
upload
download
temp
user
users
var
lib
share
thumb
thumbs
thumbnail
thumbnails
photos
import
export
abc
xyz
jpg

View file

@ -3,6 +3,22 @@ package txt
// Stopwords contains a list of stopwords for full-text indexing.
var Stopwords = map[string]bool{
"sync": true,
"upload": true,
"download": true,
"temp": true,
"user": true,
"users": true,
"var": true,
"lib": true,
"share": true,
"thumb": true,
"thumbs": true,
"thumbnail": true,
"thumbnails": true,
"photos": true,
"import": true,
"export": true,
"abc": true,
"xyz": true,
"jpg": true,