index : sfrs

A Standard Notes Sync Server written in Rust

diff options
context:
space:
mode:
authorPeter Cai <[email protected]>2020-02-23 12:51:26 +0800
committerPeter Cai <[email protected]>2020-02-23 12:51:26 +0800
commit2eaf7d24e066dfd63a075f62a4f4f90fcafb99f4 (patch)
treefaa392ad35b4705a8ad10829372237154d76383b
parentd4c4be4d639f516732fb0227cf0ffdae6c5d641d (diff)
downloadsfrs-2eaf7d24e066dfd63a075f62a4f4f90fcafb99f4.tar.gz
enable busy_timeout for SQLite
although we have a global RwLock for database access, it still can fail due to disk sync delays. Though unlikely, it has happened once or twice during testing. Let's just enable busy_timeout to avoid this issue. Since we have RwLock anyway, a busy_timeout should not be much of a problem. Unfortunately this has to be enabled via implementing our own wrapper trait.
-rw-r--r--src/api.rs16
-rw-r--r--src/db.rs105
-rw-r--r--src/item.rs11
-rw-r--r--src/main.rs32
-rw-r--r--src/tokens.rs7
-rw-r--r--src/user.rs17
6 files changed, 134 insertions, 54 deletions
diff --git a/src/api.rs b/src/api.rs
index 186ee5d..de5a758 100644
--- a/src/api.rs
+++ b/src/api.rs
@@ -88,8 +88,8 @@ fn auth_sign_in(db: DbConn, params: Json<SignInParams>) -> Custom<JsonResp<AuthR
// Shared logic for all interfaces that needs to do an automatic sign-in
fn _sign_in(db: DbConn, mail: &str, passwd: &str) -> Custom<JsonResp<AuthResult>> {
// Try to find the user first
- let res = user::User::find_user_by_email(&db, mail)
- .and_then(|u| u.create_token(&db, passwd)
+ let res = user::User::find_user_by_email(&db.0, mail)
+ .and_then(|u| u.create_token(&db.0, passwd)
.map(|x| (u.uuid, u.email, x)));
match res {
Ok((uuid, email, token)) => success_resp(AuthResult {
@@ -123,7 +123,7 @@ impl Into<AuthParams> for user::User {
#[get("/auth/params?<email>")]
fn auth_params(db: DbConn, email: String) -> Custom<JsonResp<AuthParams>> {
- match user::User::find_user_by_email(&db, &email) {
+ match user::User::find_user_by_email(&db.0, &email) {
Ok(u) => success_resp(u.into()),
Err(user::UserOpError(e)) =>
error_resp(Status::InternalServerError, vec![e])
@@ -139,9 +139,9 @@ struct ChangePwParams {
#[post("/auth/change_pw", format = "json", data = "<params>")]
fn auth_change_pw(db: DbConn, params: Json<ChangePwParams>) -> Custom<JsonResp<()>> {
- let res = user::User::find_user_by_email(&db, &params.email)
+ let res = user::User::find_user_by_email(&db.0, &params.email)
.and_then(|u|
- u.change_pw(&db, &params.current_password, &params.password));
+ u.change_pw(&db.0, &params.current_password, &params.password));
match res {
Ok(_) => Custom(Status::NoContent, Json(Response::Success(()))),
Err(user::UserOpError(e)) =>
@@ -216,7 +216,7 @@ fn items_sync(
// Remember that we have a mutex at the beginning of this function,
// so all that can change the current_max_id for the current user
// is operations later in this function.
- let new_sync_token = match item::SyncItem::get_current_max_id(&db, &u) {
+ let new_sync_token = match item::SyncItem::get_current_max_id(&db.0, &u) {
Ok(Some(id)) => Some(id.to_string()),
Ok(None) => None,
Err(item::ItemOpError(e)) =>
@@ -247,7 +247,7 @@ fn items_sync(
};
// First, retrieve what the client needs
- let result = item::SyncItem::items_of_user(&db, &u,
+ let result = item::SyncItem::items_of_user(&db.0, &u,
from_id, None, inner_params.limit);
match result {
@@ -307,7 +307,7 @@ fn items_sync(
it.updated_at =
Some(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true));
- match item::SyncItem::items_insert(&db, &u, &it) {
+ match item::SyncItem::items_insert(&db.0, &u, &it) {
Err(item::ItemOpError(e)) => {
return error_resp(Status::InternalServerError, vec![e]);
},
diff --git a/src/db.rs b/src/db.rs
new file mode 100644
index 0000000..2be6493
--- /dev/null
+++ b/src/db.rs
@@ -0,0 +1,105 @@
+use diesel::connection::{SimpleConnection, Connection};
+use diesel::deserialize::{Queryable, QueryableByName};
+use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
+use diesel::result::{ConnectionResult, QueryResult};
+use diesel::sqlite::{Sqlite, SqliteConnection};
+use diesel::sql_types::*;
+use rocket_contrib::databases::{r2d2, DatabaseConfig, Poolable};
+use std::sync::RwLock;
+
+// We need a global RwLock for SQLite
+// This is unfortunate when we still use SQLite
+// but should be mostly fine for our purpose
+// (however, due to disk sync delays, the RwLock alone
+// may still produce some SQLITE_BUSY errors randomly.
+// We implemented a wrapper later in this module to enable busy_timeout
+// to avoid this.)
+lazy_static! {
+ pub static ref DB_LOCK: RwLock<()> = RwLock::new(());
+}
+
+#[macro_export]
+macro_rules! lock_db_write {
+ () => {
+ crate::DB_LOCK.write()
+ .map_err(|_| "Cannot lock database for writing".into())
+ };
+}
+
+#[macro_export]
+macro_rules! lock_db_read {
+ () => {
+ crate::DB_LOCK.read()
+ .map_err(|_| "Cannot lock database for reading".into())
+ };
+}
+
+pub trait SqliteLike = Connection<Backend = Sqlite>;
+
+pub struct BusyWaitSqliteConnection(SqliteConnection);
+
+impl Poolable for BusyWaitSqliteConnection {
+ type Manager = diesel::r2d2::ConnectionManager<BusyWaitSqliteConnection>;
+ type Error = r2d2::Error;
+
+ fn pool(config: DatabaseConfig) -> Result<r2d2::Pool<Self::Manager>, Self::Error> {
+ let manager = diesel::r2d2::ConnectionManager::new(config.url);
+ r2d2::Pool::builder().max_size(config.pool_size).build(manager)
+ }
+}
+
+// Enable busy_timeout for SQLite connections by re-implementing the Connection trait
+// (Note: busy_timeout is never the best solution, so the global RwLock is still needed,
+// and this busy_timeout is just to make sure that we won't fail due to disk sync lagging behind
+// when we acquire the RwLock because it may take some time for the SQLite lock state to be written to disk)
+// <https://stackoverflow.com/questions/57123453/how-to-use-diesel-with-sqlite-connections-and-avoid-database-is-locked-type-of>
+impl SimpleConnection for BusyWaitSqliteConnection {
+ fn batch_execute(&self, query: &str) -> QueryResult<()> {
+ self.0.batch_execute(query)
+ }
+}
+
+impl Connection for BusyWaitSqliteConnection {
+ type Backend = <SqliteConnection as Connection>::Backend;
+ type TransactionManager = <SqliteConnection as Connection>::TransactionManager;
+
+ fn establish(database_url: &str) -> ConnectionResult<Self> {
+ let c = SqliteConnection::establish(database_url)?;
+ c.batch_execute("PRAGMA foreign_keys = ON; PRAGMA busy_timeout = 60000;")
+ .unwrap();
+ Ok(Self(c))
+ }
+
+ fn execute(&self, query: &str) -> QueryResult<usize> {
+ self.0.execute(query)
+ }
+
+ fn query_by_index<T, U>(&self, source: T) -> QueryResult<Vec<U>>
+ where
+ T: AsQuery,
+ T::Query: QueryFragment<Self::Backend> + QueryId,
+ Self::Backend: HasSqlType<T::SqlType>,
+ U: Queryable<T::SqlType, Self::Backend>,
+ {
+ self.0.query_by_index(source)
+ }
+
+ fn query_by_name<T, U>(&self, source: &T) -> QueryResult<Vec<U>>
+ where
+ T: QueryFragment<Self::Backend> + QueryId,
+ U: QueryableByName<Self::Backend>,
+ {
+ self.0.query_by_name(source)
+ }
+
+ fn execute_returning_count<T>(&self, source: &T) -> QueryResult<usize>
+ where
+ T: QueryFragment<Self::Backend> + QueryId,
+ {
+ self.0.execute_returning_count(source)
+ }
+
+ fn transaction_manager(&self) -> &Self::TransactionManager {
+ self.0.transaction_manager()
+ }
+} \ No newline at end of file
diff --git a/src/item.rs b/src/item.rs
index d812ce7..5398bf6 100644
--- a/src/item.rs
+++ b/src/item.rs
@@ -1,10 +1,9 @@
use crate::schema::items;
use crate::schema::items::dsl::*;
-use crate::{lock_db_write, lock_db_read};
+use crate::{SqliteLike, lock_db_write, lock_db_read};
use crate::user;
use diesel::dsl::max;
use diesel::prelude::*;
-use diesel::sqlite::SqliteConnection;
use serde::{Serialize, Deserialize};
use std::vec::Vec;
@@ -85,7 +84,7 @@ impl Into<SyncItem> for Item {
impl SyncItem {
pub fn items_of_user(
- db: &SqliteConnection, u: &user::User,
+ db: &impl SqliteLike, u: &user::User,
since_id: Option<i64>, max_id: Option<i64>,
limit: Option<i64>
) -> Result<Vec<Item>, ItemOpError> {
@@ -110,7 +109,7 @@ impl SyncItem {
})
}
- pub fn find_item_by_uuid(db: &SqliteConnection, u: &user::User, i: &str) -> Result<Item, ItemOpError> {
+ pub fn find_item_by_uuid(db: &impl SqliteLike, u: &user::User, i: &str) -> Result<Item, ItemOpError> {
lock_db_read!()
.and_then(|_| {
items.filter(owner.eq(u.id).and(uuid.eq(i)))
@@ -123,7 +122,7 @@ impl SyncItem {
// Remember that IDs do not identify item; instead, they are incremented to the largest value
// every time an item is updated (see Self::items_insert).
// The ID returned by this function is more like a "timestamp" of the latest "state"
- pub fn get_current_max_id(db: &SqliteConnection, u: &user::User) -> Result<Option<i64>, ItemOpError> {
+ pub fn get_current_max_id(db: &impl SqliteLike, u: &user::User) -> Result<Option<i64>, ItemOpError> {
lock_db_read!()
.and_then(|_| {
items.filter(owner.eq(u.id))
@@ -133,7 +132,7 @@ impl SyncItem {
})
}
- pub fn items_insert(db: &SqliteConnection, u: &user::User, it: &SyncItem) -> Result<i64, ItemOpError> {
+ pub fn items_insert(db: &impl SqliteLike, u: &user::User, it: &SyncItem) -> Result<i64, ItemOpError> {
// First, try to find the original item, if any, delete it, and insert a new one with the same UUID
// This way, the ID is updated each time an item is updated
// This method acts both as insertion and update
diff --git a/src/main.rs b/src/main.rs
index 227f159..851e279 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,4 +1,4 @@
-#![feature(proc_macro_hygiene, decl_macro)]
+#![feature(proc_macro_hygiene, decl_macro, trait_alias)]
#[macro_use]
extern crate rocket;
@@ -13,6 +13,7 @@ extern crate serde;
#[macro_use]
extern crate lazy_static;
+mod db;
mod schema;
mod api;
mod tokens;
@@ -23,42 +24,19 @@ mod lock;
#[cfg(test)]
mod tests;
+pub use db::*;
+
use diesel::prelude::*;
-use diesel::sqlite::SqliteConnection;
use dotenv::dotenv;
use rocket::Rocket;
use rocket::config::{Config, Environment, Value};
use std::collections::HashMap;
use std::env;
-use std::sync::RwLock;
embed_migrations!();
-// We need a global RwLock for SQLite
-// This is unfortunate when we still use SQLite
-// but should be mostly fine for our purpose
-lazy_static! {
- pub static ref DB_LOCK: RwLock<()> = RwLock::new(());
-}
-
-#[macro_export]
-macro_rules! lock_db_write {
- () => {
- crate::DB_LOCK.write()
- .map_err(|_| "Cannot lock database for writing".into())
- };
-}
-
-#[macro_export]
-macro_rules! lock_db_read {
- () => {
- crate::DB_LOCK.read()
- .map_err(|_| "Cannot lock database for reading".into())
- };
-}
-
#[database("db")]
-pub struct DbConn(SqliteConnection);
+pub struct DbConn(BusyWaitSqliteConnection);
#[get("/")]
fn index() -> &'static str {
diff --git a/src/tokens.rs b/src/tokens.rs
index caf367d..12720c8 100644
--- a/src/tokens.rs
+++ b/src/tokens.rs
@@ -1,8 +1,7 @@
use crate::schema::tokens;
use crate::schema::tokens::dsl::*;
-use crate::{lock_db_write, lock_db_read};
+use crate::{SqliteLike, lock_db_write, lock_db_read};
use chrono::NaiveDateTime;
-use diesel::sqlite::SqliteConnection;
use diesel::prelude::*;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use uuid::Uuid;
@@ -17,7 +16,7 @@ pub struct Token {
impl Token {
// Return user id if any
- pub fn find_token_by_id(db: &SqliteConnection, tid: &str) -> Option<i32> {
+ pub fn find_token_by_id(db: &impl SqliteLike, tid: &str) -> Option<i32> {
(lock_db_read!() as Result<RwLockReadGuard<()>, String>).ok()
.and_then(|_| {
tokens.filter(id.eq(tid))
@@ -34,7 +33,7 @@ impl Token {
}
// Create a new token for a user
- pub fn create_token(db: &SqliteConnection, user: i32) -> Option<String> {
+ pub fn create_token(db: &impl SqliteLike, user: i32) -> Option<String> {
let tid = Uuid::new_v4().to_hyphenated().to_string();
(lock_db_write!() as Result<RwLockWriteGuard<()>, String>).ok()
.and_then(|_| {
diff --git a/src/user.rs b/src/user.rs
index 5a19069..8c8c74e 100644
--- a/src/user.rs
+++ b/src/user.rs
@@ -1,9 +1,8 @@
use crate::schema::users;
use crate::schema::users::dsl::*;
-use crate::{lock_db_write, lock_db_read};
+use crate::{SqliteLike, lock_db_write, lock_db_read};
use ::uuid::Uuid;
use diesel::prelude::*;
-use diesel::sqlite::SqliteConnection;
use rocket::request;
use rocket::http::Status;
use serde::Deserialize;
@@ -114,7 +113,7 @@ struct NewUserInsert {
}
impl User {
- pub fn create(db: &SqliteConnection, new_user: &NewUser) -> Result<String, UserOpError> {
+ pub fn create(db: &impl SqliteLike, new_user: &NewUser) -> Result<String, UserOpError> {
let uid = Uuid::new_v4().to_hyphenated().to_string();
let user_hashed = NewUserInsert {
uuid: uid.clone(),
@@ -136,7 +135,7 @@ impl User {
}
}
- pub fn find_user_by_email(db: &SqliteConnection, user_email: &str) -> Result<User, UserOpError> {
+ pub fn find_user_by_email(db: &impl SqliteLike, user_email: &str) -> Result<User, UserOpError> {
let mut results = lock_db_read!()
.and_then(|_| users.filter(email.eq(user_email))
.limit(1)
@@ -149,7 +148,7 @@ impl User {
}
}
- pub fn find_user_by_id(db: &SqliteConnection, user_id: i32) -> Result<User, UserOpError> {
+ pub fn find_user_by_id(db: &impl SqliteLike, user_id: i32) -> Result<User, UserOpError> {
let mut results = lock_db_read!()
.and_then(|_| users.filter(id.eq(user_id))
.limit(1)
@@ -162,14 +161,14 @@ impl User {
}
}
- pub fn find_user_by_token(db: &SqliteConnection, token: &str) -> Result<User, UserOpError> {
+ pub fn find_user_by_token(db: &impl SqliteLike, token: &str) -> Result<User, UserOpError> {
crate::tokens::Token::find_token_by_id(db, token)
.ok_or("Invalid token".into())
.and_then(|uid| Self::find_user_by_id(db, uid))
}
// Create a JWT token for the current user if password matches
- pub fn create_token(&self, db: &SqliteConnection, passwd: &str) -> Result<String, UserOpError> {
+ pub fn create_token(&self, db: &impl SqliteLike, passwd: &str) -> Result<String, UserOpError> {
if self.password != passwd {
Err(UserOpError::new("Password mismatch"))
} else {
@@ -180,7 +179,7 @@ impl User {
// Change the password in database, if old password is provided
// The current instance of User model will not be mutated
- pub fn change_pw(&self, db: &SqliteConnection, passwd: &str, new_passwd: &str) -> Result<(), UserOpError> {
+ pub fn change_pw(&self, db: &impl SqliteLike, passwd: &str, new_passwd: &str) -> Result<(), UserOpError> {
if self.password != passwd {
Err(UserOpError::new("Password mismatch"))
} else {
@@ -212,7 +211,7 @@ impl<'a, 'r> request::FromRequest<'a, 'r> for User {
}
let result = Self::find_user_by_token(
- &request.guard::<crate::DbConn>().unwrap(), &token[7..]);
+ &request.guard::<crate::DbConn>().unwrap().0, &token[7..]);
match result {
Ok(u) => request::Outcome::Success(u),
Err(err) => request::Outcome::Failure((Status::Unauthorized, err))