️ Remove the periodic pooling to get the rooms (joined or not)

This commit is contained in:
2023-12-31 15:04:18 +01:00
parent 66f4ba6a7e
commit 04628ae10d
5 changed files with 170 additions and 114 deletions

View File

@@ -1,12 +1,11 @@
use std::sync::Arc;
use std::time::Duration;
use dioxus::prelude::*;
use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::{broadcast, oneshot};
use tokio::task::JoinHandle;
use tracing::{debug, error, warn};
use tracing::{debug, error};
use matrix_sdk::{
config::SyncSettings,
@@ -23,7 +22,9 @@ use matrix_sdk::{
presence::PresenceEvent,
reaction::ReactionEventContent,
room::{
member::{OriginalSyncRoomMemberEvent, RoomMemberEventContent},
member::{
OriginalSyncRoomMemberEvent, RoomMemberEventContent, StrippedRoomMemberEvent,
},
message::RoomMessageEventContent,
name::RoomNameEventContent,
redaction::OriginalSyncRoomRedactionEvent,
@@ -34,7 +35,7 @@ use matrix_sdk::{
},
OwnedRoomId,
},
Client as MatrixClient,
Client as MatrixClient, RoomState as MatrixRoomState,
};
use super::requester::{Receivers, Requester};
@@ -48,43 +49,37 @@ pub enum ClientError {
}
#[derive(Clone)]
pub struct RoomTopicEvent(pub OwnedRoomId, pub String);
pub enum RoomEvent {
TopicEvent(OwnedRoomId, String),
MemberEvent(OwnedRoomId, Room),
InviteEvent(OwnedRoomId, Room),
}
#[derive(Clone)]
struct Senders {
rooms_sender: Sender<Room>,
room_topic_sender: Sender<RoomTopicEvent>,
room_sender: Sender<RoomEvent>,
}
impl Senders {
fn new(rooms_sender: Sender<Room>, room_topic_sender: Sender<RoomTopicEvent>) -> Self {
Self {
rooms_sender,
room_topic_sender,
}
fn new(room_sender: Sender<RoomEvent>) -> Self {
Self { room_sender }
}
}
pub struct Client {
initialized: bool,
client: Option<Arc<MatrixClient>>,
load_handle: Option<JoinHandle<()>>,
sync_handle: Option<JoinHandle<()>>,
senders: Senders,
}
impl Client {
pub fn new(
client: Arc<MatrixClient>,
rooms_sender: Sender<Room>,
room_topic_sender: Sender<RoomTopicEvent>,
) -> Self {
pub fn new(client: Arc<MatrixClient>, room_sender: Sender<RoomEvent>) -> Self {
Self {
initialized: false,
client: Some(client),
load_handle: None,
sync_handle: None,
senders: Senders::new(rooms_sender, room_topic_sender),
senders: Senders::new(room_sender),
}
}
@@ -99,34 +94,88 @@ impl Client {
dbg!(_ev);
}
async fn on_sync_state_event(_ev: SyncStateEvent<RoomNameEventContent>, _room: MatrixRoom) {
debug!("== on_sync_state_event ==");
async fn on_sync_state_event(ev: SyncStateEvent<RoomNameEventContent>, _room: MatrixRoom) {
error!("== on_sync_state_event ==");
if let SyncStateEvent::Original(ev) = ev {
dbg!(ev);
}
}
async fn on_stripped_room_member_event(
ev: StrippedRoomMemberEvent,
matrix_client: MatrixClient,
matrix_room: MatrixRoom,
senders: Ctx<Senders>,
) {
if ev.state_key == matrix_client.user_id().unwrap() {
if matrix_room.state() == MatrixRoomState::Invited {
let room_id = matrix_room.room_id();
let room_topic = matrix_room.topic().map(RefCell::new);
let room = Room::new(
Arc::new(matrix_room.to_owned()),
room_topic,
matrix_room.is_direct().await.ok(),
);
if let Err(err) = senders
.room_sender
.send(RoomEvent::InviteEvent(room_id.to_owned(), room))
{
error!(
"Unable to publish the new room with \"{}\" id: {}",
room_id, err
);
}
}
}
}
async fn on_room_topic_event(
ev: SyncStateEvent<RoomTopicEventContent>,
room: MatrixRoom,
matrix_room: MatrixRoom,
senders: Ctx<Senders>,
) {
if let SyncStateEvent::Original(ev) = ev {
let room_topic_sender = &senders.room_topic_sender;
let room_id = room.room_id();
let room_id = matrix_room.room_id();
if let Err(err) =
room_topic_sender.send(RoomTopicEvent(room_id.to_owned(), ev.content.topic))
if let Err(err) = senders
.room_sender
.send(RoomEvent::TopicEvent(room_id.to_owned(), ev.content.topic))
{
error!("Unable to publish the \"{}\" new topic: {}", room_id, err);
}
}
}
async fn on_room_member_event(_ev: SyncStateEvent<RoomMemberEventContent>, _room: MatrixRoom) {
debug!("== on_room_member_event ==");
// // dbg!(room);
// if room.invited_members_count() > 0 {
// dbg!(room);
// }
// if let SyncStateEvent::Original(ev) = ev {}
async fn on_room_member_event(
ev: SyncStateEvent<RoomMemberEventContent>,
matrix_room: MatrixRoom,
senders: Ctx<Senders>,
) {
error!("== on_room_member_event ==");
// dbg!(&matrix_room);
dbg!(matrix_room.room_id());
dbg!(ev.membership());
if let SyncStateEvent::Original(_ev) = ev {
let room_sender = &senders.room_sender;
let room_id = matrix_room.room_id();
let room_topic = matrix_room.topic().map(RefCell::new);
let room = Room::new(
Arc::new(matrix_room.to_owned()),
room_topic,
matrix_room.is_direct().await.ok(),
);
if let Err(err) = room_sender.send(RoomEvent::MemberEvent(room_id.to_owned(), room)) {
error!(
"Unable to publish the new room with \"{}\" id: {}",
room_id, err
);
}
}
}
async fn on_sync_message_like_room_message_event(
@@ -230,8 +279,7 @@ impl Client {
pub async fn spawn(homeserver_url: String) -> Requester {
let (tx, rx) = unbounded_channel::<WorkerTask>();
let (rooms_sender, rooms_receiver) = broadcast::channel(32);
let (room_topic_sender, room_topic_receiver) = broadcast::channel(32);
let (room_sender, room_receiver) = broadcast::channel(32);
let matrix_client = Arc::new(
MatrixClient::builder()
@@ -241,7 +289,7 @@ impl Client {
.unwrap(),
);
let mut client = Client::new(matrix_client.clone(), rooms_sender, room_topic_sender);
let mut client = Client::new(matrix_client.clone(), room_sender);
tokio::spawn({
async move {
@@ -253,8 +301,7 @@ impl Client {
matrix_client,
tx,
receivers: Receivers {
rooms_receiver: RefCell::new(rooms_receiver),
room_topic_receiver: RefCell::new(room_topic_receiver),
room_receiver: RefCell::new(room_receiver),
},
}
}
@@ -267,6 +314,7 @@ impl Client {
let _ = client.add_event_handler(Client::on_sync_typing_event);
let _ = client.add_event_handler(Client::on_presence_event);
let _ = client.add_event_handler(Client::on_sync_state_event);
let _ = client.add_event_handler(Client::on_stripped_room_member_event);
let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event);
let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event);
let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event);
@@ -284,36 +332,36 @@ impl Client {
self.initialized = true;
}
async fn refresh_rooms(matrix_client: &MatrixClient, rooms_sender: &Sender<Room>) {
let joined_matrix_rooms_ref = &matrix_client.joined_rooms();
let invited_matrix_rooms_ref = &matrix_client.invited_rooms();
// async fn refresh_rooms(matrix_client: &MatrixClient, room_sender: &Sender<RoomMemberEvent>) {
// let joined_matrix_rooms_ref = &matrix_client.joined_rooms();
// let invited_matrix_rooms_ref = &matrix_client.invited_rooms();
for matrix_rooms in [joined_matrix_rooms_ref, invited_matrix_rooms_ref] {
for matrix_room in matrix_rooms.iter() {
let topic = matrix_room.topic().map(RefCell::new);
let room = Room::new(
Arc::new(matrix_room.to_owned()),
topic,
matrix_room.is_direct().await.ok(),
);
// for matrix_rooms in [joined_matrix_rooms_ref, invited_matrix_rooms_ref] {
// for matrix_room in matrix_rooms.iter() {
// let topic = matrix_room.topic().map(RefCell::new);
// let room = Room::new(
// Arc::new(matrix_room.to_owned()),
// topic,
// matrix_room.is_direct().await.ok(),
// );
if let Err(err) = rooms_sender.send(room) {
warn!("Error: {}", err);
}
}
}
}
// if let Err(err) = room_sender.send(room) {
// warn!("Error: {}", err);
// }
// }
// }
// }
async fn refresh_rooms_forever(matrix_client: &MatrixClient, rooms_channel: &Sender<Room>) {
// TODO: Add interval to config
let mut interval = tokio::time::interval(Duration::from_secs(5));
// async fn refresh_rooms_forever(matrix_client: &MatrixClient, room_channel: &Sender<RoomEvent>) {
// // TODO: Add interval to config
// let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
Self::refresh_rooms(matrix_client, rooms_channel).await;
// loop {
// // Self::refresh_rooms(matrix_client, room_channel).await;
interval.tick().await;
}
}
// interval.tick().await;
// }
// }
async fn login_and_sync(&mut self, style: LoginStyle) -> anyhow::Result<()> {
let client = self.client.clone().unwrap();
@@ -330,7 +378,7 @@ impl Client {
}
}
let (synchronized_tx, synchronized_rx) = oneshot::channel();
// let (synchronized_tx, synchronized_rx) = oneshot::channel();
self.sync_handle = tokio::spawn({
async move {
@@ -339,9 +387,9 @@ impl Client {
debug!("User connected to the homeserver");
if let Err(err) = synchronized_tx.send(true) {
warn!("Unable to notify that the Matrix client is now synchronized ({err})");
}
// if let Err(err) = synchronized_tx.send(true) {
// warn!("Unable to notify that the Matrix client is now synchronized ({err})");
// }
loop {
let settings = SyncSettings::default();
@@ -351,32 +399,32 @@ impl Client {
})
.into();
self.start_background_tasks(synchronized_rx);
// self.start_background_tasks(synchronized_rx);
Ok(())
}
fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver<bool>) {
let client = self.client.clone().unwrap();
let rooms_sender_ref = &self.senders.rooms_sender;
// fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver<bool>) {
// let client = self.client.clone().unwrap();
// let room_sender_ref = &self.senders.room_sender;
self.load_handle = tokio::spawn({
to_owned![rooms_sender_ref];
// self.load_handle = tokio::spawn({
// to_owned![room_sender_ref];
async move {
if let Err(err) = synchronized_rx.await {
error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})");
}
// async move {
// if let Err(err) = synchronized_rx.await {
// error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})");
// }
let rooms_refresh = Self::refresh_rooms_forever(
client.as_ref(),
&rooms_sender_ref
);
let ((),) = tokio::join!(rooms_refresh);
}
})
.into();
}
// let rooms_refresh = Self::refresh_rooms_forever(
// client.as_ref(),
// &room_sender_ref
// );
// let ((),) = tokio::join!(rooms_refresh);
// }
// })
// .into();
// }
async fn work(&mut self, mut rx: UnboundedReceiver<WorkerTask>) {
loop {

View File

@@ -5,21 +5,18 @@ use matrix_sdk::Client as MatrixClient;
use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::UnboundedSender;
use super::client::RoomTopicEvent;
use super::client::RoomEvent;
use super::worker_tasks::{LoginStyle, WorkerTask};
use crate::base::Room;
use crate::utils::oneshot;
pub struct Receivers {
pub rooms_receiver: RefCell<Receiver<Room>>,
pub room_topic_receiver: RefCell<Receiver<RoomTopicEvent>>,
pub room_receiver: RefCell<Receiver<RoomEvent>>,
}
impl Clone for Receivers {
fn clone(&self) -> Self {
Self {
rooms_receiver: RefCell::new(self.rooms_receiver.borrow().resubscribe()),
room_topic_receiver: RefCell::new(self.room_topic_receiver.borrow().resubscribe()),
room_receiver: RefCell::new(self.room_receiver.borrow().resubscribe()),
}
}
}
@@ -33,6 +30,7 @@ pub struct Requester {
impl Requester {
pub async fn init(&self) -> anyhow::Result<()> {
let (reply, mut response) = oneshot();
// TODO: Handle error case.
self.tx.send(WorkerTask::Init(reply)).unwrap();
match response.recv().await {
@@ -43,6 +41,7 @@ impl Requester {
pub async fn login(&self, style: LoginStyle) -> anyhow::Result<()> {
let (reply, mut response) = oneshot();
// TODO: Handle error case.
self.tx.send(WorkerTask::Login(style, reply)).unwrap();
match response.recv().await {