Compare commits

..

No commits in common. "31f69856a656460877be0627efa0e9060910ce97" and "5b258ae040101fe7d5c2c3f3194f084eee91957f" have entirely different histories.

5 changed files with 151 additions and 1233 deletions

1141
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
axum = "0.8.1"
redis = { version = "0.23.3", features = ["tokio-comp"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.107"
sqlx = { version = "0.8.3", features = ["runtime-tokio", "postgres"] }
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }

View file

@ -1,48 +1,14 @@
# KOreader Sync Server
This is a KOreader sync server, implemented in Rust. It uses the crates *axum*, *sqlx*, *serde* and *serde_json*.
This is a KOreader sync server, implemented in Rust. It uses the crates *axum*, *redis*, *serde* and *serde_json*.
## Requirements
- Rust toolchain for compilation
- A running PostgreSQL server
- Rust toolchain (≥ 1.63) for compilation
- A running Redis server
- Nginx (or Apache) webserver as a reverse proxy, since kosyncrs only listens locally and uses HTTP.
## Installation
Just compile it with `cargo build --release`. You can then copy the executable for example to `/usr/local/bin/`.
Just compile it with `cargo build --release`. You can then copy the executable for example to `/usr/local/bin/`. If you want to start the service automatically, you can adapt the example systemd file for your needs. You can also use nginx as a reverse proxy, so that the sync server listens on port 443.
If you want to start the service automatically, you can adapt the example systemd file for your needs. Please pay particularly attention to the PG_URL environment variable.You have to adjust the database username and password.
You can also use nginx as a reverse proxy, so that the sync server listens on port 443. An example file is provided.
### Database setup
First, create a new database user and set a password:
```
$ createuser -P kosync
```
Then, create a new database which is owned by the newly created user:
```
$ createdb -O kosync kosync
```
Connect to the database as user *kosync* (with psql) and create the database tables:
```sql
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
username TEXT NOT NULL,
password TEXT
);
CREATE TABLE progresses (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT,
document TEXT UNIQUE NOT NULL,
progress TEXT NOT NULL,
percentage REAL NOT NULL,
device TEXT NOT NULL,
device_id TEXT NOT NULL,
timestamp BIGINT NOT NULL,
FOREIGN KEY (user_id)
REFERENCES users(id) ON DELETE CASCADE,
UNIQUE (user_id, document)
);
```
## Todo
- Test, if it really works, and if it works reliably. Feedback is very welcome!
- Make it more configurable

View file

@ -1,18 +1,12 @@
use axum::{
extract::{Path, State},
http::HeaderMap,
http::StatusCode,
routing::get,
routing::post,
routing::put,
extract::Path, http::HeaderMap, http::StatusCode, routing::get, routing::post, routing::put,
Json, Router,
};
use redis::Commands;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use sqlx::{postgres::PgPoolOptions, PgExecutor, PgPool};
use std::env;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Deserialize)]
pub struct User {
@ -36,23 +30,11 @@ pub struct GetProgress {
percentage: f32,
device: String,
device_id: String,
timestamp: i64,
timestamp: u64,
}
#[tokio::main]
async fn main() {
let pg_url: String = env::var("PG_URL")
.ok()
.and_then(|v| v.parse().ok())
.unwrap();
let db_pool = PgPoolOptions::new()
.max_connections(64)
.acquire_timeout(Duration::from_secs(5))
.connect(&pg_url)
.await
.expect("Can't connect to database.");
// build our application with a single route
let app = Router::new()
.route("/", get(root))
@ -60,13 +42,10 @@ async fn main() {
.route("/users/auth", get(auth_user))
.route("/syncs/progress", put(update_progress))
.route("/syncs/progress/{document}", get(get_progress))
.route("/healthcheck", get(healthcheck))
.with_state(db_pool);
.route("/healthcheck", get(healthcheck));
// run it with hyper on localhost:3003
let listener = tokio::net::TcpListener::bind("127.0.0.1:3003")
.await
.unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:3003").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
@ -74,31 +53,19 @@ async fn root() -> &'static str {
"KOreader sync server"
}
async fn create_user(
State(db_pool): State<PgPool>,
Json(payload): Json<User>,
) -> (StatusCode, Json<Value>) {
//let client = redis::Client::open("redis://127.0.0.1/").unwrap();
//let mut con = client.get_connection().unwrap();
async fn create_user(Json(payload): Json<User>) -> (StatusCode, Json<Value>) {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_connection().unwrap();
let username = payload.username;
let password = payload.password;
let row: (i64,) = sqlx::query_as("SELECT COUNT(id) FROM users WHERE username = $1")
.bind(&username)
.fetch_one(&db_pool)
.await
.unwrap();
let user_key = format!("user:{username}:key");
let does_exist: bool = if row.0 >= 1 { true } else { false };
let does_exist: bool = con.exists(&user_key).unwrap();
if does_exist == false {
sqlx::query("INSERT INTO users (username, password) VALUES ($1, $2)")
.bind(&username)
.bind(&password)
.execute(&db_pool)
.await
.unwrap();
let _: () = con.set(&user_key, password).unwrap();
} else {
return (
StatusCode::PAYMENT_REQUIRED,
@ -109,133 +76,110 @@ async fn create_user(
(StatusCode::CREATED, Json(json!({ "username" : username })))
}
async fn authorize(db: impl PgExecutor<'_>, username: &str, password: &str) -> bool {
fn authorize(username: &str, password: &str) -> bool {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_connection().unwrap();
if username.is_empty() || password.is_empty() {
return false;
}
let row: Option<(String,)> = sqlx::query_as("SELECT password FROM users WHERE username = $1")
.bind(&username)
.fetch_optional(db)
.await
.unwrap();
let user_key = format!("user:{username}:key");
if let Some(val) = row {
return password == val.0;
let redis_pw: String = con.get(&user_key).unwrap();
if password != redis_pw {
return false;
}
false
true
}
async fn auth_user(State(db_pool): State<PgPool>, headers: HeaderMap) -> (StatusCode, Json<Value>) {
async fn auth_user(headers: HeaderMap) -> (StatusCode, Json<Value>) {
let username = headers["x-auth-user"].to_str().unwrap_or("");
let password = headers["x-auth-key"].to_str().unwrap_or("");
let mut tx = db_pool.begin().await.unwrap();
if authorize(&mut *tx, username, password).await == true {
return (StatusCode::OK, Json(json!({"authorized" : "OK"})));
if authorize(&username, &password) == false {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"message" : "Unauthorized"})),
);
}
tx.commit().await.unwrap();
(
StatusCode::UNAUTHORIZED,
Json(json!({"message" : "Unauthorized"})),
)
(StatusCode::OK, Json(json!({"authorized" : "OK"})))
}
async fn update_progress(
State(db_pool): State<PgPool>,
headers: HeaderMap,
Json(payload): Json<UpdateProgress>,
) -> StatusCode {
async fn update_progress(headers: HeaderMap, Json(payload): Json<UpdateProgress>) -> StatusCode {
let username = headers["x-auth-user"].to_str().unwrap_or("");
let password = headers["x-auth-key"].to_str().unwrap_or("");
let mut tx = db_pool.begin().await.unwrap();
if authorize(&mut *tx, username, password).await == false {
if authorize(username, password) == false {
return StatusCode::UNAUTHORIZED;
}
tx.commit().await.unwrap();
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_connection().unwrap();
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let document = payload.document;
let row: (i64,) = sqlx::query_as("SELECT id FROM users WHERE username = $1")
.bind(&username)
.fetch_one(&db_pool)
.await
let doc_key = format!("user:{username}:document:{document}");
let _: () = con
.hset_multiple(
&doc_key,
&[
("percentage", &payload.percentage.to_string()),
("progress", &payload.progress),
("device", &payload.device),
("device_id", &payload.device_id),
("timestamp", &timestamp.to_string()),
],
)
.unwrap();
let user_id = row.0;
sqlx::query("INSERT INTO progresses (user_id, document, progress, percentage, device, device_id, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7) \
ON CONFLICT (user_id, document) DO UPDATE \
SET user_id = $1, document = $2, progress = $3, percentage = $4, device = $5, device_id = $6, timestamp = $7")
.bind(user_id)
.bind(&payload.document)
.bind(&payload.progress)
.bind(&payload.percentage)
.bind(&payload.device)
.bind(&payload.device_id)
.bind(timestamp as i64)
.execute(&db_pool)
.await
.unwrap();
StatusCode::OK
}
async fn get_progress(
State(db_pool): State<PgPool>,
headers: HeaderMap,
Path(document): Path<String>,
) -> (StatusCode, Json<Value>) {
let username = headers["x-auth-user"].to_str().unwrap_or("");
let password = headers["x-auth-key"].to_str().unwrap_or("");
let mut tx = db_pool.begin().await.unwrap();
if authorize(&mut *tx, username, password).await == false {
if authorize(username, password) == false {
return (StatusCode::UNAUTHORIZED, Json(json!("")));
}
tx.commit().await.unwrap();
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_connection().unwrap();
let row: (i64,) = sqlx::query_as("SELECT id FROM users WHERE username = $1")
.bind(&username)
.fetch_one(&db_pool)
.await
.unwrap();
let doc_key = format!("user:{username}:document:{document}");
let user_id = row.0;
let values: Vec<String> = con
.hget(
doc_key,
&["percentage", "progress", "device", "device_id", "timestamp"],
)
.unwrap_or_default();
let row: Option<(f32, String, String, String, i64)> = sqlx::query_as("SELECT percentage, progress, device, device_id, timestamp FROM progresses WHERE user_id = $1 AND document = $2")
.bind(user_id)
.bind(&document)
.fetch_optional(&db_pool)
.await
.unwrap();
if let Some(val) = row {
let progress = GetProgress {
percentage: val.0,
progress: val.1,
device: val.2,
device_id: val.3,
timestamp: val.4,
document,
};
return (StatusCode::OK, Json(json!(progress)));
if values.is_empty() {
return (StatusCode::OK, Json(json!({})));
}
(StatusCode::OK, Json(json!("")))
let res = GetProgress {
percentage: values[0].parse().unwrap(),
progress: values[1].clone(),
device: values[2].clone(),
device_id: values[3].clone(),
timestamp: values[4].parse().unwrap(),
document,
};
(StatusCode::OK, Json(json!(res)))
}
async fn healthcheck() -> (StatusCode, Json<Value>) {

View file

@ -11,7 +11,6 @@ User=myuser
Group=users
WorkingDirectory=/home/myuser
ExecStart=/usr/local/bin/kosyncrs
Environment="PG_URL=postgresql://kosync:password@localhost/kosync"
Restart=always
[Install]