1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator.h"
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "protocol_proto.h"
20 
21 namespace DistributedDB {
Communicator(CommunicatorAggregator * inCommAggregator,const LabelType & inLabel)22 Communicator::Communicator(CommunicatorAggregator *inCommAggregator, const LabelType &inLabel)
23     : commAggrHandle_(inCommAggregator), commLabel_(inLabel)
24 {
25     RefObject::IncObjRef(commAggrHandle_); // Rely on CommunicatorAggregator, hold its reference.
26 }
27 
~Communicator()28 Communicator:: ~Communicator()
29 {
30     RefObject::DecObjRef(commAggrHandle_); // Communicator no longer hold the reference of CommunicatorAggregator.
31     onMessageHandle_ = nullptr;
32     onConnectHandle_ = nullptr;
33     onSendableHandle_ = nullptr;
34     commAggrHandle_ = nullptr;
35 }
36 
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)37 int Communicator::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
38 {
39     std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
40     return RegCallBack(onMessage, onMessageHandle_, inOper, onMessageFinalizer_);
41 }
42 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)43 int Communicator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
44 {
45     std::lock_guard<std::mutex> connectHandleLockGuard(connectHandleMutex_);
46     int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
47     if (onConnect && errCode == E_OK) {
48         // Register action and success
49         for (auto &entry : onlineTargets_) {
50             LOGI("[Comm][RegConnect] Label=%.3s, online target=%s{private}.", VEC_TO_STR(commLabel_), entry.c_str());
51             onConnectHandle_(entry, true);
52         }
53     }
54     return errCode;
55 }
56 
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)57 int Communicator::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
58 {
59     std::lock_guard<std::mutex> sendableHandleLockGuard(sendableHandleMutex_);
60     return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
61 }
62 
Activate()63 void Communicator::Activate()
64 {
65     commAggrHandle_->ActivateCommunicator(commLabel_);
66 }
67 
GetCommunicatorMtuSize() const68 uint32_t Communicator::GetCommunicatorMtuSize() const
69 {
70     return commAggrHandle_->GetCommunicatorAggregatorMtuSize();
71 }
72 
GetCommunicatorMtuSize(const std::string & target) const73 uint32_t Communicator::GetCommunicatorMtuSize(const std::string &target) const
74 {
75     return commAggrHandle_->GetCommunicatorAggregatorMtuSize(target);
76 }
77 
GetLocalIdentity(std::string & outTarget) const78 int Communicator::GetLocalIdentity(std::string &outTarget) const
79 {
80     return commAggrHandle_->GetLocalIdentity(outTarget);
81 }
82 
GetTimeout() const83 uint32_t Communicator::GetTimeout() const
84 {
85     return commAggrHandle_->GetCommunicatorAggregatorTimeout();
86 }
87 
GetTimeout(const std::string & target) const88 uint32_t Communicator::GetTimeout(const std::string &target) const
89 {
90     return commAggrHandle_->GetCommunicatorAggregatorTimeout(target);
91 }
92 
IsDeviceOnline(const std::string & device) const93 bool Communicator::IsDeviceOnline(const std::string &device) const
94 {
95     return commAggrHandle_->IsDeviceOnline(device);
96 }
97 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config)98 int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config)
99 {
100     return SendMessage(dstTarget, inMsg, config, nullptr);
101 }
102 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config,const OnSendEnd & onEnd)103 int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
104     const OnSendEnd &onEnd)
105 {
106     if (dstTarget.empty() || inMsg == nullptr) {
107         return -E_INVALID_ARGS;
108     }
109     std::shared_ptr<ExtendHeaderHandle> extendHandle = nullptr;
110     if (config.isNeedExtendHead) {
111         extendHandle = commAggrHandle_->GetExtendHeaderHandle(config.paramInfo);
112         if (extendHandle == nullptr) {
113             LOGE("[Comm][Send] get extendHandle failed");
114             return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
115         }
116     }
117     int error = E_OK;
118     // if error is not E_OK , null pointer will be returned
119     SerialBuffer *buffer = ProtocolProto::ToSerialBuffer(inMsg, extendHandle, false, error);
120     extendHandle = nullptr;
121     if (error != E_OK) {
122         LOGE("[Comm][Send] Serial fail, label=%.3s, error=%d.", VEC_TO_STR(commLabel_), error);
123         return error;
124     }
125     int errCode = ProtocolProto::SetDivergeHeader(buffer, commLabel_);
126     if (errCode != E_OK) {
127         LOGE("[Comm][Send] Set header fail, label=%.3s, errCode=%d.", VEC_TO_STR(commLabel_), errCode);
128         delete buffer;
129         buffer = nullptr;
130         return errCode;
131     }
132 
133     TaskConfig taskConfig {config.nonBlock, config.timeout, inMsg->GetPriority()};
134     errCode = commAggrHandle_->ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, taskConfig, onEnd);
135     if (errCode == E_OK) {
136         // if ok, free inMsg, otherwise the caller should take over inMsg
137         delete inMsg;
138         inMsg = nullptr;
139     } else {
140         // if send fails, free buffer, otherwise buffer should be taked over by comminucator aggregator
141         delete buffer;
142         buffer = nullptr;
143     }
144     return errCode;
145 }
146 
OnBufferReceive(const std::string & srcTarget,const SerialBuffer * inBuf)147 void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf)
148 {
149     std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
150     if (srcTarget.size() != 0 && inBuf != nullptr && onMessageHandle_) {
151         int error = E_OK;
152         // if error is not E_OK, null pointer will be returned
153         Message *message = ProtocolProto::ToMessage(inBuf, error);
154         delete inBuf;
155         inBuf = nullptr;
156         // message is not nullptr if error is E_OK or error is E_NOT_REGISTER.
157         // for the former case the message will be handled and release by sync module.
158         // for the latter case the message is released in TriggerUnknownMessageFeedback.
159         if (error != E_OK) {
160             LOGE("[Comm][Receive] ToMessage fail, label=%.3s, error=%d.", VEC_TO_STR(commLabel_), error);
161             if (error == -E_VERSION_NOT_SUPPORT) {
162                 TriggerVersionNegotiation(srcTarget);
163             } else if (error == -E_NOT_REGISTER) {
164                 TriggerUnknownMessageFeedback(srcTarget, message);
165             }
166             return;
167         }
168         LOGI("[Comm][Receive] label=%.3s, srcTarget=%s{private}.", VEC_TO_STR(commLabel_), srcTarget.c_str());
169         onMessageHandle_(srcTarget, message);
170     } else {
171         LOGE("[Comm][Receive] label=%.3s, src.size=%zu or buf or handle invalid.", VEC_TO_STR(commLabel_),
172             srcTarget.size());
173         if (inBuf != nullptr) {
174             delete inBuf;
175             inBuf = nullptr;
176         }
177     }
178 }
179 
OnConnectChange(const std::string & target,bool isConnect)180 void Communicator::OnConnectChange(const std::string &target, bool isConnect)
181 {
182     std::lock_guard<std::mutex> connectHandleLockGuard(connectHandleMutex_);
183     if (target.size() == 0) {
184         LOGE("[Comm][Connect] Target size zero, label=%.3s.", VEC_TO_STR(commLabel_));
185         return;
186     }
187     if (isConnect) {
188         onlineTargets_.insert(target);
189     } else {
190         onlineTargets_.erase(target);
191     }
192     LOGI("[Comm][Connect] Label=%.3s, target=%s{private}, Online=%d", VEC_TO_STR(commLabel_), target.c_str(),
193         isConnect);
194     if (onConnectHandle_) {
195         onConnectHandle_(target, isConnect);
196     } else {
197         LOGI("[Comm][Connect] Handle invalid currently.");
198     }
199 }
200 
OnSendAvailable()201 void Communicator::OnSendAvailable()
202 {
203     std::lock_guard<std::mutex> sendableHandleLockGuard(sendableHandleMutex_);
204     if (onSendableHandle_) {
205         onSendableHandle_();
206     }
207 }
208 
GetCommunicatorLabel() const209 LabelType Communicator::GetCommunicatorLabel() const
210 {
211     return commLabel_;
212 }
213 
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const214 int Communicator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
215 {
216     return commAggrHandle_->GetRemoteCommunicatorVersion(target, outVersion);
217 }
218 
TriggerVersionNegotiation(const std::string & dstTarget)219 void Communicator::TriggerVersionNegotiation(const std::string &dstTarget)
220 {
221     LOGI("[Comm][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
222     int errCode = E_OK;
223     SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
224     if (errCode != E_OK) {
225         LOGE("[Comm][TrigVer] Build empty frame fail, errCode=%d", errCode);
226         return;
227     }
228 
229     TaskConfig config{true, 0, Priority::HIGH};
230     errCode = commAggrHandle_->ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
231     if (errCode != E_OK) {
232         LOGE("[Comm][TrigVer] Send empty frame fail, errCode=%d", errCode);
233         // if send fails, free buffer, otherwise buffer will be taked over by comminucator aggregator
234         delete buffer;
235         buffer = nullptr;
236     }
237 }
238 
TriggerUnknownMessageFeedback(const std::string & dstTarget,Message * & oriMsg)239 void Communicator::TriggerUnknownMessageFeedback(const std::string &dstTarget, Message* &oriMsg)
240 {
241     if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
242         LOGI("[Comm][TrigFeedback] Do nothing for unknown message with type not request.");
243         // Do not have to do feedback if the message is not a request type message
244         delete oriMsg;
245         oriMsg = nullptr;
246         return;
247     }
248 
249     LOGI("[Comm][TrigFeedback] Do unknown message feedback with target=%s{private}.", dstTarget.c_str());
250     oriMsg->SetMessageType(TYPE_RESPONSE);
251     oriMsg->SetErrorNo(E_FEEDBACK_UNKNOWN_MESSAGE);
252 
253     int errCode = E_OK;
254     SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, commLabel_, errCode);
255     delete oriMsg;
256     oriMsg = nullptr;
257     if (errCode != E_OK) {
258         LOGE("[Comm][TrigFeedback] Build unknown message feedback frame fail, errCode=%d", errCode);
259         return;
260     }
261 
262     TaskConfig config{true, 0, Priority::HIGH};
263     errCode = commAggrHandle_->ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
264     if (errCode != E_OK) {
265         LOGE("[Comm][TrigFeedback] Send unknown message feedback frame fail, errCode=%d", errCode);
266         // if send fails, free buffer, otherwise buffer will be taked over by comminucator aggregator
267         delete buffer;
268         buffer = nullptr;
269     }
270 }
271 
272 DEFINE_OBJECT_TAG_FACILITIES(Communicator)
273 } // namespace DistributedDB
274