Add rooms topics management

This commit is contained in:
2023-12-23 14:54:21 +01:00
parent c9292fd613
commit 8679a23692
3 changed files with 89 additions and 44 deletions

View File

@@ -16,7 +16,7 @@ use matrix_sdk::{
}; };
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
use crate::matrix_interface::client::Client; use crate::matrix_interface::client::{Client, RoomTopicEvent};
use crate::matrix_interface::requester::Requester; use crate::matrix_interface::requester::Requester;
use crate::matrix_interface::worker_tasks::LoginStyle; use crate::matrix_interface::worker_tasks::LoginStyle;
@@ -148,24 +148,38 @@ async fn on_room(room_option: Option<Room>, rooms_ref: &UseAtomRef<ByIdRooms>) {
} }
} }
async fn on_room_topic(
room_topic_event_option: Option<RoomTopicEvent>,
rooms_ref: &UseAtomRef<ByIdRooms>,
) {
if let Some(room_topic_event) = room_topic_event_option {
let room_id = room_topic_event.0;
if let Some(room_ref) = rooms_ref.read().get(&room_id) {
let topic = room_topic_event.1;
let mut room = room_ref.borrow_mut();
room.topic = Some(RefCell::new(topic));
} else {
warn!("No room found with the \"{}\" id", room_id);
}
}
}
pub async fn sync_rooms( pub async fn sync_rooms(
mut rx: UnboundedReceiver<bool>, mut rx: UnboundedReceiver<bool>,
app_settings_ref: UseAtomRef<AppSettings>, app_settings_ref: UseAtomRef<AppSettings>,
rooms_ref: UseAtomRef<ByIdRooms>, rooms_ref: UseAtomRef<ByIdRooms>,
) { ) {
error!("=== SYNC ROOMS BEG ===");
while let Some(_is_logged) = rx.next().await { while let Some(_is_logged) = rx.next().await {
let app_settings_ref = app_settings_ref.read(); if let Some(requester) = &app_settings_ref.read().requester {
let requester = &app_settings_ref.requester; let mut room_stream = requester.rooms_receiver.stream();
let mut room_topic_stream = requester.room_topic_receiver.stream();
if requester.is_some() {
let rooms_receiver = &requester.as_ref().unwrap().rooms_receiver;
let mut room_stream = rooms_receiver.stream();
loop { loop {
select! { select! {
room = room_stream.next() => on_room(room, &rooms_ref).await, room = room_stream.next() => on_room(room, &rooms_ref).await,
room_topic_event = room_topic_stream.next() => on_room_topic(room_topic_event, &rooms_ref).await,
} }
} }
} }

View File

@@ -1,4 +1,5 @@
// TODO: make a choice: mpsc vs flume. // TODO: make a choice: mpsc vs flume.
use std::cell::RefCell;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -13,24 +14,27 @@ use matrix_sdk::{
config::SyncSettings, config::SyncSettings,
event_handler::Ctx, event_handler::Ctx,
room::Room as MatrixRoom, room::Room as MatrixRoom,
ruma::events::{ ruma::{
key::verification::{ events::{
done::{OriginalSyncKeyVerificationDoneEvent, ToDeviceKeyVerificationDoneEvent}, key::verification::{
key::{OriginalSyncKeyVerificationKeyEvent, ToDeviceKeyVerificationKeyEvent}, done::{OriginalSyncKeyVerificationDoneEvent, ToDeviceKeyVerificationDoneEvent},
request::ToDeviceKeyVerificationRequestEvent, key::{OriginalSyncKeyVerificationKeyEvent, ToDeviceKeyVerificationKeyEvent},
start::{OriginalSyncKeyVerificationStartEvent, ToDeviceKeyVerificationStartEvent}, request::ToDeviceKeyVerificationRequestEvent,
start::{OriginalSyncKeyVerificationStartEvent, ToDeviceKeyVerificationStartEvent},
},
presence::PresenceEvent,
reaction::ReactionEventContent,
room::{
member::{OriginalSyncRoomMemberEvent, RoomMemberEventContent},
message::RoomMessageEventContent,
name::RoomNameEventContent,
redaction::OriginalSyncRoomRedactionEvent,
topic::RoomTopicEventContent,
},
typing::SyncTypingEvent,
SyncMessageLikeEvent, SyncStateEvent,
}, },
presence::PresenceEvent, OwnedRoomId,
reaction::ReactionEventContent,
room::{
member::{OriginalSyncRoomMemberEvent, RoomMemberEventContent},
message::RoomMessageEventContent,
name::RoomNameEventContent,
redaction::OriginalSyncRoomRedactionEvent,
topic::RoomTopicEventContent,
},
typing::SyncTypingEvent,
SyncMessageLikeEvent, SyncStateEvent,
}, },
Client as MatrixClient, Client as MatrixClient,
}; };
@@ -45,14 +49,23 @@ pub enum ClientError {
Matrix(#[from] matrix_sdk::Error), Matrix(#[from] matrix_sdk::Error),
} }
pub struct RoomTopicEvent(pub OwnedRoomId, pub String);
#[derive(Clone)] #[derive(Clone)]
struct Senders { struct Senders {
rooms_sender: flume::Sender<Room>, rooms_sender: flume::Sender<Room>,
room_topic_sender: flume::Sender<RoomTopicEvent>,
} }
impl Senders { impl Senders {
fn new(rooms_sender: flume::Sender<Room>) -> Self { fn new(
Self { rooms_sender } rooms_sender: flume::Sender<Room>,
room_topic_sender: flume::Sender<RoomTopicEvent>,
) -> Self {
Self {
rooms_sender,
room_topic_sender,
}
} }
} }
@@ -65,13 +78,17 @@ pub struct Client {
} }
impl Client { impl Client {
pub fn new(client: Arc<MatrixClient>, rooms_sender: flume::Sender<Room>) -> Self { pub fn new(
client: Arc<MatrixClient>,
rooms_sender: flume::Sender<Room>,
room_topic_sender: flume::Sender<RoomTopicEvent>,
) -> Self {
Self { Self {
initialized: false, initialized: false,
client: Some(client), client: Some(client),
load_handle: None, load_handle: None,
sync_handle: None, sync_handle: None,
senders: Senders::new(rooms_sender), senders: Senders::new(rooms_sender, room_topic_sender),
} }
} }
@@ -91,21 +108,25 @@ impl Client {
dbg!(_ev); dbg!(_ev);
} }
async fn on_room_topic_event(ev: SyncStateEvent<RoomTopicEventContent>, room: MatrixRoom) { async fn on_room_topic_event(
ev: SyncStateEvent<RoomTopicEventContent>,
room: MatrixRoom,
senders: Ctx<Senders>,
) {
debug!("== on_room_topic_event =="); debug!("== on_room_topic_event ==");
dbg!(&ev); dbg!(&ev);
// if let SyncStateEvent::Original(ev) = ev {
// let room_id = room.room_id().to_owned();
// let store = reactive_store.read().unwrap().to_owned();
// if let Some(store_room) = store.rooms.get(&room_id) { if let SyncStateEvent::Original(ev) = ev {
// // store_room.write().unwrap().topic = Some(ev.content.topic); let room_topic_sender = &senders.room_topic_sender;
// // let _ = reactive_store.write(store); let room_id = room.room_id();
// println!("HOP");
// } else { if let Err(err) = room_topic_sender
// println!("No room with \"{room_id}\" id known"); .send_async(RoomTopicEvent(room_id.to_owned(), ev.content.topic))
// } .await
// } {
error!("Unable to publish the \"{}\" new topic: {}", room_id, err);
}
}
} }
async fn on_room_member_event(ev: SyncStateEvent<RoomMemberEventContent>, _room: MatrixRoom) { async fn on_room_member_event(ev: SyncStateEvent<RoomMemberEventContent>, _room: MatrixRoom) {
@@ -221,7 +242,9 @@ impl Client {
pub async fn spawn(homeserver_url: String) -> Requester { pub async fn spawn(homeserver_url: String) -> Requester {
let (tx, rx) = unbounded_channel::<WorkerTask>(); let (tx, rx) = unbounded_channel::<WorkerTask>();
let (rooms_sender, rooms_receiver) = unbounded::<Room>(); let (rooms_sender, rooms_receiver) = unbounded::<Room>();
let (room_topic_sender, room_topic_receiver) = unbounded::<RoomTopicEvent>();
let matrix_client = Arc::new( let matrix_client = Arc::new(
MatrixClient::builder() MatrixClient::builder()
@@ -231,7 +254,7 @@ impl Client {
.unwrap(), .unwrap(),
); );
let mut client = Client::new(matrix_client.clone(), rooms_sender); let mut client = Client::new(matrix_client.clone(), rooms_sender, room_topic_sender);
tokio::spawn({ tokio::spawn({
async move { async move {
@@ -243,6 +266,7 @@ impl Client {
matrix_client, matrix_client,
tx, tx,
rooms_receiver, rooms_receiver,
room_topic_receiver,
} }
} }
@@ -277,9 +301,14 @@ impl Client {
for matrix_rooms in vec![joined_matrix_rooms_ref, invited_matrix_rooms_ref] { for matrix_rooms in vec![joined_matrix_rooms_ref, invited_matrix_rooms_ref] {
for matrix_room in matrix_rooms.iter() { for matrix_room in matrix_rooms.iter() {
let topic = match matrix_room.topic() {
None => None,
Some(topic) => Some(RefCell::new(topic)),
};
let room = Room::new( let room = Room::new(
Arc::new(matrix_room.to_owned()), Arc::new(matrix_room.to_owned()),
None, topic,
matrix_room.is_direct().await.ok(), matrix_room.is_direct().await.ok(),
); );
if let Err(err) = rooms_sender.send_async(room).await { if let Err(err) = rooms_sender.send_async(room).await {

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use matrix_sdk::Client as MatrixClient; use matrix_sdk::Client as MatrixClient;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use super::client::RoomTopicEvent;
use super::worker_tasks::{oneshot, LoginStyle, WorkerTask}; use super::worker_tasks::{oneshot, LoginStyle, WorkerTask};
use crate::base::Room; use crate::base::Room;
@@ -11,6 +12,7 @@ pub struct Requester {
pub matrix_client: Arc<MatrixClient>, pub matrix_client: Arc<MatrixClient>,
pub tx: UnboundedSender<WorkerTask>, pub tx: UnboundedSender<WorkerTask>,
pub rooms_receiver: flume::Receiver<Room>, pub rooms_receiver: flume::Receiver<Room>,
pub room_topic_receiver: flume::Receiver<RoomTopicEvent>,
} }
impl Requester { impl Requester {