♻️ Replace flume with tokio and share Matrix client infos to chats

- Remove of the flume dependency.
- Add the capability to share data provided by the Matrix client to the ChatsWindow. Indeed, until the 0.6 Dioxus
  release, each window runs in a separate virtual DOM so the context and Fermi states are completely seperate
  (cf. https://discord.com/channels/899851952891002890/1188206938215948378).
This commit is contained in:
2023-12-25 23:14:43 +01:00
parent d7ba8130d3
commit ddeb94e887
8 changed files with 223 additions and 192 deletions

View File

@@ -23,7 +23,6 @@ tracing-subscriber = "0.3.18"
thiserror = "1.0.50" thiserror = "1.0.50"
turf = "0.7.0" turf = "0.7.0"
tokio = "1.34.0" tokio = "1.34.0"
flume = "0.11.0"
log = "0.4.20" log = "0.4.20"
tracing = "0.1.40" tracing = "0.1.40"
futures-util = "0.3.29" futures-util = "0.3.29"

View File

@@ -1,5 +1,3 @@
use futures::select;
// Cf. https://dioxuslabs.com/learn/0.4/reference/use_coroutine // Cf. https://dioxuslabs.com/learn/0.4/reference/use_coroutine
// In order to use/run the rx.next().await statement you will need to extend the [Stream] trait // In order to use/run the rx.next().await statement you will need to extend the [Stream] trait
// (used by [UnboundedReceiver]) by adding 'futures_util' as a dependency to your project // (used by [UnboundedReceiver]) by adding 'futures_util' as a dependency to your project
@@ -12,34 +10,36 @@ use fermi::*;
use matrix_sdk::room::Room as MatrixRoom; use matrix_sdk::room::Room as MatrixRoom;
use matrix_sdk::{ use matrix_sdk::{
room::RoomMember, room::RoomMember,
ruma::{OwnedMxcUri, OwnedRoomId, OwnedUserId}, ruma::{OwnedRoomId, OwnedUserId},
}; };
use tokio::select;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
use crate::matrix_interface::client::{Client, RoomTopicEvent}; use crate::matrix_interface::client::{Client, RoomTopicEvent};
use crate::matrix_interface::requester::Receivers;
use crate::matrix_interface::requester::Requester; use crate::matrix_interface::requester::Requester;
use crate::matrix_interface::worker_tasks::LoginStyle; use crate::matrix_interface::worker_tasks::LoginStyle;
#[derive(Clone, Debug)] // #[derive(Clone, Debug)]
pub struct UserInfo { // pub struct UserInfo {
pub avatar_url: Option<OwnedMxcUri>, // pub avatar_url: Option<OwnedMxcUri>,
pub display_name: Option<String>, // pub display_name: Option<String>,
pub blurhash: Option<String>, // pub blurhash: Option<String>,
} // }
impl UserInfo { // impl UserInfo {
pub fn new( // pub fn new(
avatar_url: Option<OwnedMxcUri>, // avatar_url: Option<OwnedMxcUri>,
display_name: Option<String>, // display_name: Option<String>,
blurhash: Option<String>, // blurhash: Option<String>,
) -> Self { // ) -> Self {
Self { // Self {
avatar_url, // avatar_url,
display_name, // display_name,
blurhash, // blurhash,
} // }
} // }
} // }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Room { pub struct Room {
@@ -80,106 +80,101 @@ impl PartialEq for Room {
} }
pub type ByIdRooms = HashMap<OwnedRoomId, RefCell<Room>>; pub type ByIdRooms = HashMap<OwnedRoomId, RefCell<Room>>;
pub type ByIdUserInfos = HashMap<OwnedUserId, UserInfo>; // pub type ByIdUserInfos = HashMap<OwnedUserId, UserInfo>;
#[derive(Clone)] // #[derive(Clone)]
pub struct Store { // pub struct Store {
pub is_logged: bool, // pub is_logged: bool,
pub rooms: ByIdRooms, // pub rooms: ByIdRooms,
pub user_infos: ByIdUserInfos, // pub user_infos: ByIdUserInfos,
pub user_id: Option<OwnedUserId>, // pub user_id: Option<OwnedUserId>,
} // }
impl Store { // impl Store {
pub fn new() -> Self { // pub fn new() -> Self {
Self { // Self {
is_logged: false, // is_logged: false,
rooms: HashMap::new(), // rooms: HashMap::new(),
user_infos: HashMap::new(), // user_infos: HashMap::new(),
user_id: None, // user_id: None,
} // }
} // }
} // }
impl PartialEq for Store { // impl PartialEq for Store {
fn eq(&self, other: &Self) -> bool { // fn eq(&self, other: &Self) -> bool {
self.is_logged == other.is_logged // self.is_logged == other.is_logged
&& self.user_id == other.user_id // && self.user_id == other.user_id
&& self.user_infos.len() == other.user_infos.len() // && self.user_infos.len() == other.user_infos.len()
&& self // && self
.user_infos // .user_infos
.keys() // .keys()
.all(|k| other.user_infos.contains_key(k)) // .all(|k| other.user_infos.contains_key(k))
&& self.rooms.len() == other.rooms.len() // && self.rooms.len() == other.rooms.len()
&& self.rooms.keys().all(|k| other.rooms.contains_key(k)) // && self.rooms.keys().all(|k| other.rooms.contains_key(k))
} // }
} // }
impl Eq for Store {} // impl Eq for Store {}
pub struct AppSettings { pub struct AppSettings {
requester: Option<Box<Requester>>, pub requester: Option<RefCell<Requester>>,
pub store: Store,
} }
impl AppSettings { impl AppSettings {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self { requester: None }
requester: None,
store: Store::new(),
}
} }
pub fn set_requester(&mut self, requester: Box<Requester>) { pub fn set_requester(&mut self, requester: RefCell<Requester>) {
self.requester = Some(requester); self.requester = Some(requester);
} }
} }
pub static APP_SETTINGS: AtomRef<AppSettings> = AtomRef(|_| AppSettings::new()); pub static APP_SETTINGS: AtomRef<AppSettings> = AtomRef(|_| AppSettings::new());
async fn on_room(room_option: Option<Room>, rooms_ref: &UseAtomRef<ByIdRooms>) { async fn on_room(room: Room, rooms_ref: &UseAtomRef<ByIdRooms>) {
if let Some(room) = room_option { let room_id = room.id();
let room_id = room.id();
// TODO: Update rooms // TODO: Update rooms
rooms_ref rooms_ref
.write() .write()
.insert(room_id, RefCell::<Room>::new(room)); .insert(room_id, RefCell::<Room>::new(room));
}
} }
async fn on_room_topic( pub async fn on_room_topic(room_topic_event: RoomTopicEvent, rooms_ref: &UseAtomRef<ByIdRooms>) {
room_topic_event_option: Option<RoomTopicEvent>, let room_id = room_topic_event.0;
rooms_ref: &UseAtomRef<ByIdRooms>,
) {
if let Some(room_topic_event) = room_topic_event_option {
let room_id = room_topic_event.0;
if let Some(room_ref) = rooms_ref.read().get(&room_id) { if let Some(room_ref) = rooms_ref.read().get(&room_id) {
let topic = room_topic_event.1; let topic = room_topic_event.1;
let mut room = room_ref.borrow_mut(); let mut room = room_ref.borrow_mut();
room.topic = Some(RefCell::new(topic)); room.topic = Some(RefCell::new(topic));
} else { } else {
warn!("No room found with the \"{}\" id", room_id); warn!("No room found with the \"{}\" id", room_id);
}
} }
} }
pub async fn sync_rooms( pub async fn sync_rooms(
mut rx: UnboundedReceiver<bool>, mut rx: UnboundedReceiver<bool>,
app_settings_ref: UseAtomRef<AppSettings>, receivers: Receivers,
rooms_ref: UseAtomRef<ByIdRooms>, rooms_ref: UseAtomRef<ByIdRooms>,
) { ) {
while let Some(_is_logged) = rx.next().await { while let Some(_is_logged) = rx.next().await {
if let Some(requester) = &app_settings_ref.read().requester { let mut rooms_receiver = receivers.rooms_receiver.borrow_mut();
let mut room_stream = requester.rooms_receiver.stream(); let mut room_topic_receiver = receivers.room_topic_receiver.borrow_mut();
let mut room_topic_stream = requester.room_topic_receiver.stream();
loop { loop {
select! { select! {
room = room_stream.next() => on_room(room, &rooms_ref).await, res = rooms_receiver.recv() => {
room_topic_event = room_topic_stream.next() => on_room_topic(room_topic_event, &rooms_ref).await, if let Ok(room) = res {
on_room(room, &rooms_ref).await;
}
},
res = room_topic_receiver.recv() => {
if let Ok(room_topic_event) = res {
on_room_topic(room_topic_event, &rooms_ref).await;
}
} }
} }
} }
@@ -215,7 +210,7 @@ pub async fn login(
// invalid_login.modify(|_| true); // invalid_login.modify(|_| true);
} }
} }
app_settings_ref.write().set_requester(Box::new(client)); app_settings_ref.write().set_requester(RefCell::new(client));
} else { } else {
warn!("At least one of the following values is/are invalid: homeserver, username or password"); warn!("At least one of the following values is/are invalid: homeserver, username or password");
} }

View File

@@ -1,25 +1,40 @@
use dioxus::prelude::*; use dioxus::prelude::*;
use fermi::*;
use tracing::debug; use tracing::debug;
use crate::base::{sync_rooms, ROOMS};
use crate::components::avatar_selector::AvatarSelector; use crate::components::avatar_selector::AvatarSelector;
use crate::components::chats_window::edit_section::EditSection;
use crate::components::icons::DownArrowIcon; use crate::components::icons::DownArrowIcon;
use crate::matrix_interface::requester::Receivers;
use super::edit_section::EditSection;
turf::style_sheet!("src/components/chats_window/chats_window.scss"); turf::style_sheet!("src/components/chats_window/chats_window.scss");
pub fn ChatsWindow(cx: Scope) -> Element { pub struct ChatsWindowProps {
pub receivers: Receivers,
}
pub fn ChatsWindow(cx: Scope<ChatsWindowProps>) -> Element {
debug!("ChatsWindow rendering"); debug!("ChatsWindow rendering");
let room_names = vec![ let receivers = &cx.props.receivers;
"MON POTE",
"Second room",
"Third room",
"AAAAAAAAAAAA",
"BBBBBBBBBBBBBBBBBBB",
"CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC",
];
let rendered_room_tabs = room_names.into_iter().map(|room_name| { use_init_atom_root(cx);
let rooms_ref = use_atom_ref(cx, &ROOMS);
let sync_rooms_coro = use_coroutine(cx, |rx| {
to_owned![receivers];
sync_rooms(rx, receivers, rooms_ref.clone())
});
sync_rooms_coro.send(true);
let rooms = rooms_ref.read();
let rendered_room_tabs = rooms.values().map(|room| {
let room = room.borrow();
let room_name = room.name().unwrap_or(room.id().to_string());
rsx!( rsx!(
div { div {
class: ClassName::TAB, class: ClassName::TAB,

View File

@@ -1,8 +1,8 @@
use dioxus::prelude::*; use dioxus::prelude::*;
use fermi::*; // use fermi::*;
use tracing::debug; use tracing::debug;
use crate::base::APP_SETTINGS; // use crate::base::APP_SETTINGS;
use crate::components::avatar_selector::AvatarSelector; use crate::components::avatar_selector::AvatarSelector;
use crate::components::icons::DownArrowIcon; use crate::components::icons::DownArrowIcon;
@@ -13,28 +13,28 @@ static MESSAGE_PLACEHOLDER: &str = "<Enter a personal message>";
pub fn UserInfos(cx: Scope) -> Element { pub fn UserInfos(cx: Scope) -> Element {
debug!("UserInfos rendering"); debug!("UserInfos rendering");
let app_settings = use_atom_ref(cx, &APP_SETTINGS); // let app_settings = use_atom_ref(cx, &APP_SETTINGS);
let store = &app_settings.read().store; // let store = &app_settings.read().store;
println!("----------------------------------"); // println!("----------------------------------");
println!("UserInfos rendering"); // println!("UserInfos rendering");
// println!("store={:?}", &store); // // println!("store={:?}", &store);
dbg!(&store.user_id); // dbg!(&store.user_id);
println!("----------------------------------"); // println!("----------------------------------");
// let user_id = store.user_id..as_ref().unwrap(); // let user_id = store.user_id..as_ref().unwrap();
// let mut user_info_option = None; // let mut user_info_option = None;
let mut user_display_name_option = None; let user_display_name_option: Option<bool> = None;
let user_id_option = &store.user_id; // let user_id_option = &store.user_id;
if user_id_option.is_some() { // if user_id_option.is_some() {
let user_id = user_id_option.as_ref().unwrap(); // let user_id = user_id_option.as_ref().unwrap();
let user_info_option = store.user_infos.get(user_id); // let user_info_option = store.user_infos.get(user_id);
if user_info_option.is_some() { // if user_info_option.is_some() {
user_display_name_option = user_info_option.unwrap().display_name.as_ref(); // user_display_name_option = user_info_option.unwrap().display_name.as_ref();
} // }
} // }
cx.render(rsx! { cx.render(rsx! {
style { STYLE_SHEET }, style { STYLE_SHEET },

View File

@@ -2,14 +2,11 @@ use std::str::FromStr;
use dioxus::prelude::*; use dioxus::prelude::*;
use fermi::*; use fermi::*;
use matrix_sdk::ruma::user_id; use tracing::debug;
use tracing::{debug, error};
use crate::base::{APP_SETTINGS, SESSION}; use crate::base::SESSION;
use crate::components::avatar_selector::AvatarSelector; use crate::components::avatar_selector::AvatarSelector;
use crate::components::header::Header; use crate::components::header::Header;
use crate::matrix_interface::client::Client;
use crate::matrix_interface::worker_tasks::LoginStyle;
turf::style_sheet!("src/components/login.scss"); turf::style_sheet!("src/components/login.scss");

View File

@@ -9,7 +9,7 @@ pub mod components;
pub mod matrix_interface; pub mod matrix_interface;
use crate::base::{login, sync_rooms, APP_SETTINGS, ROOMS, SESSION}; use crate::base::{login, sync_rooms, APP_SETTINGS, ROOMS, SESSION};
use crate::components::chats_window::chats_window::ChatsWindow; use crate::components::chats_window::chats_window::{ChatsWindow, ChatsWindowProps};
use crate::components::main_window::MainWindow; use crate::components::main_window::MainWindow;
mod base; mod base;
@@ -23,47 +23,63 @@ fn App(cx: Scope) -> Element {
let session_ref = use_atom_ref(cx, &SESSION); let session_ref = use_atom_ref(cx, &SESSION);
let rooms_ref = use_atom_ref(cx, &ROOMS); let rooms_ref = use_atom_ref(cx, &ROOMS);
let chats_win_state = use_state(cx, || None);
let login_coro = use_coroutine(cx, |rx| { let login_coro = use_coroutine(cx, |rx| {
login(rx, app_settings_ref.clone(), session_ref.clone()) login(rx, app_settings_ref.clone(), session_ref.clone())
}); });
let sync_rooms_coro = use_coroutine(cx, |rx| { let mut sync_rooms_coro = None;
sync_rooms(rx, app_settings_ref.clone(), rooms_ref.clone())
});
let is_logged = session_ref.read().is_logged; if let Some(requester) = &app_settings_ref.read().requester {
sync_rooms_coro = Some(use_coroutine(cx, |rx| {
login_coro.send(is_logged); sync_rooms(rx, requester.borrow().receivers.clone(), rooms_ref.clone())
}));
if is_logged {
sync_rooms_coro.send(is_logged);
} }
let chats_win_state = use_state(cx, || None); if !session_ref.read().is_logged {
login_coro.send(false);
if is_logged && chats_win_state.is_none() { } else {
let chats_window = dioxus_desktop::use_window(cx); if let Some(coro) = sync_rooms_coro {
coro.send(true);
let chats_dom = VirtualDom::new(ChatsWindow);
let window_cfg = Config::default().with_custom_head(
r#"
<style type="text/css">
html, body {
height: 100%;
width: 100%;
margin: 0;
} }
#main, #bodywrap {
height: 100%; if chats_win_state.is_none() {
width: 100%; let chats_window = dioxus_desktop::use_window(cx);
let receivers = app_settings_ref
.read()
.requester
.as_ref()
.unwrap()
.borrow()
.receivers
.clone();
let chats_props = ChatsWindowProps { receivers };
let chats_dom = VirtualDom::new_with_props(ChatsWindow, chats_props);
let window_cfg = Config::default().with_custom_head(
r#"
<style type="text/css">
html, body {
height: 100%;
width: 100%;
margin: 0;
}
#main, #bodywrap {
height: 100%;
width: 100%;
}
</style>
"#
.to_owned(),
);
let chats_window_desktop_service = chats_window.new_window(chats_dom, window_cfg);
chats_win_state.set(Some(chats_window_desktop_service));
} }
</style>
"#
.to_owned(),
);
let chats_window_desktop_service = chats_window.new_window(chats_dom, window_cfg);
chats_win_state.set(Some(chats_window_desktop_service));
} }
cx.render(rsx! { cx.render(rsx! {

View File

@@ -1,12 +1,11 @@
// TODO: make a choice: mpsc vs flume.
use std::cell::RefCell; use std::cell::RefCell;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use dioxus::prelude::to_owned; use dioxus::prelude::to_owned;
use flume::{bounded, unbounded}; use tokio::sync::broadcast::Sender;
use flume::{Receiver as FlumeReceiver, Sender};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::{broadcast, oneshot};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
@@ -39,7 +38,7 @@ use matrix_sdk::{
Client as MatrixClient, Client as MatrixClient,
}; };
use super::requester::Requester; use super::requester::{Receivers, Requester};
use super::worker_tasks::{LoginStyle, WorkerTask}; use super::worker_tasks::{LoginStyle, WorkerTask};
use crate::base::Room; use crate::base::Room;
@@ -49,19 +48,17 @@ pub enum ClientError {
Matrix(#[from] matrix_sdk::Error), Matrix(#[from] matrix_sdk::Error),
} }
#[derive(Clone)]
pub struct RoomTopicEvent(pub OwnedRoomId, pub String); pub struct RoomTopicEvent(pub OwnedRoomId, pub String);
#[derive(Clone)] #[derive(Clone)]
struct Senders { struct Senders {
rooms_sender: flume::Sender<Room>, rooms_sender: Sender<Room>,
room_topic_sender: flume::Sender<RoomTopicEvent>, room_topic_sender: Sender<RoomTopicEvent>,
} }
impl Senders { impl Senders {
fn new( fn new(rooms_sender: Sender<Room>, room_topic_sender: Sender<RoomTopicEvent>) -> Self {
rooms_sender: flume::Sender<Room>,
room_topic_sender: flume::Sender<RoomTopicEvent>,
) -> Self {
Self { Self {
rooms_sender, rooms_sender,
room_topic_sender, room_topic_sender,
@@ -80,8 +77,8 @@ pub struct Client {
impl Client { impl Client {
pub fn new( pub fn new(
client: Arc<MatrixClient>, client: Arc<MatrixClient>,
rooms_sender: flume::Sender<Room>, rooms_sender: Sender<Room>,
room_topic_sender: flume::Sender<RoomTopicEvent>, room_topic_sender: Sender<RoomTopicEvent>,
) -> Self { ) -> Self {
Self { Self {
initialized: false, initialized: false,
@@ -105,7 +102,6 @@ impl Client {
async fn on_sync_state_event(_ev: SyncStateEvent<RoomNameEventContent>, _room: MatrixRoom) { async fn on_sync_state_event(_ev: SyncStateEvent<RoomNameEventContent>, _room: MatrixRoom) {
debug!("== on_sync_state_event =="); debug!("== on_sync_state_event ==");
dbg!(_ev);
} }
async fn on_room_topic_event( async fn on_room_topic_event(
@@ -113,25 +109,20 @@ impl Client {
room: MatrixRoom, room: MatrixRoom,
senders: Ctx<Senders>, senders: Ctx<Senders>,
) { ) {
debug!("== on_room_topic_event ==");
dbg!(&ev);
if let SyncStateEvent::Original(ev) = ev { if let SyncStateEvent::Original(ev) = ev {
let room_topic_sender = &senders.room_topic_sender; let room_topic_sender = &senders.room_topic_sender;
let room_id = room.room_id(); let room_id = room.room_id();
if let Err(err) = room_topic_sender if let Err(err) =
.send_async(RoomTopicEvent(room_id.to_owned(), ev.content.topic)) room_topic_sender.send(RoomTopicEvent(room_id.to_owned(), ev.content.topic))
.await
{ {
error!("Unable to publish the \"{}\" new topic: {}", room_id, err); error!("Unable to publish the \"{}\" new topic: {}", room_id, err);
} }
} }
} }
async fn on_room_member_event(ev: SyncStateEvent<RoomMemberEventContent>, _room: MatrixRoom) { async fn on_room_member_event(_ev: SyncStateEvent<RoomMemberEventContent>, _room: MatrixRoom) {
debug!("== on_room_member_event =="); debug!("== on_room_member_event ==");
dbg!(ev);
// // dbg!(room); // // dbg!(room);
// if room.invited_members_count() > 0 { // if room.invited_members_count() > 0 {
// dbg!(room); // dbg!(room);
@@ -165,12 +156,11 @@ impl Client {
} }
async fn on_original_sync_room_member_event( async fn on_original_sync_room_member_event(
ev: OriginalSyncRoomMemberEvent, _ev: OriginalSyncRoomMemberEvent,
room: MatrixRoom, room: MatrixRoom,
_client: MatrixClient, _client: MatrixClient,
) { ) {
debug!("== on_original_sync_room_member_event =="); debug!("== on_original_sync_room_member_event ==");
dbg!(ev);
let room_id = room.room_id(); let room_id = room.room_id();
dbg!(room_id); dbg!(room_id);
@@ -243,8 +233,8 @@ impl Client {
pub async fn spawn(homeserver_url: String) -> Requester { pub async fn spawn(homeserver_url: String) -> Requester {
let (tx, rx) = unbounded_channel::<WorkerTask>(); let (tx, rx) = unbounded_channel::<WorkerTask>();
let (rooms_sender, rooms_receiver) = unbounded::<Room>(); let (rooms_sender, rooms_receiver) = broadcast::channel(32);
let (room_topic_sender, room_topic_receiver) = unbounded::<RoomTopicEvent>(); let (room_topic_sender, room_topic_receiver) = broadcast::channel(32);
let matrix_client = Arc::new( let matrix_client = Arc::new(
MatrixClient::builder() MatrixClient::builder()
@@ -265,8 +255,10 @@ impl Client {
Requester { Requester {
matrix_client, matrix_client,
tx, tx,
rooms_receiver, receivers: Receivers {
room_topic_receiver, rooms_receiver: RefCell::new(rooms_receiver),
room_topic_receiver: RefCell::new(room_topic_receiver),
},
} }
} }
@@ -311,7 +303,8 @@ impl Client {
topic, topic,
matrix_room.is_direct().await.ok(), matrix_room.is_direct().await.ok(),
); );
if let Err(err) = rooms_sender.send_async(room).await {
if let Err(err) = rooms_sender.send(room) {
warn!("Error: {}", err); warn!("Error: {}", err);
} }
} }
@@ -344,7 +337,7 @@ impl Client {
} }
} }
let (synchronized_tx, synchronized_rx) = bounded(1); let (synchronized_tx, synchronized_rx) = oneshot::channel();
self.sync_handle = tokio::spawn({ self.sync_handle = tokio::spawn({
async move { async move {
@@ -370,7 +363,7 @@ impl Client {
Ok(()) Ok(())
} }
fn start_background_tasks(&mut self, synchronized_rx: FlumeReceiver<bool>) { fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver<bool>) {
let client = self.client.clone().unwrap(); let client = self.client.clone().unwrap();
let rooms_sender_ref = &self.senders.rooms_sender; let rooms_sender_ref = &self.senders.rooms_sender;
@@ -378,7 +371,7 @@ impl Client {
to_owned![rooms_sender_ref]; to_owned![rooms_sender_ref];
async move { async move {
if let Err(err) = synchronized_rx.recv() { if let Err(err) = synchronized_rx.await {
error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})"); error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})");
} }

View File

@@ -1,18 +1,34 @@
use std::cell::RefCell;
use std::sync::Arc; use std::sync::Arc;
use matrix_sdk::Client as MatrixClient; use matrix_sdk::Client as MatrixClient;
use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use super::client::RoomTopicEvent; use super::client::RoomTopicEvent;
use super::worker_tasks::{oneshot, LoginStyle, WorkerTask}; use super::worker_tasks::{oneshot, LoginStyle, WorkerTask};
use crate::base::Room; use crate::base::Room;
#[derive(Debug)]
pub struct Receivers {
pub rooms_receiver: RefCell<Receiver<Room>>,
pub room_topic_receiver: RefCell<Receiver<RoomTopicEvent>>,
}
impl Clone for Receivers {
fn clone(&self) -> Self {
Self {
rooms_receiver: RefCell::new(self.rooms_receiver.borrow().resubscribe()),
room_topic_receiver: RefCell::new(self.room_topic_receiver.borrow().resubscribe()),
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct Requester { pub struct Requester {
pub matrix_client: Arc<MatrixClient>, pub matrix_client: Arc<MatrixClient>,
pub tx: UnboundedSender<WorkerTask>, pub tx: UnboundedSender<WorkerTask>,
pub rooms_receiver: flume::Receiver<Room>, pub receivers: Receivers,
pub room_topic_receiver: flume::Receiver<RoomTopicEvent>,
} }
impl Requester { impl Requester {