WIP: matrix session restore #4
10 changed files with 220 additions and 56 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1,3 +1,3 @@
|
||||||
/target
|
/target
|
||||||
config.json
|
config.json
|
||||||
matrix-meshtastic-bridge.db*
|
/state
|
||||||
|
|
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1747,6 +1747,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
|
"futures",
|
||||||
"log",
|
"log",
|
||||||
"matrix-sdk",
|
"matrix-sdk",
|
||||||
"meshtastic",
|
"meshtastic",
|
||||||
|
|
|
@ -6,10 +6,11 @@ edition = "2021"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.89"
|
anyhow = "1.0.89"
|
||||||
env_logger = "0.11.5"
|
env_logger = "0.11.5"
|
||||||
|
futures = "0.3.31"
|
||||||
log = "0.4.22"
|
log = "0.4.22"
|
||||||
matrix-sdk = { version = "0.7.1", features = ["anyhow"] }
|
matrix-sdk = { version = "0.7.1", features = ["anyhow"] }
|
||||||
meshtastic = "0.1.6"
|
meshtastic = "0.1.6"
|
||||||
serde = { version = "1.0.210", features = ["derive"]}
|
serde = { version = "1.0.210", features = ["derive"]}
|
||||||
serde_json = "1.0.128"
|
serde_json = "1.0.128"
|
||||||
sqlx = { version = "0.7.3", features = ["runtime-tokio-native-tls", "sqlite"] }
|
sqlx = { version = "0.7.3", features = ["runtime-tokio-native-tls", "sqlite"] }
|
||||||
tokio = "1.40.0"
|
tokio = { version = "1.40.0", features = ["full"] }
|
||||||
|
|
|
@ -13,3 +13,9 @@ CREATE TABLE remote_user (
|
||||||
node_name VARCHAR(100) NOT NULL,
|
node_name VARCHAR(100) NOT NULL,
|
||||||
room VARCHAR(100) NOT NULL
|
room VARCHAR(100) NOT NULL
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE TABLE matrix_state (
|
||||||
|
id INTEGER PRIMARY KEY NOT NULL,
|
||||||
|
key TEXT UNIQUE NOT NULL,
|
||||||
|
value TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
|
@ -31,7 +31,7 @@ impl Bridge {
|
||||||
let (meshtastic_listener, meshtastic_stream_api) = stream_api.connect(serial_stream).await;
|
let (meshtastic_listener, meshtastic_stream_api) = stream_api.connect(serial_stream).await;
|
||||||
|
|
||||||
// setup matrix client
|
// setup matrix client
|
||||||
let matrix_client = matrix::build(config.matrix)
|
let matrix_client = matrix::build(config.matrix, &db)
|
||||||
.await
|
.await
|
||||||
.expect("error logging into matrix");
|
.expect("error logging into matrix");
|
||||||
matrix_client.sync_once(SyncSettings::default()).await?;
|
matrix_client.sync_once(SyncSettings::default()).await?;
|
||||||
|
|
|
@ -11,10 +11,15 @@ pub(crate) struct Config {
|
||||||
#[derive(serde::Deserialize, Debug)]
|
#[derive(serde::Deserialize, Debug)]
|
||||||
pub(crate) struct MatrixConfig {
|
pub(crate) struct MatrixConfig {
|
||||||
pub(crate) username: String,
|
pub(crate) username: String,
|
||||||
|
#[serde(default = "get_matrix_password")]
|
||||||
pub(crate) password: String,
|
pub(crate) password: String,
|
||||||
#[serde(default = "get_device_name")]
|
#[serde(default = "get_matrix_device_name")]
|
||||||
pub(crate) device_name: String,
|
pub(crate) device_name: String,
|
||||||
pub(crate) room: String,
|
pub(crate) room: String,
|
||||||
|
#[serde(default = "get_matrix_state_dir")]
|
||||||
|
pub(crate) state_dir: String,
|
||||||
|
#[serde(default = "get_matrix_state_passphrase")]
|
||||||
|
pub(crate) state_passphrase: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(serde::Deserialize, Debug)]
|
#[derive(serde::Deserialize, Debug)]
|
||||||
|
@ -22,12 +27,30 @@ pub(crate) struct MeshtasticConfig {
|
||||||
pub(crate) device: String,
|
pub(crate) device: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_device_name() -> String {
|
fn get_matrix_device_name() -> String {
|
||||||
"meshtastic-bridge".to_string()
|
"meshtastic-bridge".to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_db_uri() -> String {
|
fn get_db_uri() -> String {
|
||||||
"sqlite://matrix-meshtastic-bridge.db".to_string()
|
"sqlite://state/matrix-meshtastic-bridge.db".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_matrix_password() -> String {
|
||||||
|
match std::env::var("MATRIX_PASSWORD") {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => "".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_matrix_state_dir() -> String {
|
||||||
|
"state".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_matrix_state_passphrase() -> String {
|
||||||
|
match std::env::var("MATRIX_STATE_PASSPHRASE") {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => "a".to_string(), // I'm sorry but encryption at rest is dumb if the decryption key is stored in the same place as the encrypted data
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn read_config() -> Config {
|
pub(crate) async fn read_config() -> Config {
|
||||||
|
|
73
src/db.rs
73
src/db.rs
|
@ -1,9 +1,6 @@
|
||||||
use meshtastic::protobufs::MyNodeInfo;
|
use std::sync::Arc;
|
||||||
use sqlx::{
|
|
||||||
migrate::{MigrateDatabase, Migrator},
|
use sqlx::{migrate::MigrateDatabase, sqlite::SqlitePool, Pool, Sqlite};
|
||||||
sqlite::SqlitePool,
|
|
||||||
Executor, Pool, Sqlite,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Debug, sqlx::FromRow)]
|
#[derive(Debug, sqlx::FromRow)]
|
||||||
pub(crate) struct Device {
|
pub(crate) struct Device {
|
||||||
|
@ -24,11 +21,17 @@ pub(crate) struct RemoteUser {
|
||||||
pub(crate) room: String,
|
pub(crate) room: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, sqlx::FromRow)]
|
||||||
|
struct MatrixState {
|
||||||
|
key: String,
|
||||||
|
value: String,
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) struct DB {
|
pub(crate) struct DB {
|
||||||
pool: Pool<Sqlite>,
|
pool: Pool<Sqlite>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn setup(db_url: String) -> Result<DB, Box<dyn std::error::Error>> {
|
pub(crate) async fn setup(db_url: String) -> anyhow::Result<Arc<DB>> {
|
||||||
if !Sqlite::database_exists(&db_url).await.unwrap_or(false) {
|
if !Sqlite::database_exists(&db_url).await.unwrap_or(false) {
|
||||||
log::debug!("Creating database {}", &db_url);
|
log::debug!("Creating database {}", &db_url);
|
||||||
match Sqlite::create_database(&db_url).await {
|
match Sqlite::create_database(&db_url).await {
|
||||||
|
@ -46,12 +49,12 @@ pub(crate) async fn setup(db_url: String) -> Result<DB, Box<dyn std::error::Erro
|
||||||
.await
|
.await
|
||||||
.expect("error running db migrations");
|
.expect("error running db migrations");
|
||||||
|
|
||||||
Ok(DB { pool: pool })
|
Ok(Arc::new(DB { pool: pool }))
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DB {
|
impl DB {
|
||||||
pub async fn get_device(
|
pub async fn get_device(
|
||||||
self: DB,
|
self: &DB,
|
||||||
node_num: i32,
|
node_num: i32,
|
||||||
) -> Result<Option<Device>, Box<dyn std::error::Error>> {
|
) -> Result<Option<Device>, Box<dyn std::error::Error>> {
|
||||||
let query = sqlx::query_as::<_, Device>("SELECT * FROM devices WHERE node_num = ? LIMIT 1");
|
let query = sqlx::query_as::<_, Device>("SELECT * FROM devices WHERE node_num = ? LIMIT 1");
|
||||||
|
@ -72,7 +75,57 @@ impl DB {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn close(self: DB) {
|
pub async fn get_matrix_session(
|
||||||
|
self: &DB,
|
||||||
|
) -> Result<Option<matrix_sdk::matrix_auth::MatrixSession>, Box<dyn std::error::Error>> {
|
||||||
|
let session = match self.get_matrix_state("session".to_string()).await? {
|
||||||
|
Some(session_json) => serde_json::from_str(&session_json)?,
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(session)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn set_matrix_session(
|
||||||
|
self: &DB,
|
||||||
|
session: matrix_sdk::matrix_auth::MatrixSession,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let session_json = serde_json::to_string(&session)?;
|
||||||
|
|
||||||
|
self.set_matrix_state("session".to_string(), session_json)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_matrix_sync_token(self: &DB) -> anyhow::Result<Option<String>> {
|
||||||
|
self.get_matrix_state("session_store".to_string()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn set_matrix_sync_token(self: &DB, token: String) -> anyhow::Result<()> {
|
||||||
|
self.set_matrix_state("session_store".to_string(), token)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn set_matrix_state(self: &DB, key: String, value: String) -> anyhow::Result<()> {
|
||||||
|
let query = sqlx::query("INSERT INTO matrix_state (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value");
|
||||||
|
|
||||||
|
query.bind(key).bind(value).execute(&self.pool).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_matrix_state(self: &DB, key: String) -> anyhow::Result<Option<String>> {
|
||||||
|
let query = sqlx::query_as::<_, MatrixState>(
|
||||||
|
"SELECT value FROM matrix_state WHERE 'key' = ? LIMIT 1",
|
||||||
|
);
|
||||||
|
let result = match query.bind(key).fetch_optional(&self.pool).await.unwrap() {
|
||||||
|
Some(row) => Some(row.value),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn close(self: &DB) {
|
||||||
self.pool.close().await
|
self.pool.close().await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
47
src/main.rs
47
src/main.rs
|
@ -1,4 +1,4 @@
|
||||||
use matrix_sdk::{config::SyncSettings, ruma::RoomId};
|
use matrix_sdk::ruma::RoomId;
|
||||||
use tokio::signal;
|
use tokio::signal;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
@ -8,51 +8,38 @@ mod db;
|
||||||
mod matrix;
|
mod matrix;
|
||||||
mod meshtastic;
|
mod meshtastic;
|
||||||
|
|
||||||
// #[tokio::main]
|
|
||||||
// async fn main() -> anyhow::Result<()> {
|
|
||||||
// env_logger::init();
|
|
||||||
|
|
||||||
// let config = config::read_config().await;
|
|
||||||
|
|
||||||
// let bridge = bridge::Bridge::setup(config).await.unwrap();
|
|
||||||
|
|
||||||
// bridge.run().await.unwrap();
|
|
||||||
|
|
||||||
// Ok(())
|
|
||||||
// }
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let config = config::read_config().await;
|
let config = config::read_config().await;
|
||||||
|
|
||||||
let pool = db::setup(config.db).await.expect("error connecting to db");
|
let db = db::setup(config.db).await.expect("error connecting to db");
|
||||||
|
|
||||||
let (matrix_tx, matrix_rx) = mpsc::channel(32);
|
let (matrix_tx, matrix_rx) = mpsc::channel(32);
|
||||||
|
|
||||||
let room_id = RoomId::parse(&config.matrix.room).expect("invalid room id");
|
let room_id = RoomId::parse(&config.matrix.room).expect("invalid room id");
|
||||||
log::debug!("matrix room: {:?}", room_id);
|
log::debug!("matrix room: {:?}", room_id);
|
||||||
|
|
||||||
let matrix_client = matrix::build(config.matrix)
|
let matrix_client = matrix::build(config.matrix, &db)
|
||||||
.await
|
.await
|
||||||
.expect("error logging into matrix");
|
.expect("error logging into matrix");
|
||||||
|
|
||||||
matrix_client.sync_once(SyncSettings::default()).await?;
|
|
||||||
|
|
||||||
let meshtastic_client = meshtastic::build(config.meshtastic)
|
let meshtastic_client = meshtastic::build(config.meshtastic)
|
||||||
.await
|
.await
|
||||||
.expect("error connecting to meshtastic");
|
.expect("error connecting to meshtastic");
|
||||||
|
|
||||||
tokio::spawn(async {
|
log::info!("matrix and meshtatic clients ready, spawning threads");
|
||||||
meshtastic_client
|
|
||||||
.receive(matrix_tx)
|
|
||||||
.await
|
|
||||||
.expect("error receiving message from meshtastic");
|
|
||||||
});
|
|
||||||
|
|
||||||
tokio::spawn(matrix::sender(matrix_client.clone(), matrix_rx, room_id));
|
let mut tasks = vec![];
|
||||||
tokio::spawn(matrix::sync(matrix_client.clone()));
|
|
||||||
|
tasks.push(tokio::spawn(matrix::sender(
|
||||||
|
matrix_client,
|
||||||
|
matrix_rx,
|
||||||
|
room_id,
|
||||||
|
)));
|
||||||
|
tasks.push(tokio::spawn(matrix::sync(matrix_client, db)));
|
||||||
|
tasks.push(tokio::spawn(meshtastic_client.receive(matrix_tx)));
|
||||||
|
|
||||||
match signal::ctrl_c().await {
|
match signal::ctrl_c().await {
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
|
@ -61,8 +48,14 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log::info!("shutting down tasks");
|
||||||
|
|
||||||
|
meshtastic_client.close().await;
|
||||||
|
|
||||||
|
futures::future::join_all(tasks).await;
|
||||||
|
|
||||||
log::debug!("shutting down database connection");
|
log::debug!("shutting down database connection");
|
||||||
pool.close().await;
|
db.close().await;
|
||||||
log::debug!("db connection closed");
|
log::debug!("db connection closed");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
101
src/matrix.rs
101
src/matrix.rs
|
@ -1,35 +1,60 @@
|
||||||
use crate::config;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use crate::{config, db};
|
||||||
use log;
|
use log;
|
||||||
use matrix_sdk::{
|
use matrix_sdk::{
|
||||||
config::SyncSettings,
|
config::SyncSettings,
|
||||||
ruma::{
|
ruma::{
|
||||||
|
api::client::filter::FilterDefinition,
|
||||||
events::room::{
|
events::room::{
|
||||||
member::StrippedRoomMemberEvent,
|
member::StrippedRoomMemberEvent,
|
||||||
message::{RoomMessageEventContent, SyncRoomMessageEvent},
|
message::{RoomMessageEventContent, SyncRoomMessageEvent},
|
||||||
},
|
},
|
||||||
OwnedRoomId, TransactionId, UserId,
|
OwnedRoomId, TransactionId, UserId,
|
||||||
},
|
},
|
||||||
Client, Room,
|
Client, Error, LoopCtrl, Room,
|
||||||
};
|
};
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
|
|
||||||
pub(crate) async fn build(
|
pub(crate) async fn build(
|
||||||
config: config::MatrixConfig,
|
config: config::MatrixConfig,
|
||||||
|
db: &db::DB,
|
||||||
) -> Result<matrix_sdk::Client, anyhow::Error> {
|
) -> Result<matrix_sdk::Client, anyhow::Error> {
|
||||||
|
log::debug!("creating matrix client");
|
||||||
let user = UserId::parse(config.username)
|
let user = UserId::parse(config.username)
|
||||||
.expect("invalid matrix username - should be a full mxid with server");
|
.expect("invalid matrix username - should be a full mxid with server");
|
||||||
|
|
||||||
|
log::debug!("storing matrix client state in {}", config.state_dir);
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.server_name(user.server_name())
|
.server_name(user.server_name())
|
||||||
|
.sqlite_store(config.state_dir, Some(&config.state_passphrase))
|
||||||
.build()
|
.build()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
client
|
log::debug!("checking if there's an existing session");
|
||||||
.matrix_auth()
|
match db.get_matrix_session().await.unwrap() {
|
||||||
.login_username(user, &config.password)
|
Some(session) => {
|
||||||
.initial_device_display_name(&config.device_name)
|
log::debug!("restoring session from database");
|
||||||
.send()
|
client.restore_session(session).await?;
|
||||||
.await?;
|
}
|
||||||
|
None => {
|
||||||
|
log::debug!("no existing session. Logging in");
|
||||||
|
let matrix_auth = client.matrix_auth();
|
||||||
|
matrix_auth
|
||||||
|
.login_username(user, &config.password)
|
||||||
|
.initial_device_display_name(&config.device_name)
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
log::debug!("creating new session");
|
||||||
|
let session = matrix_auth
|
||||||
|
.session()
|
||||||
|
.expect("A logged-in matrix client should have a session");
|
||||||
|
|
||||||
|
db.set_matrix_session(session).await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
|
client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
|
||||||
log::debug!("[matrix] Received a message {:?}", ev);
|
log::debug!("[matrix] Received a message {:?}", ev);
|
||||||
|
@ -37,7 +62,11 @@ pub(crate) async fn build(
|
||||||
|
|
||||||
client.add_event_handler(on_stripped_state_member);
|
client.add_event_handler(on_stripped_state_member);
|
||||||
|
|
||||||
log::debug!("connected to matrix");
|
log::debug!("connected to matrix, syncing");
|
||||||
|
|
||||||
|
client.sync_once(SyncSettings::default()).await?;
|
||||||
|
|
||||||
|
log::info!("connected to matrix server and synchronized state, ready to continue startup");
|
||||||
|
|
||||||
return Ok(client);
|
return Ok(client);
|
||||||
}
|
}
|
||||||
|
@ -92,7 +121,7 @@ pub(crate) async fn sender(
|
||||||
client: Client,
|
client: Client,
|
||||||
mut rx: Receiver<RoomMessageEventContent>,
|
mut rx: Receiver<RoomMessageEventContent>,
|
||||||
room_id: OwnedRoomId,
|
room_id: OwnedRoomId,
|
||||||
) {
|
) -> anyhow::Result<()> {
|
||||||
let room = match client.get_room(&room_id) {
|
let room = match client.get_room(&room_id) {
|
||||||
Some(room) => room,
|
Some(room) => room,
|
||||||
None => panic!("requested matrix room not found"),
|
None => panic!("requested matrix room not found"),
|
||||||
|
@ -104,11 +133,61 @@ pub(crate) async fn sender(
|
||||||
Ok(response) => log::debug!("sent message to matrix: {:?}", response),
|
Ok(response) => log::debug!("sent message to matrix: {:?}", response),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn sync(client: Client) {
|
pub(crate) async fn sync(client: Client, db: Arc<db::DB>) -> anyhow::Result<()> {
|
||||||
|
let filter = FilterDefinition::with_lazy_loading();
|
||||||
|
let mut sync_settings = SyncSettings::default().filter(filter.into());
|
||||||
|
|
||||||
|
if let Some(token) = db
|
||||||
|
.get_matrix_sync_token()
|
||||||
|
.await
|
||||||
|
.map_err(|err| Error::UnknownError(err.into()))?
|
||||||
|
{
|
||||||
|
sync_settings = sync_settings.token(token);
|
||||||
|
}
|
||||||
|
|
||||||
client
|
client
|
||||||
.sync(SyncSettings::default())
|
.sync(SyncSettings::default())
|
||||||
.await
|
.await
|
||||||
.expect("error syncing matrix");
|
.expect("error syncing matrix");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match client.sync_once(sync_settings.clone()).await {
|
||||||
|
Ok(response) => {
|
||||||
|
// This is the last time we need to provide this token, the sync method after
|
||||||
|
// will handle it on its own.
|
||||||
|
sync_settings = sync_settings.token(response.next_batch.clone());
|
||||||
|
db.set_matrix_sync_token(response.next_batch).await?;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
log::error!("An error occurred during initial sync: {error}");
|
||||||
|
log::error!("Trying again…");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::info!("The client is ready! Listening to new messages…");
|
||||||
|
|
||||||
|
// Now that we've synced, let's attach a handler for incoming room messages.
|
||||||
|
// client.add_event_handler(on_room_message);
|
||||||
|
|
||||||
|
// This loops until we kill the program or an error happens.
|
||||||
|
client
|
||||||
|
.sync_with_result_callback(sync_settings, |sync_result| async move {
|
||||||
|
let response = sync_result?;
|
||||||
|
|
||||||
|
// We persist the token each time to be able to restore our session
|
||||||
|
db.set_matrix_sync_token(response.next_batch)
|
||||||
|
.await
|
||||||
|
.map_err(|err| Error::UnknownError(err.into()))?;
|
||||||
|
|
||||||
|
Ok(LoopCtrl::Continue)
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use matrix_sdk::ruma::events::room::message::RoomMessageEventContent;
|
use matrix_sdk::ruma::events::room::message::RoomMessageEventContent;
|
||||||
use meshtastic::api::state::Connected;
|
use meshtastic::api::state::Connected;
|
||||||
use meshtastic::api::{ConnectedStreamApi, StreamApi};
|
use meshtastic::api::{ConnectedStreamApi, StreamApi};
|
||||||
|
@ -33,7 +35,7 @@ impl MeshtasticClient {
|
||||||
pub async fn receive(
|
pub async fn receive(
|
||||||
mut self: MeshtasticClient,
|
mut self: MeshtasticClient,
|
||||||
matrix_sender: Sender<RoomMessageEventContent>,
|
matrix_sender: Sender<RoomMessageEventContent>,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> anyhow::Result<()> {
|
||||||
// let config_id = utils::generate_rand_id();
|
// let config_id = utils::generate_rand_id();
|
||||||
// let stream_api = stream_api.configure(config_id).await?;
|
// let stream_api = stream_api.configure(config_id).await?;
|
||||||
|
|
||||||
|
@ -109,13 +111,19 @@ impl MeshtasticClient {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn close(self: &mut MeshtasticClient) -> anyhow::Result<()> {
|
||||||
|
self.decoded_listener.close();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_decoded_packet(
|
async fn handle_decoded_packet(
|
||||||
matrix_sender: &Sender<RoomMessageEventContent>,
|
matrix_sender: &Sender<RoomMessageEventContent>,
|
||||||
// packet: MeshPacket,
|
// packet: MeshPacket,
|
||||||
decoded: Data,
|
decoded: Data,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> anyhow::Result<()> {
|
||||||
match decoded.portnum() {
|
match decoded.portnum() {
|
||||||
meshtastic::protobufs::PortNum::TextMessageApp => {
|
meshtastic::protobufs::PortNum::TextMessageApp => {
|
||||||
debug_message(
|
debug_message(
|
||||||
|
|
Loading…
Reference in a new issue