1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 mod manager; 15 16 use std::collections::HashMap; 17 use std::net::Shutdown; 18 use std::os::fd::AsRawFd; 19 20 pub(crate) use manager::{ClientManager, ClientManagerEntry}; 21 use ylong_http_client::Headers; 22 use ylong_runtime::net::UnixDatagram; 23 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; 24 use ylong_runtime::sync::oneshot::{channel, Sender}; 25 26 use crate::error::ErrorCode; 27 use crate::task::notify::{NotifyData, SubscribeType}; 28 use crate::utils::{runtime_spawn, Recv}; 29 30 const REQUEST_MAGIC_NUM: u32 = 0x43434646; 31 const HEADERS_MAX_SIZE: u16 = 8 * 1024; 32 const POSITION_OF_LENGTH: u32 = 10; 33 34 #[derive(Debug)] 35 pub(crate) enum ClientEvent { 36 OpenChannel(u64, Sender<Result<i32, ErrorCode>>), 37 Subscribe(u32, u64, u64, u64, Sender<ErrorCode>), 38 Unsubscribe(u32, Sender<ErrorCode>), 39 TaskFinished(u32), 40 Terminate(u64, Sender<ErrorCode>), 41 SendResponse(u32, String, u32, String, Headers), 42 SendNotifyData(SubscribeType, NotifyData), 43 Shutdown, 44 } 45 46 pub(crate) enum MessageType { 47 HttpResponse = 0, 48 NotifyData, 49 } 50 51 impl ClientManagerEntry { open_channel(&self, pid: u64) -> Result<i32, ErrorCode>52 pub(crate) fn open_channel(&self, pid: u64) -> Result<i32, ErrorCode> { 53 let (tx, rx) = channel::<Result<i32, ErrorCode>>(); 54 let event = ClientEvent::OpenChannel(pid, tx); 55 if !self.send_event(event) { 56 return Err(ErrorCode::Other); 57 } 58 let rx = Recv::new(rx); 59 match rx.get() { 60 Some(ret) => ret, 61 None => { 62 error!("open channel fail, recv none"); 63 Err(ErrorCode::Other) 64 } 65 } 66 } 67 subscribe(&self, tid: u32, pid: u64, uid: u64, token_id: u64) -> ErrorCode68 pub(crate) fn subscribe(&self, tid: u32, pid: u64, uid: u64, token_id: u64) -> ErrorCode { 69 let (tx, rx) = channel::<ErrorCode>(); 70 let event = ClientEvent::Subscribe(tid, pid, uid, token_id, tx); 71 if !self.send_event(event) { 72 return ErrorCode::Other; 73 } 74 let rx = Recv::new(rx); 75 match rx.get() { 76 Some(ret) => ret, 77 None => { 78 error!("subscribe fail, recv none"); 79 ErrorCode::Other 80 } 81 } 82 } 83 unsubscribe(&self, tid: u32) -> ErrorCode84 pub(crate) fn unsubscribe(&self, tid: u32) -> ErrorCode { 85 let (tx, rx) = channel::<ErrorCode>(); 86 let event = ClientEvent::Unsubscribe(tid, tx); 87 if !self.send_event(event) { 88 return ErrorCode::Other; 89 } 90 let rx = Recv::new(rx); 91 match rx.get() { 92 Some(ret) => ret, 93 None => { 94 error!("unsubscribe failed"); 95 ErrorCode::Other 96 } 97 } 98 } 99 notify_task_finished(&self, tid: u32)100 pub(crate) fn notify_task_finished(&self, tid: u32) { 101 let event = ClientEvent::TaskFinished(tid); 102 self.send_event(event); 103 } 104 notify_process_terminate(&self, pid: u64) -> ErrorCode105 pub(crate) fn notify_process_terminate(&self, pid: u64) -> ErrorCode { 106 let (tx, rx) = channel::<ErrorCode>(); 107 let event = ClientEvent::Terminate(pid, tx); 108 if !self.send_event(event) { 109 return ErrorCode::Other; 110 } 111 let rx = Recv::new(rx); 112 match rx.get() { 113 Some(ret) => ret, 114 None => { 115 error!("notify_process_terminate failed"); 116 ErrorCode::Other 117 } 118 } 119 } 120 send_response( &self, tid: u32, version: String, status_code: u32, reason: String, headers: Headers, )121 pub(crate) fn send_response( 122 &self, 123 tid: u32, 124 version: String, 125 status_code: u32, 126 reason: String, 127 headers: Headers, 128 ) { 129 let event = ClientEvent::SendResponse(tid, version, status_code, reason, headers); 130 let _ = self.send_event(event); 131 } 132 send_notify_data(&self, subscribe_type: SubscribeType, notify_data: NotifyData)133 pub(crate) fn send_notify_data(&self, subscribe_type: SubscribeType, notify_data: NotifyData) { 134 let event = ClientEvent::SendNotifyData(subscribe_type, notify_data); 135 let _ = self.send_event(event); 136 } 137 } 138 139 // uid and token_id will be used later 140 pub(crate) struct Client { 141 pub(crate) pid: u64, 142 pub(crate) message_id: u32, 143 pub(crate) server_sock_fd: UnixDatagram, 144 pub(crate) client_sock_fd: UnixDatagram, 145 rx: UnboundedReceiver<ClientEvent>, 146 } 147 148 impl Client { constructor(pid: u64) -> Option<(UnboundedSender<ClientEvent>, i32)>149 pub(crate) fn constructor(pid: u64) -> Option<(UnboundedSender<ClientEvent>, i32)> { 150 let (tx, rx) = unbounded_channel(); 151 let (server_sock_fd, client_sock_fd) = match UnixDatagram::pair() { 152 Ok((server_sock_fd, client_sock_fd)) => (server_sock_fd, client_sock_fd), 153 Err(err) => { 154 error!("can't create a pair of sockets, {:?}", err); 155 return None; 156 } 157 }; 158 let client = Client { 159 pid, 160 message_id: 1, 161 server_sock_fd, 162 client_sock_fd, 163 rx, 164 }; 165 let fd = client.client_sock_fd.as_raw_fd(); 166 runtime_spawn(client.run()); 167 Some((tx, fd)) 168 } 169 run(mut self)170 async fn run(mut self) { 171 loop { 172 // for one task, only send last progress message 173 let mut progress_index = HashMap::new(); 174 let mut temp_notify_data: Vec<(SubscribeType, NotifyData)> = Vec::new(); 175 let mut len = self.rx.len(); 176 if len == 0 { 177 len = 1; 178 } 179 for index in 0..len { 180 let recv = match self.rx.recv().await { 181 Ok(message) => message, 182 Err(e) => { 183 error!("ClientManager recv error {:?}", e); 184 continue; 185 } 186 }; 187 match recv { 188 ClientEvent::Shutdown => { 189 let _ = self.client_sock_fd.shutdown(Shutdown::Both); 190 let _ = self.server_sock_fd.shutdown(Shutdown::Both); 191 self.rx.close(); 192 info!("client terminate, pid {}", self.pid); 193 return; 194 } 195 ClientEvent::SendResponse(tid, version, status_code, reason, headers) => { 196 self.handle_send_response(tid, version, status_code, reason, headers) 197 .await; 198 } 199 ClientEvent::SendNotifyData(subscribe_type, notify_data) => { 200 if subscribe_type == SubscribeType::Progress { 201 progress_index.insert(notify_data.task_id, index); 202 } 203 temp_notify_data.push((subscribe_type, notify_data)); 204 } 205 _ => {} 206 } 207 } 208 for (index, (subscribe_type, notify_data)) in temp_notify_data.into_iter().enumerate() { 209 if subscribe_type != SubscribeType::Progress 210 || progress_index.get(¬ify_data.task_id) == Some(&index) 211 { 212 self.handle_send_notify_data(subscribe_type, notify_data) 213 .await; 214 } 215 } 216 debug!("Client handle message done"); 217 } 218 } 219 handle_send_response( &mut self, tid: u32, version: String, status_code: u32, reason: String, headers: Headers, )220 async fn handle_send_response( 221 &mut self, 222 tid: u32, 223 version: String, 224 status_code: u32, 225 reason: String, 226 headers: Headers, 227 ) { 228 let mut response = Vec::<u8>::new(); 229 230 response.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes()); 231 232 response.extend_from_slice(&self.message_id.to_le_bytes()); 233 self.message_id += 1; 234 235 let message_type = MessageType::HttpResponse as u16; 236 response.extend_from_slice(&message_type.to_le_bytes()); 237 238 let message_body_size: u16 = 0; 239 response.extend_from_slice(&message_body_size.to_le_bytes()); 240 241 response.extend_from_slice(&tid.to_le_bytes()); 242 243 response.extend_from_slice(&version.into_bytes()); 244 response.push(b'\0'); 245 246 response.extend_from_slice(&status_code.to_le_bytes()); 247 248 response.extend_from_slice(&reason.into_bytes()); 249 response.push(b'\0'); 250 251 for (k, v) in headers { 252 response.extend_from_slice(k.as_bytes()); 253 response.push(b':'); 254 for (i, sub_value) in v.iter().enumerate() { 255 if i != 0 { 256 response.push(b','); 257 } 258 response.extend_from_slice(sub_value); 259 } 260 response.push(b'\n'); 261 } 262 263 let mut size = response.len() as u16; 264 if size > HEADERS_MAX_SIZE { 265 response.truncate(HEADERS_MAX_SIZE as usize); 266 size = HEADERS_MAX_SIZE; 267 } 268 debug!("send response size, {:?}", size); 269 let size = size.to_le_bytes(); 270 response[POSITION_OF_LENGTH as usize] = size[0]; 271 response[(POSITION_OF_LENGTH + 1) as usize] = size[1]; 272 273 self.send_message(response).await; 274 } 275 handle_send_notify_data( &mut self, subscribe_type: SubscribeType, notify_data: NotifyData, )276 async fn handle_send_notify_data( 277 &mut self, 278 subscribe_type: SubscribeType, 279 notify_data: NotifyData, 280 ) { 281 let mut message = Vec::<u8>::new(); 282 283 message.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes()); 284 285 message.extend_from_slice(&self.message_id.to_le_bytes()); 286 self.message_id += 1; 287 288 let message_type = MessageType::NotifyData as u16; 289 message.extend_from_slice(&message_type.to_le_bytes()); 290 291 let message_body_size: u16 = 0; 292 message.extend_from_slice(&message_body_size.to_le_bytes()); 293 294 message.extend_from_slice(&(subscribe_type as u32).to_le_bytes()); 295 296 message.extend_from_slice(¬ify_data.task_id.to_le_bytes()); 297 298 message.extend_from_slice(&(notify_data.progress.common_data.state as u32).to_le_bytes()); 299 300 let index = notify_data.progress.common_data.index; 301 message.extend_from_slice(&(index as u32).to_le_bytes()); 302 303 message.extend_from_slice(&(notify_data.progress.processed[index] as u64).to_le_bytes()); 304 305 message.extend_from_slice( 306 &(notify_data.progress.common_data.total_processed as u64).to_le_bytes(), 307 ); 308 309 message.extend_from_slice(&(notify_data.progress.sizes.len() as u32).to_le_bytes()); 310 for size in notify_data.progress.sizes { 311 message.extend_from_slice(&size.to_le_bytes()); 312 } 313 314 message.extend_from_slice(&(notify_data.progress.extras.len() as u32).to_le_bytes()); 315 for (key, value) in notify_data.progress.extras { 316 message.extend_from_slice(&key.into_bytes()); 317 message.push(b'\0'); 318 message.extend_from_slice(&value.into_bytes()); 319 message.push(b'\0'); 320 } 321 322 message.extend_from_slice(&(notify_data.action.repr as u32).to_le_bytes()); 323 324 message.extend_from_slice(&(notify_data.version as u32).to_le_bytes()); 325 326 message.extend_from_slice(&(notify_data.each_file_status.len() as u32).to_le_bytes()); 327 for status in notify_data.each_file_status { 328 message.extend_from_slice(&status.path.into_bytes()); 329 message.push(b'\0'); 330 message.extend_from_slice(&(status.reason.repr as u32).to_le_bytes()); 331 message.extend_from_slice(&status.message.into_bytes()); 332 message.push(b'\0'); 333 } 334 335 let size = message.len() as u16; 336 if subscribe_type == SubscribeType::Progress { 337 debug!( 338 "send tid {} {:?} size {}", 339 notify_data.task_id, subscribe_type, size 340 ); 341 } else { 342 info!( 343 "send tid {} {:?} size {}", 344 notify_data.task_id, subscribe_type, size 345 ); 346 } 347 348 let size = size.to_le_bytes(); 349 message[POSITION_OF_LENGTH as usize] = size[0]; 350 message[(POSITION_OF_LENGTH + 1) as usize] = size[1]; 351 352 self.send_message(message).await; 353 } 354 send_message(&mut self, message: Vec<u8>)355 async fn send_message(&mut self, message: Vec<u8>) { 356 let ret = self.server_sock_fd.send(&message).await; 357 match ret { 358 Ok(size) => { 359 debug!("send message ok, pid: {}, size: {}", self.pid, size); 360 let mut buf: [u8; 4] = [0; 4]; 361 let ret = self.server_sock_fd.recv(&mut buf).await; 362 if let Err(e) = ret { 363 error!("message len err, {:?}", e) 364 } 365 let len: u32 = u32::from_le_bytes(buf); 366 if len != message.len() as u32 { 367 error!("message len bad, send {:?}, recv {:?}", message.len(), len); 368 } else { 369 debug!("notify done, pid: {}", self.pid); 370 } 371 } 372 Err(err) => { 373 error!("message send error: {:?}", err); 374 } 375 } 376 } 377 } 378