use std::cell::RefCell; use std::sync::Arc; use log::{debug, error}; use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::task::JoinHandle; use matrix_sdk::{ config::SyncSettings, event_handler::Ctx, room::Room as MatrixRoom, ruma::{ events::{ key::verification::{ done::{OriginalSyncKeyVerificationDoneEvent, ToDeviceKeyVerificationDoneEvent}, key::{OriginalSyncKeyVerificationKeyEvent, ToDeviceKeyVerificationKeyEvent}, request::ToDeviceKeyVerificationRequestEvent, start::{OriginalSyncKeyVerificationStartEvent, ToDeviceKeyVerificationStartEvent}, }, presence::PresenceEvent, reaction::ReactionEventContent, room::{ member::{ OriginalSyncRoomMemberEvent, RoomMemberEventContent, StrippedRoomMemberEvent, }, message::RoomMessageEventContent, name::RoomNameEventContent, redaction::OriginalSyncRoomRedactionEvent, topic::RoomTopicEventContent, }, typing::SyncTypingEvent, SyncMessageLikeEvent, SyncStateEvent, }, OwnedRoomId, }, Client as MatrixClient, RoomState as MatrixRoomState, }; use super::requester::{Receivers, Requester}; use super::worker_tasks::{LoginStyle, WorkerTask}; use crate::base::Room; #[derive(thiserror::Error, Debug)] pub enum ClientError { #[error("Matrix client error: {0}")] Matrix(#[from] matrix_sdk::Error), } #[derive(Clone)] pub enum RoomEvent { TopicEvent(OwnedRoomId, String), MemberEvent(OwnedRoomId, Room), InviteEvent(OwnedRoomId, Room), } #[derive(Clone)] struct Senders { room_sender: Sender, } impl Senders { fn new(room_sender: Sender) -> Self { Self { room_sender } } } pub struct Client { initialized: bool, client: Option>, sync_handle: Option>, senders: Senders, } impl Client { pub fn new(client: Arc, room_sender: Sender) -> Self { Self { initialized: false, client: Some(client), sync_handle: None, senders: Senders::new(room_sender), } } async fn on_sync_typing_event(_ev: SyncTypingEvent, room: MatrixRoom) { debug!("== on_sync_typing_event =="); let room_id = room.room_id().to_owned(); dbg!(room_id); } async fn on_presence_event(_ev: PresenceEvent) { debug!("== on_presence_event =="); dbg!(_ev); } async fn on_sync_state_event(ev: SyncStateEvent, _room: MatrixRoom) { error!("== on_sync_state_event =="); if let SyncStateEvent::Original(ev) = ev { dbg!(ev); } } async fn on_stripped_room_member_event( ev: StrippedRoomMemberEvent, matrix_client: MatrixClient, matrix_room: MatrixRoom, senders: Ctx, ) { if ev.state_key == matrix_client.user_id().unwrap() { if matrix_room.state() == MatrixRoomState::Invited { let room_id = matrix_room.room_id(); let room = Room::from_matrix_room(&matrix_room).await; if let Err(err) = senders .room_sender .send(RoomEvent::InviteEvent(room_id.to_owned(), room)) { error!( "Unable to publish the new room with \"{}\" id: {}", room_id, err ); } } } } async fn on_room_topic_event( ev: SyncStateEvent, matrix_room: MatrixRoom, senders: Ctx, ) { if let SyncStateEvent::Original(ev) = ev { let room_id = matrix_room.room_id(); if let Err(err) = senders .room_sender .send(RoomEvent::TopicEvent(room_id.to_owned(), ev.content.topic)) { error!("Unable to publish the \"{}\" new topic: {}", room_id, err); } } } async fn on_room_member_event( ev: SyncStateEvent, matrix_room: MatrixRoom, senders: Ctx, ) { if let SyncStateEvent::Original(_ev) = ev { let room_sender = &senders.room_sender; let room_id = matrix_room.room_id(); let room = Room::from_matrix_room(&matrix_room).await; if let Err(err) = room_sender.send(RoomEvent::MemberEvent(room_id.to_owned(), room)) { error!( "Unable to publish the new room with \"{}\" id: {}", room_id, err ); } } } async fn on_sync_message_like_room_message_event( ev: SyncMessageLikeEvent, _room: MatrixRoom, _client: MatrixClient, ) { debug!("== on_sync_message_like_room_message_event =="); dbg!(ev); } async fn on_sync_message_like_reaction_event( ev: SyncMessageLikeEvent, _room: MatrixRoom, ) { debug!("== on_sync_message_like_reaction_event =="); dbg!(ev); } async fn on_original_sync_room_redaction_event( ev: OriginalSyncRoomRedactionEvent, _room: MatrixRoom, ) { debug!("== on_original_sync_room_redaction_event =="); dbg!(ev); } async fn on_original_sync_room_member_event( _ev: OriginalSyncRoomMemberEvent, _room: MatrixRoom, _client: MatrixClient, ) { debug!("== on_original_sync_room_member_event =="); // let mut store = store_ctx.read().unwrap().to_owned(); // dbg!(store.rooms.keys()); // let is_direct = room.is_direct().await.ok(); // store.rooms.insert( // OwnedRoomId::from(room_id), // Arc::new(RwLock::new(Room::new(Arc::new(room), None, is_direct))), // ); // let _ = store_ctx.write(store); } async fn on_original_sync_key_verif_start_event( ev: OriginalSyncKeyVerificationStartEvent, _client: MatrixClient, ) { debug!("== on_original_sync_key_verif_start_event =="); dbg!(ev); } async fn on_original_sync_key_verif_key_event( ev: OriginalSyncKeyVerificationKeyEvent, _client: MatrixClient, ) { debug!("== on_original_sync_key_verif_key_event =="); dbg!(ev); } async fn on_original_sync_key_verif_done_event( ev: OriginalSyncKeyVerificationDoneEvent, _client: MatrixClient, ) { debug!("== on_original_sync_key_verif_done_event =="); dbg!(ev); } async fn on_device_key_verif_req_event( ev: ToDeviceKeyVerificationRequestEvent, _client: MatrixClient, ) { debug!("== on_device_key_verif_req_event =="); dbg!(ev); } async fn on_device_key_verif_start_event( ev: ToDeviceKeyVerificationStartEvent, _client: MatrixClient, ) { debug!("== on_device_key_verif_start_event =="); dbg!(ev); } async fn on_device_key_verif_key_event( ev: ToDeviceKeyVerificationKeyEvent, _client: MatrixClient, ) { debug!("== on_device_key_verif_key_event =="); dbg!(ev); } async fn on_device_key_verif_done_event( ev: ToDeviceKeyVerificationDoneEvent, _client: MatrixClient, ) { debug!("== on_device_key_verif_done_event =="); dbg!(ev); } pub async fn spawn(homeserver_url: String) -> Requester { let (tx, rx) = unbounded_channel::(); let (room_sender, room_receiver) = broadcast::channel(32); let matrix_client = Arc::new( MatrixClient::builder() .homeserver_url(&homeserver_url) .build() .await .unwrap(), ); let mut client = Client::new(matrix_client.clone(), room_sender); tokio::spawn({ async move { client.work(rx).await; } }); Requester { matrix_client, tx, receivers: Receivers { room_receiver: RefCell::new(room_receiver), }, } } fn init(&mut self) { let client = self.client.clone().unwrap(); client.add_event_handler_context(self.senders.clone()); let _ = client.add_event_handler(Client::on_sync_typing_event); let _ = client.add_event_handler(Client::on_presence_event); let _ = client.add_event_handler(Client::on_sync_state_event); let _ = client.add_event_handler(Client::on_stripped_room_member_event); let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event); let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event); let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event); let _ = client.add_event_handler(Client::on_original_sync_room_member_event); let _ = client.add_event_handler(Client::on_original_sync_key_verif_start_event); let _ = client.add_event_handler(Client::on_original_sync_key_verif_key_event); let _ = client.add_event_handler(Client::on_original_sync_key_verif_done_event); let _ = client.add_event_handler(Client::on_device_key_verif_req_event); let _ = client.add_event_handler(Client::on_device_key_verif_start_event); let _ = client.add_event_handler(Client::on_device_key_verif_key_event); let _ = client.add_event_handler(Client::on_device_key_verif_done_event); let _ = client.add_event_handler(Client::on_room_topic_event); let _ = client.add_event_handler(Client::on_room_member_event); self.initialized = true; } // async fn refresh_rooms(matrix_client: &MatrixClient, room_sender: &Sender) { // let joined_matrix_rooms_ref = &matrix_client.joined_rooms(); // let invited_matrix_rooms_ref = &matrix_client.invited_rooms(); // for matrix_rooms in [joined_matrix_rooms_ref, invited_matrix_rooms_ref] { // for matrix_room in matrix_rooms.iter() { // let topic = matrix_room.topic().map(RefCell::new); // let room = Room::new( // Arc::new(matrix_room.to_owned()), // topic, // matrix_room.is_direct().await.ok(), // ); // if let Err(err) = room_sender.send(room) { // warn!("Error: {}", err); // } // } // } // } // async fn refresh_rooms_forever(matrix_client: &MatrixClient, room_channel: &Sender) { // // TODO: Add interval to config // let mut interval = tokio::time::interval(Duration::from_secs(5)); // loop { // // Self::refresh_rooms(matrix_client, room_channel).await; // interval.tick().await; // } // } async fn login_and_sync(&mut self, style: LoginStyle) -> anyhow::Result<()> { let client = self.client.clone().unwrap(); match style { LoginStyle::Password(username, password) => { let _resp = client .matrix_auth() .login_username(&username, &password) .initial_device_display_name("TODO") .send() .await .map_err(ClientError::from)?; } } // let (synchronized_tx, synchronized_rx) = oneshot::channel(); self.sync_handle = tokio::spawn({ async move { // Sync once so we receive the client state and old messages let sync_token_option = match client.sync_once(SyncSettings::default()).await { Ok(sync_response) => Some(sync_response.next_batch), Err(err) => { error!("Error during sync one: {}", err); None } }; if let Some(sync_token) = sync_token_option { let settings = SyncSettings::default().token(sync_token); debug!("User connected to the homeserver, start syncing"); let _ = client.sync(settings).await; } } }) .into(); // self.start_background_tasks(synchronized_rx); Ok(()) } // fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver) { // let client = self.client.clone().unwrap(); // let room_sender_ref = &self.senders.room_sender; // self.load_handle = tokio::spawn({ // to_owned![room_sender_ref]; // async move { // if let Err(err) = synchronized_rx.await { // error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})"); // } // let rooms_refresh = Self::refresh_rooms_forever( // client.as_ref(), // &room_sender_ref // ); // let ((),) = tokio::join!(rooms_refresh); // } // }) // .into(); // } async fn work(&mut self, mut rx: UnboundedReceiver) { loop { let task = rx.recv().await; match task { Some(task) => self.run(task).await, None => { break; } } } if let Some(handle) = self.sync_handle.take() { handle.abort(); } } async fn run(&mut self, task: WorkerTask) { match task { WorkerTask::Init(reply) => { assert!(!self.initialized); self.init(); reply.send(()).await; } WorkerTask::Login(style, reply) => { assert!(self.initialized); reply.send(self.login_and_sync(style).await).await; } } } }