1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::error::Error;
15 use std::io::{Read, Write};
16 use std::mem::take;
17 use std::sync::{Arc, Mutex};
18 
19 use ylong_http::request::uri::Uri;
20 
21 use crate::error::{ErrorKind, HttpClientError};
22 use crate::sync_impl::Connector;
23 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher};
24 use crate::util::pool::{Pool, PoolKey};
25 
26 pub(crate) struct ConnPool<C, S> {
27     pool: Pool<PoolKey, Conns<S>>,
28     connector: Arc<C>,
29 }
30 
31 impl<C: Connector> ConnPool<C, C::Stream> {
new(connector: C) -> Self32     pub(crate) fn new(connector: C) -> Self {
33         Self {
34             pool: Pool::new(),
35             connector: Arc::new(connector),
36         }
37     }
38 
connect_to(&self, uri: Uri) -> Result<Conn<C::Stream>, HttpClientError>39     pub(crate) fn connect_to(&self, uri: Uri) -> Result<Conn<C::Stream>, HttpClientError> {
40         let key = PoolKey::new(
41             uri.scheme().unwrap().clone(),
42             uri.authority().unwrap().clone(),
43         );
44 
45         self.pool
46             .get(key, Conns::new)
47             .conn(|| self.connector.clone().connect(&uri))
48     }
49 }
50 
51 pub(crate) struct Conns<S> {
52     list: Arc<Mutex<Vec<ConnDispatcher<S>>>>,
53 }
54 
55 impl<S> Conns<S> {
new() -> Self56     fn new() -> Self {
57         Self {
58             list: Arc::new(Mutex::new(Vec::new())),
59         }
60     }
61 }
62 
63 impl<S> Clone for Conns<S> {
clone(&self) -> Self64     fn clone(&self) -> Self {
65         Self {
66             list: self.list.clone(),
67         }
68     }
69 }
70 
71 impl<S: Read + Write + 'static> Conns<S> {
conn<F, E>(&self, connect_fn: F) -> Result<Conn<S>, HttpClientError> where F: FnOnce() -> Result<S, E>, E: Into<Box<dyn Error + Send + Sync>>,72     fn conn<F, E>(&self, connect_fn: F) -> Result<Conn<S>, HttpClientError>
73     where
74         F: FnOnce() -> Result<S, E>,
75         E: Into<Box<dyn Error + Send + Sync>>,
76     {
77         let mut list = self.list.lock().unwrap();
78         let mut conn = None;
79         let curr = take(&mut *list);
80         for dispatcher in curr.into_iter() {
81             // Discard invalid dispatchers.
82             if dispatcher.is_shutdown() {
83                 continue;
84             }
85             if conn.is_none() {
86                 conn = dispatcher.dispatch();
87             }
88             list.push(dispatcher);
89         }
90 
91         if let Some(conn) = conn {
92             Ok(conn)
93         } else {
94             let dispatcher = ConnDispatcher::http1(
95                 connect_fn().map_err(|e| HttpClientError::from_error(ErrorKind::Connect, e))?,
96             );
97             // We must be able to get the `Conn` here.
98             let conn = dispatcher.dispatch().unwrap();
99             list.push(dispatcher);
100             Ok(conn)
101         }
102     }
103 }
104