1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "db_status_adapter.h"
16
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "param_check_utils.h"
21 #include "runtime_context.h"
22
23 namespace DistributedDB {
DBStatusAdapter()24 DBStatusAdapter::DBStatusAdapter()
25 : dbInfoHandle_(nullptr),
26 localSendLabelExchange_(false),
27 cacheLocalSendLabelExchange_(false)
28 {
29 }
30
SetDBInfoHandle(const std::shared_ptr<DBInfoHandle> & dbInfoHandle)31 void DBStatusAdapter::SetDBInfoHandle(const std::shared_ptr<DBInfoHandle> &dbInfoHandle)
32 {
33 {
34 std::lock_guard<std::mutex> autoLock(handleMutex_);
35 dbInfoHandle_ = dbInfoHandle;
36 }
37 NotifyRemoteOffline();
38 ClearAllCache();
39 LOGI("[DBStatusAdapter] SetDBInfoHandle Finish");
40 }
41
IsSupport(const std::string & devInfo)42 bool DBStatusAdapter::IsSupport(const std::string &devInfo)
43 {
44 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
45 if (dbInfoHandle == nullptr) {
46 LOGD("[DBStatusAdapter] dbInfoHandle not set");
47 return false;
48 }
49 if (IsSendLabelExchange()) {
50 return false;
51 }
52 std::lock_guard<std::mutex> autoLock(supportMutex_);
53 if (remoteOptimizeInfo_.find(devInfo) != remoteOptimizeInfo_.end()) {
54 return remoteOptimizeInfo_[devInfo];
55 }
56 remoteOptimizeInfo_[devInfo] = true;
57 return true;
58 }
59
GetLocalDBInfos(std::vector<DBInfo> & dbInfos)60 int DBStatusAdapter::GetLocalDBInfos(std::vector<DBInfo> &dbInfos)
61 {
62 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
63 if (dbInfoHandle == nullptr) {
64 LOGD("[DBStatusAdapter][GetLocalDBInfos] handle not set");
65 return -E_NOT_SUPPORT;
66 }
67 if (IsSendLabelExchange()) {
68 return -E_NOT_SUPPORT;
69 }
70 std::lock_guard<std::mutex> autoLock(localInfoMutex_);
71 for (const auto &info: localDBInfos_) {
72 if (info.isNeedSync) {
73 dbInfos.push_back(info);
74 }
75 }
76 return E_OK;
77 }
78
SetDBStatusChangeCallback(const RemoteDBChangeCallback & remote,const LocalDBChangeCallback & local,const RemoteSupportChangeCallback & supportCallback)79 void DBStatusAdapter::SetDBStatusChangeCallback(const RemoteDBChangeCallback &remote,
80 const LocalDBChangeCallback &local, const RemoteSupportChangeCallback &supportCallback)
81 {
82 {
83 std::lock_guard<std::mutex> autoLock(callbackMutex_);
84 remoteCallback_ = remote;
85 localCallback_ = local;
86 supportCallback_ = supportCallback;
87 }
88 if (remote == nullptr || local == nullptr) {
89 LOGD("[DBStatusAdapter][SetDBStatusChangeCallback] remote or local DB change callback is NULL.");
90 return;
91 }
92 // avoid notify before set callback
93 bool triggerNow = false;
94 std::map<std::string, std::vector<DBInfo>> remoteDBInfos;
95 {
96 std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
97 remoteDBInfos = remoteDBInfos_;
98 triggerNow = !remoteDBInfos.empty();
99 }
100 bool triggerLocal = false;
101 {
102 std::lock_guard<std::mutex> autoLock(localInfoMutex_);
103 triggerLocal = !localDBInfos_.empty();
104 triggerNow |= triggerLocal;
105 }
106 // trigger callback async
107 if (!triggerNow) {
108 return;
109 }
110 int errCode = RuntimeContext::GetInstance()->ScheduleTask([triggerLocal, remoteDBInfos, remote, local]() {
111 for (const auto &[devInfo, dbInfos]: remoteDBInfos) {
112 remote(devInfo, dbInfos);
113 }
114 if (triggerLocal) {
115 local();
116 }
117 });
118 if (errCode != E_OK) {
119 LOGW("[DBStatusAdapter][SetDBStatusChangeCallback] Schedule Task failed! errCode = %d", errCode);
120 }
121 }
122
NotifyDBInfos(const DeviceInfos & devInfos,const std::vector<DBInfo> & dbInfos)123 void DBStatusAdapter::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector<DBInfo> &dbInfos)
124 {
125 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, devInfos, dbInfos]() {
126 if (IsSendLabelExchange()) {
127 LOGW("[DBStatusAdapter][NotifyDBInfos] local dev no support communication optimize");
128 return;
129 }
130 bool isLocal = IsLocalDeviceId(devInfos.identifier);
131 if (!isLocal && !IsSupport(devInfos.identifier)) {
132 LOGW("[DBStatusAdapter][NotifyDBInfos] no support dev %s", STR_MASK(devInfos.identifier));
133 return;
134 }
135 bool isChange = LoadIntoCache(isLocal, devInfos, dbInfos);
136 std::lock_guard<std::mutex> autoLock(callbackMutex_);
137 if (!isLocal && remoteCallback_ != nullptr) {
138 remoteCallback_(devInfos.identifier, dbInfos);
139 } else if (isLocal && localCallback_ != nullptr && isChange) {
140 localCallback_();
141 }
142 });
143 if (errCode != E_OK) {
144 LOGW("[DBStatusAdapter][NotifyDBInfos] Schedule Task failed! errCode = %d", errCode);
145 }
146 }
147
TargetOffline(const std::string & device)148 void DBStatusAdapter::TargetOffline(const std::string &device)
149 {
150 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
151 if (dbInfoHandle == nullptr) {
152 LOGD("[DBStatusAdapter][TargetOffline] handle not set");
153 return;
154 }
155 {
156 std::lock_guard<std::mutex> autoLock(supportMutex_);
157 if (remoteOptimizeInfo_.find(device) != remoteOptimizeInfo_.end()) {
158 remoteOptimizeInfo_.erase(device);
159 }
160 }
161 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(device);
162 }
163
IsNeedAutoSync(const std::string & userId,const std::string & appId,const std::string & storeId,const std::string & devInfo)164 bool DBStatusAdapter::IsNeedAutoSync(const std::string &userId, const std::string &appId, const std::string &storeId,
165 const std::string &devInfo)
166 {
167 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
168 if (dbInfoHandle == nullptr || IsSendLabelExchange()) {
169 return true;
170 }
171 return dbInfoHandle->IsNeedAutoSync(userId, appId, storeId, { devInfo });
172 }
173
SetRemoteOptimizeCommunication(const std::string & dev,bool optimize)174 void DBStatusAdapter::SetRemoteOptimizeCommunication(const std::string &dev, bool optimize)
175 {
176 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
177 if (dbInfoHandle == nullptr) {
178 LOGD("[DBStatusAdapter][SetRemoteOptimizeCommunication] handle not set");
179 return;
180 }
181 bool triggerLocalCallback = false;
182 {
183 std::lock_guard<std::mutex> autoLock(supportMutex_);
184 if (remoteOptimizeInfo_.find(dev) == remoteOptimizeInfo_.end()) {
185 remoteOptimizeInfo_[dev] = optimize;
186 return;
187 }
188 if (remoteOptimizeInfo_[dev] == optimize) {
189 return;
190 }
191 if (remoteOptimizeInfo_[dev]) {
192 triggerLocalCallback = true;
193 }
194 remoteOptimizeInfo_[dev] = optimize;
195 }
196 if (!triggerLocalCallback) {
197 return;
198 }
199 LOGI("[DBStatusAdapter][SetRemoteOptimizeCommunication] remote dev %.3s optimize change!", dev.c_str());
200 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, dev]() {
201 RemoteSupportChangeCallback callback;
202 {
203 std::lock_guard<std::mutex> autoLock(callbackMutex_);
204 callback = supportCallback_;
205 }
206 if (callback) {
207 callback(dev);
208 }
209 });
210 if (errCode != E_OK) {
211 LOGW("[DBStatusAdapter][SetRemoteOptimizeCommunication] Schedule Task failed! errCode = %d", errCode);
212 }
213 }
214
IsSendLabelExchange()215 bool DBStatusAdapter::IsSendLabelExchange()
216 {
217 std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
218 if (dbInfoHandle == nullptr) {
219 LOGD("[DBStatusAdapter][IsSendLabelExchange] handle not set");
220 return true;
221 }
222 {
223 std::lock_guard<std::mutex> autoLock(supportMutex_);
224 if (cacheLocalSendLabelExchange_) {
225 return localSendLabelExchange_;
226 }
227 }
228 bool isSupport = dbInfoHandle->IsSupport();
229 std::lock_guard<std::mutex> autoLock(supportMutex_);
230 LOGD("[DBStatusAdapter][IsSendLabelExchange] local support status is %d", isSupport);
231 localSendLabelExchange_ = !isSupport;
232 cacheLocalSendLabelExchange_ = true;
233 return localSendLabelExchange_;
234 }
235
GetDBInfoHandle() const236 std::shared_ptr<DBInfoHandle> DBStatusAdapter::GetDBInfoHandle() const
237 {
238 std::lock_guard<std::mutex> autoLock(handleMutex_);
239 return dbInfoHandle_;
240 }
241
LoadIntoCache(bool isLocal,const DeviceInfos & devInfos,const std::vector<DBInfo> & dbInfos)242 bool DBStatusAdapter::LoadIntoCache(bool isLocal, const DeviceInfos &devInfos, const std::vector<DBInfo> &dbInfos)
243 {
244 if (isLocal) {
245 std::lock_guard<std::mutex> autoLock(localInfoMutex_);
246 return MergeDBInfos(dbInfos, localDBInfos_);
247 }
248 std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
249 if (remoteDBInfos_.find(devInfos.identifier) == remoteDBInfos_.end()) {
250 remoteDBInfos_.insert({devInfos.identifier, {}});
251 }
252 return MergeDBInfos(dbInfos, remoteDBInfos_[devInfos.identifier]);
253 }
254
ClearAllCache()255 void DBStatusAdapter::ClearAllCache()
256 {
257 {
258 std::lock_guard<std::mutex> autoLock(localInfoMutex_);
259 localDBInfos_.clear();
260 }
261 {
262 std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
263 remoteDBInfos_.clear();
264 }
265 std::lock_guard<std::mutex> autoLock(supportMutex_);
266 remoteOptimizeInfo_.clear();
267 cacheLocalSendLabelExchange_ = false;
268 localSendLabelExchange_ = false;
269 LOGD("[DBStatusAdapter] ClearAllCache ok");
270 }
271
NotifyRemoteOffline()272 void DBStatusAdapter::NotifyRemoteOffline()
273 {
274 std::map<std::string, std::vector<DBInfo>> remoteOnlineDBInfos;
275 {
276 std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
277 for (const auto &[dev, dbInfos]: remoteDBInfos_) {
278 for (const auto &dbInfo: dbInfos) {
279 if (dbInfo.isNeedSync) {
280 DBInfo info = dbInfo;
281 info.isNeedSync = false;
282 remoteOnlineDBInfos[dev].push_back(info);
283 }
284 }
285 }
286 }
287 RemoteDBChangeCallback callback;
288 {
289 std::lock_guard<std::mutex> autoLock(callbackMutex_);
290 callback = remoteCallback_;
291 }
292 if (callback == nullptr || remoteOnlineDBInfos.empty()) {
293 LOGD("[DBStatusAdapter] no need to notify offline when reset handle");
294 return;
295 }
296 for (const auto &[dev, dbInfos]: remoteOnlineDBInfos) {
297 callback(dev, dbInfos);
298 }
299 LOGD("[DBStatusAdapter] Notify offline ok");
300 }
301
MergeDBInfos(const std::vector<DBInfo> & srcDbInfos,std::vector<DBInfo> & dstDbInfos)302 bool DBStatusAdapter::MergeDBInfos(const std::vector<DBInfo> &srcDbInfos, std::vector<DBInfo> &dstDbInfos)
303 {
304 bool isDbInfoChange = false;
305 for (const auto &srcInfo: srcDbInfos) {
306 if (!ParamCheckUtils::CheckStoreParameter({srcInfo.storeId, srcInfo.appId, srcInfo.userId}, false, "", true)) {
307 continue;
308 }
309 auto res = std::find_if(dstDbInfos.begin(), dstDbInfos.end(), [&srcInfo](const DBInfo &dstInfo) {
310 return srcInfo.appId == dstInfo.appId && srcInfo.userId == dstInfo.userId &&
311 srcInfo.storeId == dstInfo.storeId && srcInfo.syncDualTupleMode == dstInfo.syncDualTupleMode;
312 });
313 if (res == dstDbInfos.end()) {
314 dstDbInfos.push_back(srcInfo);
315 isDbInfoChange = true;
316 } else if (res->isNeedSync != srcInfo.isNeedSync) {
317 res->isNeedSync = srcInfo.isNeedSync;
318 isDbInfoChange = true;
319 }
320 }
321 return isDbInfoChange;
322 }
323
GetLocalDeviceId(std::string & deviceId)324 int DBStatusAdapter::GetLocalDeviceId(std::string &deviceId)
325 {
326 ICommunicatorAggregator *communicatorAggregator = nullptr;
327 int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
328 if (errCode != E_OK) {
329 LOGE("[DBStatusAdapter][GetLocalDeviceId] Get ICommunicatorAggregator error: %d", errCode);
330 return errCode;
331 }
332 return communicatorAggregator->GetLocalIdentity(deviceId);
333 }
334
IsLocalDeviceId(const std::string & deviceId)335 bool DBStatusAdapter::IsLocalDeviceId(const std::string &deviceId)
336 {
337 std::string localId;
338 if (GetLocalDeviceId(localId) != E_OK) {
339 LOGE("[DBStatusAdapter][IsLocalDeviceId] Get local device ID failed.");
340 return false;
341 }
342 return deviceId == localId;
343 }
344 }