1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "virtual_communicator.h"
17 
18 #include "log_print.h"
19 #include "protocol_proto.h"
20 #include "single_ver_serialize_manager.h"
21 #include "sync_engine.h"
22 #include "virtual_communicator_aggregator.h"
23 
24 namespace DistributedDB {
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)25 int VirtualCommunicator::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
26 {
27     std::lock_guard<std::mutex> lock(onMessageLock_);
28     onMessage_ = onMessage;
29     return E_OK;
30 }
31 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)32 int VirtualCommunicator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
33 {
34     std::lock_guard<std::mutex> lock(onConnectLock_);
35     onConnect_ = onConnect;
36     return E_OK;
37 }
38 
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)39 int VirtualCommunicator::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
40 {
41     return E_OK;
42 }
43 
Activate()44 void VirtualCommunicator::Activate()
45 {
46 }
47 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config)48 int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config)
49 {
50     return SendMessage(dstTarget, inMsg, config, nullptr);
51 }
52 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config,const OnSendEnd & onEnd)53 int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
54     const OnSendEnd &onEnd)
55 {
56     AutoLock lock(this);
57     if (IsKilled()) {
58         return -E_OBJ_IS_KILLED;
59     }
60     if (!isEnable_) {
61         LOGD("[VirtualCommunicator] the VirtualCommunicator disabled!");
62         return -E_PERIPHERAL_INTERFACE_FAIL;
63     }
64     if (dstTarget == deviceId_) {
65         delete inMsg;
66         inMsg = nullptr;
67         return E_OK;
68     }
69     Message *message = nullptr;
70     int errCode = TranslateMsg(inMsg, message);
71     if (errCode == -E_NOT_REGISTER) {
72         communicatorAggregator_->DispatchMessage(deviceId_, dstTarget, inMsg, onEnd);
73         return E_OK;
74     }
75     if (errCode != E_OK) {
76         return errCode;
77     }
78     delete inMsg;
79     inMsg = nullptr;
80     communicatorAggregator_->DispatchMessage(deviceId_, dstTarget, message, onEnd);
81     return E_OK;
82 }
83 
SetRemoteVersion(uint16_t remoteVersion)84 void VirtualCommunicator::SetRemoteVersion(uint16_t remoteVersion)
85 {
86     remoteVersion_ = remoteVersion;
87 }
88 
GetRemoteCommunicatorVersion(const std::string & deviceId,uint16_t & version) const89 int VirtualCommunicator::GetRemoteCommunicatorVersion(const std::string &deviceId, uint16_t &version) const
90 {
91     version = remoteVersion_;
92     return E_OK;
93 }
94 
CallbackOnMessage(const std::string & srcTarget,Message * inMsg)95 void VirtualCommunicator::CallbackOnMessage(const std::string &srcTarget, Message *inMsg)
96 {
97     std::lock_guard<std::mutex> lock(onMessageLock_);
98     if (isEnable_ && onMessage_ && (srcTarget != deviceId_) && ((inMsg->GetMessageId() != dropMsgId_) ||
99         (dropMsgTimes_ == 0))) {
100         onMessage_(srcTarget, inMsg);
101     } else {
102         LOGD("drop msg from dev=%s, localDev=%s", srcTarget.c_str(), deviceId_.c_str());
103         if (dropMsgTimes_ > 0) {
104             dropMsgTimes_--;
105         }
106         delete inMsg;
107         inMsg = nullptr;
108     }
109 }
110 
CallbackOnConnect(const std::string & target,bool isConnect) const111 void VirtualCommunicator::CallbackOnConnect(const std::string &target, bool isConnect) const
112 {
113     {
114         std::lock_guard<std::mutex> lock(devicesMapLock_);
115         if (target != deviceId_) {
116             onlineDevicesMap_[target] = isConnect;
117         }
118     }
119     std::lock_guard<std::mutex> lock(onConnectLock_);
120     if (isEnable_ && onConnect_) {
121         onConnect_(target, isConnect);
122     }
123 }
124 
GetCommunicatorMtuSize() const125 uint32_t VirtualCommunicator::GetCommunicatorMtuSize() const
126 {
127     return mtuSize_;
128 }
129 
GetCommunicatorMtuSize(const std::string & target) const130 uint32_t VirtualCommunicator::GetCommunicatorMtuSize(const std::string &target) const
131 {
132     return GetCommunicatorMtuSize();
133 }
134 
SetCommunicatorMtuSize(uint32_t mtuSize)135 void VirtualCommunicator::SetCommunicatorMtuSize(uint32_t mtuSize)
136 {
137     mtuSize_ = mtuSize;
138 }
139 
GetTimeout() const140 uint32_t VirtualCommunicator::GetTimeout() const
141 {
142     LOGD("[VirtualCommunicator] Get timeout %" PRIu32, timeout_);
143     return timeout_;
144 }
145 
GetTimeout(const std::string & target) const146 uint32_t VirtualCommunicator::GetTimeout(const std::string &target) const
147 {
148     return GetTimeout();
149 }
150 
SetTimeout(uint32_t timeout)151 void VirtualCommunicator::SetTimeout(uint32_t timeout)
152 {
153     timeout_ = timeout;
154 }
155 
GetLocalIdentity(std::string & outTarget) const156 int VirtualCommunicator::GetLocalIdentity(std::string &outTarget) const
157 {
158     outTarget = deviceId_;
159     return E_OK;
160 }
161 
GeneralVirtualSyncId()162 int VirtualCommunicator::GeneralVirtualSyncId()
163 {
164     std::lock_guard<std::mutex> lock(syncIdLock_);
165     currentSyncId_++;
166     return currentSyncId_;
167 }
168 
Disable()169 void VirtualCommunicator::Disable()
170 {
171     isEnable_ = false;
172 }
173 
Enable()174 void VirtualCommunicator::Enable()
175 {
176     isEnable_ = true;
177 }
178 
SetDeviceId(const std::string & deviceId)179 void VirtualCommunicator::SetDeviceId(const std::string &deviceId)
180 {
181     deviceId_ = deviceId;
182 }
183 
GetDeviceId() const184 std::string VirtualCommunicator::GetDeviceId() const
185 {
186     return deviceId_;
187 }
188 
IsEnabled() const189 bool VirtualCommunicator::IsEnabled() const
190 {
191     return isEnable_;
192 }
193 
IsDeviceOnline(const std::string & device) const194 bool VirtualCommunicator::IsDeviceOnline(const std::string &device) const
195 {
196     bool res = true;
197     {
198         std::lock_guard<std::mutex> lock(devicesMapLock_);
199         if (onlineDevicesMap_.find(device) != onlineDevicesMap_.end()) {
200             res = onlineDevicesMap_[device];
201         }
202     }
203     return res;
204 }
205 
~VirtualCommunicator()206 VirtualCommunicator::~VirtualCommunicator()
207 {
208 }
209 
VirtualCommunicator(const std::string & deviceId,VirtualCommunicatorAggregator * communicatorAggregator)210 VirtualCommunicator::VirtualCommunicator(const std::string &deviceId,
211     VirtualCommunicatorAggregator *communicatorAggregator)
212     : deviceId_(deviceId), communicatorAggregator_(communicatorAggregator)
213 {
214 }
215 
TranslateMsg(const Message * inMsg,Message * & outMsg)216 int VirtualCommunicator::TranslateMsg(const Message *inMsg, Message *&outMsg)
217 {
218     int errCode = E_OK;
219     std::shared_ptr<ExtendHeaderHandle> extendHandle = nullptr;
220     auto buffer = ProtocolProto::ToSerialBuffer(inMsg, extendHandle, false, errCode);
221     if (errCode != E_OK) {
222         return errCode;
223     }
224 
225     outMsg = ProtocolProto::ToMessage(buffer, errCode);
226     delete buffer;
227     buffer = nullptr;
228     return errCode;
229 }
230 
SetDropMessageTypeByDevice(MessageId msgid,uint32_t dropTimes)231 void VirtualCommunicator::SetDropMessageTypeByDevice(MessageId msgid, uint32_t dropTimes)
232 {
233     dropMsgId_ = msgid;
234     dropMsgTimes_ = dropTimes;
235     if (msgid == UNKNOW_MESSAGE) {
236         dropMsgTimes_ = 0;
237     }
238 }
239 } // namespace DistributedDB