1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "SchedulerManager"
16
17 #include "scheduler_manager.h"
18
19 #include <cinttypes>
20
21 #include "log_print.h"
22 #include "timer_info.h"
23 #include "uri_utils.h"
24 #include "utils/anonymous.h"
25
26 namespace OHOS::DataShare {
27 static constexpr int64_t MAX_MILLISECONDS = 31536000000; // 365 days
28 static constexpr int32_t DELAYED_MILLISECONDS = 200;
GetInstance()29 SchedulerManager &SchedulerManager::GetInstance()
30 {
31 static SchedulerManager instance;
32 return instance;
33 }
34
Execute(const std::string & uri,const int32_t userId,const std::string & rdbDir,int version,const std::string & bundleName)35 void SchedulerManager::Execute(const std::string &uri, const int32_t userId, const std::string &rdbDir, int version,
36 const std::string &bundleName)
37 {
38 if (!URIUtils::IsDataProxyURI(uri)) {
39 return;
40 }
41 DistributedData::StoreMetaData meta;
42 meta.dataDir = rdbDir;
43 meta.bundleName = bundleName;
44 auto delegate = DBDelegate::Create(meta);
45 if (delegate == nullptr) {
46 ZLOGE("malloc fail %{public}s", DistributedData::Anonymous::Change(uri).c_str());
47 return;
48 }
49 std::vector<Key> keys = RdbSubscriberManager::GetInstance().GetKeysByUri(uri);
50 for (auto const &key : keys) {
51 ExecuteSchedulerSQL(rdbDir, userId, version, key, delegate);
52 }
53 }
54
Execute(const Key & key,const int32_t userId,const std::string & rdbDir,int version)55 void SchedulerManager::Execute(const Key &key, const int32_t userId, const std::string &rdbDir, int version)
56 {
57 DistributedData::StoreMetaData meta;
58 meta.dataDir = rdbDir;
59 meta.bundleName = key.bundleName;
60 auto delegate = DBDelegate::Create(meta);
61 if (delegate == nullptr) {
62 ZLOGE("malloc fail %{public}s", DistributedData::Anonymous::Change(key.uri).c_str());
63 return;
64 }
65 ExecuteSchedulerSQL(rdbDir, userId, version, key, delegate);
66 }
67
SetTimerTask(uint64_t & timerId,const std::function<void ()> & callback,int64_t reminderTime)68 bool SchedulerManager::SetTimerTask(uint64_t &timerId, const std::function<void()> &callback,
69 int64_t reminderTime)
70 {
71 auto timerInfo = std::make_shared<TimerInfo>();
72 timerInfo->SetType(timerInfo->TIMER_TYPE_EXACT);
73 timerInfo->SetRepeat(false);
74 auto wantAgent = std::shared_ptr<AbilityRuntime::WantAgent::WantAgent>();
75 timerInfo->SetWantAgent(wantAgent);
76 timerInfo->SetCallbackInfo(callback);
77 timerId = TimeServiceClient::GetInstance()->CreateTimer(timerInfo);
78 if (timerId == 0) {
79 return false;
80 }
81 TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
82 return true;
83 }
84
DestoryTimerTask(int64_t timerId)85 void SchedulerManager::DestoryTimerTask(int64_t timerId)
86 {
87 if (timerId > 0) {
88 TimeServiceClient::GetInstance()->DestroyTimer(timerId);
89 }
90 }
91
ResetTimerTask(int64_t timerId,int64_t reminderTime)92 void SchedulerManager::ResetTimerTask(int64_t timerId, int64_t reminderTime)
93 {
94 // This start also means reset, new one will replace old one
95 TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
96 }
97
SetTimer(const std::string & dbPath,const int32_t userId,int version,const Key & key,int64_t reminderTime)98 void SchedulerManager::SetTimer(
99 const std::string &dbPath, const int32_t userId, int version, const Key &key, int64_t reminderTime)
100 {
101 std::lock_guard<std::mutex> lock(mutex_);
102 if (executor_ == nullptr) {
103 ZLOGE("executor_ is nullptr");
104 return;
105 }
106 int64_t now = 0;
107 TimeServiceClient::GetInstance()->GetWallTimeMs(now);
108 if (reminderTime <= now || reminderTime - now >= MAX_MILLISECONDS) {
109 ZLOGE("invalid args, %{public}" PRId64 ", %{public}" PRId64 ", subId=%{public}" PRId64
110 ", bundleName=%{public}s.", reminderTime, now, key.subscriberId, key.bundleName.c_str());
111 return;
112 }
113 auto duration = reminderTime - now;
114 ZLOGI("the task will notify in %{public}" PRId64 " ms, %{public}" PRId64 ", %{public}s.",
115 duration, key.subscriberId, key.bundleName.c_str());
116 auto it = timerCache_.find(key);
117 if (it != timerCache_.end()) {
118 ZLOGD("has current taskId, uri is %{private}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
119 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
120 auto timerId = it->second;
121 ResetTimerTask(timerId, reminderTime);
122 return;
123 }
124 auto callback = [key, dbPath, version, userId, this]() {
125 ZLOGI("schedule notify start, uri is %{private}s, subscriberId is %{public}" PRId64 ", bundleName is "
126 "%{public}s", DistributedData::Anonymous::Change(key.uri).c_str(),
127 key.subscriberId, key.bundleName.c_str());
128 int64_t timerId = -1;
129 {
130 std::lock_guard<std::mutex> lock(mutex_);
131 auto it = timerCache_.find(key);
132 if (it != timerCache_.end()) {
133 timerId = it->second;
134 timerCache_.erase(key);
135 }
136 }
137 DestoryTimerTask(timerId);
138 Execute(key, userId, dbPath, version);
139 RdbSubscriberManager::GetInstance().EmitByKey(key, userId, dbPath, version);
140 };
141 uint64_t timerId = 0;
142 if (!SetTimerTask(timerId, callback, reminderTime)) {
143 ZLOGE("create timer failed.");
144 return;
145 }
146 ZLOGI("create new task success, uri is %{public}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
147 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
148 timerCache_.emplace(key, timerId);
149 }
150
ExecuteSchedulerSQL(const std::string & rdbDir,const int32_t userId,int version,const Key & key,std::shared_ptr<DBDelegate> delegate)151 void SchedulerManager::ExecuteSchedulerSQL(const std::string &rdbDir, const int32_t userId, int version, const Key &key,
152 std::shared_ptr<DBDelegate> delegate)
153 {
154 Template tpl;
155 if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
156 ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
157 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
158 return;
159 }
160 if (tpl.scheduler_.empty()) {
161 ZLOGW("template scheduler_ empty, %{public}s, %{public}" PRId64 ", %{public}s",
162 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
163 return;
164 }
165 GenRemindTimerFuncParams(rdbDir, userId, version, key, tpl.scheduler_);
166 auto resultSet = delegate->QuerySql(tpl.scheduler_);
167 if (resultSet == nullptr) {
168 ZLOGE("resultSet is nullptr, %{public}s, %{public}" PRId64 ", %{public}s",
169 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
170 return;
171 }
172 int count;
173 int errCode = resultSet->GetRowCount(count);
174 if (errCode != E_OK || count == 0) {
175 ZLOGE("GetRowCount error, %{public}s, %{public}" PRId64 ", %{public}s, errorCode is %{public}d, count is "
176 "%{public}d",
177 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str(), errCode,
178 count);
179 return;
180 }
181 }
182
GenRemindTimerFuncParams(const std::string & rdbDir,const int32_t userId,int version,const Key & key,std::string & schedulerSQL)183 void SchedulerManager::GenRemindTimerFuncParams(
184 const std::string &rdbDir, const int32_t userId, int version, const Key &key, std::string &schedulerSQL)
185 {
186 auto index = schedulerSQL.find(REMIND_TIMER_FUNC);
187 if (index == std::string::npos) {
188 ZLOGW("not find remindTimer, sql is %{public}s", schedulerSQL.c_str());
189 return;
190 }
191 index += REMIND_TIMER_FUNC_LEN;
192 std::string keyStr = "'" + rdbDir + "', " + std::to_string(version) + ", '" + key.uri + "', " +
193 std::to_string(key.subscriberId) + ", '" + key.bundleName + "', " + std::to_string(userId) +
194 ", ";
195 schedulerSQL.insert(index, keyStr);
196 return;
197 }
198
RemoveTimer(const Key & key)199 void SchedulerManager::RemoveTimer(const Key &key)
200 {
201 std::lock_guard<std::mutex> lock(mutex_);
202 if (executor_ == nullptr) {
203 ZLOGE("executor_ is nullptr");
204 return;
205 }
206 auto it = timerCache_.find(key);
207 if (it != timerCache_.end()) {
208 ZLOGW("RemoveTimer %{public}s %{public}s %{public}" PRId64,
209 DistributedData::Anonymous::Change(key.uri).c_str(), key.bundleName.c_str(), key.subscriberId);
210 DestoryTimerTask(it->second);
211 timerCache_.erase(key);
212 }
213 }
214
ClearTimer()215 void SchedulerManager::ClearTimer()
216 {
217 ZLOGI("Clear all timer");
218 std::lock_guard<std::mutex> lock(mutex_);
219 if (executor_ == nullptr) {
220 ZLOGE("executor_ is nullptr");
221 return;
222 }
223 auto it = timerCache_.begin();
224 while (it != timerCache_.end()) {
225 DestoryTimerTask(it->second);
226 it = timerCache_.erase(it);
227 }
228 }
229
SetExecutorPool(std::shared_ptr<ExecutorPool> executor)230 void SchedulerManager::SetExecutorPool(std::shared_ptr<ExecutorPool> executor)
231 {
232 executor_ = executor;
233 }
234
ReExecuteAll()235 void SchedulerManager::ReExecuteAll()
236 {
237 std::lock_guard<std::mutex> lock(mutex_);
238 for (const auto &item : timerCache_) {
239 // restart in 200ms
240 auto timerId = item.second;
241 int64_t currentTime = 0;
242 TimeServiceClient::GetInstance()->GetWallTimeMs(currentTime);
243 ResetTimerTask(timerId, currentTime + DELAYED_MILLISECONDS);
244 }
245 }
246 } // namespace OHOS::DataShare
247
248