Faces: Run background worker only when data has been updated #3124

This may reduce server load and prevent disks from spinning up.
We welcome tests reports!

Signed-off-by: Michael Mayer <michael@photoprism.app>
This commit is contained in:
Michael Mayer 2023-03-08 12:42:57 +01:00
parent c787945732
commit 0fbb4043c6
10 changed files with 102 additions and 56 deletions

View file

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/photoprism/photoprism/internal/face" "github.com/photoprism/photoprism/internal/face"
@ -14,6 +15,7 @@ import (
) )
var faceMutex = sync.Mutex{} var faceMutex = sync.Mutex{}
var UpdateFaces = atomic.Bool{}
// Face represents the face of a Subject. // Face represents the face of a Subject.
type Face struct { type Face struct {
@ -194,12 +196,13 @@ func (m *Face) ResolveCollision(embeddings face.Embeddings) (resolved bool, err
m.MatchedAt = &m.UpdatedAt m.MatchedAt = &m.UpdatedAt
m.Collisions++ m.Collisions++
m.CollisionRadius = dist m.CollisionRadius = dist
UpdateFaces.Store(true)
return true, m.Updates(Values{"Collisions": m.Collisions, "CollisionRadius": m.CollisionRadius, "FaceKind": m.FaceKind, "UpdatedAt": m.UpdatedAt, "MatchedAt": m.MatchedAt}) return true, m.Updates(Values{"Collisions": m.Collisions, "CollisionRadius": m.CollisionRadius, "FaceKind": m.FaceKind, "UpdatedAt": m.UpdatedAt, "MatchedAt": m.MatchedAt})
} else { } else {
m.MatchedAt = nil m.MatchedAt = nil
m.Collisions++ m.Collisions++
m.CollisionRadius = dist - 0.01 m.CollisionRadius = dist - 0.01
UpdateFaces.Store(true)
} }
err = m.Updates(Values{"Collisions": m.Collisions, "CollisionRadius": m.CollisionRadius, "MatchedAt": m.MatchedAt}) err = m.Updates(Values{"Collisions": m.Collisions, "CollisionRadius": m.CollisionRadius, "MatchedAt": m.MatchedAt})
@ -278,6 +281,8 @@ func (m *Face) SetSubjectUID(subjUid string) (err error) {
m.SubjUID = subjUid m.SubjUID = subjUid
} }
UpdateFaces.Store(true)
// Update related markers. // Update related markers.
if err = Db().Model(&Marker{}). if err = Db().Model(&Marker{}).
Where("face_id = ?", m.ID). Where("face_id = ?", m.ID).
@ -297,6 +302,8 @@ func (m *Face) RefreshPhotos() error {
return fmt.Errorf("empty face id") return fmt.Errorf("empty face id")
} }
UpdateFaces.Store(true)
var err error var err error
switch DbDialect() { switch DbDialect() {
case MySQL: case MySQL:
@ -331,6 +338,8 @@ func (m *Face) Create() error {
faceMutex.Lock() faceMutex.Lock()
defer faceMutex.Unlock() defer faceMutex.Unlock()
UpdateFaces.Store(true)
return Db().Create(m).Error return Db().Create(m).Error
} }
@ -340,6 +349,8 @@ func (m *Face) Delete() error {
return fmt.Errorf("empty id") return fmt.Errorf("empty id")
} }
UpdateFaces.Store(true)
// Remove face id from markers before deleting. // Remove face id from markers before deleting.
if err := Db().Model(&Marker{}). if err := Db().Model(&Marker{}).
Where("face_id = ?", m.ID). Where("face_id = ?", m.ID).
@ -356,6 +367,8 @@ func (m *Face) Update(attr string, value interface{}) error {
return fmt.Errorf("empty id") return fmt.Errorf("empty id")
} }
UpdateFaces.Store(true)
return UnscopedDb().Model(m).Update(attr, value).Error return UnscopedDb().Model(m).Update(attr, value).Error
} }
@ -365,6 +378,8 @@ func (m *Face) Updates(values interface{}) error {
return fmt.Errorf("empty id") return fmt.Errorf("empty id")
} }
UpdateFaces.Store(true)
return UnscopedDb().Model(m).Updates(values).Error return UnscopedDb().Model(m).Updates(values).Error
} }
@ -387,6 +402,7 @@ func FirstOrCreateFace(m *Face) *Face {
} }
return &result return &result
} else if err := m.Create(); err == nil { } else if err := m.Create(); err == nil {
UpdateFaces.Store(true)
return m return m
} else if findErr = UnscopedDb().Where("id = ?", m.ID).First(&result).Error; findErr == nil && result.ID != "" { } else if findErr = UnscopedDb().Where("id = ?", m.ID).First(&result).Error; findErr == nil && result.ID != "" {
if m.SubjUID != result.SubjUID { if m.SubjUID != result.SubjUID {

View file

@ -136,17 +136,20 @@ func (m *Marker) UpdateFile(file *File) (updated bool) {
log.Errorf("faces: failed assigning marker %s to file %s (%s)", m.MarkerUID, m.FileUID, err) log.Errorf("faces: failed assigning marker %s to file %s (%s)", m.MarkerUID, m.FileUID, err)
return false return false
} else { } else {
UpdateFaces.Store(true)
return true return true
} }
} }
// Updates multiple columns in the database. // Updates multiple columns in the database.
func (m *Marker) Updates(values interface{}) error { func (m *Marker) Updates(values interface{}) error {
UpdateFaces.Store(true)
return UnscopedDb().Model(m).Updates(values).Error return UnscopedDb().Model(m).Updates(values).Error
} }
// Update updates a column in the database. // Update updates a column in the database.
func (m *Marker) Update(attr string, value interface{}) error { func (m *Marker) Update(attr string, value interface{}) error {
UpdateFaces.Store(true)
return UnscopedDb().Model(m).Update(attr, value).Error return UnscopedDb().Model(m).Update(attr, value).Error
} }
@ -192,10 +195,10 @@ func (m *Marker) SaveForm(f form.Marker) (changed bool, err error) {
} }
if changed { if changed {
return changed, m.Save() return true, m.Save()
} }
return changed, nil return false, nil
} }
// HasFace tests if the marker already has the best matching face. // HasFace tests if the marker already has the best matching face.
@ -237,6 +240,8 @@ func (m *Marker) SetFace(f *Face, dist float64) (updated bool, err error) {
return false, nil return false, nil
} }
UpdateFaces.Store(true)
// Update face with known subject from marker? // Update face with known subject from marker?
if m.SubjSrc == SrcAuto || m.SubjUID == "" || f.SubjUID != "" { if m.SubjSrc == SrcAuto || m.SubjUID == "" || f.SubjUID != "" {
// Don't update if face has a known subject, or marker subject is unknown. // Don't update if face has a known subject, or marker subject is unknown.
@ -380,6 +385,8 @@ func (m *Marker) Save() error {
return err return err
} }
UpdateFaces.Store(true)
return Db().Save(m).Error return Db().Save(m).Error
} }
@ -389,6 +396,8 @@ func (m *Marker) Create() error {
return err return err
} }
UpdateFaces.Store(true)
return Db().Create(m).Error return Db().Create(m).Error
} }
@ -458,7 +467,7 @@ func (m *Marker) ClearSubject(src string) error {
if count, err := DeleteOrphanPeople(); err != nil { if count, err := DeleteOrphanPeople(); err != nil {
log.Errorf("faces: %s while clearing subject of marker %s [%s]", err, clean.Log(m.MarkerUID), time.Since(start)) log.Errorf("faces: %s while clearing subject of marker %s [%s]", err, clean.Log(m.MarkerUID), time.Since(start))
} else if count > 0 { } else if count > 0 {
log.Debugf("faces: %s marked as missing while clearing subject of marker %s [%s]", english.Plural(count, "person", "people"), clean.Log(m.MarkerUID), time.Since(start)) log.Debugf("faces: %s flagged as missing while clearing subject of marker %s [%s]", english.Plural(count, "person", "people"), clean.Log(m.MarkerUID), time.Since(start))
} }
}() }()
@ -532,6 +541,7 @@ func (m *Marker) ClearFace() (updated bool, err error) {
return false, m.Matched() return false, m.Matched()
} }
UpdateFaces.Store(true)
updated = true updated = true
// Remove face references. // Remove face references.

View file

@ -26,6 +26,7 @@ const (
PhotoUID = byte('p') PhotoUID = byte('p')
) )
var IndexUpdateInterval = 3 * time.Hour // 3 Hours
var MetadataUpdateInterval = 24 * 3 * time.Hour // 3 Days var MetadataUpdateInterval = 24 * 3 * time.Hour // 3 Days
var MetadataEstimateInterval = 24 * 7 * time.Hour // 7 Days var MetadataEstimateInterval = 24 * 7 * time.Hour // 7 Days

View file

@ -128,7 +128,7 @@ func (m *Subject) Delete() error {
return err return err
} }
log.Infof("subject: marked %s %s as missing", TypeString(m.SubjType), clean.Log(m.SubjName)) log.Infof("subject: flagged %s as missing", TypeString(m.SubjType), clean.Log(m.SubjName))
return Db().Delete(m).Error return Db().Delete(m).Error
} }

View file

@ -47,7 +47,7 @@ func (w *Faces) Start(opt FacesOptions) (err error) {
return fmt.Errorf("face recognition is disabled") return fmt.Errorf("face recognition is disabled")
} }
if err := mutex.FacesWorker.Start(); err != nil { if err = mutex.FacesWorker.Start(); err != nil {
return err return err
} }
@ -148,6 +148,8 @@ func (w *Faces) Start(opt FacesOptions) (err error) {
log.Debugf("faces: removed %d clusters [%s]", count, time.Since(start)) log.Debugf("faces: removed %d clusters [%s]", count, time.Since(start))
} }
entity.UpdateFaces.Store(false)
return nil return nil
} }

View file

@ -185,7 +185,7 @@ func FlagHiddenPhotos() (err error) {
return err return err
} else { } else {
// Log result. // Log result.
log.Infof("index: flagged %s as hidden or missing [%s]", english.Plural(int(n), "photo", "photos"), time.Since(start)) log.Infof("index: flagged %s as hidden [%s]", english.Plural(int(n), "photo", "photos"), time.Since(start))
} }
return nil return nil

View file

@ -7,6 +7,8 @@ import (
"runtime/debug" "runtime/debug"
"time" "time"
"github.com/dustin/go-humanize/english"
"github.com/photoprism/photoprism/internal/config" "github.com/photoprism/photoprism/internal/config"
"github.com/photoprism/photoprism/internal/entity" "github.com/photoprism/photoprism/internal/entity"
"github.com/photoprism/photoprism/internal/mutex" "github.com/photoprism/photoprism/internal/mutex"
@ -17,6 +19,7 @@ import (
// Meta represents a background metadata optimization worker. // Meta represents a background metadata optimization worker.
type Meta struct { type Meta struct {
conf *config.Config conf *config.Config
lastRun time.Time
} }
// NewMeta returns a new Meta worker. // NewMeta returns a new Meta worker.
@ -25,44 +28,48 @@ func NewMeta(conf *config.Config) *Meta {
} }
// originalsPath returns the original media files path as string. // originalsPath returns the original media files path as string.
func (m *Meta) originalsPath() string { func (w *Meta) originalsPath() string {
return m.conf.OriginalsPath() return w.conf.OriginalsPath()
} }
// Start metadata optimization routine. // Start metadata optimization routine.
func (m *Meta) Start(delay, interval time.Duration, force bool) (err error) { func (w *Meta) Start(delay, interval time.Duration, force bool) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
err = fmt.Errorf("metadata: %s (panic)\nstack: %s", r, debug.Stack()) err = fmt.Errorf("index: %s (worker panic)\nstack: %s", r, debug.Stack())
log.Error(err) log.Error(err)
} }
}() }()
if err := mutex.MetaWorker.Start(); err != nil { if err = mutex.MetaWorker.Start(); err != nil {
return err return err
} }
defer mutex.MetaWorker.Stop() defer mutex.MetaWorker.Stop()
log.Debugf("metadata: running face recognition") // Check time when worker was last executed.
updateIndex := force || w.lastRun.Before(time.Now().Add(-1*entity.IndexUpdateInterval))
// Run faces worker. // Run faces worker if needed.
if w := photoprism.NewFaces(m.conf); w.Disabled() { if updateIndex || entity.UpdateFaces.Load() {
log.Debugf("metadata: skipping face recognition") log.Debugf("index: running face recognition")
} else if err := w.Start(photoprism.FacesOptions{}); err != nil { if faces := photoprism.NewFaces(w.conf); faces.Disabled() {
log.Debugf("index: skipping face recognition")
} else if err := faces.Start(photoprism.FacesOptions{}); err != nil {
log.Warn(err) log.Warn(err)
} }
}
log.Debugf("metadata: starting routine check") // Refresh index metadata.
log.Debugf("index: updating metadata")
settings := m.conf.Settings() start := time.Now()
settings := w.conf.Settings()
done := make(map[string]bool) done := make(map[string]bool)
limit := 1000
limit := 50
offset := 0 offset := 0
optimized := 0 optimized := 0
// Run index optimization.
for { for {
photos, err := query.PhotosMetadataUpdate(limit, offset, delay, interval) photos, err := query.PhotosMetadataUpdate(limit, offset, delay, interval)
@ -72,13 +79,11 @@ func (m *Meta) Start(delay, interval time.Duration, force bool) (err error) {
if len(photos) == 0 { if len(photos) == 0 {
break break
} else if offset == 0 {
} }
for _, photo := range photos { for _, photo := range photos {
if mutex.MetaWorker.Canceled() { if mutex.MetaWorker.Canceled() {
return errors.New("metadata: check canceled") return errors.New("index: metadata update canceled")
} }
if done[photo.PhotoUID] { if done[photo.PhotoUID] {
@ -90,52 +95,57 @@ func (m *Meta) Start(delay, interval time.Duration, force bool) (err error) {
updated, merged, err := photo.Optimize(settings.StackMeta(), settings.StackUUID(), settings.Features.Estimates, force) updated, merged, err := photo.Optimize(settings.StackMeta(), settings.StackUUID(), settings.Features.Estimates, force)
if err != nil { if err != nil {
log.Errorf("metadata: %s (optimize photo)", err) log.Errorf("index: %s (metadata update)", err)
} else if updated { } else if updated {
optimized++ optimized++
log.Debugf("metadata: updated photo %s", photo.String()) log.Debugf("index: updated photo %s", photo.String())
} }
for _, p := range merged { for _, p := range merged {
log.Infof("metadata: merged %s", p.PhotoUID) log.Infof("index: merged %s", p.PhotoUID)
done[p.PhotoUID] = true done[p.PhotoUID] = true
} }
} }
if mutex.MetaWorker.Canceled() { if mutex.MetaWorker.Canceled() {
return errors.New("metadata: check canceled") return errors.New("index: metadata update canceled")
} }
offset += limit offset += limit
time.Sleep(100 * time.Millisecond)
} }
if optimized > 0 { if optimized > 0 {
log.Infof("metadata: updated %d photos", optimized) log.Infof("index: updated %s [%s]", english.Plural(optimized, "photo", "photos"), time.Since(start))
updateIndex = true
} }
// Only update index if necessary.
if updateIndex {
// Set photo quality scores to -1 if files are missing. // Set photo quality scores to -1 if files are missing.
if err := query.FlagHiddenPhotos(); err != nil { if err = query.FlagHiddenPhotos(); err != nil {
log.Warnf("metadata: %s (reset quality)", err.Error()) log.Warnf("index: %s (reset quality)", err.Error())
} }
// Run moments worker. // Run moments worker.
if w := photoprism.NewMoments(m.conf); w == nil { if moments := photoprism.NewMoments(w.conf); moments == nil {
log.Errorf("metadata: failed updating moments") log.Errorf("index: failed updating moments")
} else if err := w.Start(); err != nil { } else if err = moments.Start(); err != nil {
log.Warn(err) log.Warn(err)
} }
// Update precalculated photo and file counts. // Update precalculated photo and file counts.
if err := entity.UpdateCounts(); err != nil { if err = entity.UpdateCounts(); err != nil {
log.Warnf("index: %s (update counts)", err.Error()) log.Warnf("index: %s (update counts)", err.Error())
} }
// Update album, subject, and label cover thumbs. // Update album, subject, and label cover thumbs.
if err := query.UpdateCovers(); err != nil { if err = query.UpdateCovers(); err != nil {
log.Warnf("index: %s (update covers)", err) log.Warnf("index: %s (update covers)", err)
} }
}
// Update time when worker was last executed.
w.lastRun = entity.TimeStamp()
// Run garbage collection. // Run garbage collection.
runtime.GC() runtime.GC()

View file

@ -11,7 +11,7 @@ import (
"github.com/photoprism/photoprism/internal/mutex" "github.com/photoprism/photoprism/internal/mutex"
) )
func TestPrism_Start(t *testing.T) { func TestMeta_Start(t *testing.T) {
conf := config.TestConfig() conf := config.TestConfig()
t.Logf("database-dsn: %s", conf.DatabaseDsn()) t.Logf("database-dsn: %s", conf.DatabaseDsn())
@ -27,15 +27,22 @@ func TestPrism_Start(t *testing.T) {
delay := time.Second delay := time.Second
interval := time.Second interval := time.Second
// Mutex should prevent worker from starting.
if err := worker.Start(delay, interval, true); err == nil { if err := worker.Start(delay, interval, true); err == nil {
t.Fatal("error expected") t.Fatal("error expected")
} }
mutex.MetaWorker.Stop() mutex.MetaWorker.Stop()
// Start worker.
if err := worker.Start(delay, interval, true); err != nil { if err := worker.Start(delay, interval, true); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Rerun worker.
if err := worker.Start(delay, interval, false); err != nil {
t.Fatal(err)
}
} }
func TestMeta_originalsPath(t *testing.T) { func TestMeta_originalsPath(t *testing.T) {

View file

@ -39,7 +39,7 @@ func (w *Share) logError(err error) {
func (w *Share) Start() (err error) { func (w *Share) Start() (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
err = fmt.Errorf("share: %s (panic)\nstack: %s", r, debug.Stack()) err = fmt.Errorf("share: %s (worker panic)\nstack: %s", r, debug.Stack())
log.Error(err) log.Error(err)
} }
}() }()

View file

@ -44,7 +44,7 @@ func (w *Sync) logWarn(err error) {
func (w *Sync) Start() (err error) { func (w *Sync) Start() (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
err = fmt.Errorf("sync: %s (panic)\nstack: %s", r, debug.Stack()) err = fmt.Errorf("sync: %s (worker panic)\nstack: %s", r, debug.Stack())
log.Error(err) log.Error(err)
} }
}() }()