From b19cafc2b268ec69f47428c3eab891dd5d404f06 Mon Sep 17 00:00:00 2001 From: Finn Date: Tue, 15 Oct 2024 23:22:07 -0700 Subject: [PATCH] wip --- .gitignore | 2 +- Cargo.toml | 2 +- migrations/20241011053437_init.sql | 6 ++ src/bridge.rs | 2 +- src/config.rs | 29 ++++++++- src/db.rs | 67 ++++++++++++++++++--- src/main.rs | 14 ++--- src/matrix.rs | 95 ++++++++++++++++++++++++++---- 8 files changed, 186 insertions(+), 31 deletions(-) diff --git a/.gitignore b/.gitignore index 74bdf22..d9227c5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ /target config.json -matrix-meshtastic-bridge.db* +/state diff --git a/Cargo.toml b/Cargo.toml index 5651713..49dcadd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,4 +12,4 @@ meshtastic = "0.1.6" serde = { version = "1.0.210", features = ["derive"]} serde_json = "1.0.128" sqlx = { version = "0.7.3", features = ["runtime-tokio-native-tls", "sqlite"] } -tokio = "1.40.0" +tokio = { version = "1.40.0", features = ["full"] } diff --git a/migrations/20241011053437_init.sql b/migrations/20241011053437_init.sql index e8ed445..19801ec 100644 --- a/migrations/20241011053437_init.sql +++ b/migrations/20241011053437_init.sql @@ -13,3 +13,9 @@ CREATE TABLE remote_user ( node_name VARCHAR(100) NOT NULL, room VARCHAR(100) NOT NULL ); + +CREATE TABLE matrix_state ( + id INTEGER PRIMARY KEY NOT NULL, + key TEXT UNIQUE NOT NULL, + value TEXT NOT NULL +); diff --git a/src/bridge.rs b/src/bridge.rs index aaff398..6f72e8c 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -31,7 +31,7 @@ impl Bridge { let (meshtastic_listener, meshtastic_stream_api) = stream_api.connect(serial_stream).await; // setup matrix client - let matrix_client = matrix::build(config.matrix) + let matrix_client = matrix::build(config.matrix, &db) .await .expect("error logging into matrix"); matrix_client.sync_once(SyncSettings::default()).await?; diff --git a/src/config.rs b/src/config.rs index b483c54..da8a70e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,10 +11,15 @@ pub(crate) struct Config { #[derive(serde::Deserialize, Debug)] pub(crate) struct MatrixConfig { pub(crate) username: String, + #[serde(default = "get_matrix_password")] pub(crate) password: String, - #[serde(default = "get_device_name")] + #[serde(default = "get_matrix_device_name")] pub(crate) device_name: String, pub(crate) room: String, + #[serde(default = "get_matrix_state_dir")] + pub(crate) state_dir: String, + #[serde(default = "get_matrix_state_passphrase")] + pub(crate) state_passphrase: String, } #[derive(serde::Deserialize, Debug)] @@ -22,12 +27,30 @@ pub(crate) struct MeshtasticConfig { pub(crate) device: String, } -fn get_device_name() -> String { +fn get_matrix_device_name() -> String { "meshtastic-bridge".to_string() } fn get_db_uri() -> String { - "sqlite://matrix-meshtastic-bridge.db".to_string() + "sqlite://state/matrix-meshtastic-bridge.db".to_string() +} + +fn get_matrix_password() -> String { + match std::env::var("MATRIX_PASSWORD") { + Ok(p) => p, + Err(_) => "".to_string(), + } +} + +fn get_matrix_state_dir() -> String { + "state".to_string() +} + +fn get_matrix_state_passphrase() -> String { + match std::env::var("MATRIX_STATE_PASSPHRASE") { + Ok(p) => p, + Err(_) => "a".to_string(), // I'm sorry but encryption at rest is dumb if the decryption key is stored in the same place as the encrypted data + } } pub(crate) async fn read_config() -> Config { diff --git a/src/db.rs b/src/db.rs index 0845bf0..5523229 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,9 +1,4 @@ -use meshtastic::protobufs::MyNodeInfo; -use sqlx::{ - migrate::{MigrateDatabase, Migrator}, - sqlite::SqlitePool, - Executor, Pool, Sqlite, -}; +use sqlx::{migrate::MigrateDatabase, sqlite::SqlitePool, Pool, Sqlite}; #[derive(Debug, sqlx::FromRow)] pub(crate) struct Device { @@ -24,6 +19,12 @@ pub(crate) struct RemoteUser { pub(crate) room: String, } +#[derive(Debug, sqlx::FromRow)] +struct MatrixState { + key: String, + value: String, +} + pub(crate) struct DB { pool: Pool, } @@ -51,7 +52,7 @@ pub(crate) async fn setup(db_url: String) -> Result Result, Box> { let query = sqlx::query_as::<_, Device>("SELECT * FROM devices WHERE node_num = ? LIMIT 1"); @@ -72,7 +73,57 @@ impl DB { Ok(()) } - pub async fn close(self: DB) { + pub async fn get_matrix_session( + self: &DB, + ) -> Result, Box> { + let session = match self.get_matrix_state("session".to_string()).await? { + Some(session_json) => serde_json::from_str(&session_json)?, + None => None, + }; + + Ok(session) + } + + pub async fn set_matrix_session( + self: &DB, + session: matrix_sdk::matrix_auth::MatrixSession, + ) -> anyhow::Result<()> { + let session_json = serde_json::to_string(&session)?; + + self.set_matrix_state("session".to_string(), session_json) + .await + } + + pub async fn get_matrix_sync_token(self: &DB) -> anyhow::Result> { + self.get_matrix_state("session_store".to_string()).await + } + + pub async fn set_matrix_sync_token(self: &DB, token: String) -> anyhow::Result<()> { + self.set_matrix_state("session_store".to_string(), token) + .await + } + + async fn set_matrix_state(self: &DB, key: String, value: String) -> anyhow::Result<()> { + let query = sqlx::query("INSERT INTO matrix_state (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value"); + + query.bind(key).bind(value).execute(&self.pool).await?; + + Ok(()) + } + + pub async fn get_matrix_state(self: &DB, key: String) -> anyhow::Result> { + let query = sqlx::query_as::<_, MatrixState>( + "SELECT value FROM matrix_state WHERE 'key' = ? LIMIT 1", + ); + let result = match query.bind(key).fetch_optional(&self.pool).await.unwrap() { + Some(row) => Some(row.value), + None => None, + }; + + Ok(result) + } + + pub async fn close(self: &DB) { self.pool.close().await } } diff --git a/src/main.rs b/src/main.rs index d284df8..309080c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use matrix_sdk::{config::SyncSettings, ruma::RoomId}; +use matrix_sdk::ruma::RoomId; use tokio::signal; use tokio::sync::mpsc; @@ -27,23 +27,23 @@ async fn main() -> anyhow::Result<()> { let config = config::read_config().await; - let pool = db::setup(config.db).await.expect("error connecting to db"); + let db = db::setup(config.db).await.expect("error connecting to db"); let (matrix_tx, matrix_rx) = mpsc::channel(32); let room_id = RoomId::parse(&config.matrix.room).expect("invalid room id"); log::debug!("matrix room: {:?}", room_id); - let matrix_client = matrix::build(config.matrix) + let matrix_client = matrix::build(config.matrix, &db) .await .expect("error logging into matrix"); - matrix_client.sync_once(SyncSettings::default()).await?; - let meshtastic_client = meshtastic::build(config.meshtastic) .await .expect("error connecting to meshtastic"); + log::info!("matrix and meshtatic clients ready, spawning threads"); + tokio::spawn(async { meshtastic_client .receive(matrix_tx) @@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> { }); tokio::spawn(matrix::sender(matrix_client.clone(), matrix_rx, room_id)); - tokio::spawn(matrix::sync(matrix_client.clone())); + tokio::spawn(matrix::sync(matrix_client.clone(), &db)); match signal::ctrl_c().await { Ok(()) => {} @@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> { } log::debug!("shutting down database connection"); - pool.close().await; + db.close().await; log::debug!("db connection closed"); Ok(()) diff --git a/src/matrix.rs b/src/matrix.rs index 2e5fbc9..8528f66 100644 --- a/src/matrix.rs +++ b/src/matrix.rs @@ -1,35 +1,58 @@ -use crate::config; +use crate::{config, db}; use log; use matrix_sdk::{ config::SyncSettings, ruma::{ + api::client::filter::FilterDefinition, events::room::{ member::StrippedRoomMemberEvent, message::{RoomMessageEventContent, SyncRoomMessageEvent}, }, OwnedRoomId, TransactionId, UserId, }, - Client, Room, + Client, Error, LoopCtrl, Room, }; use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, Duration}; pub(crate) async fn build( config: config::MatrixConfig, + db: &db::DB, ) -> Result { + log::debug!("creating matrix client"); let user = UserId::parse(config.username) .expect("invalid matrix username - should be a full mxid with server"); + + log::debug!("storing matrix client state in {}", config.state_dir); let client = Client::builder() .server_name(user.server_name()) + .sqlite_store(config.state_dir, Some(&config.state_passphrase)) .build() .await?; - client - .matrix_auth() - .login_username(user, &config.password) - .initial_device_display_name(&config.device_name) - .send() - .await?; + log::debug!("checking if there's an existing session"); + match db.get_matrix_session().await.unwrap() { + Some(session) => { + log::debug!("restoring session from database"); + client.restore_session(session).await?; + } + None => { + log::debug!("no existing session. Logging in"); + let matrix_auth = client.matrix_auth(); + matrix_auth + .login_username(user, &config.password) + .initial_device_display_name(&config.device_name) + .send() + .await?; + + log::debug!("creating new session"); + let session = matrix_auth + .session() + .expect("A logged-in matrix client should have a session"); + + db.set_matrix_session(session).await.unwrap(); + } + } client.add_event_handler(|ev: SyncRoomMessageEvent| async move { log::debug!("[matrix] Received a message {:?}", ev); @@ -37,7 +60,11 @@ pub(crate) async fn build( client.add_event_handler(on_stripped_state_member); - log::debug!("connected to matrix"); + log::debug!("connected to matrix, syncing"); + + client.sync_once(SyncSettings::default()).await?; + + log::info!("connected to matrix server and synchronized state, ready to continue startup"); return Ok(client); } @@ -106,9 +133,57 @@ pub(crate) async fn sender( } } -pub(crate) async fn sync(client: Client) { +pub(crate) async fn sync(client: Client, db: &db::DB) -> anyhow::Result<()> { + let filter = FilterDefinition::with_lazy_loading(); + let mut sync_settings = SyncSettings::default().filter(filter.into()); + + if let Some(token) = db + .get_matrix_sync_token() + .await + .map_err(|err| Error::UnknownError(err.into()))? + { + sync_settings = sync_settings.token(token); + } + client .sync(SyncSettings::default()) .await .expect("error syncing matrix"); + + loop { + match client.sync_once(sync_settings.clone()).await { + Ok(response) => { + // This is the last time we need to provide this token, the sync method after + // will handle it on its own. + sync_settings = sync_settings.token(response.next_batch.clone()); + db.set_matrix_sync_token(response.next_batch).await?; + break; + } + Err(error) => { + log::error!("An error occurred during initial sync: {error}"); + log::error!("Trying again…"); + } + } + } + + log::info!("The client is ready! Listening to new messages…"); + + // Now that we've synced, let's attach a handler for incoming room messages. + // client.add_event_handler(on_room_message); + + // This loops until we kill the program or an error happens. + client + .sync_with_result_callback(sync_settings, |sync_result| async move { + let response = sync_result?; + + // We persist the token each time to be able to restore our session + db.set_matrix_sync_token(response.next_batch) + .await + .map_err(|err| Error::UnknownError(err.into()))?; + + Ok(LoopCtrl::Continue) + }) + .await?; + + Ok(()) }