1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod manager;
15 
16 use std::collections::HashMap;
17 use std::net::Shutdown;
18 use std::os::fd::AsRawFd;
19 
20 pub(crate) use manager::{ClientManager, ClientManagerEntry};
21 use ylong_http_client::Headers;
22 use ylong_runtime::net::UnixDatagram;
23 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
24 use ylong_runtime::sync::oneshot::{channel, Sender};
25 
26 use crate::error::ErrorCode;
27 use crate::task::notify::{NotifyData, SubscribeType};
28 use crate::utils::{runtime_spawn, Recv};
29 
30 const REQUEST_MAGIC_NUM: u32 = 0x43434646;
31 const HEADERS_MAX_SIZE: u16 = 8 * 1024;
32 const POSITION_OF_LENGTH: u32 = 10;
33 
34 #[derive(Debug)]
35 pub(crate) enum ClientEvent {
36     OpenChannel(u64, Sender<Result<i32, ErrorCode>>),
37     Subscribe(u32, u64, u64, u64, Sender<ErrorCode>),
38     Unsubscribe(u32, Sender<ErrorCode>),
39     TaskFinished(u32),
40     Terminate(u64, Sender<ErrorCode>),
41     SendResponse(u32, String, u32, String, Headers),
42     SendNotifyData(SubscribeType, NotifyData),
43     Shutdown,
44 }
45 
46 pub(crate) enum MessageType {
47     HttpResponse = 0,
48     NotifyData,
49 }
50 
51 impl ClientManagerEntry {
open_channel(&self, pid: u64) -> Result<i32, ErrorCode>52     pub(crate) fn open_channel(&self, pid: u64) -> Result<i32, ErrorCode> {
53         let (tx, rx) = channel::<Result<i32, ErrorCode>>();
54         let event = ClientEvent::OpenChannel(pid, tx);
55         if !self.send_event(event) {
56             return Err(ErrorCode::Other);
57         }
58         let rx = Recv::new(rx);
59         match rx.get() {
60             Some(ret) => ret,
61             None => {
62                 error!("open channel fail, recv none");
63                 Err(ErrorCode::Other)
64             }
65         }
66     }
67 
subscribe(&self, tid: u32, pid: u64, uid: u64, token_id: u64) -> ErrorCode68     pub(crate) fn subscribe(&self, tid: u32, pid: u64, uid: u64, token_id: u64) -> ErrorCode {
69         let (tx, rx) = channel::<ErrorCode>();
70         let event = ClientEvent::Subscribe(tid, pid, uid, token_id, tx);
71         if !self.send_event(event) {
72             return ErrorCode::Other;
73         }
74         let rx = Recv::new(rx);
75         match rx.get() {
76             Some(ret) => ret,
77             None => {
78                 error!("subscribe fail, recv none");
79                 ErrorCode::Other
80             }
81         }
82     }
83 
unsubscribe(&self, tid: u32) -> ErrorCode84     pub(crate) fn unsubscribe(&self, tid: u32) -> ErrorCode {
85         let (tx, rx) = channel::<ErrorCode>();
86         let event = ClientEvent::Unsubscribe(tid, tx);
87         if !self.send_event(event) {
88             return ErrorCode::Other;
89         }
90         let rx = Recv::new(rx);
91         match rx.get() {
92             Some(ret) => ret,
93             None => {
94                 error!("unsubscribe failed");
95                 ErrorCode::Other
96             }
97         }
98     }
99 
notify_task_finished(&self, tid: u32)100     pub(crate) fn notify_task_finished(&self, tid: u32) {
101         let event = ClientEvent::TaskFinished(tid);
102         self.send_event(event);
103     }
104 
notify_process_terminate(&self, pid: u64) -> ErrorCode105     pub(crate) fn notify_process_terminate(&self, pid: u64) -> ErrorCode {
106         let (tx, rx) = channel::<ErrorCode>();
107         let event = ClientEvent::Terminate(pid, tx);
108         if !self.send_event(event) {
109             return ErrorCode::Other;
110         }
111         let rx = Recv::new(rx);
112         match rx.get() {
113             Some(ret) => ret,
114             None => {
115                 error!("notify_process_terminate failed");
116                 ErrorCode::Other
117             }
118         }
119     }
120 
send_response( &self, tid: u32, version: String, status_code: u32, reason: String, headers: Headers, )121     pub(crate) fn send_response(
122         &self,
123         tid: u32,
124         version: String,
125         status_code: u32,
126         reason: String,
127         headers: Headers,
128     ) {
129         let event = ClientEvent::SendResponse(tid, version, status_code, reason, headers);
130         let _ = self.send_event(event);
131     }
132 
send_notify_data(&self, subscribe_type: SubscribeType, notify_data: NotifyData)133     pub(crate) fn send_notify_data(&self, subscribe_type: SubscribeType, notify_data: NotifyData) {
134         let event = ClientEvent::SendNotifyData(subscribe_type, notify_data);
135         let _ = self.send_event(event);
136     }
137 }
138 
139 // uid and token_id will be used later
140 pub(crate) struct Client {
141     pub(crate) pid: u64,
142     pub(crate) message_id: u32,
143     pub(crate) server_sock_fd: UnixDatagram,
144     pub(crate) client_sock_fd: UnixDatagram,
145     rx: UnboundedReceiver<ClientEvent>,
146 }
147 
148 impl Client {
constructor(pid: u64) -> Option<(UnboundedSender<ClientEvent>, i32)>149     pub(crate) fn constructor(pid: u64) -> Option<(UnboundedSender<ClientEvent>, i32)> {
150         let (tx, rx) = unbounded_channel();
151         let (server_sock_fd, client_sock_fd) = match UnixDatagram::pair() {
152             Ok((server_sock_fd, client_sock_fd)) => (server_sock_fd, client_sock_fd),
153             Err(err) => {
154                 error!("can't create a pair of sockets, {:?}", err);
155                 return None;
156             }
157         };
158         let client = Client {
159             pid,
160             message_id: 1,
161             server_sock_fd,
162             client_sock_fd,
163             rx,
164         };
165         let fd = client.client_sock_fd.as_raw_fd();
166         runtime_spawn(client.run());
167         Some((tx, fd))
168     }
169 
run(mut self)170     async fn run(mut self) {
171         loop {
172             // for one task, only send last progress message
173             let mut progress_index = HashMap::new();
174             let mut temp_notify_data: Vec<(SubscribeType, NotifyData)> = Vec::new();
175             let mut len = self.rx.len();
176             if len == 0 {
177                 len = 1;
178             }
179             for index in 0..len {
180                 let recv = match self.rx.recv().await {
181                     Ok(message) => message,
182                     Err(e) => {
183                         error!("ClientManager recv error {:?}", e);
184                         continue;
185                     }
186                 };
187                 match recv {
188                     ClientEvent::Shutdown => {
189                         let _ = self.client_sock_fd.shutdown(Shutdown::Both);
190                         let _ = self.server_sock_fd.shutdown(Shutdown::Both);
191                         self.rx.close();
192                         info!("client terminate, pid {}", self.pid);
193                         return;
194                     }
195                     ClientEvent::SendResponse(tid, version, status_code, reason, headers) => {
196                         self.handle_send_response(tid, version, status_code, reason, headers)
197                             .await;
198                     }
199                     ClientEvent::SendNotifyData(subscribe_type, notify_data) => {
200                         if subscribe_type == SubscribeType::Progress {
201                             progress_index.insert(notify_data.task_id, index);
202                         }
203                         temp_notify_data.push((subscribe_type, notify_data));
204                     }
205                     _ => {}
206                 }
207             }
208             for (index, (subscribe_type, notify_data)) in temp_notify_data.into_iter().enumerate() {
209                 if subscribe_type != SubscribeType::Progress
210                     || progress_index.get(&notify_data.task_id) == Some(&index)
211                 {
212                     self.handle_send_notify_data(subscribe_type, notify_data)
213                         .await;
214                 }
215             }
216             debug!("Client handle message done");
217         }
218     }
219 
handle_send_response( &mut self, tid: u32, version: String, status_code: u32, reason: String, headers: Headers, )220     async fn handle_send_response(
221         &mut self,
222         tid: u32,
223         version: String,
224         status_code: u32,
225         reason: String,
226         headers: Headers,
227     ) {
228         let mut response = Vec::<u8>::new();
229 
230         response.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes());
231 
232         response.extend_from_slice(&self.message_id.to_le_bytes());
233         self.message_id += 1;
234 
235         let message_type = MessageType::HttpResponse as u16;
236         response.extend_from_slice(&message_type.to_le_bytes());
237 
238         let message_body_size: u16 = 0;
239         response.extend_from_slice(&message_body_size.to_le_bytes());
240 
241         response.extend_from_slice(&tid.to_le_bytes());
242 
243         response.extend_from_slice(&version.into_bytes());
244         response.push(b'\0');
245 
246         response.extend_from_slice(&status_code.to_le_bytes());
247 
248         response.extend_from_slice(&reason.into_bytes());
249         response.push(b'\0');
250 
251         for (k, v) in headers {
252             response.extend_from_slice(k.as_bytes());
253             response.push(b':');
254             for (i, sub_value) in v.iter().enumerate() {
255                 if i != 0 {
256                     response.push(b',');
257                 }
258                 response.extend_from_slice(sub_value);
259             }
260             response.push(b'\n');
261         }
262 
263         let mut size = response.len() as u16;
264         if size > HEADERS_MAX_SIZE {
265             response.truncate(HEADERS_MAX_SIZE as usize);
266             size = HEADERS_MAX_SIZE;
267         }
268         debug!("send response size, {:?}", size);
269         let size = size.to_le_bytes();
270         response[POSITION_OF_LENGTH as usize] = size[0];
271         response[(POSITION_OF_LENGTH + 1) as usize] = size[1];
272 
273         self.send_message(response).await;
274     }
275 
handle_send_notify_data( &mut self, subscribe_type: SubscribeType, notify_data: NotifyData, )276     async fn handle_send_notify_data(
277         &mut self,
278         subscribe_type: SubscribeType,
279         notify_data: NotifyData,
280     ) {
281         let mut message = Vec::<u8>::new();
282 
283         message.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes());
284 
285         message.extend_from_slice(&self.message_id.to_le_bytes());
286         self.message_id += 1;
287 
288         let message_type = MessageType::NotifyData as u16;
289         message.extend_from_slice(&message_type.to_le_bytes());
290 
291         let message_body_size: u16 = 0;
292         message.extend_from_slice(&message_body_size.to_le_bytes());
293 
294         message.extend_from_slice(&(subscribe_type as u32).to_le_bytes());
295 
296         message.extend_from_slice(&notify_data.task_id.to_le_bytes());
297 
298         message.extend_from_slice(&(notify_data.progress.common_data.state as u32).to_le_bytes());
299 
300         let index = notify_data.progress.common_data.index;
301         message.extend_from_slice(&(index as u32).to_le_bytes());
302 
303         message.extend_from_slice(&(notify_data.progress.processed[index] as u64).to_le_bytes());
304 
305         message.extend_from_slice(
306             &(notify_data.progress.common_data.total_processed as u64).to_le_bytes(),
307         );
308 
309         message.extend_from_slice(&(notify_data.progress.sizes.len() as u32).to_le_bytes());
310         for size in notify_data.progress.sizes {
311             message.extend_from_slice(&size.to_le_bytes());
312         }
313 
314         message.extend_from_slice(&(notify_data.progress.extras.len() as u32).to_le_bytes());
315         for (key, value) in notify_data.progress.extras {
316             message.extend_from_slice(&key.into_bytes());
317             message.push(b'\0');
318             message.extend_from_slice(&value.into_bytes());
319             message.push(b'\0');
320         }
321 
322         message.extend_from_slice(&(notify_data.action.repr as u32).to_le_bytes());
323 
324         message.extend_from_slice(&(notify_data.version as u32).to_le_bytes());
325 
326         message.extend_from_slice(&(notify_data.each_file_status.len() as u32).to_le_bytes());
327         for status in notify_data.each_file_status {
328             message.extend_from_slice(&status.path.into_bytes());
329             message.push(b'\0');
330             message.extend_from_slice(&(status.reason.repr as u32).to_le_bytes());
331             message.extend_from_slice(&status.message.into_bytes());
332             message.push(b'\0');
333         }
334 
335         let size = message.len() as u16;
336         if subscribe_type == SubscribeType::Progress {
337             debug!(
338                 "send tid {} {:?} size {}",
339                 notify_data.task_id, subscribe_type, size
340             );
341         } else {
342             info!(
343                 "send tid {} {:?} size {}",
344                 notify_data.task_id, subscribe_type, size
345             );
346         }
347 
348         let size = size.to_le_bytes();
349         message[POSITION_OF_LENGTH as usize] = size[0];
350         message[(POSITION_OF_LENGTH + 1) as usize] = size[1];
351 
352         self.send_message(message).await;
353     }
354 
send_message(&mut self, message: Vec<u8>)355     async fn send_message(&mut self, message: Vec<u8>) {
356         let ret = self.server_sock_fd.send(&message).await;
357         match ret {
358             Ok(size) => {
359                 debug!("send message ok, pid: {}, size: {}", self.pid, size);
360                 let mut buf: [u8; 4] = [0; 4];
361                 let ret = self.server_sock_fd.recv(&mut buf).await;
362                 if let Err(e) = ret {
363                     error!("message len err, {:?}", e)
364                 }
365                 let len: u32 = u32::from_le_bytes(buf);
366                 if len != message.len() as u32 {
367                     error!("message len bad, send {:?}, recv {:?}", message.len(), len);
368                 } else {
369                     debug!("notify done, pid: {}", self.pid);
370                 }
371             }
372             Err(err) => {
373                 error!("message send error: {:?}", err);
374             }
375         }
376     }
377 }
378