1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "communicator_proxy.h"
16 #include "db_constant.h"
17 #include "db_common.h"
18 #include "db_dump_helper.h"
19 #include "log_print.h"
20 
21 namespace DistributedDB {
CommunicatorProxy()22 CommunicatorProxy::CommunicatorProxy() : mainComm_(nullptr)
23 {
24 }
25 
~CommunicatorProxy()26 CommunicatorProxy::~CommunicatorProxy()
27 {
28     if (mainComm_ != nullptr) {
29         RefObject::DecObjRef(mainComm_);
30     }
31     mainComm_ = nullptr;
32 
33     std::lock_guard lock(devCommMapLock_);
34     for (const auto &iter : devCommMap_) {
35         RefObject::DecObjRef(devCommMap_[iter.first].second);
36     }
37     devCommMap_.clear();
38 }
39 
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)40 int CommunicatorProxy::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
41 {
42     if (mainComm_ != nullptr) {
43         (void) mainComm_->RegOnMessageCallback(onMessage, inOper);
44     }
45 
46     std::lock_guard lock(devCommMapLock_);
47     for (const auto &iter : devCommMap_) {
48         (void) devCommMap_[iter.first].second->RegOnMessageCallback(onMessage, inOper);
49     }
50     return E_OK;
51 }
52 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)53 int CommunicatorProxy::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
54 {
55     if (mainComm_ != nullptr) {
56         (void) mainComm_->RegOnConnectCallback(onConnect, inOper);
57     }
58 
59     std::lock_guard lock(devCommMapLock_);
60     for (const auto &iter : devCommMap_) {
61         (void) devCommMap_[iter.first].second->RegOnConnectCallback(onConnect, inOper);
62     }
63 
64     return E_OK;
65 }
66 
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)67 int CommunicatorProxy::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
68 {
69     if (mainComm_ != nullptr) {
70         (void) mainComm_->RegOnSendableCallback(onSendable, inOper);
71     }
72 
73     std::lock_guard lock(devCommMapLock_);
74     for (const auto &iter : devCommMap_) {
75         (void) devCommMap_[iter.first].second->RegOnSendableCallback(onSendable, inOper);
76     }
77 
78     return E_OK;
79 }
80 
Activate()81 void CommunicatorProxy::Activate()
82 {
83     if (mainComm_ != nullptr) {
84         mainComm_->Activate();
85     }
86 
87     // use temp map to avoid active in lock
88     std::map<std::string, ICommunicator *> tempMap;
89     {
90         std::lock_guard lock(devCommMapLock_);
91         for (const auto &iter : devCommMap_) {
92             tempMap[iter.first] = devCommMap_[iter.first].second;
93             RefObject::IncObjRef(devCommMap_[iter.first].second);
94         }
95     }
96 
97     for (const auto &iter : tempMap) {
98         tempMap[iter.first]->Activate();
99         RefObject::DecObjRef(tempMap[iter.first]);
100     }
101 }
102 
GetCommunicatorMtuSize() const103 uint32_t CommunicatorProxy::GetCommunicatorMtuSize() const
104 {
105     if (mainComm_ == nullptr) {
106         return DBConstant::MIN_MTU_SIZE;
107     }
108     return mainComm_->GetCommunicatorMtuSize();
109 }
110 
GetCommunicatorMtuSize(const std::string & target) const111 uint32_t CommunicatorProxy::GetCommunicatorMtuSize(const std::string &target) const
112 {
113     ICommunicator *targetCommunicator = nullptr;
114     {
115         std::lock_guard<std::mutex> lock(devCommMapLock_);
116         if (devCommMap_.count(target) != 0) {
117             targetCommunicator = devCommMap_.at(target).second;
118             RefObject::IncObjRef(targetCommunicator);
119         }
120     }
121     if (targetCommunicator != nullptr) {
122         uint32_t mtuSize = targetCommunicator->GetCommunicatorMtuSize(target);
123         RefObject::DecObjRef(targetCommunicator);
124         return mtuSize;
125     }
126 
127     if (mainComm_ != nullptr) {
128         return mainComm_->GetCommunicatorMtuSize(target);
129     }
130 
131     return DBConstant::MIN_MTU_SIZE;
132 }
133 
GetTimeout() const134 uint32_t CommunicatorProxy::GetTimeout() const
135 {
136     if (mainComm_ == nullptr) {
137         return DBConstant::MIN_TIMEOUT;
138     }
139     return mainComm_->GetTimeout();
140 }
141 
GetTimeout(const std::string & target) const142 uint32_t CommunicatorProxy::GetTimeout(const std::string &target) const
143 {
144     ICommunicator *targetCommunicator = nullptr;
145     {
146         std::lock_guard<std::mutex> lock(devCommMapLock_);
147         if (devCommMap_.count(target) != 0) {
148             targetCommunicator = devCommMap_.at(target).second;
149             RefObject::IncObjRef(targetCommunicator);
150         }
151     }
152     if (targetCommunicator != nullptr) {
153         uint32_t timeout = targetCommunicator->GetTimeout(target);
154         RefObject::DecObjRef(targetCommunicator);
155         return timeout;
156     }
157 
158     if (mainComm_ != nullptr) {
159         return mainComm_->GetTimeout(target);
160     }
161 
162     return DBConstant::MIN_TIMEOUT;
163 }
164 
IsDeviceOnline(const std::string & device) const165 bool CommunicatorProxy::IsDeviceOnline(const std::string &device) const
166 {
167     return mainComm_->IsDeviceOnline(device);
168 }
169 
GetLocalIdentity(std::string & outTarget) const170 int CommunicatorProxy::GetLocalIdentity(std::string &outTarget) const
171 {
172     return mainComm_->GetLocalIdentity(outTarget);
173 }
174 
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const175 int CommunicatorProxy::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
176 {
177     ICommunicator *targetCommunicator = nullptr;
178     {
179         std::lock_guard<std::mutex> lock(devCommMapLock_);
180         if (devCommMap_.count(target) != 0) {
181             targetCommunicator = devCommMap_.at(target).second;
182             RefObject::IncObjRef(targetCommunicator);
183         }
184     }
185     if (targetCommunicator != nullptr) {
186         int errCode = targetCommunicator->GetRemoteCommunicatorVersion(target, outVersion);
187         RefObject::DecObjRef(targetCommunicator);
188         return errCode;
189     }
190 
191     if (mainComm_ != nullptr) {
192         return mainComm_->GetRemoteCommunicatorVersion(target, outVersion);
193     }
194 
195     return -E_NOT_INIT;
196 }
197 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config)198 int CommunicatorProxy::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config)
199 {
200     return SendMessage(dstTarget, inMsg, config, nullptr);
201 }
202 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config,const OnSendEnd & onEnd)203 int CommunicatorProxy::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
204     const OnSendEnd &onEnd)
205 {
206     ICommunicator *targetCommunicator = nullptr;
207     {
208         std::lock_guard<std::mutex> lock(devCommMapLock_);
209         if (devCommMap_.count(dstTarget) != 0) {
210             targetCommunicator = devCommMap_[dstTarget].second;
211             RefObject::IncObjRef(targetCommunicator);
212         }
213     }
214     if (targetCommunicator != nullptr) {
215         LOGD("[CommProxy] use equal label to send data");
216         int errCode = targetCommunicator->SendMessage(dstTarget, inMsg, config, onEnd);
217         RefObject::DecObjRef(targetCommunicator);
218         return errCode;
219     }
220 
221     if (mainComm_ != nullptr) {
222         return mainComm_->SendMessage(dstTarget, inMsg, config, onEnd);
223     }
224 
225     return -E_NOT_INIT;
226 }
227 
SetMainCommunicator(ICommunicator * communicator)228 void CommunicatorProxy::SetMainCommunicator(ICommunicator *communicator)
229 {
230     mainComm_ = communicator;
231     RefObject::IncObjRef(mainComm_);
232 }
233 
SetEqualCommunicator(ICommunicator * communicator,const std::string & identifier,const std::vector<std::string> & targets)234 void CommunicatorProxy::SetEqualCommunicator(ICommunicator *communicator, const std::string &identifier,
235     const std::vector<std::string> &targets)
236 {
237     std::lock_guard<std::mutex> lock(devCommMapLock_);
238     // Clear offline target
239     for (auto dev = devCommMap_.begin(); dev != devCommMap_.end();) {
240         if (identifier != dev->second.first) {
241             dev++;
242             continue;
243         }
244         auto iter = std::find_if(targets.begin(), targets.end(),
245             [&dev](const std::string &target) {
246                 return target == dev->first;
247             });
248         if (iter == targets.end()) {
249             RefObject::DecObjRef(devCommMap_[dev->first].second);
250             dev = devCommMap_.erase(dev);
251             continue;
252         }
253         dev++;
254     }
255 
256     // Add new online target
257     for (const auto &target : targets) {
258         if (devCommMap_.count(target) != 0) {
259             // change the identifier and dev relation
260             RefObject::DecObjRef(devCommMap_[target].second);
261         }
262         RefObject::IncObjRef(communicator);
263         devCommMap_[target] = {identifier, communicator};
264     }
265 }
266 
Dump(int fd)267 void CommunicatorProxy::Dump(int fd)
268 {
269     std::lock_guard<std::mutex> lock(devCommMapLock_);
270     for (const auto &[target, communicator] : devCommMap_) {
271         std::string label = DBCommon::TransferStringToHex(communicator.first);
272         DBDumpHelper::Dump(fd, "\t\ttarget = %s, label = %s\n", target.c_str(), label.c_str());
273     }
274 }
275 } // namespace DistributedDB