1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "sync_able_engine.h"
16 
17 #include "db_dump_helper.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "parcel.h"
21 #include "ref_object.h"
22 #include "relational_sync_able_storage.h"
23 #include "runtime_context.h"
24 #include "user_change_monitor.h"
25 
26 namespace DistributedDB {
SyncAbleEngine(ISyncInterface * store)27 SyncAbleEngine::SyncAbleEngine(ISyncInterface *store)
28     : syncer_(),
29       started_(false),
30       closed_(false),
31       isSyncModuleActiveCheck_(false),
32       isSyncNeedActive_(true),
33       store_(store),
34       userChangeListener_(nullptr)
35 {}
36 
~SyncAbleEngine()37 SyncAbleEngine::~SyncAbleEngine()
38 {
39     if (userChangeListener_ != nullptr) {
40         userChangeListener_->Drop(true);
41         userChangeListener_ = nullptr;
42     }
43 }
44 
45 // Start a sync action.
Sync(const ISyncer::SyncParma & parm,uint64_t connectionId)46 int SyncAbleEngine::Sync(const ISyncer::SyncParma &parm, uint64_t connectionId)
47 {
48     if (!started_) {
49         int errCode = StartSyncer();
50         if (!started_) {
51             return errCode;
52         }
53     }
54     return syncer_.Sync(parm, connectionId);
55 }
56 
WakeUpSyncer()57 void SyncAbleEngine::WakeUpSyncer()
58 {
59     StartSyncer();
60 }
61 
Close()62 void SyncAbleEngine::Close()
63 {
64     StopSyncer();
65 }
66 
67 // Get The current virtual timestamp
GetTimestamp()68 uint64_t SyncAbleEngine::GetTimestamp()
69 {
70     if (NeedStartSyncer()) {
71         StartSyncer();
72     }
73     return syncer_.GetTimestamp();
74 }
75 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)76 int SyncAbleEngine::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
77 {
78     if (NeedStartSyncer()) {
79         int errCode = StartSyncer();
80         if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
81             return errCode;
82         }
83     }
84     return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash, tableName);
85 }
86 
87 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)88 int SyncAbleEngine::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
89 {
90     int errCode = E_OK;
91     {
92         std::unique_lock<std::mutex> lock(syncerOperateLock_);
93         errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
94         closed_ = false;
95     }
96     UserChangeHandle();
97     return errCode;
98 }
99 
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)100 int SyncAbleEngine::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
101 {
102     if (store_ == nullptr) {
103         LOGF("RDB got null sync interface.");
104         return -E_INVALID_ARGS;
105     }
106     if (!isCheckSyncActive) {
107         SetSyncModuleActive();
108         isNeedActive = GetSyncModuleActive();
109     }
110 
111     int errCode = syncer_.Initialize(store_, isNeedActive);
112     if (errCode == E_OK) {
113         started_ = true;
114     } else {
115         LOGE("RDB start syncer failed, err:'%d'.", errCode);
116     }
117 
118     bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
119     std::string label = store_->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
120     if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
121         // active to non_active
122         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
123             [this](void *) { ChangeUserListener(); }, UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
124         LOGI("[StartSyncerWithNoLock] [%.3s] After RegisterUserChangedListener", label.c_str());
125     } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
126         EventType event = isNeedActive ?
127             UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
128         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
129             [this](void *) { UserChangeHandle(); }, event);
130         LOGI("[StartSyncerWithNoLock] [%.3s] After RegisterUserChangedListener event=%d", label.c_str(), event);
131     }
132     return errCode;
133 }
134 
135 // Stop syncer
StopSyncer()136 void SyncAbleEngine::StopSyncer()
137 {
138     NotificationChain::Listener *userChangeListener = nullptr;
139     {
140         std::unique_lock<std::mutex> lock(syncerOperateLock_);
141         StopSyncerWithNoLock(true);
142         userChangeListener = userChangeListener_;
143         userChangeListener_ = nullptr;
144     }
145     if (userChangeListener != nullptr) {
146         userChangeListener->Drop(true);
147         userChangeListener = nullptr;
148     }
149 }
150 
StopSyncerWithNoLock(bool isClosedOperation)151 void SyncAbleEngine::StopSyncerWithNoLock(bool isClosedOperation)
152 {
153     ReSetSyncModuleActive();
154     syncer_.Close(isClosedOperation);
155     if (started_) {
156         started_ = false;
157     }
158     closed_ = isClosedOperation;
159     if (!isClosedOperation && userChangeListener_ != nullptr) {
160         userChangeListener_->Drop(false);
161         userChangeListener_ = nullptr;
162     }
163 }
164 
UserChangeHandle()165 void SyncAbleEngine::UserChangeHandle()
166 {
167     if (store_ == nullptr) {
168         LOGD("[SyncAbleEngine] RDB got null sync interface in userChange.");
169         return;
170     }
171     bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
172     if (!isSyncDualTupleMode) {
173         LOGD("[SyncAbleEngine] no use syncDualTupleMode, abort userChange");
174         return;
175     }
176     std::unique_lock<std::mutex> lock(syncerOperateLock_);
177     if (closed_) {
178         LOGI("RDB is already closed");
179         return;
180     }
181     bool isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
182     bool isNeedChange = (isNeedActive != isSyncNeedActive_);
183     // non_active to active or active to non_active
184     if (isNeedChange) {
185         StopSyncerWithNoLock(); // will drop userChangeListener
186         isSyncModuleActiveCheck_ = true;
187         isSyncNeedActive_ = isNeedActive;
188         StartSyncerWithNoLock(true, isNeedActive);
189     }
190 }
191 
ChangeUserListener()192 void SyncAbleEngine::ChangeUserListener()
193 {
194     // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
195     if (userChangeListener_ != nullptr) {
196         userChangeListener_->Drop(false);
197         userChangeListener_ = nullptr;
198     }
199     userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
200         [this](void *) { UserChangeHandle(); }, UserChangeMonitor::USER_NON_ACTIVE_EVENT);
201     std::string label = store_->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
202     LOGI("[ChangeUserListener] [%.3s] After RegisterUserChangedListener", label.c_str());
203 }
204 
SetSyncModuleActive()205 void SyncAbleEngine::SetSyncModuleActive()
206 {
207     if (isSyncModuleActiveCheck_) {
208         return;
209     }
210 
211     bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
212     if (!isSyncDualTupleMode) {
213         isSyncNeedActive_ = true;
214         isSyncModuleActiveCheck_ = true;
215         return;
216     }
217     isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
218     if (!isSyncNeedActive_) {
219         LOGI("syncer no need to active");
220     }
221     isSyncModuleActiveCheck_ = true;
222 }
223 
GetSyncModuleActive()224 bool SyncAbleEngine::GetSyncModuleActive()
225 {
226     return isSyncNeedActive_;
227 }
228 
ReSetSyncModuleActive()229 void SyncAbleEngine::ReSetSyncModuleActive()
230 {
231     isSyncModuleActiveCheck_ = false;
232     isSyncNeedActive_ = true;
233 }
234 
GetLocalIdentity(std::string & outTarget)235 int SyncAbleEngine::GetLocalIdentity(std::string &outTarget)
236 {
237     if (!started_) {
238         StartSyncer();
239     }
240     return syncer_.GetLocalIdentity(outTarget);
241 }
242 
StopSync(uint64_t connectionId)243 void SyncAbleEngine::StopSync(uint64_t connectionId)
244 {
245     if (started_) {
246         syncer_.StopSync(connectionId);
247     }
248 }
249 
Dump(int fd)250 void SyncAbleEngine::Dump(int fd)
251 {
252     SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
253     DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
254         basicInfo.isAutoSync);
255     if (basicInfo.isSyncActive) {
256         DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
257         syncer_.Dump(fd);
258     }
259 }
260 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)261 int SyncAbleEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
262     uint64_t connectionId, std::shared_ptr<ResultSet> &result)
263 {
264     if (!started_) {
265         int errCode = StartSyncer();
266         if (!started_) {
267             return errCode;
268         }
269     }
270     return syncer_.RemoteQuery(device, condition, timeout, connectionId, result);
271 }
272 
NeedStartSyncer() const273 bool SyncAbleEngine::NeedStartSyncer() const
274 {
275     if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
276         LOGW("Engine communicator not ready!");
277         return false;
278     }
279     // don't start when check callback got not active
280     // equivalent to !(!isSyncNeedActive_ && isSyncModuleActiveCheck_)
281     return !started_ && (isSyncNeedActive_ || !isSyncModuleActiveCheck_);
282 }
283 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)284 int SyncAbleEngine::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
285 {
286     if (NeedStartSyncer()) {
287         int errCode = StartSyncer();
288         if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
289             return errCode;
290         }
291     }
292     return syncer_.GetHashDeviceId(clientId, hashDevId);
293 }
294 }
295