Compare commits

..

No commits in common. "31f69856a656460877be0627efa0e9060910ce97" and "5b258ae040101fe7d5c2c3f3194f084eee91957f" have entirely different histories.

5 changed files with 151 additions and 1233 deletions

1141
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@ edition = "2021"
[dependencies] [dependencies]
axum = "0.8.1" axum = "0.8.1"
redis = { version = "0.23.3", features = ["tokio-comp"] }
serde = { version = "1.0.188", features = ["derive"] } serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.107" serde_json = "1.0.107"
sqlx = { version = "0.8.3", features = ["runtime-tokio", "postgres"] }
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] } tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }

View file

@ -1,48 +1,14 @@
# KOreader Sync Server # KOreader Sync Server
This is a KOreader sync server, implemented in Rust. It uses the crates *axum*, *sqlx*, *serde* and *serde_json*. This is a KOreader sync server, implemented in Rust. It uses the crates *axum*, *redis*, *serde* and *serde_json*.
## Requirements ## Requirements
- Rust toolchain for compilation - Rust toolchain (≥ 1.63) for compilation
- A running PostgreSQL server - A running Redis server
- Nginx (or Apache) webserver as a reverse proxy, since kosyncrs only listens locally and uses HTTP. - Nginx (or Apache) webserver as a reverse proxy, since kosyncrs only listens locally and uses HTTP.
## Installation ## Installation
Just compile it with `cargo build --release`. You can then copy the executable for example to `/usr/local/bin/`. Just compile it with `cargo build --release`. You can then copy the executable for example to `/usr/local/bin/`. If you want to start the service automatically, you can adapt the example systemd file for your needs. You can also use nginx as a reverse proxy, so that the sync server listens on port 443.
If you want to start the service automatically, you can adapt the example systemd file for your needs. Please pay particularly attention to the PG_URL environment variable.You have to adjust the database username and password. ## Todo
- Test, if it really works, and if it works reliably. Feedback is very welcome!
You can also use nginx as a reverse proxy, so that the sync server listens on port 443. An example file is provided. - Make it more configurable
### Database setup
First, create a new database user and set a password:
```
$ createuser -P kosync
```
Then, create a new database which is owned by the newly created user:
```
$ createdb -O kosync kosync
```
Connect to the database as user *kosync* (with psql) and create the database tables:
```sql
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
username TEXT NOT NULL,
password TEXT
);
CREATE TABLE progresses (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT,
document TEXT UNIQUE NOT NULL,
progress TEXT NOT NULL,
percentage REAL NOT NULL,
device TEXT NOT NULL,
device_id TEXT NOT NULL,
timestamp BIGINT NOT NULL,
FOREIGN KEY (user_id)
REFERENCES users(id) ON DELETE CASCADE,
UNIQUE (user_id, document)
);
```

View file

@ -1,18 +1,12 @@
use axum::{ use axum::{
extract::{Path, State}, extract::Path, http::HeaderMap, http::StatusCode, routing::get, routing::post, routing::put,
http::HeaderMap,
http::StatusCode,
routing::get,
routing::post,
routing::put,
Json, Router, Json, Router,
}; };
use redis::Commands;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
use sqlx::{postgres::PgPoolOptions, PgExecutor, PgPool};
use std::env; use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct User { pub struct User {
@ -36,23 +30,11 @@ pub struct GetProgress {
percentage: f32, percentage: f32,
device: String, device: String,
device_id: String, device_id: String,
timestamp: i64, timestamp: u64,
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let pg_url: String = env::var("PG_URL")
.ok()
.and_then(|v| v.parse().ok())
.unwrap();
let db_pool = PgPoolOptions::new()
.max_connections(64)
.acquire_timeout(Duration::from_secs(5))
.connect(&pg_url)
.await
.expect("Can't connect to database.");
// build our application with a single route // build our application with a single route
let app = Router::new() let app = Router::new()
.route("/", get(root)) .route("/", get(root))
@ -60,13 +42,10 @@ async fn main() {
.route("/users/auth", get(auth_user)) .route("/users/auth", get(auth_user))
.route("/syncs/progress", put(update_progress)) .route("/syncs/progress", put(update_progress))
.route("/syncs/progress/{document}", get(get_progress)) .route("/syncs/progress/{document}", get(get_progress))
.route("/healthcheck", get(healthcheck)) .route("/healthcheck", get(healthcheck));
.with_state(db_pool);
// run it with hyper on localhost:3003 // run it with hyper on localhost:3003
let listener = tokio::net::TcpListener::bind("127.0.0.1:3003") let listener = tokio::net::TcpListener::bind("127.0.0.1:3003").await.unwrap();
.await
.unwrap();
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }
@ -74,31 +53,19 @@ async fn root() -> &'static str {
"KOreader sync server" "KOreader sync server"
} }
async fn create_user( async fn create_user(Json(payload): Json<User>) -> (StatusCode, Json<Value>) {
State(db_pool): State<PgPool>, let client = redis::Client::open("redis://127.0.0.1/").unwrap();
Json(payload): Json<User>, let mut con = client.get_connection().unwrap();
) -> (StatusCode, Json<Value>) {
//let client = redis::Client::open("redis://127.0.0.1/").unwrap();
//let mut con = client.get_connection().unwrap();
let username = payload.username; let username = payload.username;
let password = payload.password; let password = payload.password;
let row: (i64,) = sqlx::query_as("SELECT COUNT(id) FROM users WHERE username = $1") let user_key = format!("user:{username}:key");
.bind(&username)
.fetch_one(&db_pool)
.await
.unwrap();
let does_exist: bool = if row.0 >= 1 { true } else { false }; let does_exist: bool = con.exists(&user_key).unwrap();
if does_exist == false { if does_exist == false {
sqlx::query("INSERT INTO users (username, password) VALUES ($1, $2)") let _: () = con.set(&user_key, password).unwrap();
.bind(&username)
.bind(&password)
.execute(&db_pool)
.await
.unwrap();
} else { } else {
return ( return (
StatusCode::PAYMENT_REQUIRED, StatusCode::PAYMENT_REQUIRED,
@ -109,133 +76,110 @@ async fn create_user(
(StatusCode::CREATED, Json(json!({ "username" : username }))) (StatusCode::CREATED, Json(json!({ "username" : username })))
} }
async fn authorize(db: impl PgExecutor<'_>, username: &str, password: &str) -> bool { fn authorize(username: &str, password: &str) -> bool {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_connection().unwrap();
if username.is_empty() || password.is_empty() { if username.is_empty() || password.is_empty() {
return false; return false;
} }
let row: Option<(String,)> = sqlx::query_as("SELECT password FROM users WHERE username = $1") let user_key = format!("user:{username}:key");
.bind(&username)
.fetch_optional(db)
.await
.unwrap();
if let Some(val) = row { let redis_pw: String = con.get(&user_key).unwrap();
return password == val.0;
if password != redis_pw {
return false;
} }
false true
} }
async fn auth_user(State(db_pool): State<PgPool>, headers: HeaderMap) -> (StatusCode, Json<Value>) { async fn auth_user(headers: HeaderMap) -> (StatusCode, Json<Value>) {
let username = headers["x-auth-user"].to_str().unwrap_or(""); let username = headers["x-auth-user"].to_str().unwrap_or("");
let password = headers["x-auth-key"].to_str().unwrap_or(""); let password = headers["x-auth-key"].to_str().unwrap_or("");
let mut tx = db_pool.begin().await.unwrap(); if authorize(&username, &password) == false {
return (
if authorize(&mut *tx, username, password).await == true { StatusCode::UNAUTHORIZED,
return (StatusCode::OK, Json(json!({"authorized" : "OK"}))); Json(json!({"message" : "Unauthorized"})),
);
} }
tx.commit().await.unwrap(); (StatusCode::OK, Json(json!({"authorized" : "OK"})))
(
StatusCode::UNAUTHORIZED,
Json(json!({"message" : "Unauthorized"})),
)
} }
async fn update_progress( async fn update_progress(headers: HeaderMap, Json(payload): Json<UpdateProgress>) -> StatusCode {
State(db_pool): State<PgPool>,
headers: HeaderMap,
Json(payload): Json<UpdateProgress>,
) -> StatusCode {
let username = headers["x-auth-user"].to_str().unwrap_or(""); let username = headers["x-auth-user"].to_str().unwrap_or("");
let password = headers["x-auth-key"].to_str().unwrap_or(""); let password = headers["x-auth-key"].to_str().unwrap_or("");
let mut tx = db_pool.begin().await.unwrap(); if authorize(username, password) == false {
if authorize(&mut *tx, username, password).await == false {
return StatusCode::UNAUTHORIZED; return StatusCode::UNAUTHORIZED;
} }
tx.commit().await.unwrap(); let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_connection().unwrap();
let timestamp = SystemTime::now() let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.unwrap() .unwrap()
.as_secs(); .as_secs();
let document = payload.document;
let row: (i64,) = sqlx::query_as("SELECT id FROM users WHERE username = $1") let doc_key = format!("user:{username}:document:{document}");
.bind(&username) let _: () = con
.fetch_one(&db_pool) .hset_multiple(
.await &doc_key,
&[
("percentage", &payload.percentage.to_string()),
("progress", &payload.progress),
("device", &payload.device),
("device_id", &payload.device_id),
("timestamp", &timestamp.to_string()),
],
)
.unwrap(); .unwrap();
let user_id = row.0;
sqlx::query("INSERT INTO progresses (user_id, document, progress, percentage, device, device_id, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7) \
ON CONFLICT (user_id, document) DO UPDATE \
SET user_id = $1, document = $2, progress = $3, percentage = $4, device = $5, device_id = $6, timestamp = $7")
.bind(user_id)
.bind(&payload.document)
.bind(&payload.progress)
.bind(&payload.percentage)
.bind(&payload.device)
.bind(&payload.device_id)
.bind(timestamp as i64)
.execute(&db_pool)
.await
.unwrap();
StatusCode::OK StatusCode::OK
} }
async fn get_progress( async fn get_progress(
State(db_pool): State<PgPool>,
headers: HeaderMap, headers: HeaderMap,
Path(document): Path<String>, Path(document): Path<String>,
) -> (StatusCode, Json<Value>) { ) -> (StatusCode, Json<Value>) {
let username = headers["x-auth-user"].to_str().unwrap_or(""); let username = headers["x-auth-user"].to_str().unwrap_or("");
let password = headers["x-auth-key"].to_str().unwrap_or(""); let password = headers["x-auth-key"].to_str().unwrap_or("");
let mut tx = db_pool.begin().await.unwrap(); if authorize(username, password) == false {
if authorize(&mut *tx, username, password).await == false {
return (StatusCode::UNAUTHORIZED, Json(json!(""))); return (StatusCode::UNAUTHORIZED, Json(json!("")));
} }
tx.commit().await.unwrap(); let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_connection().unwrap();
let row: (i64,) = sqlx::query_as("SELECT id FROM users WHERE username = $1") let doc_key = format!("user:{username}:document:{document}");
.bind(&username)
.fetch_one(&db_pool)
.await
.unwrap();
let user_id = row.0; let values: Vec<String> = con
.hget(
doc_key,
&["percentage", "progress", "device", "device_id", "timestamp"],
)
.unwrap_or_default();
let row: Option<(f32, String, String, String, i64)> = sqlx::query_as("SELECT percentage, progress, device, device_id, timestamp FROM progresses WHERE user_id = $1 AND document = $2") if values.is_empty() {
.bind(user_id) return (StatusCode::OK, Json(json!({})));
.bind(&document)
.fetch_optional(&db_pool)
.await
.unwrap();
if let Some(val) = row {
let progress = GetProgress {
percentage: val.0,
progress: val.1,
device: val.2,
device_id: val.3,
timestamp: val.4,
document,
};
return (StatusCode::OK, Json(json!(progress)));
} }
(StatusCode::OK, Json(json!(""))) let res = GetProgress {
percentage: values[0].parse().unwrap(),
progress: values[1].clone(),
device: values[2].clone(),
device_id: values[3].clone(),
timestamp: values[4].parse().unwrap(),
document,
};
(StatusCode::OK, Json(json!(res)))
} }
async fn healthcheck() -> (StatusCode, Json<Value>) { async fn healthcheck() -> (StatusCode, Json<Value>) {

View file

@ -11,7 +11,6 @@ User=myuser
Group=users Group=users
WorkingDirectory=/home/myuser WorkingDirectory=/home/myuser
ExecStart=/usr/local/bin/kosyncrs ExecStart=/usr/local/bin/kosyncrs
Environment="PG_URL=postgresql://kosync:password@localhost/kosync"
Restart=always Restart=always
[Install] [Install]