1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "DelayNotify"
16 #include "delay_notify.h"
17 #include "logger.h"
18 namespace OHOS::NativeRdb {
19 using namespace OHOS::Rdb;
DelayNotify()20 DelayNotify::DelayNotify() : pauseCount_(0), task_(nullptr), pool_(nullptr)
21 {
22 }
23
~DelayNotify()24 DelayNotify::~DelayNotify()
25 {
26 if (pool_ == nullptr) {
27 return;
28 }
29 if (delaySyncTaskId_ != Executor::INVALID_TASK_ID) {
30 pool_->Remove(delaySyncTaskId_);
31 }
32 if (task_ != nullptr && changedData_.tableData.size() > 0) {
33 DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
34 rdbNotifyConfig.delay_ = 0;
35 rdbNotifyConfig.isFull_ = isFull_;
36 auto errCode = task_(changedData_, rdbNotifyConfig);
37 if (errCode != 0) {
38 LOG_ERROR("NotifyDataChange is failed, err is %{public}d.", errCode);
39 }
40 }
41 }
42
UpdateNotify(const DistributedRdb::RdbChangedData & changedData,bool isFull)43 void DelayNotify::UpdateNotify(const DistributedRdb::RdbChangedData &changedData, bool isFull)
44 {
45 LOG_DEBUG("Update changed data.");
46 {
47 std::lock_guard<std::mutex> lock(mutex_);
48 for (auto& [k, v] : changedData.tableData) {
49 if (!v.isTrackedDataChange) {
50 continue;
51 }
52 auto it = changedData_.tableData.find(k);
53 if (it == changedData_.tableData.end()) {
54 changedData_.tableData.insert_or_assign(k, v);
55 }
56 }
57 isFull_ |= isFull;
58 }
59 StartTimer();
60 }
61
SetExecutorPool(std::shared_ptr<ExecutorPool> pool)62 void DelayNotify::SetExecutorPool(std::shared_ptr<ExecutorPool> pool)
63 {
64 if (pool_ != nullptr) {
65 return;
66 }
67 pool_ = pool;
68 }
69
SetTask(Task task)70 void DelayNotify::SetTask(Task task)
71 {
72 task_ = std::move(task);
73 }
74
StartTimer()75 void DelayNotify::StartTimer()
76 {
77 DistributedRdb::RdbChangedData changedData;
78 bool needExecTask = false;
79 bool isFull = false;
80 {
81 std::lock_guard<std::mutex> lock(mutex_);
82 changedData.tableData = changedData_.tableData;
83 isFull = isFull_;
84 if (pool_ == nullptr) {
85 return;
86 }
87
88 if (delaySyncTaskId_ == Executor::INVALID_TASK_ID) {
89 delaySyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(autoSyncInterval_),
90 [this]() { ExecuteTask(); });
91 } else {
92 delaySyncTaskId_ =
93 pool_->Reset(delaySyncTaskId_, std::chrono::milliseconds(autoSyncInterval_));
94 }
95
96 if (changedData.tableData.empty()) {
97 return;
98 }
99
100 if (!isInitialized_) {
101 needExecTask = true;
102 lastTimePoint_ = std::chrono::steady_clock::now();
103 isInitialized_ = true;
104 } else {
105 Time curTime = std::chrono::steady_clock::now();
106 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(curTime - lastTimePoint_);
107 if (duration >= std::chrono::milliseconds(MAX_NOTIFY_INTERVAL)) {
108 needExecTask = true;
109 lastTimePoint_ = std::chrono::steady_clock::now();
110 }
111 }
112 }
113
114 if (needExecTask) {
115 DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
116 rdbNotifyConfig.delay_ = SERVICE_INTERVAL;
117 rdbNotifyConfig.isFull_ = isFull;
118 task_(changedData, rdbNotifyConfig);
119 }
120 }
121
StopTimer()122 void DelayNotify::StopTimer()
123 {
124 if (pool_ != nullptr) {
125 pool_->Remove(delaySyncTaskId_);
126 }
127 delaySyncTaskId_ = Executor::INVALID_TASK_ID;
128 }
129
ExecuteTask()130 void DelayNotify::ExecuteTask()
131 {
132 LOG_DEBUG("Notify data change.");
133 DistributedRdb::RdbChangedData changedData;
134 bool isFull = false;
135 {
136 std::lock_guard<std::mutex> lock(mutex_);
137 changedData.tableData = std::move(changedData_.tableData);
138 isFull = isFull_;
139 RestoreDefaultSyncInterval();
140 StopTimer();
141 isFull_ = false;
142 isInitialized_ = false;
143 }
144 if (task_ != nullptr && (changedData.tableData.size() > 0 || isFull)) {
145 DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
146 rdbNotifyConfig.delay_ = 0;
147 rdbNotifyConfig.isFull_ = isFull;
148 int errCode = task_(changedData, rdbNotifyConfig);
149 if (errCode != 0) {
150 LOG_ERROR("NotifyDataChange is failed, err is %{public}d.", errCode);
151 std::lock_guard<std::mutex> lock(mutex_);
152 for (auto& [k, v] : changedData.tableData) {
153 changedData_.tableData.insert_or_assign(k, v);
154 }
155 return;
156 }
157 }
158 }
159
SetAutoSyncInterval(uint32_t interval)160 void DelayNotify::SetAutoSyncInterval(uint32_t interval)
161 {
162 autoSyncInterval_ = interval;
163 }
164
RestoreDefaultSyncInterval()165 void DelayNotify::RestoreDefaultSyncInterval()
166 {
167 autoSyncInterval_ = AUTO_SYNC_INTERVAL;
168 }
169
Pause()170 void DelayNotify::Pause()
171 {
172 StopTimer();
173 pauseCount_.fetch_add(1, std::memory_order_relaxed);
174 }
175
Resume()176 void DelayNotify::Resume()
177 {
178 pauseCount_.fetch_sub(1, std::memory_order_relaxed);
179 if (pauseCount_.load() == 0) {
180 StartTimer();
181 }
182 }
183
PauseDelayNotify(std::shared_ptr<DelayNotify> delayNotifier)184 PauseDelayNotify::PauseDelayNotify(std::shared_ptr<DelayNotify> delayNotifier) : delayNotifier_(delayNotifier)
185 {
186 if (delayNotifier_ != nullptr) {
187 delayNotifier_->Pause();
188 delayNotifier_->SetAutoSyncInterval(AUTO_SYNC_MAX_INTERVAL);
189 }
190 }
191
~PauseDelayNotify()192 PauseDelayNotify::~PauseDelayNotify()
193 {
194 if (delayNotifier_ != nullptr) {
195 delayNotifier_->Resume();
196 }
197 }
198 }