1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::mem::take;
15 use std::sync::{Arc, Mutex};
16 
17 #[cfg(feature = "http2")]
18 use ylong_http::request::uri::Scheme;
19 use ylong_http::request::uri::Uri;
20 
21 use crate::async_impl::connector::ConnInfo;
22 use crate::async_impl::Connector;
23 use crate::error::HttpClientError;
24 use crate::runtime::{AsyncRead, AsyncWrite};
25 #[cfg(feature = "http2")]
26 use crate::util::config::H2Config;
27 use crate::util::config::{HttpConfig, HttpVersion};
28 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher};
29 use crate::util::pool::{Pool, PoolKey};
30 
31 pub(crate) struct ConnPool<C, S> {
32     pool: Pool<PoolKey, Conns<S>>,
33     connector: Arc<C>,
34     config: HttpConfig,
35 }
36 
37 impl<C: Connector> ConnPool<C, C::Stream> {
new(config: HttpConfig, connector: C) -> Self38     pub(crate) fn new(config: HttpConfig, connector: C) -> Self {
39         Self {
40             pool: Pool::new(),
41             connector: Arc::new(connector),
42             config,
43         }
44     }
45 
connect_to(&self, uri: &Uri) -> Result<Conn<C::Stream>, HttpClientError>46     pub(crate) async fn connect_to(&self, uri: &Uri) -> Result<Conn<C::Stream>, HttpClientError> {
47         let key = PoolKey::new(
48             uri.scheme().unwrap().clone(),
49             uri.authority().unwrap().clone(),
50         );
51 
52         self.pool
53             .get(key, Conns::new)
54             .conn(self.config.clone(), self.connector.clone(), uri)
55             .await
56     }
57 }
58 
59 pub(crate) struct Conns<S> {
60     list: Arc<Mutex<Vec<ConnDispatcher<S>>>>,
61     #[cfg(feature = "http2")]
62     h2_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>,
63 }
64 
65 impl<S> Conns<S> {
new() -> Self66     fn new() -> Self {
67         Self {
68             list: Arc::new(Mutex::new(Vec::new())),
69 
70             #[cfg(feature = "http2")]
71             h2_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))),
72         }
73     }
74 }
75 
76 impl<S> Clone for Conns<S> {
clone(&self) -> Self77     fn clone(&self) -> Self {
78         Self {
79             list: self.list.clone(),
80 
81             #[cfg(feature = "http2")]
82             h2_conn: self.h2_conn.clone(),
83         }
84     }
85 }
86 
87 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Send + Sync + 'static> Conns<S> {
conn<C>( &mut self, config: HttpConfig, connector: Arc<C>, url: &Uri, ) -> Result<Conn<S>, HttpClientError> where C: Connector<Stream = S>,88     async fn conn<C>(
89         &mut self,
90         config: HttpConfig,
91         connector: Arc<C>,
92         url: &Uri,
93     ) -> Result<Conn<S>, HttpClientError>
94     where
95         C: Connector<Stream = S>,
96     {
97         match config.version {
98             #[cfg(feature = "http2")]
99             HttpVersion::Http2 => self.conn_h2(connector, url, config.http2_config).await,
100             #[cfg(feature = "http1_1")]
101             HttpVersion::Http1 => self.conn_h1(connector, url).await,
102             HttpVersion::Negotiate => {
103                 #[cfg(all(feature = "http1_1", not(feature = "http2")))]
104                 return self.conn_h1(connector, url).await;
105 
106                 #[cfg(all(feature = "http2", feature = "http1_1"))]
107                 return self
108                     .conn_negotiate(connector, url, config.http2_config)
109                     .await;
110             }
111         }
112     }
113 
conn_h1<C>(&self, connector: Arc<C>, url: &Uri) -> Result<Conn<S>, HttpClientError> where C: Connector<Stream = S>,114     async fn conn_h1<C>(&self, connector: Arc<C>, url: &Uri) -> Result<Conn<S>, HttpClientError>
115     where
116         C: Connector<Stream = S>,
117     {
118         if let Some(conn) = self.exist_h1_conn() {
119             return Ok(conn);
120         }
121         let dispatcher = ConnDispatcher::http1(connector.connect(url).await?);
122         Ok(self.dispatch_h1_conn(dispatcher))
123     }
124 
125     #[cfg(feature = "http2")]
conn_h2<C>( &self, connector: Arc<C>, url: &Uri, config: H2Config, ) -> Result<Conn<S>, HttpClientError> where C: Connector<Stream = S>,126     async fn conn_h2<C>(
127         &self,
128         connector: Arc<C>,
129         url: &Uri,
130         config: H2Config,
131     ) -> Result<Conn<S>, HttpClientError>
132     where
133         C: Connector<Stream = S>,
134     {
135         // The lock `h2_occupation` is used to prevent multiple coroutines from sending
136         // Requests at the same time under concurrent conditions,
137         // resulting in the creation of multiple tcp connections
138         let mut lock = self.h2_conn.lock().await;
139 
140         if let Some(conn) = Self::exist_h2_conn(&mut lock) {
141             return Ok(conn);
142         }
143         let stream = connector.connect(url).await?;
144         let details = stream.conn_detail();
145         let tls = if let Some(scheme) = url.scheme() {
146             *scheme == Scheme::HTTPS
147         } else {
148             false
149         };
150         match details.alpn() {
151             None if tls => return err_from_msg!(Connect, "The peer does not support http/2."),
152             Some(protocol) if protocol != b"h2" => {
153                 return err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.")
154             }
155             _ => {}
156         }
157 
158         Ok(Self::dispatch_h2_conn(config, stream, &mut lock))
159     }
160 
161     #[cfg(all(feature = "http2", feature = "http1_1"))]
conn_negotiate<C>( &self, connector: Arc<C>, url: &Uri, config: H2Config, ) -> Result<Conn<S>, HttpClientError> where C: Connector<Stream = S>,162     async fn conn_negotiate<C>(
163         &self,
164         connector: Arc<C>,
165         url: &Uri,
166         config: H2Config,
167     ) -> Result<Conn<S>, HttpClientError>
168     where
169         C: Connector<Stream = S>,
170     {
171         match *url.scheme().unwrap() {
172             Scheme::HTTPS => {
173                 let mut lock = self.h2_conn.lock().await;
174 
175                 if let Some(conn) = Self::exist_h2_conn(&mut lock) {
176                     return Ok(conn);
177                 }
178 
179                 if let Some(conn) = self.exist_h1_conn() {
180                     return Ok(conn);
181                 }
182 
183                 let stream = connector.connect(url).await?;
184                 let details = stream.conn_detail();
185 
186                 let protocol = if let Some(bytes) = details.alpn() {
187                     bytes
188                 } else {
189                     let dispatcher = ConnDispatcher::http1(stream);
190                     return Ok(self.dispatch_h1_conn(dispatcher));
191                 };
192 
193                 if protocol == b"http/1.1" {
194                     let dispatcher = ConnDispatcher::http1(stream);
195                     Ok(self.dispatch_h1_conn(dispatcher))
196                 } else if protocol == b"h2" {
197                     Ok(Self::dispatch_h2_conn(config, stream, &mut lock))
198                 } else {
199                     err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.")
200                 }
201             }
202             Scheme::HTTP => self.conn_h1(connector, url).await,
203         }
204     }
205 
dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>) -> Conn<S>206     fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>) -> Conn<S> {
207         // We must be able to get the `Conn` here.
208         let conn = dispatcher.dispatch().unwrap();
209         let mut list = self.list.lock().unwrap();
210         list.push(dispatcher);
211 
212         conn
213     }
214 
215     #[cfg(feature = "http2")]
dispatch_h2_conn( config: H2Config, stream: S, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Conn<S>216     fn dispatch_h2_conn(
217         config: H2Config,
218         stream: S,
219         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
220     ) -> Conn<S> {
221         let dispatcher = ConnDispatcher::http2(config, stream);
222         let conn = dispatcher.dispatch().unwrap();
223         lock.push(dispatcher);
224         conn
225     }
226 
exist_h1_conn(&self) -> Option<Conn<S>>227     fn exist_h1_conn(&self) -> Option<Conn<S>> {
228         let mut list = self.list.lock().unwrap();
229         let mut conn = None;
230         let curr = take(&mut *list);
231         // TODO Distinguish between http2 connections and http1 connections.
232         for dispatcher in curr.into_iter() {
233             // Discard invalid dispatchers.
234             if dispatcher.is_shutdown() {
235                 continue;
236             }
237             if conn.is_none() {
238                 conn = dispatcher.dispatch();
239             }
240             list.push(dispatcher);
241         }
242         conn
243     }
244 
245     #[cfg(feature = "http2")]
exist_h2_conn( lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Option<Conn<S>>246     fn exist_h2_conn(
247         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
248     ) -> Option<Conn<S>> {
249         if let Some(dispatcher) = lock.pop() {
250             if !dispatcher.is_shutdown() {
251                 if let Some(conn) = dispatcher.dispatch() {
252                     lock.push(dispatcher);
253                     return Some(conn);
254                 }
255             }
256         }
257         None
258     }
259 }
260