1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "virtual_communicator.h"
17
18 #include "log_print.h"
19 #include "protocol_proto.h"
20 #include "single_ver_serialize_manager.h"
21 #include "sync_engine.h"
22 #include "virtual_communicator_aggregator.h"
23
24 namespace DistributedDB {
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)25 int VirtualCommunicator::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
26 {
27 std::lock_guard<std::mutex> lock(onMessageLock_);
28 onMessage_ = onMessage;
29 return E_OK;
30 }
31
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)32 int VirtualCommunicator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
33 {
34 std::lock_guard<std::mutex> lock(onConnectLock_);
35 onConnect_ = onConnect;
36 return E_OK;
37 }
38
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)39 int VirtualCommunicator::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
40 {
41 return E_OK;
42 }
43
Activate()44 void VirtualCommunicator::Activate()
45 {
46 }
47
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config)48 int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config)
49 {
50 return SendMessage(dstTarget, inMsg, config, nullptr);
51 }
52
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config,const OnSendEnd & onEnd)53 int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
54 const OnSendEnd &onEnd)
55 {
56 AutoLock lock(this);
57 if (IsKilled()) {
58 return -E_OBJ_IS_KILLED;
59 }
60 if (!isEnable_) {
61 LOGD("[VirtualCommunicator] the VirtualCommunicator disabled!");
62 return -E_PERIPHERAL_INTERFACE_FAIL;
63 }
64 if (dstTarget == deviceId_) {
65 delete inMsg;
66 inMsg = nullptr;
67 return E_OK;
68 }
69 Message *message = nullptr;
70 int errCode = TranslateMsg(inMsg, message);
71 if (errCode == -E_NOT_REGISTER) {
72 communicatorAggregator_->DispatchMessage(deviceId_, dstTarget, inMsg, onEnd);
73 return E_OK;
74 }
75 if (errCode != E_OK) {
76 return errCode;
77 }
78 delete inMsg;
79 inMsg = nullptr;
80 communicatorAggregator_->DispatchMessage(deviceId_, dstTarget, message, onEnd);
81 return E_OK;
82 }
83
SetRemoteVersion(uint16_t remoteVersion)84 void VirtualCommunicator::SetRemoteVersion(uint16_t remoteVersion)
85 {
86 remoteVersion_ = remoteVersion;
87 }
88
GetRemoteCommunicatorVersion(const std::string & deviceId,uint16_t & version) const89 int VirtualCommunicator::GetRemoteCommunicatorVersion(const std::string &deviceId, uint16_t &version) const
90 {
91 version = remoteVersion_;
92 return E_OK;
93 }
94
CallbackOnMessage(const std::string & srcTarget,Message * inMsg)95 void VirtualCommunicator::CallbackOnMessage(const std::string &srcTarget, Message *inMsg)
96 {
97 std::lock_guard<std::mutex> lock(onMessageLock_);
98 if (isEnable_ && onMessage_ && (srcTarget != deviceId_) && ((inMsg->GetMessageId() != dropMsgId_) ||
99 (dropMsgTimes_ == 0))) {
100 onMessage_(srcTarget, inMsg);
101 } else {
102 LOGD("drop msg from dev=%s, localDev=%s", srcTarget.c_str(), deviceId_.c_str());
103 if (dropMsgTimes_ > 0) {
104 dropMsgTimes_--;
105 }
106 delete inMsg;
107 inMsg = nullptr;
108 }
109 }
110
CallbackOnConnect(const std::string & target,bool isConnect) const111 void VirtualCommunicator::CallbackOnConnect(const std::string &target, bool isConnect) const
112 {
113 {
114 std::lock_guard<std::mutex> lock(devicesMapLock_);
115 if (target != deviceId_) {
116 onlineDevicesMap_[target] = isConnect;
117 }
118 }
119 std::lock_guard<std::mutex> lock(onConnectLock_);
120 if (isEnable_ && onConnect_) {
121 onConnect_(target, isConnect);
122 }
123 }
124
GetCommunicatorMtuSize() const125 uint32_t VirtualCommunicator::GetCommunicatorMtuSize() const
126 {
127 return mtuSize_;
128 }
129
GetCommunicatorMtuSize(const std::string & target) const130 uint32_t VirtualCommunicator::GetCommunicatorMtuSize(const std::string &target) const
131 {
132 return GetCommunicatorMtuSize();
133 }
134
SetCommunicatorMtuSize(uint32_t mtuSize)135 void VirtualCommunicator::SetCommunicatorMtuSize(uint32_t mtuSize)
136 {
137 mtuSize_ = mtuSize;
138 }
139
GetTimeout() const140 uint32_t VirtualCommunicator::GetTimeout() const
141 {
142 LOGD("[VirtualCommunicator] Get timeout %" PRIu32, timeout_);
143 return timeout_;
144 }
145
GetTimeout(const std::string & target) const146 uint32_t VirtualCommunicator::GetTimeout(const std::string &target) const
147 {
148 return GetTimeout();
149 }
150
SetTimeout(uint32_t timeout)151 void VirtualCommunicator::SetTimeout(uint32_t timeout)
152 {
153 timeout_ = timeout;
154 }
155
GetLocalIdentity(std::string & outTarget) const156 int VirtualCommunicator::GetLocalIdentity(std::string &outTarget) const
157 {
158 outTarget = deviceId_;
159 return E_OK;
160 }
161
GeneralVirtualSyncId()162 int VirtualCommunicator::GeneralVirtualSyncId()
163 {
164 std::lock_guard<std::mutex> lock(syncIdLock_);
165 currentSyncId_++;
166 return currentSyncId_;
167 }
168
Disable()169 void VirtualCommunicator::Disable()
170 {
171 isEnable_ = false;
172 }
173
Enable()174 void VirtualCommunicator::Enable()
175 {
176 isEnable_ = true;
177 }
178
SetDeviceId(const std::string & deviceId)179 void VirtualCommunicator::SetDeviceId(const std::string &deviceId)
180 {
181 deviceId_ = deviceId;
182 }
183
GetDeviceId() const184 std::string VirtualCommunicator::GetDeviceId() const
185 {
186 return deviceId_;
187 }
188
IsEnabled() const189 bool VirtualCommunicator::IsEnabled() const
190 {
191 return isEnable_;
192 }
193
IsDeviceOnline(const std::string & device) const194 bool VirtualCommunicator::IsDeviceOnline(const std::string &device) const
195 {
196 bool res = true;
197 {
198 std::lock_guard<std::mutex> lock(devicesMapLock_);
199 if (onlineDevicesMap_.find(device) != onlineDevicesMap_.end()) {
200 res = onlineDevicesMap_[device];
201 }
202 }
203 return res;
204 }
205
~VirtualCommunicator()206 VirtualCommunicator::~VirtualCommunicator()
207 {
208 }
209
VirtualCommunicator(const std::string & deviceId,VirtualCommunicatorAggregator * communicatorAggregator)210 VirtualCommunicator::VirtualCommunicator(const std::string &deviceId,
211 VirtualCommunicatorAggregator *communicatorAggregator)
212 : deviceId_(deviceId), communicatorAggregator_(communicatorAggregator)
213 {
214 }
215
TranslateMsg(const Message * inMsg,Message * & outMsg)216 int VirtualCommunicator::TranslateMsg(const Message *inMsg, Message *&outMsg)
217 {
218 int errCode = E_OK;
219 std::shared_ptr<ExtendHeaderHandle> extendHandle = nullptr;
220 auto buffer = ProtocolProto::ToSerialBuffer(inMsg, extendHandle, false, errCode);
221 if (errCode != E_OK) {
222 return errCode;
223 }
224
225 outMsg = ProtocolProto::ToMessage(buffer, errCode);
226 delete buffer;
227 buffer = nullptr;
228 return errCode;
229 }
230
SetDropMessageTypeByDevice(MessageId msgid,uint32_t dropTimes)231 void VirtualCommunicator::SetDropMessageTypeByDevice(MessageId msgid, uint32_t dropTimes)
232 {
233 dropMsgId_ = msgid;
234 dropMsgTimes_ = dropTimes;
235 if (msgid == UNKNOW_MESSAGE) {
236 dropMsgTimes_ = 0;
237 }
238 }
239 } // namespace DistributedDB