1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "db_status_adapter.h"
16 
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "param_check_utils.h"
21 #include "runtime_context.h"
22 
23 namespace DistributedDB {
DBStatusAdapter()24 DBStatusAdapter::DBStatusAdapter()
25     : dbInfoHandle_(nullptr),
26       localSendLabelExchange_(false),
27       cacheLocalSendLabelExchange_(false)
28 {
29 }
30 
SetDBInfoHandle(const std::shared_ptr<DBInfoHandle> & dbInfoHandle)31 void DBStatusAdapter::SetDBInfoHandle(const std::shared_ptr<DBInfoHandle> &dbInfoHandle)
32 {
33     {
34         std::lock_guard<std::mutex> autoLock(handleMutex_);
35         dbInfoHandle_ = dbInfoHandle;
36     }
37     NotifyRemoteOffline();
38     ClearAllCache();
39     LOGI("[DBStatusAdapter] SetDBInfoHandle Finish");
40 }
41 
IsSupport(const std::string & devInfo)42 bool DBStatusAdapter::IsSupport(const std::string &devInfo)
43 {
44     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
45     if (dbInfoHandle == nullptr) {
46         LOGD("[DBStatusAdapter] dbInfoHandle not set");
47         return false;
48     }
49     if (IsSendLabelExchange()) {
50         return false;
51     }
52     std::lock_guard<std::mutex> autoLock(supportMutex_);
53     if (remoteOptimizeInfo_.find(devInfo) != remoteOptimizeInfo_.end()) {
54         return remoteOptimizeInfo_[devInfo];
55     }
56     remoteOptimizeInfo_[devInfo] = true;
57     return true;
58 }
59 
GetLocalDBInfos(std::vector<DBInfo> & dbInfos)60 int DBStatusAdapter::GetLocalDBInfos(std::vector<DBInfo> &dbInfos)
61 {
62     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
63     if (dbInfoHandle == nullptr) {
64         LOGD("[DBStatusAdapter][GetLocalDBInfos] handle not set");
65         return -E_NOT_SUPPORT;
66     }
67     if (IsSendLabelExchange()) {
68         return -E_NOT_SUPPORT;
69     }
70     std::lock_guard<std::mutex> autoLock(localInfoMutex_);
71     for (const auto &info: localDBInfos_) {
72         if (info.isNeedSync) {
73             dbInfos.push_back(info);
74         }
75     }
76     return E_OK;
77 }
78 
SetDBStatusChangeCallback(const RemoteDBChangeCallback & remote,const LocalDBChangeCallback & local,const RemoteSupportChangeCallback & supportCallback)79 void DBStatusAdapter::SetDBStatusChangeCallback(const RemoteDBChangeCallback &remote,
80     const LocalDBChangeCallback &local, const RemoteSupportChangeCallback &supportCallback)
81 {
82     {
83         std::lock_guard<std::mutex> autoLock(callbackMutex_);
84         remoteCallback_ = remote;
85         localCallback_ = local;
86         supportCallback_ = supportCallback;
87     }
88     if (remote == nullptr || local == nullptr) {
89         LOGD("[DBStatusAdapter][SetDBStatusChangeCallback] remote or local DB change callback is NULL.");
90         return;
91     }
92     // avoid notify before set callback
93     bool triggerNow = false;
94     std::map<std::string, std::vector<DBInfo>> remoteDBInfos;
95     {
96         std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
97         remoteDBInfos = remoteDBInfos_;
98         triggerNow = !remoteDBInfos.empty();
99     }
100     bool triggerLocal = false;
101     {
102         std::lock_guard<std::mutex> autoLock(localInfoMutex_);
103         triggerLocal = !localDBInfos_.empty();
104         triggerNow |= triggerLocal;
105     }
106     // trigger callback async
107     if (!triggerNow) {
108         return;
109     }
110     int errCode = RuntimeContext::GetInstance()->ScheduleTask([triggerLocal, remoteDBInfos, remote, local]() {
111         for (const auto &[devInfo, dbInfos]: remoteDBInfos) {
112             remote(devInfo, dbInfos);
113         }
114         if (triggerLocal) {
115             local();
116         }
117     });
118     if (errCode != E_OK) {
119         LOGW("[DBStatusAdapter][SetDBStatusChangeCallback] Schedule Task failed! errCode = %d", errCode);
120     }
121 }
122 
NotifyDBInfos(const DeviceInfos & devInfos,const std::vector<DBInfo> & dbInfos)123 void DBStatusAdapter::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector<DBInfo> &dbInfos)
124 {
125     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, devInfos, dbInfos]() {
126         if (IsSendLabelExchange()) {
127             LOGW("[DBStatusAdapter][NotifyDBInfos] local dev no support communication optimize");
128             return;
129         }
130         bool isLocal = IsLocalDeviceId(devInfos.identifier);
131         if (!isLocal && !IsSupport(devInfos.identifier)) {
132             LOGW("[DBStatusAdapter][NotifyDBInfos] no support dev %s", STR_MASK(devInfos.identifier));
133             return;
134         }
135         bool isChange = LoadIntoCache(isLocal, devInfos, dbInfos);
136         std::lock_guard<std::mutex> autoLock(callbackMutex_);
137         if (!isLocal && remoteCallback_ != nullptr) {
138             remoteCallback_(devInfos.identifier, dbInfos);
139         } else if (isLocal && localCallback_ != nullptr && isChange) {
140             localCallback_();
141         }
142     });
143     if (errCode != E_OK) {
144         LOGW("[DBStatusAdapter][NotifyDBInfos] Schedule Task failed! errCode = %d", errCode);
145     }
146 }
147 
TargetOffline(const std::string & device)148 void DBStatusAdapter::TargetOffline(const std::string &device)
149 {
150     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
151     if (dbInfoHandle == nullptr) {
152         LOGD("[DBStatusAdapter][TargetOffline] handle not set");
153         return;
154     }
155     {
156         std::lock_guard<std::mutex> autoLock(supportMutex_);
157         if (remoteOptimizeInfo_.find(device) != remoteOptimizeInfo_.end()) {
158             remoteOptimizeInfo_.erase(device);
159         }
160     }
161     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(device);
162 }
163 
IsNeedAutoSync(const std::string & userId,const std::string & appId,const std::string & storeId,const std::string & devInfo)164 bool DBStatusAdapter::IsNeedAutoSync(const std::string &userId, const std::string &appId, const std::string &storeId,
165     const std::string &devInfo)
166 {
167     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
168     if (dbInfoHandle == nullptr || IsSendLabelExchange()) {
169         return true;
170     }
171     return dbInfoHandle->IsNeedAutoSync(userId, appId, storeId, { devInfo });
172 }
173 
SetRemoteOptimizeCommunication(const std::string & dev,bool optimize)174 void DBStatusAdapter::SetRemoteOptimizeCommunication(const std::string &dev, bool optimize)
175 {
176     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
177     if (dbInfoHandle == nullptr) {
178         LOGD("[DBStatusAdapter][SetRemoteOptimizeCommunication] handle not set");
179         return;
180     }
181     bool triggerLocalCallback = false;
182     {
183         std::lock_guard<std::mutex> autoLock(supportMutex_);
184         if (remoteOptimizeInfo_.find(dev) == remoteOptimizeInfo_.end()) {
185             remoteOptimizeInfo_[dev] = optimize;
186             return;
187         }
188         if (remoteOptimizeInfo_[dev] == optimize) {
189             return;
190         }
191         if (remoteOptimizeInfo_[dev]) {
192             triggerLocalCallback = true;
193         }
194         remoteOptimizeInfo_[dev] = optimize;
195     }
196     if (!triggerLocalCallback) {
197         return;
198     }
199     LOGI("[DBStatusAdapter][SetRemoteOptimizeCommunication] remote dev %.3s optimize change!", dev.c_str());
200     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, dev]() {
201         RemoteSupportChangeCallback callback;
202         {
203             std::lock_guard<std::mutex> autoLock(callbackMutex_);
204             callback = supportCallback_;
205         }
206         if (callback) {
207             callback(dev);
208         }
209     });
210     if (errCode != E_OK) {
211         LOGW("[DBStatusAdapter][SetRemoteOptimizeCommunication] Schedule Task failed! errCode = %d", errCode);
212     }
213 }
214 
IsSendLabelExchange()215 bool DBStatusAdapter::IsSendLabelExchange()
216 {
217     std::shared_ptr<DBInfoHandle> dbInfoHandle = GetDBInfoHandle();
218     if (dbInfoHandle == nullptr) {
219         LOGD("[DBStatusAdapter][IsSendLabelExchange] handle not set");
220         return true;
221     }
222     {
223         std::lock_guard<std::mutex> autoLock(supportMutex_);
224         if (cacheLocalSendLabelExchange_) {
225             return localSendLabelExchange_;
226         }
227     }
228     bool isSupport = dbInfoHandle->IsSupport();
229     std::lock_guard<std::mutex> autoLock(supportMutex_);
230     LOGD("[DBStatusAdapter][IsSendLabelExchange] local support status is %d", isSupport);
231     localSendLabelExchange_ = !isSupport;
232     cacheLocalSendLabelExchange_ = true;
233     return localSendLabelExchange_;
234 }
235 
GetDBInfoHandle() const236 std::shared_ptr<DBInfoHandle> DBStatusAdapter::GetDBInfoHandle() const
237 {
238     std::lock_guard<std::mutex> autoLock(handleMutex_);
239     return dbInfoHandle_;
240 }
241 
LoadIntoCache(bool isLocal,const DeviceInfos & devInfos,const std::vector<DBInfo> & dbInfos)242 bool DBStatusAdapter::LoadIntoCache(bool isLocal, const DeviceInfos &devInfos, const std::vector<DBInfo> &dbInfos)
243 {
244     if (isLocal) {
245         std::lock_guard<std::mutex> autoLock(localInfoMutex_);
246         return MergeDBInfos(dbInfos, localDBInfos_);
247     }
248     std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
249     if (remoteDBInfos_.find(devInfos.identifier) == remoteDBInfos_.end()) {
250         remoteDBInfos_.insert({devInfos.identifier, {}});
251     }
252     return MergeDBInfos(dbInfos, remoteDBInfos_[devInfos.identifier]);
253 }
254 
ClearAllCache()255 void DBStatusAdapter::ClearAllCache()
256 {
257     {
258         std::lock_guard<std::mutex> autoLock(localInfoMutex_);
259         localDBInfos_.clear();
260     }
261     {
262         std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
263         remoteDBInfos_.clear();
264     }
265     std::lock_guard<std::mutex> autoLock(supportMutex_);
266     remoteOptimizeInfo_.clear();
267     cacheLocalSendLabelExchange_ = false;
268     localSendLabelExchange_ = false;
269     LOGD("[DBStatusAdapter] ClearAllCache ok");
270 }
271 
NotifyRemoteOffline()272 void DBStatusAdapter::NotifyRemoteOffline()
273 {
274     std::map<std::string, std::vector<DBInfo>> remoteOnlineDBInfos;
275     {
276         std::lock_guard<std::mutex> autoLock(remoteInfoMutex_);
277         for (const auto &[dev, dbInfos]: remoteDBInfos_) {
278             for (const auto &dbInfo: dbInfos) {
279                 if (dbInfo.isNeedSync) {
280                     DBInfo info = dbInfo;
281                     info.isNeedSync = false;
282                     remoteOnlineDBInfos[dev].push_back(info);
283                 }
284             }
285         }
286     }
287     RemoteDBChangeCallback callback;
288     {
289         std::lock_guard<std::mutex> autoLock(callbackMutex_);
290         callback = remoteCallback_;
291     }
292     if (callback == nullptr || remoteOnlineDBInfos.empty()) {
293         LOGD("[DBStatusAdapter] no need to notify offline when reset handle");
294         return;
295     }
296     for (const auto &[dev, dbInfos]: remoteOnlineDBInfos) {
297         callback(dev, dbInfos);
298     }
299     LOGD("[DBStatusAdapter] Notify offline ok");
300 }
301 
MergeDBInfos(const std::vector<DBInfo> & srcDbInfos,std::vector<DBInfo> & dstDbInfos)302 bool DBStatusAdapter::MergeDBInfos(const std::vector<DBInfo> &srcDbInfos, std::vector<DBInfo> &dstDbInfos)
303 {
304     bool isDbInfoChange = false;
305     for (const auto &srcInfo: srcDbInfos) {
306         if (!ParamCheckUtils::CheckStoreParameter({srcInfo.storeId, srcInfo.appId, srcInfo.userId}, false, "", true)) {
307             continue;
308         }
309         auto res = std::find_if(dstDbInfos.begin(), dstDbInfos.end(), [&srcInfo](const DBInfo &dstInfo) {
310             return srcInfo.appId == dstInfo.appId && srcInfo.userId == dstInfo.userId &&
311                 srcInfo.storeId == dstInfo.storeId && srcInfo.syncDualTupleMode == dstInfo.syncDualTupleMode;
312         });
313         if (res == dstDbInfos.end()) {
314             dstDbInfos.push_back(srcInfo);
315             isDbInfoChange = true;
316         } else if (res->isNeedSync != srcInfo.isNeedSync) {
317             res->isNeedSync = srcInfo.isNeedSync;
318             isDbInfoChange = true;
319         }
320     }
321     return isDbInfoChange;
322 }
323 
GetLocalDeviceId(std::string & deviceId)324 int DBStatusAdapter::GetLocalDeviceId(std::string &deviceId)
325 {
326     ICommunicatorAggregator *communicatorAggregator = nullptr;
327     int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator);
328     if (errCode != E_OK) {
329         LOGE("[DBStatusAdapter][GetLocalDeviceId] Get ICommunicatorAggregator error: %d", errCode);
330         return errCode;
331     }
332     return communicatorAggregator->GetLocalIdentity(deviceId);
333 }
334 
IsLocalDeviceId(const std::string & deviceId)335 bool DBStatusAdapter::IsLocalDeviceId(const std::string &deviceId)
336 {
337     std::string localId;
338     if (GetLocalDeviceId(localId) != E_OK) {
339         LOGE("[DBStatusAdapter][IsLocalDeviceId] Get local device ID failed.");
340         return false;
341     }
342     return deviceId == localId;
343 }
344 }