♻️ Rework the Matrix messaging Client

This commit is contained in:
2024-05-10 22:32:35 +02:00
parent 0a936dd12b
commit bc6b02bc34

View File

@@ -1,37 +1,46 @@
use std::borrow::Borrow; use std::{
use std::cell::RefCell; borrow::Borrow,
use std::sync::Arc; collections::HashMap,
use std::time::Duration; sync::{Arc, Mutex},
};
use async_std::task; use async_std::stream::StreamExt;
use dioxus::prelude::Task; use dioxus::prelude::Task;
use log::{debug, error};
use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use matrix_sdk::{ use matrix_sdk::{
config::SyncSettings, config::SyncSettings,
event_handler::Ctx, event_handler::Ctx,
room::Room as MatrixRoom, media::{MediaFormat, MediaThumbnailSize},
room::{Room, RoomMember},
ruma::{ ruma::{
api::client::media::get_content_thumbnail::v3::Method,
events::{ events::{
room::{ room::{
member::{RoomMemberEventContent, StrippedRoomMemberEvent}, member::{
OriginalSyncRoomMemberEvent, RoomMemberEventContent, StrippedRoomMemberEvent,
},
topic::RoomTopicEventContent, topic::RoomTopicEventContent,
}, },
SyncStateEvent, SyncStateEvent,
}, },
OwnedRoomId, uint, OwnedRoomId, RoomId, UserId,
}, },
Client as MatrixClient, RoomState as MatrixRoomState, Client as MatrixClient, RoomMemberships, RoomState,
}; };
use super::requester::{Receivers, Requester}; use tokio::sync::{
use super::worker_tasks::{LoginStyle, WorkerTask}; broadcast,
use crate::domain::model::room::Room; broadcast::{error::SendError, Receiver, Sender},
mpsc::{unbounded_channel, UnboundedReceiver},
};
use tracing::{debug, error, warn};
use super::{
account_event::AccountEvent,
requester::Requester,
room_event::{RoomEvent, RoomEventsReceiver},
worker_tasks::{LoginStyle, WorkerTask},
};
use crate::utils::oneshot;
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum ClientError { pub enum ClientError {
@@ -39,21 +48,45 @@ pub enum ClientError {
Matrix(#[from] matrix_sdk::Error), Matrix(#[from] matrix_sdk::Error),
} }
#[derive(Clone)]
pub enum RoomEvent {
TopicEvent(OwnedRoomId, String),
MemberEvent(OwnedRoomId, Room),
InviteEvent(OwnedRoomId, Room),
}
#[derive(Clone)] #[derive(Clone)]
struct Senders { struct Senders {
room_events_sender: Sender<RoomEvent>, account_events_sender: Sender<AccountEvent>,
room_events_senders: Arc<Mutex<HashMap<OwnedRoomId, Sender<RoomEvent>>>>,
} }
impl Senders { impl Senders {
fn new(room_events_sender: Sender<RoomEvent>) -> Self { fn new(account_events_sender: Sender<AccountEvent>) -> Self {
Self { room_events_sender } Self {
account_events_sender,
room_events_senders: Arc::new(Mutex::new(HashMap::new())),
}
}
fn contains(&self, room_id: &RoomId) -> bool {
let room_senders = self.room_events_senders.lock().unwrap();
room_senders.contains_key(room_id)
}
fn send(&self, room_id: &RoomId, event: RoomEvent) -> Result<usize, SendError<RoomEvent>> {
let room_senders = self.room_events_senders.lock().unwrap();
if let Some(room_sender) = room_senders.get(room_id) {
room_sender.send(event)
} else {
error!("No sender found for \"{}\" room id", room_id);
Ok(0)
}
}
fn add_room(&self, room_id: &OwnedRoomId) -> Option<RoomEventsReceiver> {
let mut senders = self.room_events_senders.lock().unwrap();
if !senders.contains_key(room_id) {
let (room_sender, room_receiver) = broadcast::channel(32);
senders.insert(room_id.clone(), room_sender);
Some(RoomEventsReceiver::new(room_receiver))
} else {
None
}
} }
} }
@@ -65,16 +98,173 @@ pub struct Client {
} }
impl Client { impl Client {
pub fn new(client: Arc<MatrixClient>, room_events_sender: Sender<RoomEvent>) -> Self { pub fn new(client: Arc<MatrixClient>, account_events_sender: Sender<AccountEvent>) -> Self {
Self { Self {
initialized: false, initialized: false,
client: Some(client), client: Some(client),
sync_task: None, sync_task: None,
senders: Senders::new(room_events_sender), senders: Senders::new(account_events_sender),
} }
} }
// async fn on_sync_typing_event(_ev: SyncTypingEvent, room: MatrixRoom) { async fn create_space(
senders: &Ctx<Senders>,
room_id: OwnedRoomId,
room: Option<&Room>,
) -> anyhow::Result<(), SendError<AccountEvent>> {
let mut name = None;
let mut topic = None;
if let Some(room) = room {
name = room.name();
topic = room.topic();
}
if let Some(receiver) = senders.add_room(&room_id) {
let (reply, mut response) = oneshot::<bool>();
let event = AccountEvent::NewSpace(room_id.clone(), name, topic, receiver, reply);
if let Err(err) = senders.account_events_sender.send(event) {
error!(
"Unable to publish the new room with \"{}\" id: {}",
room_id, err
);
return Err(err);
}
// We're expecting a response indicating that the client is able to compute the next RoomEvent
response.recv().await;
} else {
let events = vec![RoomEvent::NewTopic(topic), RoomEvent::NewName(name)];
for event in events {
if let Err(err) = senders.send(&room_id, event.clone()) {
error!(
"Unable to publish the {:?} event to the \"{}\" room: {}",
event, room_id, err
);
// return Err(err);
}
}
}
Ok(())
}
async fn create_room(
senders: &Ctx<Senders>,
room: &Room,
) -> anyhow::Result<(), SendError<AccountEvent>> {
let room_id = room.room_id().to_owned();
if let Some(receiver) = senders.add_room(&room_id) {
let (reply, mut response) = oneshot::<bool>();
let is_direct = match room.is_direct().await {
Ok(is_direct) => Some(is_direct),
Err(err) => {
error!("Unable to know if the room \"{room_id}\" is direct: {err}");
None
}
};
let mut parents = vec![];
// TODO: Remove unwrap
let mut spaces = room.parent_spaces().await.unwrap();
while let Some(parent) = spaces.next().await {
match parent {
Ok(parent) => match parent {
matrix_sdk::room::ParentSpace::Reciprocal(parent) => {
parents.push(parent.room_id().to_owned());
}
_ => todo!(),
},
Err(err) => {
error!("{}", err);
}
}
}
let event = AccountEvent::NewRoom(
room_id.clone(),
parents.clone(),
room.name(),
room.topic(),
is_direct,
room.state(),
receiver,
reply,
);
if let Err(err) = senders.account_events_sender.send(event) {
error!(
"Unable to publish the new room with \"{}\" id: {}",
room.room_id(),
err
);
return Err(err);
}
// We're expecting a response indicating that the client is able to compute the next RoomEvent
response.recv().await;
}
Ok(())
}
async fn add_room(
senders: &Ctx<Senders>,
room: &Room,
) -> anyhow::Result<(), SendError<AccountEvent>> {
let room_id = room.room_id().to_owned();
if room.is_space() {
Self::create_space(senders, room_id, Some(room)).await
} else {
let ret = Self::create_room(senders, room).await;
let mut parents = vec![];
// TODO: Remove unwrap
let mut spaces = room.parent_spaces().await.unwrap();
while let Some(parent) = spaces.next().await {
match parent {
Ok(parent) => match parent {
matrix_sdk::room::ParentSpace::Reciprocal(parent) => {
parents.push(parent.room_id().to_owned());
}
_ => {
warn!(
"Only ParentSpace::Reciprocal taken into account, skip {:?}",
parent
);
}
},
Err(err) => {
error!("{}", err);
}
}
}
error!("parents={:?}", &parents);
for parent in parents {
// Create a minimal space to make the relation consistent... its content will be sync later.
if !senders.contains(&parent) {
let _ = Self::create_space(senders, parent.clone(), None).await;
}
let event = RoomEvent::NewChild(room_id.clone());
if let Err(err) = senders.send(parent.as_ref(), event.clone()) {
error!(
"Unable to send the {:?} event to the \"{}\": {:?}",
event, parent, err
);
}
}
ret
}
}
// async fn on_sync_typing_event(_ev: SyncTypingEvent, room: Room) {
// debug!("== on_sync_typing_event =="); // debug!("== on_sync_typing_event ==");
// let room_id = room.room_id().to_owned(); // let room_id = room.room_id().to_owned();
// dbg!(room_id); // dbg!(room_id);
@@ -85,7 +275,7 @@ impl Client {
// dbg!(_ev); // dbg!(_ev);
// } // }
// async fn on_sync_state_event(ev: SyncStateEvent<RoomNameEventContent>, _room: MatrixRoom) { // async fn on_sync_state_event(ev: SyncStateEvent<RoomNameEventContent>, _room: Room) {
// error!("== on_sync_state_event =="); // error!("== on_sync_state_event ==");
// if let SyncStateEvent::Original(ev) = ev { // if let SyncStateEvent::Original(ev) = ev {
// dbg!(ev); // dbg!(ev);
@@ -94,32 +284,33 @@ impl Client {
// async fn on_original_sync_room_message_event( // async fn on_original_sync_room_message_event(
// ev: OriginalSyncRoomMessageEvent, // ev: OriginalSyncRoomMessageEvent,
// _matrix_room: MatrixRoom, // _room: Room,
// _senders: Ctx<Senders>, // _senders: Ctx<Senders>,
// ) { // ) {
// error!("== on_original_sync_room_message_event =="); // error!("== on_original_sync_room_message_event ==");
// error!("ev={:?}", ev.content); // error!("ev={:?}", ev.content);
// }
async fn on_stripped_room_member_event( async fn on_stripped_room_member_event(
ev: StrippedRoomMemberEvent, ev: StrippedRoomMemberEvent,
matrix_client: MatrixClient, matrix_client: MatrixClient,
matrix_room: MatrixRoom, room: Room,
senders: Ctx<Senders>, senders: Ctx<Senders>,
) { ) {
if ev.state_key == matrix_client.user_id().unwrap() error!("*** on_stripped_room_member_event ***");
&& matrix_room.state() == MatrixRoomState::Invited // error!("ev={:?}", ev);
{
let room_id = matrix_room.room_id();
let room = Room::from_matrix_room(&matrix_room).await;
if let Err(err) = senders if ev.state_key == matrix_client.user_id().unwrap()
.room_events_sender && room.state() == RoomState::Invited
.send(RoomEvent::InviteEvent(room_id.to_owned(), room)) && Self::add_room(&senders, &room).await.is_ok()
{ {
let room_id = room.room_id();
let event = RoomEvent::Invitation();
if let Err(err) = senders.send(room_id, event) {
error!( error!(
"Unable to publish the new room with \"{}\" id: {}", "Unable to publish the room \"{}\" invitation: {}",
room_id, err room.room_id(),
err
); );
} }
} }
@@ -127,45 +318,56 @@ impl Client {
async fn on_room_topic_event( async fn on_room_topic_event(
ev: SyncStateEvent<RoomTopicEventContent>, ev: SyncStateEvent<RoomTopicEventContent>,
matrix_room: MatrixRoom, room: Room,
senders: Ctx<Senders>, senders: Ctx<Senders>,
) { ) {
if let SyncStateEvent::Original(ev) = ev { error!("*** on_room_topic_event ***");
let room_id = matrix_room.room_id(); // error!("ev={:?}", ev);
if let Err(err) = senders if let SyncStateEvent::Original(ev) = ev {
.room_events_sender let _ = Self::add_room(&senders, &room).await;
.send(RoomEvent::TopicEvent(room_id.to_owned(), ev.content.topic))
{ let room_id = room.room_id();
error!("Unable to publish the \"{}\" new topic: {}", room_id, err); let event = RoomEvent::NewTopic(Some(ev.content.topic));
if let Err(err) = senders.send(room_id, event) {
error!(
"Unable to publish the room \"{}\" topic: {}",
room.room_id(),
err
);
} }
} }
} }
async fn on_room_member_event( async fn on_room_member_event(
ev: SyncStateEvent<RoomMemberEventContent>, ev: SyncStateEvent<RoomMemberEventContent>,
matrix_room: MatrixRoom, room: Room,
senders: Ctx<Senders>, senders: Ctx<Senders>,
) { ) {
if let SyncStateEvent::Original(_ev) = ev { error!("*** on_room_member_event ***");
let room_id = matrix_room.room_id(); // error!("ev={:?}", ev);
let room = Room::from_matrix_room(&matrix_room).await;
if let Err(err) = senders if let SyncStateEvent::Original(_ev) = ev {
.room_events_sender if Self::add_room(&senders, &room).await.is_ok() {
.send(RoomEvent::MemberEvent(room_id.to_owned(), room)) // let room_id = room.room_id();
{ // // TODO: Client shall only manage Matrix object... not BG92 ones.
error!( // let event = RoomEvent::Membership(RoomMember::new(ev.sender, room_id));
"Unable to publish the new room with \"{}\" id: {}", // if let Some(result) = senders.send(room_id, event) {
room_id, err // if let Err(err) = result {
); // error!(
// "Unable to publish the room \"{}\" membership: {}",
// room.room_id(),
// err
// );
// }
// }
} }
} }
} }
// async fn on_sync_message_like_room_message_event( // async fn on_sync_message_like_room_message_event(
// ev: SyncMessageLikeEvent<RoomMessageEventContent>, // ev: SyncMessageLikeEvent<RoomMessageEventContent>,
// _room: MatrixRoom, // _room: Room,
// _client: MatrixClient, // _client: MatrixClient,
// ) { // ) {
// debug!("== on_sync_message_like_room_message_event =="); // debug!("== on_sync_message_like_room_message_event ==");
@@ -174,7 +376,7 @@ impl Client {
// async fn on_sync_message_like_reaction_event( // async fn on_sync_message_like_reaction_event(
// ev: SyncMessageLikeEvent<ReactionEventContent>, // ev: SyncMessageLikeEvent<ReactionEventContent>,
// _room: MatrixRoom, // _room: Room,
// ) { // ) {
// debug!("== on_sync_message_like_reaction_event =="); // debug!("== on_sync_message_like_reaction_event ==");
// dbg!(ev); // dbg!(ev);
@@ -182,28 +384,28 @@ impl Client {
// async fn on_original_sync_room_redaction_event( // async fn on_original_sync_room_redaction_event(
// ev: OriginalSyncRoomRedactionEvent, // ev: OriginalSyncRoomRedactionEvent,
// _room: MatrixRoom, // _room: Room,
// ) { // ) {
// debug!("== on_original_sync_room_redaction_event =="); // debug!("== on_original_sync_room_redaction_event ==");
// dbg!(ev); // dbg!(ev);
// } // }
// async fn on_original_sync_room_member_event( async fn on_original_sync_room_member_event(
// _ev: OriginalSyncRoomMemberEvent, _ev: OriginalSyncRoomMemberEvent,
// _room: MatrixRoom, _room: Room,
// _client: MatrixClient, _client: MatrixClient,
// ) { ) {
// debug!("== on_original_sync_room_member_event =="); // debug!("== on_original_sync_room_member_event ==");
// error!("room={:?}", room);
// let mut store = store_ctx.read().unwrap().to_owned(); // let mut store = store_ctx.read().unwrap().to_owned();
// dbg!(store.rooms.keys()); // dbg!(store.rooms.keys());
// let is_direct = room.is_direct().await.ok(); // let is_direct = room.is_direct().await.ok();
// store.rooms.insert( // store.rooms.insert(
// OwnedRoomId::from(room_id), // OwnedRoomId::from(room_id),
// Arc::new(RwLock::new(Room::new(Arc::new(room), None, is_direct))), // Arc::new(RwLock::new(Room::new(Arc::new(room), None, is_direct))),
// ); // );
// let _ = store_ctx.write(store); // let _ = store_ctx.write(store);
// } }
// async fn on_original_sync_key_verif_start_event( // async fn on_original_sync_key_verif_start_event(
// ev: OriginalSyncKeyVerificationStartEvent, // ev: OriginalSyncKeyVerificationStartEvent,
@@ -265,11 +467,7 @@ impl Client {
// debug!("== on_room_event({}) ==", ev.) // debug!("== on_room_event({}) ==", ev.)
// } // }
pub async fn spawn(homeserver_url: String) -> Requester { pub async fn spawn(homeserver_url: String) -> (Requester, Receiver<AccountEvent>) {
let (tx, rx) = unbounded_channel::<WorkerTask>();
let (room_sender, room_receiver) = broadcast::channel(32);
let matrix_client = Arc::new( let matrix_client = Arc::new(
MatrixClient::builder() MatrixClient::builder()
.homeserver_url(&homeserver_url) .homeserver_url(&homeserver_url)
@@ -278,23 +476,22 @@ impl Client {
.unwrap(), .unwrap(),
); );
let mut client = Client::new(matrix_client.clone(), room_sender); let (worker_tasks_sender, worker_tasks_receiver) = unbounded_channel::<WorkerTask>();
let (account_events_sender, account_events_receiver) =
broadcast::channel::<AccountEvent>(32);
let mut client = Client::new(matrix_client, account_events_sender);
dioxus::prelude::spawn(async move { dioxus::prelude::spawn(async move {
client.work(rx).await; client.work(worker_tasks_receiver).await;
}); });
Requester { (Requester::new(worker_tasks_sender), account_events_receiver)
matrix_client,
tx,
receivers: Receivers {
room_receiver: RefCell::new(room_receiver),
},
}
} }
fn init(&mut self) { fn init(&mut self) {
if let Some(client) = self.client.borrow() { if let Some(client) = self.client.borrow() {
// TODO: Remove clone?
client.add_event_handler_context(self.senders.clone()); client.add_event_handler_context(self.senders.clone());
let _ = client.add_event_handler(Client::on_stripped_room_member_event); let _ = client.add_event_handler(Client::on_stripped_room_member_event);
@@ -309,7 +506,9 @@ impl Client {
// let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event); // let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event);
// let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event); // let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event);
// let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event); // let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event);
// let _ = client.add_event_handler(Client::on_original_sync_room_member_event);
let _ = client.add_event_handler(Client::on_original_sync_room_member_event);
// let _ = client.add_event_handler(Client::on_original_sync_key_verif_start_event); // let _ = client.add_event_handler(Client::on_original_sync_key_verif_start_event);
// let _ = client.add_event_handler(Client::on_original_sync_key_verif_key_event); // let _ = client.add_event_handler(Client::on_original_sync_key_verif_key_event);
// let _ = client.add_event_handler(Client::on_original_sync_key_verif_done_event); // let _ = client.add_event_handler(Client::on_original_sync_key_verif_done_event);
@@ -322,12 +521,12 @@ impl Client {
} }
} }
async fn login_and_sync(&mut self, style: LoginStyle) -> anyhow::Result<()> { async fn login(&mut self, style: LoginStyle) -> anyhow::Result<()> {
let client = self.client.clone().unwrap(); let client = self.client.as_ref().unwrap();
match style { match style {
LoginStyle::Password(username, password) => { LoginStyle::Password(username, password) => {
let _resp = client client
.matrix_auth() .matrix_auth()
.login_username(&username, &password) .login_username(&username, &password)
.initial_device_display_name("TODO") .initial_device_display_name("TODO")
@@ -337,7 +536,11 @@ impl Client {
} }
} }
let (synchronized_tx, synchronized_rx) = oneshot::channel::<bool>(); Ok(())
}
async fn run_forever(&mut self) {
let client = self.client.clone().unwrap();
let task = dioxus::prelude::spawn(async move { let task = dioxus::prelude::spawn(async move {
// Sync once so we receive the client state and old messages // Sync once so we receive the client state and old messages
@@ -350,87 +553,107 @@ impl Client {
}; };
if let Some(sync_token) = sync_token_option { if let Some(sync_token) = sync_token_option {
let settings = SyncSettings::default().token(sync_token);
debug!("User connected to the homeserver, start syncing"); debug!("User connected to the homeserver, start syncing");
if let Err(err) = synchronized_tx.send(true) { let settings = SyncSettings::default().token(sync_token);
error!("Unable to notify that the Matrix client is now synchronized ({err})");
}
let _ = client.sync(settings).await; let _ = client.sync(settings).await;
} }
}); });
self.sync_task = Some(task); self.sync_task = Some(task);
// self.start_background_tasks(synchronized_rx);
Ok(())
} }
// async fn register_room_events(&self, room_id: OwnedRoomId) { async fn get_display_name(&mut self) -> anyhow::Result<Option<String>> {
// let client = self.client.unwrap(); let client = self.client.as_ref().unwrap();
// client.add_room_event_handler(&room_id, Client::on_room_event); match client.account().get_display_name().await {
// } Ok(display_name) => Ok(display_name),
Err(err) => Err(err.into()),
}
}
// async fn refresh_rooms( async fn get_avatar(&mut self) -> anyhow::Result<Option<Vec<u8>>> {
// matrix_client: &Arc<MatrixClient>, let client = self.client.as_ref().unwrap();
// room_events_sender: &Sender<RoomEvent>,
// ) {
// let joined_matrix_rooms_ref = &matrix_client.joined_rooms();
// let invited_matrix_rooms_ref = &matrix_client.invited_rooms();
// for matrix_rooms in [joined_matrix_rooms_ref, invited_matrix_rooms_ref] { match client
// for matrix_room in matrix_rooms.iter() { .account()
// let room = Room::from_matrix_room(matrix_room).await; .get_avatar(MediaFormat::Thumbnail(MediaThumbnailSize {
// let event = RoomEvent::MemberEvent(room.id().clone(), room); method: Method::Scale,
width: uint!(256),
height: uint!(256),
}))
.await
{
Ok(avatar) => Ok(avatar),
Err(err) => Err(err.into()),
}
}
// if let Err(err) = room_events_sender.send(event) { async fn get_room_avatar(&mut self, room_id: &OwnedRoomId) -> anyhow::Result<Option<Vec<u8>>> {
// error!("Error: {}", err); let client = self.client.as_ref().unwrap();
// }
// }
// }
// }
// async fn refresh_rooms_forever( if let Some(room) = client.get_room(room_id) {
// matrix_client: Arc<MatrixClient>, match room
// room_events_sender: &Sender<RoomEvent>, .avatar(MediaFormat::Thumbnail(MediaThumbnailSize {
// ) { method: Method::Scale,
// // TODO: Add interval to config width: uint!(256),
// let period_sec = Duration::from_secs(5); height: uint!(256),
}))
.await
{
Ok(avatar) => Ok(avatar),
Err(err) => Err(err.into()),
}
} else {
error!("No room found with the \"{}\" id", room_id.as_str());
// TODO: Return an error if the room has not been found
Ok(None)
}
}
// loop { async fn get_room_members(&mut self, room_id: &OwnedRoomId) -> anyhow::Result<Vec<RoomMember>> {
// Self::refresh_rooms(&matrix_client, room_events_sender).await; let client = self.client.as_ref().unwrap();
// task::sleep(period_sec).await; if let Some(room) = client.get_room(room_id) {
// } match room.members(RoomMemberships::ACTIVE).await {
// } Ok(room_members) => Ok(room_members),
Err(err) => Err(err.into()),
}
} else {
error!("No room found with the \"{}\" id", room_id.as_str());
// TODO: Return an error if the room has not been found
Ok(vec![])
}
}
// fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver<bool>) { async fn get_room_member_avatar(
// let client = self.client.clone().unwrap(); &self,
// let room_events_sender = self.senders.room_events_sender.clone(); room_id: &RoomId,
user_id: &UserId,
) -> anyhow::Result<Option<Vec<u8>>> {
let client = self.client.as_ref().unwrap();
// let task = dioxus::prelude::spawn(async move { if let Some(room) = client.get_room(room_id) {
// if let Err(err) = synchronized_rx.await { if let Ok(Some(room_member)) = room.get_member(user_id).await {
// error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})"); let res = match room_member
// } .avatar(MediaFormat::Thumbnail(MediaThumbnailSize {
method: Method::Scale,
// debug!("Start room refreshing forever"); width: uint!(256),
height: uint!(256),
// let _ = Self::refresh_rooms_forever(client, &room_events_sender).await; }))
// }); .await
// self.background_task = Some(task); {
// } Ok(avatar) => Ok(avatar),
Err(err) => Err(err.into()),
};
return res;
}
}
Ok(None)
}
async fn work(&mut self, mut rx: UnboundedReceiver<WorkerTask>) { async fn work(&mut self, mut rx: UnboundedReceiver<WorkerTask>) {
loop { while let Some(task) = rx.recv().await {
match rx.recv().await { self.run(task).await;
Some(task) => self.run(task).await,
None => {
break;
}
}
} }
if let Some(task) = self.sync_task.take() { if let Some(task) = self.sync_task.take() {
@@ -441,17 +664,38 @@ impl Client {
async fn run(&mut self, task: WorkerTask) { async fn run(&mut self, task: WorkerTask) {
match task { match task {
WorkerTask::Init(reply) => { WorkerTask::Init(reply) => {
assert!(!self.initialized);
self.init(); self.init();
reply.send(()).await; reply.send(Ok(())).await;
}
WorkerTask::RunForever(reply) => {
{
self.run_forever().await;
reply.send(())
}
.await
} }
WorkerTask::Login(style, reply) => { WorkerTask::Login(style, reply) => {
assert!(self.initialized); reply.send(self.login(style).await).await;
reply.send(self.login_and_sync(style).await).await; }
} // WorkerTask::registerRoomEvents(room_id, reply) => { WorkerTask::GetDisplayName(reply) => {
// assert!(self.initialized); reply.send(self.get_display_name().await).await;
// reply.send(self.register_room_events(room_id).await).await; }
// } WorkerTask::GetAvatar(reply) => {
reply.send(self.get_avatar().await).await;
}
WorkerTask::GetRoomAvatar(id, reply) => {
reply.send(self.get_room_avatar(&id).await).await;
}
WorkerTask::GetRoomMembers(id, reply) => {
reply.send(self.get_room_members(&id).await).await;
}
WorkerTask::GetRoomMemberAvatar(room_id, user_id, reply) => {
reply
.send(self.get_room_member_avatar(&room_id, &user_id).await)
.await;
}
} }
} }
} }