1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::error::Error; 15 use std::io::{Read, Write}; 16 use std::mem::take; 17 use std::sync::{Arc, Mutex}; 18 19 use ylong_http::request::uri::Uri; 20 21 use crate::error::{ErrorKind, HttpClientError}; 22 use crate::sync_impl::Connector; 23 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher}; 24 use crate::util::pool::{Pool, PoolKey}; 25 26 pub(crate) struct ConnPool<C, S> { 27 pool: Pool<PoolKey, Conns<S>>, 28 connector: Arc<C>, 29 } 30 31 impl<C: Connector> ConnPool<C, C::Stream> { new(connector: C) -> Self32 pub(crate) fn new(connector: C) -> Self { 33 Self { 34 pool: Pool::new(), 35 connector: Arc::new(connector), 36 } 37 } 38 connect_to(&self, uri: Uri) -> Result<Conn<C::Stream>, HttpClientError>39 pub(crate) fn connect_to(&self, uri: Uri) -> Result<Conn<C::Stream>, HttpClientError> { 40 let key = PoolKey::new( 41 uri.scheme().unwrap().clone(), 42 uri.authority().unwrap().clone(), 43 ); 44 45 self.pool 46 .get(key, Conns::new) 47 .conn(|| self.connector.clone().connect(&uri)) 48 } 49 } 50 51 pub(crate) struct Conns<S> { 52 list: Arc<Mutex<Vec<ConnDispatcher<S>>>>, 53 } 54 55 impl<S> Conns<S> { new() -> Self56 fn new() -> Self { 57 Self { 58 list: Arc::new(Mutex::new(Vec::new())), 59 } 60 } 61 } 62 63 impl<S> Clone for Conns<S> { clone(&self) -> Self64 fn clone(&self) -> Self { 65 Self { 66 list: self.list.clone(), 67 } 68 } 69 } 70 71 impl<S: Read + Write + 'static> Conns<S> { conn<F, E>(&self, connect_fn: F) -> Result<Conn<S>, HttpClientError> where F: FnOnce() -> Result<S, E>, E: Into<Box<dyn Error + Send + Sync>>,72 fn conn<F, E>(&self, connect_fn: F) -> Result<Conn<S>, HttpClientError> 73 where 74 F: FnOnce() -> Result<S, E>, 75 E: Into<Box<dyn Error + Send + Sync>>, 76 { 77 let mut list = self.list.lock().unwrap(); 78 let mut conn = None; 79 let curr = take(&mut *list); 80 for dispatcher in curr.into_iter() { 81 // Discard invalid dispatchers. 82 if dispatcher.is_shutdown() { 83 continue; 84 } 85 if conn.is_none() { 86 conn = dispatcher.dispatch(); 87 } 88 list.push(dispatcher); 89 } 90 91 if let Some(conn) = conn { 92 Ok(conn) 93 } else { 94 let dispatcher = ConnDispatcher::http1( 95 connect_fn().map_err(|e| HttpClientError::from_error(ErrorKind::Connect, e))?, 96 ); 97 // We must be able to get the `Conn` here. 98 let conn = dispatcher.dispatch().unwrap(); 99 list.push(dispatcher); 100 Ok(conn) 101 } 102 } 103 } 104