1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "device_manager.h"
17
18 #include <algorithm>
19
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "message.h"
23 #include "message_transform.h"
24 #include "parcel.h"
25 #include "performance_analysis.h"
26 #include "sync_types.h"
27
28 namespace DistributedDB {
DeviceManager()29 DeviceManager::DeviceManager() : communicator_(nullptr)
30 {
31 }
32
~DeviceManager()33 DeviceManager::~DeviceManager()
34 {
35 if (communicator_ != nullptr) {
36 RefObject::DecObjRef(communicator_);
37 communicator_ = nullptr;
38 }
39 }
40
CalculateLen()41 uint32_t DeviceManager::CalculateLen()
42 {
43 return Parcel::GetUInt64Len();
44 }
45
RegisterTransformFunc()46 int DeviceManager::RegisterTransformFunc()
47 {
48 TransformFunc func;
49 func.computeFunc = [](const Message *msg) {
50 (void) msg;
51 return DeviceManager::CalculateLen();
52 };
53 // LocalDataChanged has no dataPct
54 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
55 (void) buffer;
56 (void) length;
57 (void) inMsg;
58 return E_OK;
59 };
60 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
61 (void) buffer;
62 (void) length;
63 (void) inMsg;
64 return E_OK;
65 };
66 return MessageTransform::RegTransformFunction(LOCAL_DATA_CHANGED, func);
67 }
68
69 // Initialize the DeviceManager
Initialize(ICommunicator * communicator,const std::function<void (std::string)> & onlineCallback,const std::function<void (std::string)> & offlineCallback)70 int DeviceManager::Initialize(ICommunicator *communicator, const std::function<void(std::string)> &onlineCallback,
71 const std::function<void(std::string)> &offlineCallback)
72 {
73 if (communicator == nullptr) {
74 return -E_INVALID_ARGS;
75 }
76 RefObject::IncObjRef(communicator);
77 communicator_ = communicator;
78 RegDeviceOnLineCallBack(onlineCallback);
79 RegDeviceOffLineCallBack(offlineCallback);
80 return E_OK;
81 }
82
RegDeviceOnLineCallBack(const std::function<void (std::string)> & callback)83 void DeviceManager::RegDeviceOnLineCallBack(const std::function<void(std::string)> &callback)
84 {
85 onlineCallback_ = callback;
86 }
87
RegDeviceOffLineCallBack(const std::function<void (std::string)> & callback)88 void DeviceManager::RegDeviceOffLineCallBack(const std::function<void(std::string)> &callback)
89 {
90 offlineCallback_ = callback;
91 }
92
OnDeviceConnectCallback(const std::string & targetDev,bool isConnect)93 void DeviceManager::OnDeviceConnectCallback(const std::string &targetDev, bool isConnect)
94 {
95 if (targetDev.empty()) {
96 LOGE("[DeviceManager] DeviceConnectCallback invalid device!");
97 }
98 if (isConnect) {
99 {
100 std::lock_guard<std::mutex> lockOnline(devicesLock_);
101 devices_.insert(targetDev);
102 }
103 if (onlineCallback_) {
104 onlineCallback_(targetDev);
105 LOGD("[DeviceManager] DeviceConnectCallback call online callback");
106 }
107 } else {
108 {
109 std::lock_guard<std::mutex> lockOffline(devicesLock_);
110 devices_.erase(targetDev);
111 }
112 if (offlineCallback_) {
113 offlineCallback_(targetDev);
114 }
115 }
116 }
117
GetOnlineDevices(std::vector<std::string> & devices) const118 void DeviceManager::GetOnlineDevices(std::vector<std::string> &devices) const
119 {
120 std::lock_guard<std::mutex> lock(devicesLock_);
121 devices.assign(devices_.begin(), devices_.end());
122 }
123
124 #ifndef OMIT_MULTI_VER
SendBroadCast(uint32_t msgId)125 int DeviceManager::SendBroadCast(uint32_t msgId)
126 {
127 if (msgId == LOCAL_DATA_CHANGED) {
128 return SendLocalDataChanged();
129 }
130 LOGE("[DeviceManager] invalid BroadCast msgId:%u", msgId);
131 return -E_INVALID_ARGS;
132 }
133
SendLocalDataChanged()134 int DeviceManager::SendLocalDataChanged()
135 {
136 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
137 if (performance != nullptr) {
138 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_SEND_LOCAL_DATA_CHANGED_TO_COMMIT_REQUEST_RECV);
139 }
140 std::vector<std::string> copyDevices;
141 GetOnlineDevices(copyDevices);
142 if (copyDevices.empty()) {
143 LOGI("[DeviceManager] no device online to SendLocalDataChanged!");
144 }
145 for (const auto &deviceId : copyDevices) {
146 Message *msg = new (std::nothrow) Message();
147 if (msg == nullptr) {
148 LOGE("[DeviceManager] Message alloc failed when SendBroadCast!");
149 return -E_OUT_OF_MEMORY;
150 }
151 msg->SetMessageId(LOCAL_DATA_CHANGED);
152 msg->SetTarget(deviceId);
153 SendConfig conf = {false, false, SEND_TIME_OUT, {}};
154 int errCode = communicator_->SendMessage(deviceId, msg, conf);
155 if (errCode != E_OK) {
156 LOGE("[DeviceManager] SendLocalDataChanged to dev %s{private} failed. err %d",
157 deviceId.c_str(), errCode);
158 delete msg;
159 msg = nullptr;
160 }
161 }
162 return E_OK;
163 }
164 #endif // OMIT_MULTI_VER
165
IsDeviceOnline(const std::string & deviceId) const166 bool DeviceManager::IsDeviceOnline(const std::string &deviceId) const
167 {
168 std::lock_guard<std::mutex> lock(devicesLock_);
169 auto iter = std::find(devices_.begin(), devices_.end(), deviceId);
170 return (iter != devices_.end());
171 }
172 } // namespace DistributedDB