Implement share worker #225

Signed-off-by: Michael Mayer <michael@liquidbytes.net>
This commit is contained in:
Michael Mayer 2020-04-03 18:08:49 +02:00
parent 15d32016c6
commit 2e5840f3b0
18 changed files with 261 additions and 64 deletions

View file

@ -31,4 +31,4 @@ INSERT INTO labels (id, label_uuid, label_slug, label_name, label_priority, labe
INSERT INTO labels (id, label_uuid, label_slug, label_name, label_priority, label_favorite) VALUES ('3', '14', 'cow', 'COW', -1, 1); INSERT INTO labels (id, label_uuid, label_slug, label_name, label_priority, label_favorite) VALUES ('3', '14', 'cow', 'COW', -1, 1);
INSERT INTO photos_labels (photo_id, label_id, label_uncertainty, label_source) VALUES ('1', '1', '38', 'image'); INSERT INTO photos_labels (photo_id, label_id, label_uncertainty, label_source) VALUES ('1', '1', '38', 'image');
INSERT INTO photos_labels (photo_id, label_id, label_uncertainty, label_source) VALUES ('1', '2', '10', 'image'); INSERT INTO photos_labels (photo_id, label_id, label_uncertainty, label_source) VALUES ('1', '2', '10', 'image');
INSERT INTO accounts (id, acc_name, acc_owner, acc_url, acc_type, acc_key, acc_user, acc_pass, acc_error, acc_share, acc_sync, retry_limit, share_path, share_size, share_expires, share_exif, share_sidecar, sync_path, sync_interval, sync_upload, sync_download, sync_delete, sync_raw, sync_video, sync_sidecar, sync_start, synced_at, created_at, updated_at, deleted_at) VALUES (1, 'Test Account', 'Admin', 'http://webdav-dummy/', 'webdav', '', 'admin', 'photoprism', null, true, false, 3, '/Photos', null, null, true, false, null, null, null, null, null, null, null, null, null, null, '2020-03-06 02:06:51', '2020-03-28 14:06:00', null); INSERT INTO accounts (id, acc_name, acc_owner, acc_url, acc_type, acc_key, acc_user, acc_pass, acc_error, acc_share, acc_sync, retry_limit, share_path, share_size, share_expires, sync_path, sync_interval, sync_upload, sync_download, sync_delete, sync_raw, sync_video, sync_sidecar, sync_start, synced_at, created_at, updated_at, deleted_at) VALUES (1, 'Test Account', 'Admin', 'http://webdav-dummy/', 'webdav', '', 'admin', 'photoprism', null, true, false, 3, '/Photos', null, null, null, null, null, null, null, null, null, null, null, null, '2020-03-06 02:06:51', '2020-03-28 14:06:00', null);

View file

@ -113,26 +113,6 @@
:items="items.expires"> :items="items.expires">
</v-select> </v-select>
</v-flex> </v-flex>
<v-flex xs12 sm6 class="px-2">
<v-checkbox
:disabled="!model.AccShare"
browser-autocomplete="off"
hide-details
color="secondary-dark"
:label="label.ShareExif"
v-model="model.ShareExif"
></v-checkbox>
</v-flex>
<v-flex xs12 sm6 class="px-2">
<v-checkbox
:disabled="!model.AccShare"
browser-autocomplete="off"
hide-details
color="secondary-dark"
:label="label.ShareSidecar"
v-model="model.ShareSidecar"
></v-checkbox>
</v-flex>
</v-layout> </v-layout>
<v-layout row wrap v-else-if="scope === 'sync'"> <v-layout row wrap v-else-if="scope === 'sync'">
<v-flex xs12 sm6 class="pa-2"> <v-flex xs12 sm6 class="pa-2">
@ -376,8 +356,6 @@
SharePath: this.$gettext("Default Location"), SharePath: this.$gettext("Default Location"),
ShareSize: this.$gettext("Size"), ShareSize: this.$gettext("Size"),
ShareExpires: this.$gettext("Expires"), ShareExpires: this.$gettext("Expires"),
ShareExif: this.$gettext("Include metadata"),
ShareSidecar: this.$gettext("Include sidecar files"),
SyncPath: this.$gettext("Location"), SyncPath: this.$gettext("Location"),
SyncInterval: this.$gettext("Interval"), SyncInterval: this.$gettext("Interval"),
SyncStart: this.$gettext("Start"), SyncStart: this.$gettext("Start"),

View file

@ -18,10 +18,8 @@ class Account extends Abstract {
AccSync: false, AccSync: false,
RetryLimit: 3, RetryLimit: 3,
SharePath: "/", SharePath: "/",
ShareSize: "fit_2048", ShareSize: "",
ShareExpires: 0, ShareExpires: 0,
ShareExif: true,
ShareSidecar: false,
SyncPath: "/", SyncPath: "/",
SyncInterval: 86400, SyncInterval: 86400,
SyncUpload: false, SyncUpload: false,

View file

@ -138,7 +138,7 @@ func ShareWithAccount(router *gin.RouterGroup, conf *config.Config) {
return return
} }
w := webdav.Connect(m.AccURL, m.AccUser, m.AccPass) w := webdav.New(m.AccURL, m.AccUser, m.AccPass)
if err := w.CreateDir(dst); err != nil { if err := w.CreateDir(dst); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": txt.UcFirst(err.Error())}) c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": txt.UcFirst(err.Error())})

View file

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/photoprism/photoprism/internal/config" "github.com/photoprism/photoprism/internal/config"
"github.com/photoprism/photoprism/internal/photoprism"
"github.com/photoprism/photoprism/internal/server" "github.com/photoprism/photoprism/internal/server"
"github.com/photoprism/photoprism/pkg/fs" "github.com/photoprism/photoprism/pkg/fs"
"github.com/sevlyar/go-daemon" "github.com/sevlyar/go-daemon"
@ -115,11 +116,15 @@ func startAction(ctx *cli.Context) error {
// start web server // start web server
go server.Start(cctx, conf) go server.Start(cctx, conf)
// start share & sync service workers
stop := photoprism.ServiceWorkers(conf)
// set up proper shutdown of daemon and web server // set up proper shutdown of daemon and web server
quit := make(chan os.Signal) quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit <-quit
stop <- true
log.Info("shutting down...") log.Info("shutting down...")
conf.Shutdown() conf.Shutdown()
cancel() cancel()

View file

@ -199,6 +199,8 @@ func (c *Config) Init(ctx context.Context) error {
// Shutdown services and workers. // Shutdown services and workers.
func (c *Config) Shutdown() { func (c *Config) Shutdown() {
mutex.Worker.Cancel() mutex.Worker.Cancel()
mutex.Share.Cancel()
mutex.Sync.Cancel()
if err := c.CloseDb(); err != nil { if err := c.CloseDb(); err != nil {
log.Errorf("could not close database connection: %s", err) log.Errorf("could not close database connection: %s", err)

View file

@ -26,13 +26,12 @@ type Account struct {
AccError string `gorm:"type:varbinary(512);"` AccError string `gorm:"type:varbinary(512);"`
AccShare bool AccShare bool
AccSync bool AccSync bool
RetryLimit uint RetryLimit int
SharePath string `gorm:"type:varbinary(256);"` SharePath string `gorm:"type:varbinary(256);"`
ShareSize string `gorm:"type:varbinary(16);"` ShareSize string `gorm:"type:varbinary(16);"`
ShareExpires uint ShareExpires uint
ShareExif bool
ShareSidecar bool
SyncPath string `gorm:"type:varbinary(256);"` SyncPath string `gorm:"type:varbinary(256);"`
SyncStatus string `gorm:"type:varbinary(16);"`
SyncInterval uint SyncInterval uint
SyncUpload bool SyncUpload bool
SyncDownload bool SyncDownload bool
@ -87,7 +86,7 @@ func (m *Account) Delete(db *gorm.DB) error {
// Directories returns a list of directories or albums in an account. // Directories returns a list of directories or albums in an account.
func (m *Account) Directories() (result fs.FileInfos, err error) { func (m *Account) Directories() (result fs.FileInfos, err error) {
if m.AccType == string(service.TypeWebDAV) { if m.AccType == string(service.TypeWebDAV) {
c := webdav.Connect(m.AccURL, m.AccUser, m.AccPass) c := webdav.New(m.AccURL, m.AccUser, m.AccPass)
result, err = c.Directories("/", true) result, err = c.Directories("/", true)
} }

View file

@ -1,13 +1,19 @@
package entity package entity
import ( import (
"database/sql"
"time" "time"
"github.com/jinzhu/gorm" "github.com/jinzhu/gorm"
"github.com/photoprism/photoprism/internal/mutex" "github.com/photoprism/photoprism/internal/mutex"
) )
const (
FileShareNew = "new"
FileShareError = "error"
FileShareShared = "shared"
FileShareRemoved = "removed"
)
// FileShare represents a one-to-many relation between File and Account for pushing files to remote services. // FileShare represents a one-to-many relation between File and Account for pushing files to remote services.
type FileShare struct { type FileShare struct {
FileID uint `gorm:"primary_key;auto_increment:false"` FileID uint `gorm:"primary_key;auto_increment:false"`
@ -15,10 +21,9 @@ type FileShare struct {
RemoteName string `gorm:"primary_key;auto_increment:false;type:varbinary(256)"` RemoteName string `gorm:"primary_key;auto_increment:false;type:varbinary(256)"`
Status string `gorm:"type:varbinary(16);"` Status string `gorm:"type:varbinary(16);"`
Error string `gorm:"type:varbinary(512);"` Error string `gorm:"type:varbinary(512);"`
Errors int
File *File File *File
Account *Account Account *Account
SharedAt sql.NullTime
ExpiresAt sql.NullTime
CreatedAt time.Time CreatedAt time.Time
UpdatedAt time.Time UpdatedAt time.Time
} }

View file

@ -1,7 +1,6 @@
package entity package entity
import ( import (
"database/sql"
"time" "time"
"github.com/jinzhu/gorm" "github.com/jinzhu/gorm"
@ -17,9 +16,9 @@ type FileSync struct {
RemoteSize int64 RemoteSize int64
Status string `gorm:"type:varbinary(16);"` Status string `gorm:"type:varbinary(16);"`
Error string `gorm:"type:varbinary(512);"` Error string `gorm:"type:varbinary(512);"`
Errors int
File *File File *File
Account *Account Account *Account
SyncedAt sql.NullTime
CreatedAt time.Time CreatedAt time.Time
UpdatedAt time.Time UpdatedAt time.Time
} }

View file

@ -23,8 +23,6 @@ type Account struct {
SharePath string `json:"SharePath"` SharePath string `json:"SharePath"`
ShareSize string `json:"ShareSize"` ShareSize string `json:"ShareSize"`
ShareExpires uint `json:"ShareExpires"` ShareExpires uint `json:"ShareExpires"`
ShareExif bool `json:"ShareExif"`
ShareSidecar bool `json:"ShareSidecar"`
SyncPath string `json:"SyncPath"` SyncPath string `json:"SyncPath"`
SyncInterval uint `json:"SyncInterval"` SyncInterval uint `json:"SyncInterval"`
SyncUpload bool `json:"SyncUpload"` SyncUpload bool `json:"SyncUpload"`

View file

@ -4,6 +4,9 @@ import (
"sync" "sync"
) )
var Db = sync.Mutex{} var (
Db = sync.Mutex{}
var Worker = Busy{} Worker = Busy{}
Sync = Busy{}
Share = Busy{}
)

View file

@ -0,0 +1,48 @@
package photoprism
import (
"time"
"github.com/photoprism/photoprism/internal/config"
"github.com/photoprism/photoprism/internal/mutex"
)
func ServiceWorkers(conf *config.Config) chan bool {
ticker := time.NewTicker(5 * time.Minute)
stop := make(chan bool, 1)
go func() {
for {
select {
case <-stop:
log.Info("stopping service workers")
ticker.Stop()
mutex.Share.Cancel()
mutex.Sync.Cancel()
return
case <-ticker.C:
if !mutex.Share.Busy() {
go func() {
// Start
s := NewShare(conf)
if err := s.Start(); err != nil {
log.Error(err)
}
}()
}
if !mutex.Sync.Busy() {
go func() {
// Start
s := NewSync(conf)
if err := s.Start(); err != nil {
log.Error(err)
}
}()
}
}
}
}()
return stop
}

View file

@ -0,0 +1,104 @@
package photoprism
import (
"fmt"
"os"
"github.com/photoprism/photoprism/internal/config"
"github.com/photoprism/photoprism/internal/entity"
"github.com/photoprism/photoprism/internal/event"
"github.com/photoprism/photoprism/internal/form"
"github.com/photoprism/photoprism/internal/mutex"
"github.com/photoprism/photoprism/internal/query"
"github.com/photoprism/photoprism/internal/service"
"github.com/photoprism/photoprism/internal/service/webdav"
"github.com/photoprism/photoprism/internal/thumb"
)
// Share represents a share worker.
type Share struct {
conf *config.Config
}
// NewShare returns a new service share worker.
func NewShare(conf *config.Config) *Share {
return &Share{conf: conf}
}
// Start starts the share worker.
func (s *Share) Start() (err error) {
if err := mutex.Share.Start(); err != nil {
event.Error(fmt.Sprintf("share: %s", err.Error()))
return err
}
defer mutex.Share.Stop()
f := form.AccountSearch{
Share: true,
}
db := s.conf.Db()
q := query.New(db)
accounts, err := q.Accounts(f)
for _, a := range accounts {
if a.AccType != service.TypeWebDAV {
continue
}
files, err := q.FileShares(a.ID, entity.FileShareNew)
if err != nil {
log.Errorf("share: %s", err.Error())
continue
}
if len(files) == 0 {
continue
}
client := webdav.New(a.AccURL, a.AccUser, a.AccPass)
for _, file := range files {
srcFileName := s.conf.OriginalsPath() + string(os.PathSeparator) + file.File.FileName
if a.ShareSize != "" {
thumbType, ok := thumb.Types[a.ShareSize]
if !ok {
log.Errorf("share: invalid size %s", a.ShareSize)
continue
}
srcFileName, err = thumb.FromFile(srcFileName, file.File.FileHash, s.conf.ThumbnailsPath(), thumbType.Width, thumbType.Height, thumbType.Options...)
if err != nil {
log.Errorf("share: %s", err)
continue
}
}
if err := client.Upload(srcFileName, file.RemoteName); err != nil {
log.Errorf("share: %s", err.Error())
file.Errors++
file.Error = err.Error()
} else {
file.Errors = 0
file.Error = ""
file.Status = entity.FileShareShared
}
if a.RetryLimit >= 0 && file.Errors > a.RetryLimit {
file.Status = entity.FileShareError
}
if err := db.Save(&file).Error; err != nil {
log.Errorf("share: %s", err.Error())
}
}
}
return err
}

View file

@ -0,0 +1,33 @@
package photoprism
import (
"fmt"
"github.com/photoprism/photoprism/internal/config"
"github.com/photoprism/photoprism/internal/event"
"github.com/photoprism/photoprism/internal/mutex"
)
// Sync represents a sync worker.
type Sync struct {
conf *config.Config
}
// NewSync returns a new service sync worker.
func NewSync(conf *config.Config) *Sync {
return &Sync{conf: conf}
}
// Start starts the sync worker.
func (c *Sync) Start() (err error) {
if err := mutex.Sync.Start(); err != nil {
event.Error(fmt.Sprintf("import: %s", err.Error()))
return err
}
defer mutex.Sync.Stop()
log.Info("sync: start")
return err
}

View file

@ -0,0 +1,27 @@
package query
import "github.com/photoprism/photoprism/internal/entity"
// FileShares
func (q *Query) FileShares(accountId uint, status string) (result []entity.FileShare, err error) {
s := q.db.Where(&entity.FileShare{})
if accountId > 0 {
s = s.Where("account_id = ?", accountId)
}
if status != "" {
s = s.Where("status = ?", status)
}
s = s.Order("created_at ASC")
s = s.Limit(100).Offset(0)
s = s.Preload("File")
if err := s.Find(&result).Error; err != nil {
return result, err
}
return result, nil
}

View file

@ -20,21 +20,19 @@ import (
var log = event.Log var log = event.Log
var client = &http.Client{} var client = &http.Client{}
type Type string
const ( const (
TypeWeb Type = "web" TypeWeb = "web"
TypeWebDAV Type = "webdav" TypeWebDAV = "webdav"
TypeFacebook Type = "facebook" TypeFacebook = "facebook"
TypeTwitter Type = "twitter" TypeTwitter = "twitter"
TypeFlickr Type = "flickr" TypeFlickr = "flickr"
TypeInstagram Type = "instagram" TypeInstagram = "instagram"
TypeEyeEm Type = "eyeem" TypeEyeEm = "eyeem"
TypeTelegram Type = "telegram" TypeTelegram = "telegram"
TypeWhatsApp Type = "whatsapp" TypeWhatsApp = "whatsapp"
TypeGooglePhotos Type = "gphotos" TypeGooglePhotos = "gphotos"
TypeGoogleDrive Type = "gdrive" TypeGoogleDrive = "gdrive"
TypeOneDrive Type = "onedrive" TypeOneDrive = "onedrive"
) )
type Account struct { type Account struct {
@ -47,7 +45,7 @@ type Account struct {
} }
type Heuristic struct { type Heuristic struct {
ServiceType Type ServiceType string
Domains []string Domains []string
Paths []string Paths []string
Method string Method string

View file

@ -24,8 +24,8 @@ type Client struct {
client *gowebdav.Client client *gowebdav.Client
} }
// Connect creates a new WebDAV client. // New creates a new WebDAV client.
func Connect(url, user, pass string) Client { func New(url, user, pass string) Client {
clt := gowebdav.NewClient(url, user, pass) clt := gowebdav.NewClient(url, user, pass)
result := Client{client: clt} result := Client{client: clt}

View file

@ -16,13 +16,13 @@ const (
) )
func TestConnect(t *testing.T) { func TestConnect(t *testing.T) {
c := Connect(testUrl, testUser, testPass) c := New(testUrl, testUser, testPass)
assert.IsType(t, Client{}, c) assert.IsType(t, Client{}, c)
} }
func TestClient_Files(t *testing.T) { func TestClient_Files(t *testing.T) {
c := Connect(testUrl, testUser, testPass) c := New(testUrl, testUser, testPass)
assert.IsType(t, Client{}, c) assert.IsType(t, Client{}, c)
@ -38,7 +38,7 @@ func TestClient_Files(t *testing.T) {
} }
func TestClient_Directories(t *testing.T) { func TestClient_Directories(t *testing.T) {
c := Connect(testUrl, testUser, testPass) c := New(testUrl, testUser, testPass)
assert.IsType(t, Client{}, c) assert.IsType(t, Client{}, c)
@ -74,7 +74,7 @@ func TestClient_Directories(t *testing.T) {
} }
func TestClient_Download(t *testing.T) { func TestClient_Download(t *testing.T) {
c := Connect(testUrl, testUser, testPass) c := New(testUrl, testUser, testPass)
assert.IsType(t, Client{}, c) assert.IsType(t, Client{}, c)
@ -105,7 +105,7 @@ func TestClient_Download(t *testing.T) {
} }
func TestClient_DownloadDir(t *testing.T) { func TestClient_DownloadDir(t *testing.T) {
c := Connect(testUrl, testUser, testPass) c := New(testUrl, testUser, testPass)
assert.IsType(t, Client{}, c) assert.IsType(t, Client{}, c)
@ -135,7 +135,7 @@ func TestClient_DownloadDir(t *testing.T) {
} }
func TestClient_UploadAndDelete(t *testing.T) { func TestClient_UploadAndDelete(t *testing.T) {
c := Connect(testUrl, testUser, testPass) c := New(testUrl, testUser, testPass)
assert.IsType(t, Client{}, c) assert.IsType(t, Client{}, c)