1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "SchedulerManager"
16 
17 #include "scheduler_manager.h"
18 
19 #include <cinttypes>
20 
21 #include "log_print.h"
22 #include "timer_info.h"
23 #include "uri_utils.h"
24 #include "utils/anonymous.h"
25 
26 namespace OHOS::DataShare {
27 static constexpr int64_t MAX_MILLISECONDS = 31536000000; // 365 days
28 static constexpr int32_t DELAYED_MILLISECONDS = 200;
GetInstance()29 SchedulerManager &SchedulerManager::GetInstance()
30 {
31     static SchedulerManager instance;
32     return instance;
33 }
34 
Execute(const std::string & uri,const int32_t userId,const std::string & rdbDir,int version,const std::string & bundleName)35 void SchedulerManager::Execute(const std::string &uri, const int32_t userId, const std::string &rdbDir, int version,
36     const std::string &bundleName)
37 {
38     if (!URIUtils::IsDataProxyURI(uri)) {
39         return;
40     }
41     DistributedData::StoreMetaData meta;
42     meta.dataDir = rdbDir;
43     meta.bundleName = bundleName;
44     auto delegate = DBDelegate::Create(meta);
45     if (delegate == nullptr) {
46         ZLOGE("malloc fail %{public}s", DistributedData::Anonymous::Change(uri).c_str());
47         return;
48     }
49     std::vector<Key> keys = RdbSubscriberManager::GetInstance().GetKeysByUri(uri);
50     for (auto const &key : keys) {
51         ExecuteSchedulerSQL(rdbDir, userId, version, key, delegate);
52     }
53 }
54 
Execute(const Key & key,const int32_t userId,const std::string & rdbDir,int version)55 void SchedulerManager::Execute(const Key &key, const int32_t userId, const std::string &rdbDir, int version)
56 {
57     DistributedData::StoreMetaData meta;
58     meta.dataDir = rdbDir;
59     meta.bundleName = key.bundleName;
60     auto delegate = DBDelegate::Create(meta);
61     if (delegate == nullptr) {
62         ZLOGE("malloc fail %{public}s", DistributedData::Anonymous::Change(key.uri).c_str());
63         return;
64     }
65     ExecuteSchedulerSQL(rdbDir, userId, version, key, delegate);
66 }
67 
SetTimerTask(uint64_t & timerId,const std::function<void ()> & callback,int64_t reminderTime)68 bool SchedulerManager::SetTimerTask(uint64_t &timerId, const std::function<void()> &callback,
69     int64_t reminderTime)
70 {
71     auto timerInfo = std::make_shared<TimerInfo>();
72     timerInfo->SetType(timerInfo->TIMER_TYPE_EXACT);
73     timerInfo->SetRepeat(false);
74     auto wantAgent = std::shared_ptr<AbilityRuntime::WantAgent::WantAgent>();
75     timerInfo->SetWantAgent(wantAgent);
76     timerInfo->SetCallbackInfo(callback);
77     timerId = TimeServiceClient::GetInstance()->CreateTimer(timerInfo);
78     if (timerId == 0) {
79         return false;
80     }
81     TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
82     return true;
83 }
84 
DestoryTimerTask(int64_t timerId)85 void SchedulerManager::DestoryTimerTask(int64_t timerId)
86 {
87     if (timerId > 0) {
88         TimeServiceClient::GetInstance()->DestroyTimer(timerId);
89     }
90 }
91 
ResetTimerTask(int64_t timerId,int64_t reminderTime)92 void SchedulerManager::ResetTimerTask(int64_t timerId, int64_t reminderTime)
93 {
94     // This start also means reset, new one will replace old one
95     TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast<uint64_t>(reminderTime));
96 }
97 
SetTimer(const std::string & dbPath,const int32_t userId,int version,const Key & key,int64_t reminderTime)98 void SchedulerManager::SetTimer(
99     const std::string &dbPath, const int32_t userId, int version, const Key &key, int64_t reminderTime)
100 {
101     std::lock_guard<std::mutex> lock(mutex_);
102     if (executor_ == nullptr) {
103         ZLOGE("executor_ is nullptr");
104         return;
105     }
106     int64_t now = 0;
107     TimeServiceClient::GetInstance()->GetWallTimeMs(now);
108     if (reminderTime <= now || reminderTime - now >= MAX_MILLISECONDS) {
109         ZLOGE("invalid args, %{public}" PRId64 ", %{public}" PRId64 ", subId=%{public}" PRId64
110             ", bundleName=%{public}s.", reminderTime, now, key.subscriberId, key.bundleName.c_str());
111         return;
112     }
113     auto duration = reminderTime - now;
114     ZLOGI("the task will notify in %{public}" PRId64 " ms, %{public}" PRId64 ", %{public}s.",
115           duration, key.subscriberId, key.bundleName.c_str());
116     auto it = timerCache_.find(key);
117     if (it != timerCache_.end()) {
118         ZLOGD("has current taskId, uri is %{private}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
119             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
120         auto timerId = it->second;
121         ResetTimerTask(timerId, reminderTime);
122         return;
123     }
124     auto callback = [key, dbPath, version, userId, this]() {
125         ZLOGI("schedule notify start, uri is %{private}s, subscriberId is %{public}" PRId64 ", bundleName is "
126             "%{public}s", DistributedData::Anonymous::Change(key.uri).c_str(),
127             key.subscriberId, key.bundleName.c_str());
128         int64_t timerId = -1;
129         {
130             std::lock_guard<std::mutex> lock(mutex_);
131             auto it = timerCache_.find(key);
132             if (it != timerCache_.end()) {
133                 timerId = it->second;
134                 timerCache_.erase(key);
135             }
136         }
137         DestoryTimerTask(timerId);
138         Execute(key, userId, dbPath, version);
139         RdbSubscriberManager::GetInstance().EmitByKey(key, userId, dbPath, version);
140     };
141     uint64_t timerId = 0;
142     if (!SetTimerTask(timerId, callback, reminderTime)) {
143         ZLOGE("create timer failed.");
144         return;
145     }
146     ZLOGI("create new task success, uri is %{public}s, subscriberId is %{public}" PRId64 ", bundleName is %{public}s",
147         DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
148     timerCache_.emplace(key, timerId);
149 }
150 
ExecuteSchedulerSQL(const std::string & rdbDir,const int32_t userId,int version,const Key & key,std::shared_ptr<DBDelegate> delegate)151 void SchedulerManager::ExecuteSchedulerSQL(const std::string &rdbDir, const int32_t userId, int version, const Key &key,
152     std::shared_ptr<DBDelegate> delegate)
153 {
154     Template tpl;
155     if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
156         ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
157             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
158         return;
159     }
160     if (tpl.scheduler_.empty()) {
161         ZLOGW("template scheduler_ empty, %{public}s, %{public}" PRId64 ", %{public}s",
162             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
163         return;
164     }
165     GenRemindTimerFuncParams(rdbDir, userId, version, key, tpl.scheduler_);
166     auto resultSet = delegate->QuerySql(tpl.scheduler_);
167     if (resultSet == nullptr) {
168         ZLOGE("resultSet is nullptr, %{public}s, %{public}" PRId64 ", %{public}s",
169             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
170         return;
171     }
172     int count;
173     int errCode = resultSet->GetRowCount(count);
174     if (errCode != E_OK || count == 0) {
175         ZLOGE("GetRowCount error, %{public}s, %{public}" PRId64 ", %{public}s, errorCode is %{public}d, count is "
176             "%{public}d",
177             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str(), errCode,
178             count);
179         return;
180     }
181 }
182 
GenRemindTimerFuncParams(const std::string & rdbDir,const int32_t userId,int version,const Key & key,std::string & schedulerSQL)183 void SchedulerManager::GenRemindTimerFuncParams(
184     const std::string &rdbDir, const int32_t userId, int version, const Key &key, std::string &schedulerSQL)
185 {
186     auto index = schedulerSQL.find(REMIND_TIMER_FUNC);
187     if (index == std::string::npos) {
188         ZLOGW("not find remindTimer, sql is %{public}s", schedulerSQL.c_str());
189         return;
190     }
191     index += REMIND_TIMER_FUNC_LEN;
192     std::string keyStr = "'" + rdbDir + "', " + std::to_string(version) + ", '" + key.uri + "', " +
193                          std::to_string(key.subscriberId) + ", '" + key.bundleName + "', " + std::to_string(userId) +
194                          ", ";
195     schedulerSQL.insert(index, keyStr);
196     return;
197 }
198 
RemoveTimer(const Key & key)199 void SchedulerManager::RemoveTimer(const Key &key)
200 {
201     std::lock_guard<std::mutex> lock(mutex_);
202     if (executor_ == nullptr) {
203         ZLOGE("executor_ is nullptr");
204         return;
205     }
206     auto it = timerCache_.find(key);
207     if (it != timerCache_.end()) {
208         ZLOGW("RemoveTimer %{public}s %{public}s %{public}" PRId64,
209             DistributedData::Anonymous::Change(key.uri).c_str(), key.bundleName.c_str(), key.subscriberId);
210         DestoryTimerTask(it->second);
211         timerCache_.erase(key);
212     }
213 }
214 
ClearTimer()215 void SchedulerManager::ClearTimer()
216 {
217     ZLOGI("Clear all timer");
218     std::lock_guard<std::mutex> lock(mutex_);
219     if (executor_ == nullptr) {
220         ZLOGE("executor_ is nullptr");
221         return;
222     }
223     auto it = timerCache_.begin();
224     while (it != timerCache_.end()) {
225         DestoryTimerTask(it->second);
226         it = timerCache_.erase(it);
227     }
228 }
229 
SetExecutorPool(std::shared_ptr<ExecutorPool> executor)230 void SchedulerManager::SetExecutorPool(std::shared_ptr<ExecutorPool> executor)
231 {
232     executor_ = executor;
233 }
234 
ReExecuteAll()235 void SchedulerManager::ReExecuteAll()
236 {
237     std::lock_guard<std::mutex> lock(mutex_);
238     for (const auto &item : timerCache_) {
239         // restart in 200ms
240         auto timerId = item.second;
241         int64_t currentTime = 0;
242         TimeServiceClient::GetInstance()->GetWallTimeMs(currentTime);
243         ResetTimerTask(timerId, currentTime + DELAYED_MILLISECONDS);
244     }
245 }
246 } // namespace OHOS::DataShare
247 
248