// TODO: make a choice: mpsc vs flume. use std::sync::Arc; use std::time::Duration; use dioxus::prelude::to_owned; use flume::{bounded, unbounded}; use flume::{Receiver as FlumeReceiver, Sender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::task::JoinHandle; use tracing::{debug, error, warn}; use matrix_sdk::{ config::SyncSettings, event_handler::Ctx, room::Room as MatrixRoom, ruma::events::{ key::verification::{ done::{OriginalSyncKeyVerificationDoneEvent, ToDeviceKeyVerificationDoneEvent}, key::{OriginalSyncKeyVerificationKeyEvent, ToDeviceKeyVerificationKeyEvent}, request::ToDeviceKeyVerificationRequestEvent, start::{OriginalSyncKeyVerificationStartEvent, ToDeviceKeyVerificationStartEvent}, }, presence::PresenceEvent, reaction::ReactionEventContent, room::{ member::{OriginalSyncRoomMemberEvent, RoomMemberEventContent}, message::RoomMessageEventContent, name::RoomNameEventContent, redaction::OriginalSyncRoomRedactionEvent, topic::RoomTopicEventContent, }, typing::SyncTypingEvent, SyncMessageLikeEvent, SyncStateEvent, }, Client as MatrixClient, }; use super::requester::Requester; use super::worker_tasks::{LoginStyle, WorkerTask}; use crate::base::Room; #[derive(thiserror::Error, Debug)] pub enum ClientError { #[error("Matrix client error: {0}")] Matrix(#[from] matrix_sdk::Error), } #[derive(Clone)] struct Senders { rooms_sender: flume::Sender, } impl Senders { fn new(rooms_sender: flume::Sender) -> Self { Self { rooms_sender } } } pub struct Client { initialized: bool, client: Option>, load_handle: Option>, sync_handle: Option>, senders: Senders, } impl Client { pub fn new(client: Arc, rooms_sender: flume::Sender) -> Self { Self { initialized: false, client: Some(client), load_handle: None, sync_handle: None, senders: Senders::new(rooms_sender), } } async fn on_sync_typing_event(_ev: SyncTypingEvent, room: MatrixRoom) { debug!("== on_sync_typing_event =="); let room_id = room.room_id().to_owned(); dbg!(room_id); } async fn on_presence_event(_ev: PresenceEvent) { debug!("== on_presence_event =="); dbg!(_ev); } async fn on_sync_state_event(_ev: SyncStateEvent, _room: MatrixRoom) { debug!("== on_sync_state_event =="); dbg!(_ev); } async fn on_room_topic_event(ev: SyncStateEvent, room: MatrixRoom) { debug!("== on_room_topic_event =="); dbg!(&ev); // if let SyncStateEvent::Original(ev) = ev { // let room_id = room.room_id().to_owned(); // let store = reactive_store.read().unwrap().to_owned(); // if let Some(store_room) = store.rooms.get(&room_id) { // // store_room.write().unwrap().topic = Some(ev.content.topic); // // let _ = reactive_store.write(store); // println!("HOP"); // } else { // println!("No room with \"{room_id}\" id known"); // } // } } async fn on_room_member_event(ev: SyncStateEvent, _room: MatrixRoom) { debug!("== on_room_member_event =="); dbg!(ev); // // dbg!(room); // if room.invited_members_count() > 0 { // dbg!(room); // } // if let SyncStateEvent::Original(ev) = ev {} } async fn on_sync_message_like_room_message_event( ev: SyncMessageLikeEvent, _room: MatrixRoom, _client: MatrixClient, ) { debug!("== on_sync_message_like_room_message_event =="); dbg!(ev); } async fn on_sync_message_like_reaction_event( ev: SyncMessageLikeEvent, _room: MatrixRoom, ) { debug!("== on_sync_message_like_reaction_event =="); dbg!(ev); } async fn on_original_sync_room_redaction_event( ev: OriginalSyncRoomRedactionEvent, _room: MatrixRoom, ) { debug!("== on_original_sync_room_redaction_event =="); dbg!(ev); } async fn on_original_sync_room_member_event( ev: OriginalSyncRoomMemberEvent, room: MatrixRoom, _client: MatrixClient, ) { debug!("== on_original_sync_room_member_event =="); dbg!(ev); let room_id = room.room_id(); dbg!(room_id); // let mut store = store_ctx.read().unwrap().to_owned(); // dbg!(store.rooms.keys()); // let is_direct = room.is_direct().await.ok(); // store.rooms.insert( // OwnedRoomId::from(room_id), // Arc::new(RwLock::new(Room::new(Arc::new(room), None, is_direct))), // ); // let _ = store_ctx.write(store); } async fn on_original_sync_key_verif_start_event( ev: OriginalSyncKeyVerificationStartEvent, _client: MatrixClient, ) { debug!("== on_original_sync_key_verif_start_event =="); dbg!(ev); } async fn on_original_sync_key_verif_key_event( ev: OriginalSyncKeyVerificationKeyEvent, _client: MatrixClient, ) { debug!("== on_original_sync_key_verif_key_event =="); dbg!(ev); } async fn on_original_sync_key_verif_done_event( ev: OriginalSyncKeyVerificationDoneEvent, _client: MatrixClient, ) { debug!("== on_original_sync_key_verif_done_event =="); dbg!(ev); } async fn on_device_key_verif_req_event( ev: ToDeviceKeyVerificationRequestEvent, _client: MatrixClient, ) { debug!("== on_device_key_verif_req_event =="); dbg!(ev); } async fn on_device_key_verif_start_event( ev: ToDeviceKeyVerificationStartEvent, _client: MatrixClient, ) { debug!("== on_device_key_verif_start_event =="); dbg!(ev); } async fn on_device_key_verif_key_event( ev: ToDeviceKeyVerificationKeyEvent, _client: MatrixClient, ) { debug!("== on_device_key_verif_key_event =="); dbg!(ev); } async fn on_device_key_verif_done_event( ev: ToDeviceKeyVerificationDoneEvent, _client: MatrixClient, ) { debug!("== on_device_key_verif_done_event =="); dbg!(ev); } pub async fn spawn(homeserver_url: String) -> Requester { let (tx, rx) = unbounded_channel::(); let (rooms_sender, rooms_receiver) = unbounded::(); let matrix_client = Arc::new( MatrixClient::builder() .homeserver_url(&homeserver_url) .build() .await .unwrap(), ); let mut client = Client::new(matrix_client.clone(), rooms_sender); tokio::spawn({ async move { client.work(rx).await; } }); Requester { matrix_client, tx, rooms_receiver, } } fn init(&mut self) { let client = self.client.clone().unwrap(); client.add_event_handler_context(self.senders.clone()); let _ = client.add_event_handler(Client::on_sync_typing_event); let _ = client.add_event_handler(Client::on_presence_event); let _ = client.add_event_handler(Client::on_sync_state_event); let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event); let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event); let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event); let _ = client.add_event_handler(Client::on_original_sync_room_member_event); let _ = client.add_event_handler(Client::on_original_sync_key_verif_start_event); let _ = client.add_event_handler(Client::on_original_sync_key_verif_key_event); let _ = client.add_event_handler(Client::on_original_sync_key_verif_done_event); let _ = client.add_event_handler(Client::on_device_key_verif_req_event); let _ = client.add_event_handler(Client::on_device_key_verif_start_event); let _ = client.add_event_handler(Client::on_device_key_verif_key_event); let _ = client.add_event_handler(Client::on_device_key_verif_done_event); let _ = client.add_event_handler(Client::on_room_topic_event); let _ = client.add_event_handler(Client::on_room_member_event); self.initialized = true; } async fn refresh_rooms(matrix_client: &MatrixClient, rooms_sender: &Sender) { let joined_matrix_rooms_ref = &matrix_client.joined_rooms(); let invited_matrix_rooms_ref = &matrix_client.invited_rooms(); for matrix_rooms in vec![joined_matrix_rooms_ref, invited_matrix_rooms_ref] { for matrix_room in matrix_rooms.iter() { let room = Room::new( Arc::new(matrix_room.to_owned()), None, matrix_room.is_direct().await.ok(), ); if let Err(err) = rooms_sender.send_async(room).await { warn!("Error: {}", err); } } } } async fn refresh_rooms_forever(matrix_client: &MatrixClient, rooms_channel: &Sender) { // TODO: Add interval to config let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { Self::refresh_rooms(matrix_client, rooms_channel).await; interval.tick().await; } } async fn login_and_sync(&mut self, style: LoginStyle) -> anyhow::Result<()> { let client = self.client.clone().unwrap(); match style { LoginStyle::Password(username, password) => { let _resp = client .matrix_auth() .login_username(&username, &password) .initial_device_display_name("TODO") .send() .await .map_err(ClientError::from)?; } } let (synchronized_tx, synchronized_rx) = bounded(1); self.sync_handle = tokio::spawn({ async move { // Sync once so we receive the client state and old messages let _ = client.sync_once(SyncSettings::default()).await; debug!("User connected to the homeserver"); if let Err(err) = synchronized_tx.send(true) { warn!("Unable to notify that the Matrix client is now synchronized ({err})"); } loop { let settings = SyncSettings::default(); let _ = client.sync(settings).await; } } }) .into(); self.start_background_tasks(synchronized_rx); Ok(()) } fn start_background_tasks(&mut self, synchronized_rx: FlumeReceiver) { let client = self.client.clone().unwrap(); let rooms_sender_ref = &self.senders.rooms_sender; self.load_handle = tokio::spawn({ to_owned![rooms_sender_ref]; async move { if let Err(err) = synchronized_rx.recv() { error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})"); } let rooms_refresh = Self::refresh_rooms_forever( client.as_ref(), &rooms_sender_ref ); let ((),) = tokio::join!(rooms_refresh); } }) .into(); } async fn work(&mut self, mut rx: UnboundedReceiver) { loop { let task = rx.recv().await; match task { Some(task) => self.run(task).await, None => { break; } } } if let Some(handle) = self.sync_handle.take() { handle.abort(); } } async fn run(&mut self, task: WorkerTask) { match task { WorkerTask::Init(reply) => { assert_eq!(self.initialized, false); self.init(); reply.send(()); } WorkerTask::Login(style, reply) => { assert!(self.initialized); reply.send(self.login_and_sync(style).await); } } } }