WIP: matrix session restore #4

Draft
finn wants to merge 2 commits from matrix-session-restore into main
10 changed files with 220 additions and 56 deletions

2
.gitignore vendored
View file

@ -1,3 +1,3 @@
/target /target
config.json config.json
matrix-meshtastic-bridge.db* /state

1
Cargo.lock generated
View file

@ -1747,6 +1747,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"env_logger", "env_logger",
"futures",
"log", "log",
"matrix-sdk", "matrix-sdk",
"meshtastic", "meshtastic",

View file

@ -6,10 +6,11 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.89" anyhow = "1.0.89"
env_logger = "0.11.5" env_logger = "0.11.5"
futures = "0.3.31"
log = "0.4.22" log = "0.4.22"
matrix-sdk = { version = "0.7.1", features = ["anyhow"] } matrix-sdk = { version = "0.7.1", features = ["anyhow"] }
meshtastic = "0.1.6" meshtastic = "0.1.6"
serde = { version = "1.0.210", features = ["derive"]} serde = { version = "1.0.210", features = ["derive"]}
serde_json = "1.0.128" serde_json = "1.0.128"
sqlx = { version = "0.7.3", features = ["runtime-tokio-native-tls", "sqlite"] } sqlx = { version = "0.7.3", features = ["runtime-tokio-native-tls", "sqlite"] }
tokio = "1.40.0" tokio = { version = "1.40.0", features = ["full"] }

View file

@ -13,3 +13,9 @@ CREATE TABLE remote_user (
node_name VARCHAR(100) NOT NULL, node_name VARCHAR(100) NOT NULL,
room VARCHAR(100) NOT NULL room VARCHAR(100) NOT NULL
); );
CREATE TABLE matrix_state (
id INTEGER PRIMARY KEY NOT NULL,
key TEXT UNIQUE NOT NULL,
value TEXT NOT NULL
);

View file

@ -31,7 +31,7 @@ impl Bridge {
let (meshtastic_listener, meshtastic_stream_api) = stream_api.connect(serial_stream).await; let (meshtastic_listener, meshtastic_stream_api) = stream_api.connect(serial_stream).await;
// setup matrix client // setup matrix client
let matrix_client = matrix::build(config.matrix) let matrix_client = matrix::build(config.matrix, &db)
.await .await
.expect("error logging into matrix"); .expect("error logging into matrix");
matrix_client.sync_once(SyncSettings::default()).await?; matrix_client.sync_once(SyncSettings::default()).await?;

View file

@ -11,10 +11,15 @@ pub(crate) struct Config {
#[derive(serde::Deserialize, Debug)] #[derive(serde::Deserialize, Debug)]
pub(crate) struct MatrixConfig { pub(crate) struct MatrixConfig {
pub(crate) username: String, pub(crate) username: String,
#[serde(default = "get_matrix_password")]
pub(crate) password: String, pub(crate) password: String,
#[serde(default = "get_device_name")] #[serde(default = "get_matrix_device_name")]
pub(crate) device_name: String, pub(crate) device_name: String,
pub(crate) room: String, pub(crate) room: String,
#[serde(default = "get_matrix_state_dir")]
pub(crate) state_dir: String,
#[serde(default = "get_matrix_state_passphrase")]
pub(crate) state_passphrase: String,
} }
#[derive(serde::Deserialize, Debug)] #[derive(serde::Deserialize, Debug)]
@ -22,12 +27,30 @@ pub(crate) struct MeshtasticConfig {
pub(crate) device: String, pub(crate) device: String,
} }
fn get_device_name() -> String { fn get_matrix_device_name() -> String {
"meshtastic-bridge".to_string() "meshtastic-bridge".to_string()
} }
fn get_db_uri() -> String { fn get_db_uri() -> String {
"sqlite://matrix-meshtastic-bridge.db".to_string() "sqlite://state/matrix-meshtastic-bridge.db".to_string()
}
fn get_matrix_password() -> String {
match std::env::var("MATRIX_PASSWORD") {
Ok(p) => p,
Err(_) => "".to_string(),
}
}
fn get_matrix_state_dir() -> String {
"state".to_string()
}
fn get_matrix_state_passphrase() -> String {
match std::env::var("MATRIX_STATE_PASSPHRASE") {
Ok(p) => p,
Err(_) => "a".to_string(), // I'm sorry but encryption at rest is dumb if the decryption key is stored in the same place as the encrypted data
}
} }
pub(crate) async fn read_config() -> Config { pub(crate) async fn read_config() -> Config {

View file

@ -1,9 +1,6 @@
use meshtastic::protobufs::MyNodeInfo; use std::sync::Arc;
use sqlx::{
migrate::{MigrateDatabase, Migrator}, use sqlx::{migrate::MigrateDatabase, sqlite::SqlitePool, Pool, Sqlite};
sqlite::SqlitePool,
Executor, Pool, Sqlite,
};
#[derive(Debug, sqlx::FromRow)] #[derive(Debug, sqlx::FromRow)]
pub(crate) struct Device { pub(crate) struct Device {
@ -24,11 +21,17 @@ pub(crate) struct RemoteUser {
pub(crate) room: String, pub(crate) room: String,
} }
#[derive(Debug, sqlx::FromRow)]
struct MatrixState {
key: String,
value: String,
}
pub(crate) struct DB { pub(crate) struct DB {
pool: Pool<Sqlite>, pool: Pool<Sqlite>,
} }
pub(crate) async fn setup(db_url: String) -> Result<DB, Box<dyn std::error::Error>> { pub(crate) async fn setup(db_url: String) -> anyhow::Result<Arc<DB>> {
if !Sqlite::database_exists(&db_url).await.unwrap_or(false) { if !Sqlite::database_exists(&db_url).await.unwrap_or(false) {
log::debug!("Creating database {}", &db_url); log::debug!("Creating database {}", &db_url);
match Sqlite::create_database(&db_url).await { match Sqlite::create_database(&db_url).await {
@ -46,12 +49,12 @@ pub(crate) async fn setup(db_url: String) -> Result<DB, Box<dyn std::error::Erro
.await .await
.expect("error running db migrations"); .expect("error running db migrations");
Ok(DB { pool: pool }) Ok(Arc::new(DB { pool: pool }))
} }
impl DB { impl DB {
pub async fn get_device( pub async fn get_device(
self: DB, self: &DB,
node_num: i32, node_num: i32,
) -> Result<Option<Device>, Box<dyn std::error::Error>> { ) -> Result<Option<Device>, Box<dyn std::error::Error>> {
let query = sqlx::query_as::<_, Device>("SELECT * FROM devices WHERE node_num = ? LIMIT 1"); let query = sqlx::query_as::<_, Device>("SELECT * FROM devices WHERE node_num = ? LIMIT 1");
@ -72,7 +75,57 @@ impl DB {
Ok(()) Ok(())
} }
pub async fn close(self: DB) { pub async fn get_matrix_session(
self: &DB,
) -> Result<Option<matrix_sdk::matrix_auth::MatrixSession>, Box<dyn std::error::Error>> {
let session = match self.get_matrix_state("session".to_string()).await? {
Some(session_json) => serde_json::from_str(&session_json)?,
None => None,
};
Ok(session)
}
pub async fn set_matrix_session(
self: &DB,
session: matrix_sdk::matrix_auth::MatrixSession,
) -> anyhow::Result<()> {
let session_json = serde_json::to_string(&session)?;
self.set_matrix_state("session".to_string(), session_json)
.await
}
pub async fn get_matrix_sync_token(self: &DB) -> anyhow::Result<Option<String>> {
self.get_matrix_state("session_store".to_string()).await
}
pub async fn set_matrix_sync_token(self: &DB, token: String) -> anyhow::Result<()> {
self.set_matrix_state("session_store".to_string(), token)
.await
}
async fn set_matrix_state(self: &DB, key: String, value: String) -> anyhow::Result<()> {
let query = sqlx::query("INSERT INTO matrix_state (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value");
query.bind(key).bind(value).execute(&self.pool).await?;
Ok(())
}
pub async fn get_matrix_state(self: &DB, key: String) -> anyhow::Result<Option<String>> {
let query = sqlx::query_as::<_, MatrixState>(
"SELECT value FROM matrix_state WHERE 'key' = ? LIMIT 1",
);
let result = match query.bind(key).fetch_optional(&self.pool).await.unwrap() {
Some(row) => Some(row.value),
None => None,
};
Ok(result)
}
pub async fn close(self: &DB) {
self.pool.close().await self.pool.close().await
} }
} }

View file

@ -1,4 +1,4 @@
use matrix_sdk::{config::SyncSettings, ruma::RoomId}; use matrix_sdk::ruma::RoomId;
use tokio::signal; use tokio::signal;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -8,51 +8,38 @@ mod db;
mod matrix; mod matrix;
mod meshtastic; mod meshtastic;
// #[tokio::main]
// async fn main() -> anyhow::Result<()> {
// env_logger::init();
// let config = config::read_config().await;
// let bridge = bridge::Bridge::setup(config).await.unwrap();
// bridge.run().await.unwrap();
// Ok(())
// }
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
env_logger::init(); env_logger::init();
let config = config::read_config().await; let config = config::read_config().await;
let pool = db::setup(config.db).await.expect("error connecting to db"); let db = db::setup(config.db).await.expect("error connecting to db");
let (matrix_tx, matrix_rx) = mpsc::channel(32); let (matrix_tx, matrix_rx) = mpsc::channel(32);
let room_id = RoomId::parse(&config.matrix.room).expect("invalid room id"); let room_id = RoomId::parse(&config.matrix.room).expect("invalid room id");
log::debug!("matrix room: {:?}", room_id); log::debug!("matrix room: {:?}", room_id);
let matrix_client = matrix::build(config.matrix) let matrix_client = matrix::build(config.matrix, &db)
.await .await
.expect("error logging into matrix"); .expect("error logging into matrix");
matrix_client.sync_once(SyncSettings::default()).await?;
let meshtastic_client = meshtastic::build(config.meshtastic) let meshtastic_client = meshtastic::build(config.meshtastic)
.await .await
.expect("error connecting to meshtastic"); .expect("error connecting to meshtastic");
tokio::spawn(async { log::info!("matrix and meshtatic clients ready, spawning threads");
meshtastic_client
.receive(matrix_tx)
.await
.expect("error receiving message from meshtastic");
});
tokio::spawn(matrix::sender(matrix_client.clone(), matrix_rx, room_id)); let mut tasks = vec![];
tokio::spawn(matrix::sync(matrix_client.clone()));
tasks.push(tokio::spawn(matrix::sender(
matrix_client,
matrix_rx,
room_id,
)));
tasks.push(tokio::spawn(matrix::sync(matrix_client, db)));
tasks.push(tokio::spawn(meshtastic_client.receive(matrix_tx)));
match signal::ctrl_c().await { match signal::ctrl_c().await {
Ok(()) => {} Ok(()) => {}
@ -61,8 +48,14 @@ async fn main() -> anyhow::Result<()> {
} }
} }
log::info!("shutting down tasks");
meshtastic_client.close().await;
futures::future::join_all(tasks).await;
log::debug!("shutting down database connection"); log::debug!("shutting down database connection");
pool.close().await; db.close().await;
log::debug!("db connection closed"); log::debug!("db connection closed");
Ok(()) Ok(())

View file

@ -1,43 +1,72 @@
use crate::config; use std::sync::Arc;
use crate::{config, db};
use log; use log;
use matrix_sdk::{ use matrix_sdk::{
config::SyncSettings, config::SyncSettings,
ruma::{ ruma::{
api::client::filter::FilterDefinition,
events::room::{ events::room::{
member::StrippedRoomMemberEvent, member::StrippedRoomMemberEvent,
message::{RoomMessageEventContent, SyncRoomMessageEvent}, message::{RoomMessageEventContent, SyncRoomMessageEvent},
}, },
OwnedRoomId, TransactionId, UserId, OwnedRoomId, TransactionId, UserId,
}, },
Client, Room, Client, Error, LoopCtrl, Room,
}; };
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
pub(crate) async fn build( pub(crate) async fn build(
config: config::MatrixConfig, config: config::MatrixConfig,
db: &db::DB,
) -> Result<matrix_sdk::Client, anyhow::Error> { ) -> Result<matrix_sdk::Client, anyhow::Error> {
log::debug!("creating matrix client");
let user = UserId::parse(config.username) let user = UserId::parse(config.username)
.expect("invalid matrix username - should be a full mxid with server"); .expect("invalid matrix username - should be a full mxid with server");
log::debug!("storing matrix client state in {}", config.state_dir);
let client = Client::builder() let client = Client::builder()
.server_name(user.server_name()) .server_name(user.server_name())
.sqlite_store(config.state_dir, Some(&config.state_passphrase))
.build() .build()
.await?; .await?;
client log::debug!("checking if there's an existing session");
.matrix_auth() match db.get_matrix_session().await.unwrap() {
Some(session) => {
log::debug!("restoring session from database");
client.restore_session(session).await?;
}
None => {
log::debug!("no existing session. Logging in");
let matrix_auth = client.matrix_auth();
matrix_auth
.login_username(user, &config.password) .login_username(user, &config.password)
.initial_device_display_name(&config.device_name) .initial_device_display_name(&config.device_name)
.send() .send()
.await?; .await?;
log::debug!("creating new session");
let session = matrix_auth
.session()
.expect("A logged-in matrix client should have a session");
db.set_matrix_session(session).await.unwrap();
}
}
client.add_event_handler(|ev: SyncRoomMessageEvent| async move { client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
log::debug!("[matrix] Received a message {:?}", ev); log::debug!("[matrix] Received a message {:?}", ev);
}); });
client.add_event_handler(on_stripped_state_member); client.add_event_handler(on_stripped_state_member);
log::debug!("connected to matrix"); log::debug!("connected to matrix, syncing");
client.sync_once(SyncSettings::default()).await?;
log::info!("connected to matrix server and synchronized state, ready to continue startup");
return Ok(client); return Ok(client);
} }
@ -92,7 +121,7 @@ pub(crate) async fn sender(
client: Client, client: Client,
mut rx: Receiver<RoomMessageEventContent>, mut rx: Receiver<RoomMessageEventContent>,
room_id: OwnedRoomId, room_id: OwnedRoomId,
) { ) -> anyhow::Result<()> {
let room = match client.get_room(&room_id) { let room = match client.get_room(&room_id) {
Some(room) => room, Some(room) => room,
None => panic!("requested matrix room not found"), None => panic!("requested matrix room not found"),
@ -104,11 +133,61 @@ pub(crate) async fn sender(
Ok(response) => log::debug!("sent message to matrix: {:?}", response), Ok(response) => log::debug!("sent message to matrix: {:?}", response),
} }
} }
Ok(())
} }
pub(crate) async fn sync(client: Client) { pub(crate) async fn sync(client: Client, db: Arc<db::DB>) -> anyhow::Result<()> {
let filter = FilterDefinition::with_lazy_loading();
let mut sync_settings = SyncSettings::default().filter(filter.into());
if let Some(token) = db
.get_matrix_sync_token()
.await
.map_err(|err| Error::UnknownError(err.into()))?
{
sync_settings = sync_settings.token(token);
}
client client
.sync(SyncSettings::default()) .sync(SyncSettings::default())
.await .await
.expect("error syncing matrix"); .expect("error syncing matrix");
loop {
match client.sync_once(sync_settings.clone()).await {
Ok(response) => {
// This is the last time we need to provide this token, the sync method after
// will handle it on its own.
sync_settings = sync_settings.token(response.next_batch.clone());
db.set_matrix_sync_token(response.next_batch).await?;
break;
}
Err(error) => {
log::error!("An error occurred during initial sync: {error}");
log::error!("Trying again…");
}
}
}
log::info!("The client is ready! Listening to new messages…");
// Now that we've synced, let's attach a handler for incoming room messages.
// client.add_event_handler(on_room_message);
// This loops until we kill the program or an error happens.
client
.sync_with_result_callback(sync_settings, |sync_result| async move {
let response = sync_result?;
// We persist the token each time to be able to restore our session
db.set_matrix_sync_token(response.next_batch)
.await
.map_err(|err| Error::UnknownError(err.into()))?;
Ok(LoopCtrl::Continue)
})
.await?;
Ok(())
} }

View file

@ -1,3 +1,5 @@
use std::sync::Arc;
use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; use matrix_sdk::ruma::events::room::message::RoomMessageEventContent;
use meshtastic::api::state::Connected; use meshtastic::api::state::Connected;
use meshtastic::api::{ConnectedStreamApi, StreamApi}; use meshtastic::api::{ConnectedStreamApi, StreamApi};
@ -33,7 +35,7 @@ impl MeshtasticClient {
pub async fn receive( pub async fn receive(
mut self: MeshtasticClient, mut self: MeshtasticClient,
matrix_sender: Sender<RoomMessageEventContent>, matrix_sender: Sender<RoomMessageEventContent>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> anyhow::Result<()> {
// let config_id = utils::generate_rand_id(); // let config_id = utils::generate_rand_id();
// let stream_api = stream_api.configure(config_id).await?; // let stream_api = stream_api.configure(config_id).await?;
@ -109,13 +111,19 @@ impl MeshtasticClient {
Ok(()) Ok(())
} }
pub async fn close(self: &mut MeshtasticClient) -> anyhow::Result<()> {
self.decoded_listener.close();
Ok(())
}
} }
async fn handle_decoded_packet( async fn handle_decoded_packet(
matrix_sender: &Sender<RoomMessageEventContent>, matrix_sender: &Sender<RoomMessageEventContent>,
// packet: MeshPacket, // packet: MeshPacket,
decoded: Data, decoded: Data,
) -> Result<(), Box<dyn std::error::Error>> { ) -> anyhow::Result<()> {
match decoded.portnum() { match decoded.portnum() {
meshtastic::protobufs::PortNum::TextMessageApp => { meshtastic::protobufs::PortNum::TextMessageApp => {
debug_message( debug_message(