Improve persistence logic (#55)

* Improve persistence logic

* INSERT with ON CONFLICT

* specify patch version of `rand`

* fix gen_range
This commit is contained in:
Heyang Zhou 2022-10-02 12:10:35 +08:00 committed by GitHub
parent 124ff15d3d
commit 1e9b972585
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 14 additions and 4 deletions

1
Cargo.lock generated
View File

@ -1067,6 +1067,7 @@ dependencies = [
"operational-transform",
"parking_lot",
"pretty_env_logger",
"rand 0.8.3",
"serde",
"serde_json",
"sqlx",

View File

@ -14,6 +14,7 @@ log = "0.4.14"
operational-transform = { version = "0.6.0", features = ["serde"] }
parking_lot = "0.11.1"
pretty_env_logger = "0.4.0"
rand = "0.8.3"
serde = { version = "1.0.126", features = ["derive"] }
serde_json = "1.0.64"
sqlx = { version = "0.5.9", features = ["runtime-tokio-rustls", "sqlite"] }

View File

@ -49,10 +49,13 @@ impl Database {
pub async fn store(&self, document_id: &str, document: &PersistedDocument) -> Result<()> {
let result = sqlx::query(
r#"
INSERT OR REPLACE INTO
INSERT INTO
document (id, text, language)
VALUES
($1, $2, $3)"#,
($1, $2, $3)
ON CONFLICT(id) DO UPDATE SET
text = excluded.text,
language = excluded.language"#,
)
.bind(document_id)
.bind(&document.text)

View File

@ -8,6 +8,7 @@ use std::time::{Duration, SystemTime};
use dashmap::DashMap;
use log::{error, info};
use rand::Rng;
use serde::Serialize;
use tokio::time::{self, Instant};
use warp::{filters::BoxedFilter, ws::Ws, Filter, Rejection, Reply};
@ -208,19 +209,23 @@ async fn cleaner(state: ServerState, expiry_days: u32) {
}
const PERSIST_INTERVAL: Duration = Duration::from_secs(3);
const PERSIST_INTERVAL_JITTER: Duration = Duration::from_secs(1);
/// Persists changed documents after a fixed time interval.
async fn persister(id: String, rustpad: Arc<Rustpad>, db: Database) {
let mut last_revision = 0;
while !rustpad.killed() {
time::sleep(PERSIST_INTERVAL).await;
let interval = PERSIST_INTERVAL
+ rand::thread_rng().gen_range(Duration::ZERO..=PERSIST_INTERVAL_JITTER);
time::sleep(interval).await;
let revision = rustpad.revision();
if revision > last_revision {
info!("persisting revision {} for id = {}", revision, id);
if let Err(e) = db.store(&id, &rustpad.snapshot()).await {
error!("when persisting document {}: {}", id, e);
} else {
last_revision = revision;
}
last_revision = revision;
}
}
}