1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communicator.h"
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "protocol_proto.h"
20
21 namespace DistributedDB {
Communicator(CommunicatorAggregator * inCommAggregator,const LabelType & inLabel)22 Communicator::Communicator(CommunicatorAggregator *inCommAggregator, const LabelType &inLabel)
23 : commAggrHandle_(inCommAggregator), commLabel_(inLabel)
24 {
25 RefObject::IncObjRef(commAggrHandle_); // Rely on CommunicatorAggregator, hold its reference.
26 }
27
~Communicator()28 Communicator:: ~Communicator()
29 {
30 RefObject::DecObjRef(commAggrHandle_); // Communicator no longer hold the reference of CommunicatorAggregator.
31 onMessageHandle_ = nullptr;
32 onConnectHandle_ = nullptr;
33 onSendableHandle_ = nullptr;
34 commAggrHandle_ = nullptr;
35 }
36
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)37 int Communicator::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
38 {
39 std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
40 return RegCallBack(onMessage, onMessageHandle_, inOper, onMessageFinalizer_);
41 }
42
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)43 int Communicator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
44 {
45 std::lock_guard<std::mutex> connectHandleLockGuard(connectHandleMutex_);
46 int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
47 if (onConnect && errCode == E_OK) {
48 // Register action and success
49 for (auto &entry : onlineTargets_) {
50 LOGI("[Comm][RegConnect] Label=%.3s, online target=%s{private}.", VEC_TO_STR(commLabel_), entry.c_str());
51 onConnectHandle_(entry, true);
52 }
53 }
54 return errCode;
55 }
56
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)57 int Communicator::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
58 {
59 std::lock_guard<std::mutex> sendableHandleLockGuard(sendableHandleMutex_);
60 return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
61 }
62
Activate()63 void Communicator::Activate()
64 {
65 commAggrHandle_->ActivateCommunicator(commLabel_);
66 }
67
GetCommunicatorMtuSize() const68 uint32_t Communicator::GetCommunicatorMtuSize() const
69 {
70 return commAggrHandle_->GetCommunicatorAggregatorMtuSize();
71 }
72
GetCommunicatorMtuSize(const std::string & target) const73 uint32_t Communicator::GetCommunicatorMtuSize(const std::string &target) const
74 {
75 return commAggrHandle_->GetCommunicatorAggregatorMtuSize(target);
76 }
77
GetLocalIdentity(std::string & outTarget) const78 int Communicator::GetLocalIdentity(std::string &outTarget) const
79 {
80 return commAggrHandle_->GetLocalIdentity(outTarget);
81 }
82
GetTimeout() const83 uint32_t Communicator::GetTimeout() const
84 {
85 return commAggrHandle_->GetCommunicatorAggregatorTimeout();
86 }
87
GetTimeout(const std::string & target) const88 uint32_t Communicator::GetTimeout(const std::string &target) const
89 {
90 return commAggrHandle_->GetCommunicatorAggregatorTimeout(target);
91 }
92
IsDeviceOnline(const std::string & device) const93 bool Communicator::IsDeviceOnline(const std::string &device) const
94 {
95 return commAggrHandle_->IsDeviceOnline(device);
96 }
97
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config)98 int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config)
99 {
100 return SendMessage(dstTarget, inMsg, config, nullptr);
101 }
102
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config,const OnSendEnd & onEnd)103 int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
104 const OnSendEnd &onEnd)
105 {
106 if (dstTarget.empty() || inMsg == nullptr) {
107 return -E_INVALID_ARGS;
108 }
109 std::shared_ptr<ExtendHeaderHandle> extendHandle = nullptr;
110 if (config.isNeedExtendHead) {
111 extendHandle = commAggrHandle_->GetExtendHeaderHandle(config.paramInfo);
112 if (extendHandle == nullptr) {
113 LOGE("[Comm][Send] get extendHandle failed");
114 return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
115 }
116 }
117 int error = E_OK;
118 // if error is not E_OK , null pointer will be returned
119 SerialBuffer *buffer = ProtocolProto::ToSerialBuffer(inMsg, extendHandle, false, error);
120 extendHandle = nullptr;
121 if (error != E_OK) {
122 LOGE("[Comm][Send] Serial fail, label=%.3s, error=%d.", VEC_TO_STR(commLabel_), error);
123 return error;
124 }
125 int errCode = ProtocolProto::SetDivergeHeader(buffer, commLabel_);
126 if (errCode != E_OK) {
127 LOGE("[Comm][Send] Set header fail, label=%.3s, errCode=%d.", VEC_TO_STR(commLabel_), errCode);
128 delete buffer;
129 buffer = nullptr;
130 return errCode;
131 }
132
133 TaskConfig taskConfig {config.nonBlock, config.timeout, inMsg->GetPriority()};
134 errCode = commAggrHandle_->ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, taskConfig, onEnd);
135 if (errCode == E_OK) {
136 // if ok, free inMsg, otherwise the caller should take over inMsg
137 delete inMsg;
138 inMsg = nullptr;
139 } else {
140 // if send fails, free buffer, otherwise buffer should be taked over by comminucator aggregator
141 delete buffer;
142 buffer = nullptr;
143 }
144 return errCode;
145 }
146
OnBufferReceive(const std::string & srcTarget,const SerialBuffer * inBuf)147 void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf)
148 {
149 std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
150 if (srcTarget.size() != 0 && inBuf != nullptr && onMessageHandle_) {
151 int error = E_OK;
152 // if error is not E_OK, null pointer will be returned
153 Message *message = ProtocolProto::ToMessage(inBuf, error);
154 delete inBuf;
155 inBuf = nullptr;
156 // message is not nullptr if error is E_OK or error is E_NOT_REGISTER.
157 // for the former case the message will be handled and release by sync module.
158 // for the latter case the message is released in TriggerUnknownMessageFeedback.
159 if (error != E_OK) {
160 LOGE("[Comm][Receive] ToMessage fail, label=%.3s, error=%d.", VEC_TO_STR(commLabel_), error);
161 if (error == -E_VERSION_NOT_SUPPORT) {
162 TriggerVersionNegotiation(srcTarget);
163 } else if (error == -E_NOT_REGISTER) {
164 TriggerUnknownMessageFeedback(srcTarget, message);
165 }
166 return;
167 }
168 LOGI("[Comm][Receive] label=%.3s, srcTarget=%s{private}.", VEC_TO_STR(commLabel_), srcTarget.c_str());
169 onMessageHandle_(srcTarget, message);
170 } else {
171 LOGE("[Comm][Receive] label=%.3s, src.size=%zu or buf or handle invalid.", VEC_TO_STR(commLabel_),
172 srcTarget.size());
173 if (inBuf != nullptr) {
174 delete inBuf;
175 inBuf = nullptr;
176 }
177 }
178 }
179
OnConnectChange(const std::string & target,bool isConnect)180 void Communicator::OnConnectChange(const std::string &target, bool isConnect)
181 {
182 std::lock_guard<std::mutex> connectHandleLockGuard(connectHandleMutex_);
183 if (target.size() == 0) {
184 LOGE("[Comm][Connect] Target size zero, label=%.3s.", VEC_TO_STR(commLabel_));
185 return;
186 }
187 if (isConnect) {
188 onlineTargets_.insert(target);
189 } else {
190 onlineTargets_.erase(target);
191 }
192 LOGI("[Comm][Connect] Label=%.3s, target=%s{private}, Online=%d", VEC_TO_STR(commLabel_), target.c_str(),
193 isConnect);
194 if (onConnectHandle_) {
195 onConnectHandle_(target, isConnect);
196 } else {
197 LOGI("[Comm][Connect] Handle invalid currently.");
198 }
199 }
200
OnSendAvailable()201 void Communicator::OnSendAvailable()
202 {
203 std::lock_guard<std::mutex> sendableHandleLockGuard(sendableHandleMutex_);
204 if (onSendableHandle_) {
205 onSendableHandle_();
206 }
207 }
208
GetCommunicatorLabel() const209 LabelType Communicator::GetCommunicatorLabel() const
210 {
211 return commLabel_;
212 }
213
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const214 int Communicator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
215 {
216 return commAggrHandle_->GetRemoteCommunicatorVersion(target, outVersion);
217 }
218
TriggerVersionNegotiation(const std::string & dstTarget)219 void Communicator::TriggerVersionNegotiation(const std::string &dstTarget)
220 {
221 LOGI("[Comm][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
222 int errCode = E_OK;
223 SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
224 if (errCode != E_OK) {
225 LOGE("[Comm][TrigVer] Build empty frame fail, errCode=%d", errCode);
226 return;
227 }
228
229 TaskConfig config{true, 0, Priority::HIGH};
230 errCode = commAggrHandle_->ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
231 if (errCode != E_OK) {
232 LOGE("[Comm][TrigVer] Send empty frame fail, errCode=%d", errCode);
233 // if send fails, free buffer, otherwise buffer will be taked over by comminucator aggregator
234 delete buffer;
235 buffer = nullptr;
236 }
237 }
238
TriggerUnknownMessageFeedback(const std::string & dstTarget,Message * & oriMsg)239 void Communicator::TriggerUnknownMessageFeedback(const std::string &dstTarget, Message* &oriMsg)
240 {
241 if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
242 LOGI("[Comm][TrigFeedback] Do nothing for unknown message with type not request.");
243 // Do not have to do feedback if the message is not a request type message
244 delete oriMsg;
245 oriMsg = nullptr;
246 return;
247 }
248
249 LOGI("[Comm][TrigFeedback] Do unknown message feedback with target=%s{private}.", dstTarget.c_str());
250 oriMsg->SetMessageType(TYPE_RESPONSE);
251 oriMsg->SetErrorNo(E_FEEDBACK_UNKNOWN_MESSAGE);
252
253 int errCode = E_OK;
254 SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, commLabel_, errCode);
255 delete oriMsg;
256 oriMsg = nullptr;
257 if (errCode != E_OK) {
258 LOGE("[Comm][TrigFeedback] Build unknown message feedback frame fail, errCode=%d", errCode);
259 return;
260 }
261
262 TaskConfig config{true, 0, Priority::HIGH};
263 errCode = commAggrHandle_->ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
264 if (errCode != E_OK) {
265 LOGE("[Comm][TrigFeedback] Send unknown message feedback frame fail, errCode=%d", errCode);
266 // if send fails, free buffer, otherwise buffer will be taked over by comminucator aggregator
267 delete buffer;
268 buffer = nullptr;
269 }
270 }
271
272 DEFINE_OBJECT_TAG_FACILITIES(Communicator)
273 } // namespace DistributedDB
274