1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "sync_able_engine.h"
16
17 #include "db_dump_helper.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "parcel.h"
21 #include "ref_object.h"
22 #include "relational_sync_able_storage.h"
23 #include "runtime_context.h"
24 #include "user_change_monitor.h"
25
26 namespace DistributedDB {
SyncAbleEngine(ISyncInterface * store)27 SyncAbleEngine::SyncAbleEngine(ISyncInterface *store)
28 : syncer_(),
29 started_(false),
30 closed_(false),
31 isSyncModuleActiveCheck_(false),
32 isSyncNeedActive_(true),
33 store_(store),
34 userChangeListener_(nullptr)
35 {}
36
~SyncAbleEngine()37 SyncAbleEngine::~SyncAbleEngine()
38 {
39 if (userChangeListener_ != nullptr) {
40 userChangeListener_->Drop(true);
41 userChangeListener_ = nullptr;
42 }
43 }
44
45 // Start a sync action.
Sync(const ISyncer::SyncParma & parm,uint64_t connectionId)46 int SyncAbleEngine::Sync(const ISyncer::SyncParma &parm, uint64_t connectionId)
47 {
48 if (!started_) {
49 int errCode = StartSyncer();
50 if (!started_) {
51 return errCode;
52 }
53 }
54 return syncer_.Sync(parm, connectionId);
55 }
56
WakeUpSyncer()57 void SyncAbleEngine::WakeUpSyncer()
58 {
59 StartSyncer();
60 }
61
Close()62 void SyncAbleEngine::Close()
63 {
64 StopSyncer();
65 }
66
67 // Get The current virtual timestamp
GetTimestamp()68 uint64_t SyncAbleEngine::GetTimestamp()
69 {
70 if (NeedStartSyncer()) {
71 StartSyncer();
72 }
73 return syncer_.GetTimestamp();
74 }
75
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)76 int SyncAbleEngine::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
77 {
78 if (NeedStartSyncer()) {
79 int errCode = StartSyncer();
80 if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
81 return errCode;
82 }
83 }
84 return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash, tableName);
85 }
86
87 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)88 int SyncAbleEngine::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
89 {
90 int errCode = E_OK;
91 {
92 std::unique_lock<std::mutex> lock(syncerOperateLock_);
93 errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
94 closed_ = false;
95 }
96 UserChangeHandle();
97 return errCode;
98 }
99
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)100 int SyncAbleEngine::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
101 {
102 if (store_ == nullptr) {
103 LOGF("RDB got null sync interface.");
104 return -E_INVALID_ARGS;
105 }
106 if (!isCheckSyncActive) {
107 SetSyncModuleActive();
108 isNeedActive = GetSyncModuleActive();
109 }
110
111 int errCode = syncer_.Initialize(store_, isNeedActive);
112 if (errCode == E_OK) {
113 started_ = true;
114 } else {
115 LOGE("RDB start syncer failed, err:'%d'.", errCode);
116 }
117
118 bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
119 std::string label = store_->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
120 if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
121 // active to non_active
122 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
123 [this](void *) { ChangeUserListener(); }, UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
124 LOGI("[StartSyncerWithNoLock] [%.3s] After RegisterUserChangedListener", label.c_str());
125 } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
126 EventType event = isNeedActive ?
127 UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
128 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
129 [this](void *) { UserChangeHandle(); }, event);
130 LOGI("[StartSyncerWithNoLock] [%.3s] After RegisterUserChangedListener event=%d", label.c_str(), event);
131 }
132 return errCode;
133 }
134
135 // Stop syncer
StopSyncer()136 void SyncAbleEngine::StopSyncer()
137 {
138 NotificationChain::Listener *userChangeListener = nullptr;
139 {
140 std::unique_lock<std::mutex> lock(syncerOperateLock_);
141 StopSyncerWithNoLock(true);
142 userChangeListener = userChangeListener_;
143 userChangeListener_ = nullptr;
144 }
145 if (userChangeListener != nullptr) {
146 userChangeListener->Drop(true);
147 userChangeListener = nullptr;
148 }
149 }
150
StopSyncerWithNoLock(bool isClosedOperation)151 void SyncAbleEngine::StopSyncerWithNoLock(bool isClosedOperation)
152 {
153 ReSetSyncModuleActive();
154 syncer_.Close(isClosedOperation);
155 if (started_) {
156 started_ = false;
157 }
158 closed_ = isClosedOperation;
159 if (!isClosedOperation && userChangeListener_ != nullptr) {
160 userChangeListener_->Drop(false);
161 userChangeListener_ = nullptr;
162 }
163 }
164
UserChangeHandle()165 void SyncAbleEngine::UserChangeHandle()
166 {
167 if (store_ == nullptr) {
168 LOGD("[SyncAbleEngine] RDB got null sync interface in userChange.");
169 return;
170 }
171 bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
172 if (!isSyncDualTupleMode) {
173 LOGD("[SyncAbleEngine] no use syncDualTupleMode, abort userChange");
174 return;
175 }
176 std::unique_lock<std::mutex> lock(syncerOperateLock_);
177 if (closed_) {
178 LOGI("RDB is already closed");
179 return;
180 }
181 bool isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
182 bool isNeedChange = (isNeedActive != isSyncNeedActive_);
183 // non_active to active or active to non_active
184 if (isNeedChange) {
185 StopSyncerWithNoLock(); // will drop userChangeListener
186 isSyncModuleActiveCheck_ = true;
187 isSyncNeedActive_ = isNeedActive;
188 StartSyncerWithNoLock(true, isNeedActive);
189 }
190 }
191
ChangeUserListener()192 void SyncAbleEngine::ChangeUserListener()
193 {
194 // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
195 if (userChangeListener_ != nullptr) {
196 userChangeListener_->Drop(false);
197 userChangeListener_ = nullptr;
198 }
199 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
200 [this](void *) { UserChangeHandle(); }, UserChangeMonitor::USER_NON_ACTIVE_EVENT);
201 std::string label = store_->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
202 LOGI("[ChangeUserListener] [%.3s] After RegisterUserChangedListener", label.c_str());
203 }
204
SetSyncModuleActive()205 void SyncAbleEngine::SetSyncModuleActive()
206 {
207 if (isSyncModuleActiveCheck_) {
208 return;
209 }
210
211 bool isSyncDualTupleMode = store_->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false);
212 if (!isSyncDualTupleMode) {
213 isSyncNeedActive_ = true;
214 isSyncModuleActiveCheck_ = true;
215 return;
216 }
217 isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(store_->GetDbProperties());
218 if (!isSyncNeedActive_) {
219 LOGI("syncer no need to active");
220 }
221 isSyncModuleActiveCheck_ = true;
222 }
223
GetSyncModuleActive()224 bool SyncAbleEngine::GetSyncModuleActive()
225 {
226 return isSyncNeedActive_;
227 }
228
ReSetSyncModuleActive()229 void SyncAbleEngine::ReSetSyncModuleActive()
230 {
231 isSyncModuleActiveCheck_ = false;
232 isSyncNeedActive_ = true;
233 }
234
GetLocalIdentity(std::string & outTarget)235 int SyncAbleEngine::GetLocalIdentity(std::string &outTarget)
236 {
237 if (!started_) {
238 StartSyncer();
239 }
240 return syncer_.GetLocalIdentity(outTarget);
241 }
242
StopSync(uint64_t connectionId)243 void SyncAbleEngine::StopSync(uint64_t connectionId)
244 {
245 if (started_) {
246 syncer_.StopSync(connectionId);
247 }
248 }
249
Dump(int fd)250 void SyncAbleEngine::Dump(int fd)
251 {
252 SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
253 DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
254 basicInfo.isAutoSync);
255 if (basicInfo.isSyncActive) {
256 DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
257 syncer_.Dump(fd);
258 }
259 }
260
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)261 int SyncAbleEngine::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
262 uint64_t connectionId, std::shared_ptr<ResultSet> &result)
263 {
264 if (!started_) {
265 int errCode = StartSyncer();
266 if (!started_) {
267 return errCode;
268 }
269 }
270 return syncer_.RemoteQuery(device, condition, timeout, connectionId, result);
271 }
272
NeedStartSyncer() const273 bool SyncAbleEngine::NeedStartSyncer() const
274 {
275 if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
276 LOGW("Engine communicator not ready!");
277 return false;
278 }
279 // don't start when check callback got not active
280 // equivalent to !(!isSyncNeedActive_ && isSyncModuleActiveCheck_)
281 return !started_ && (isSyncNeedActive_ || !isSyncModuleActiveCheck_);
282 }
283
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)284 int SyncAbleEngine::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
285 {
286 if (NeedStartSyncer()) {
287 int errCode = StartSyncer();
288 if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
289 return errCode;
290 }
291 }
292 return syncer_.GetHashDeviceId(clientId, hashDevId);
293 }
294 }
295