1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "communicator_proxy.h"
16 #include "db_constant.h"
17 #include "db_common.h"
18 #include "db_dump_helper.h"
19 #include "log_print.h"
20
21 namespace DistributedDB {
CommunicatorProxy()22 CommunicatorProxy::CommunicatorProxy() : mainComm_(nullptr)
23 {
24 }
25
~CommunicatorProxy()26 CommunicatorProxy::~CommunicatorProxy()
27 {
28 if (mainComm_ != nullptr) {
29 RefObject::DecObjRef(mainComm_);
30 }
31 mainComm_ = nullptr;
32
33 std::lock_guard lock(devCommMapLock_);
34 for (const auto &iter : devCommMap_) {
35 RefObject::DecObjRef(devCommMap_[iter.first].second);
36 }
37 devCommMap_.clear();
38 }
39
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)40 int CommunicatorProxy::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
41 {
42 if (mainComm_ != nullptr) {
43 (void) mainComm_->RegOnMessageCallback(onMessage, inOper);
44 }
45
46 std::lock_guard lock(devCommMapLock_);
47 for (const auto &iter : devCommMap_) {
48 (void) devCommMap_[iter.first].second->RegOnMessageCallback(onMessage, inOper);
49 }
50 return E_OK;
51 }
52
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)53 int CommunicatorProxy::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
54 {
55 if (mainComm_ != nullptr) {
56 (void) mainComm_->RegOnConnectCallback(onConnect, inOper);
57 }
58
59 std::lock_guard lock(devCommMapLock_);
60 for (const auto &iter : devCommMap_) {
61 (void) devCommMap_[iter.first].second->RegOnConnectCallback(onConnect, inOper);
62 }
63
64 return E_OK;
65 }
66
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)67 int CommunicatorProxy::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
68 {
69 if (mainComm_ != nullptr) {
70 (void) mainComm_->RegOnSendableCallback(onSendable, inOper);
71 }
72
73 std::lock_guard lock(devCommMapLock_);
74 for (const auto &iter : devCommMap_) {
75 (void) devCommMap_[iter.first].second->RegOnSendableCallback(onSendable, inOper);
76 }
77
78 return E_OK;
79 }
80
Activate()81 void CommunicatorProxy::Activate()
82 {
83 if (mainComm_ != nullptr) {
84 mainComm_->Activate();
85 }
86
87 // use temp map to avoid active in lock
88 std::map<std::string, ICommunicator *> tempMap;
89 {
90 std::lock_guard lock(devCommMapLock_);
91 for (const auto &iter : devCommMap_) {
92 tempMap[iter.first] = devCommMap_[iter.first].second;
93 RefObject::IncObjRef(devCommMap_[iter.first].second);
94 }
95 }
96
97 for (const auto &iter : tempMap) {
98 tempMap[iter.first]->Activate();
99 RefObject::DecObjRef(tempMap[iter.first]);
100 }
101 }
102
GetCommunicatorMtuSize() const103 uint32_t CommunicatorProxy::GetCommunicatorMtuSize() const
104 {
105 if (mainComm_ == nullptr) {
106 return DBConstant::MIN_MTU_SIZE;
107 }
108 return mainComm_->GetCommunicatorMtuSize();
109 }
110
GetCommunicatorMtuSize(const std::string & target) const111 uint32_t CommunicatorProxy::GetCommunicatorMtuSize(const std::string &target) const
112 {
113 ICommunicator *targetCommunicator = nullptr;
114 {
115 std::lock_guard<std::mutex> lock(devCommMapLock_);
116 if (devCommMap_.count(target) != 0) {
117 targetCommunicator = devCommMap_.at(target).second;
118 RefObject::IncObjRef(targetCommunicator);
119 }
120 }
121 if (targetCommunicator != nullptr) {
122 uint32_t mtuSize = targetCommunicator->GetCommunicatorMtuSize(target);
123 RefObject::DecObjRef(targetCommunicator);
124 return mtuSize;
125 }
126
127 if (mainComm_ != nullptr) {
128 return mainComm_->GetCommunicatorMtuSize(target);
129 }
130
131 return DBConstant::MIN_MTU_SIZE;
132 }
133
GetTimeout() const134 uint32_t CommunicatorProxy::GetTimeout() const
135 {
136 if (mainComm_ == nullptr) {
137 return DBConstant::MIN_TIMEOUT;
138 }
139 return mainComm_->GetTimeout();
140 }
141
GetTimeout(const std::string & target) const142 uint32_t CommunicatorProxy::GetTimeout(const std::string &target) const
143 {
144 ICommunicator *targetCommunicator = nullptr;
145 {
146 std::lock_guard<std::mutex> lock(devCommMapLock_);
147 if (devCommMap_.count(target) != 0) {
148 targetCommunicator = devCommMap_.at(target).second;
149 RefObject::IncObjRef(targetCommunicator);
150 }
151 }
152 if (targetCommunicator != nullptr) {
153 uint32_t timeout = targetCommunicator->GetTimeout(target);
154 RefObject::DecObjRef(targetCommunicator);
155 return timeout;
156 }
157
158 if (mainComm_ != nullptr) {
159 return mainComm_->GetTimeout(target);
160 }
161
162 return DBConstant::MIN_TIMEOUT;
163 }
164
IsDeviceOnline(const std::string & device) const165 bool CommunicatorProxy::IsDeviceOnline(const std::string &device) const
166 {
167 return mainComm_->IsDeviceOnline(device);
168 }
169
GetLocalIdentity(std::string & outTarget) const170 int CommunicatorProxy::GetLocalIdentity(std::string &outTarget) const
171 {
172 return mainComm_->GetLocalIdentity(outTarget);
173 }
174
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const175 int CommunicatorProxy::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
176 {
177 ICommunicator *targetCommunicator = nullptr;
178 {
179 std::lock_guard<std::mutex> lock(devCommMapLock_);
180 if (devCommMap_.count(target) != 0) {
181 targetCommunicator = devCommMap_.at(target).second;
182 RefObject::IncObjRef(targetCommunicator);
183 }
184 }
185 if (targetCommunicator != nullptr) {
186 int errCode = targetCommunicator->GetRemoteCommunicatorVersion(target, outVersion);
187 RefObject::DecObjRef(targetCommunicator);
188 return errCode;
189 }
190
191 if (mainComm_ != nullptr) {
192 return mainComm_->GetRemoteCommunicatorVersion(target, outVersion);
193 }
194
195 return -E_NOT_INIT;
196 }
197
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config)198 int CommunicatorProxy::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config)
199 {
200 return SendMessage(dstTarget, inMsg, config, nullptr);
201 }
202
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config,const OnSendEnd & onEnd)203 int CommunicatorProxy::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
204 const OnSendEnd &onEnd)
205 {
206 ICommunicator *targetCommunicator = nullptr;
207 {
208 std::lock_guard<std::mutex> lock(devCommMapLock_);
209 if (devCommMap_.count(dstTarget) != 0) {
210 targetCommunicator = devCommMap_[dstTarget].second;
211 RefObject::IncObjRef(targetCommunicator);
212 }
213 }
214 if (targetCommunicator != nullptr) {
215 LOGD("[CommProxy] use equal label to send data");
216 int errCode = targetCommunicator->SendMessage(dstTarget, inMsg, config, onEnd);
217 RefObject::DecObjRef(targetCommunicator);
218 return errCode;
219 }
220
221 if (mainComm_ != nullptr) {
222 return mainComm_->SendMessage(dstTarget, inMsg, config, onEnd);
223 }
224
225 return -E_NOT_INIT;
226 }
227
SetMainCommunicator(ICommunicator * communicator)228 void CommunicatorProxy::SetMainCommunicator(ICommunicator *communicator)
229 {
230 mainComm_ = communicator;
231 RefObject::IncObjRef(mainComm_);
232 }
233
SetEqualCommunicator(ICommunicator * communicator,const std::string & identifier,const std::vector<std::string> & targets)234 void CommunicatorProxy::SetEqualCommunicator(ICommunicator *communicator, const std::string &identifier,
235 const std::vector<std::string> &targets)
236 {
237 std::lock_guard<std::mutex> lock(devCommMapLock_);
238 // Clear offline target
239 for (auto dev = devCommMap_.begin(); dev != devCommMap_.end();) {
240 if (identifier != dev->second.first) {
241 dev++;
242 continue;
243 }
244 auto iter = std::find_if(targets.begin(), targets.end(),
245 [&dev](const std::string &target) {
246 return target == dev->first;
247 });
248 if (iter == targets.end()) {
249 RefObject::DecObjRef(devCommMap_[dev->first].second);
250 dev = devCommMap_.erase(dev);
251 continue;
252 }
253 dev++;
254 }
255
256 // Add new online target
257 for (const auto &target : targets) {
258 if (devCommMap_.count(target) != 0) {
259 // change the identifier and dev relation
260 RefObject::DecObjRef(devCommMap_[target].second);
261 }
262 RefObject::IncObjRef(communicator);
263 devCommMap_[target] = {identifier, communicator};
264 }
265 }
266
Dump(int fd)267 void CommunicatorProxy::Dump(int fd)
268 {
269 std::lock_guard<std::mutex> lock(devCommMapLock_);
270 for (const auto &[target, communicator] : devCommMap_) {
271 std::string label = DBCommon::TransferStringToHex(communicator.first);
272 DBDumpHelper::Dump(fd, "\t\ttarget = %s, label = %s\n", target.c_str(), label.c_str());
273 }
274 }
275 } // namespace DistributedDB