1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "KvSyncManager"
16 #include "kvstore_sync_manager.h"
17 #include "log_print.h"
18 
19 namespace OHOS {
20 namespace DistributedKv {
KvStoreSyncManager()21 KvStoreSyncManager::KvStoreSyncManager() {}
~KvStoreSyncManager()22 KvStoreSyncManager::~KvStoreSyncManager() {}
23 
AddSyncOperation(uintptr_t syncId,uint32_t delayMs,const SyncFunc & syncFunc,const SyncEnd & syncEnd)24 Status KvStoreSyncManager::AddSyncOperation(uintptr_t syncId, uint32_t delayMs, const SyncFunc &syncFunc,
25                                             const SyncEnd &syncEnd)
26 {
27     if (syncId == 0 || syncFunc == nullptr) {
28         return Status::INVALID_ARGUMENT;
29     }
30     uint32_t opSeq = ++syncOpSeq_;
31     SyncEnd endFunc;
32     if (syncEnd != nullptr) {
33         endFunc = [opSeq, delayMs, syncEnd, this](const std::map<std::string, DistributedDB::DBStatus> &devices) {
34             RemoveSyncingOp(opSeq, (delayMs == 0) ? realtimeSyncingOps_ : delaySyncingOps_);
35             syncEnd(devices);
36         };
37     }
38 
39     auto beginTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(delayMs);
40     KvSyncOperation syncOp{ syncId, opSeq, delayMs, syncFunc, endFunc, beginTime };
41     if (delayMs == 0) {
42         if (endFunc != nullptr) {
43             std::lock_guard<std::mutex> lock(syncOpsMutex_);
44             realtimeSyncingOps_.push_back(syncOp);
45         }
46         auto status = syncFunc(endFunc);
47         if (status != Status::SUCCESS) {
48             RemoveSyncingOp(opSeq, realtimeSyncingOps_);
49         }
50         return status;
51     }
52 
53     std::lock_guard<std::mutex> lock(syncOpsMutex_);
54     scheduleSyncOps_.emplace(beginTime, syncOp);
55     ZLOGD("add op %u delay %u count %zu.", opSeq, delayMs, scheduleSyncOps_.size());
56     if ((scheduleSyncOps_.size() == 1) ||
57         (nextScheduleTime_ > beginTime + std::chrono::milliseconds(GetExpireTimeRange(delayMs)))) {
58         AddTimer(beginTime);
59     }
60     return Status::SUCCESS;
61 }
62 
GetExpireTimeRange(uint32_t delayMs) const63 uint32_t KvStoreSyncManager::GetExpireTimeRange(uint32_t delayMs) const
64 {
65     uint32_t range = delayMs / DELAY_TIME_RANGE_DIVISOR;
66     return std::max(range, SYNC_MIN_DELAY_MS >> 1);
67 }
68 
RemoveSyncOperation(uintptr_t syncId)69 Status KvStoreSyncManager::RemoveSyncOperation(uintptr_t syncId)
70 {
71     auto pred = [syncId](const KvSyncOperation &op) -> bool { return syncId == op.syncId; };
72 
73     std::lock_guard<std::mutex> lock(syncOpsMutex_);
74     uint32_t count = DoRemoveSyncingOp(pred, realtimeSyncingOps_);
75     count += DoRemoveSyncingOp(pred, delaySyncingOps_);
76 
77     auto &syncOps = scheduleSyncOps_;
78     for (auto it = syncOps.begin(); it != syncOps.end();) {
79         if (pred(it->second)) {
80             count++;
81             it = syncOps.erase(it);
82         } else {
83             ++it;
84         }
85     }
86     return (count > 0) ? Status::SUCCESS : Status::ERROR;
87 }
88 
DoRemoveSyncingOp(OpPred pred,std::list<KvSyncOperation> & syncingOps)89 uint32_t KvStoreSyncManager::DoRemoveSyncingOp(OpPred pred, std::list<KvSyncOperation> &syncingOps)
90 {
91     uint32_t count = 0;
92     for (auto it = syncingOps.begin(); it != syncingOps.end();) {
93         if (pred(*it)) {
94             count++;
95             it = syncingOps.erase(it);
96         } else {
97             ++it;
98         }
99     }
100     return count;
101 }
102 
RemoveSyncingOp(uint32_t opSeq,std::list<KvSyncOperation> & syncingOps)103 Status KvStoreSyncManager::RemoveSyncingOp(uint32_t opSeq, std::list<KvSyncOperation> &syncingOps)
104 {
105     auto pred = [opSeq](const KvSyncOperation &op) -> bool { return opSeq == op.opSeq; };
106 
107     ZLOGD("remove op %u", opSeq);
108     std::lock_guard<std::mutex> lock(syncOpsMutex_);
109     uint32_t count = DoRemoveSyncingOp(pred, syncingOps);
110     return (count == 1) ? Status::SUCCESS : Status::ERROR;
111 }
112 
AddTimer(const TimePoint & expireTime)113 void KvStoreSyncManager::AddTimer(const TimePoint &expireTime)
114 {
115     ZLOGD("time %lld", expireTime.time_since_epoch().count());
116     nextScheduleTime_ = expireTime;
117     executors_->Schedule(
118         expireTime - std::chrono::steady_clock::now(),
119         [time = expireTime, this]() {
120             Schedule(time);
121         });
122 }
123 
GetTimeoutSyncOps(const TimePoint & currentTime,std::list<KvSyncOperation> & syncOps)124 bool KvStoreSyncManager::GetTimeoutSyncOps(const TimePoint &currentTime, std::list<KvSyncOperation> &syncOps)
125 {
126     std::lock_guard<std::mutex> lock(syncOpsMutex_);
127     if ((!realtimeSyncingOps_.empty()) && (!scheduleSyncOps_.empty())) {
128         // the last processing time is less than priorSyncingTime
129         auto priorSyncingTime = std::chrono::milliseconds(REALTIME_PRIOR_SYNCING_MS);
130         if (currentTime < realtimeSyncingOps_.rbegin()->beginTime + priorSyncingTime) {
131             return true;
132         }
133     }
134     for (auto it = scheduleSyncOps_.begin(); it != scheduleSyncOps_.end();) {
135         const auto &expireTime = it->first;
136         const auto &op = it->second;
137         // currentTime is earlier than expireTime minus delayMs
138         if (currentTime + std::chrono::milliseconds(GetExpireTimeRange(op.delayMs)) < expireTime) {
139             break;
140         }
141 
142         syncOps.push_back(op);
143         if (op.syncEnd != nullptr) {
144             delaySyncingOps_.push_back(op);
145         }
146         it = scheduleSyncOps_.erase(it);
147     }
148     return false;
149 }
150 
DoCheckSyncingTimeout(std::list<KvSyncOperation> & syncingOps)151 void KvStoreSyncManager::DoCheckSyncingTimeout(std::list<KvSyncOperation> &syncingOps)
152 {
153     auto syncingTimeoutPred = [](const KvSyncOperation &op) -> bool {
154         return op.beginTime + std::chrono::milliseconds(SYNCING_TIMEOUT_MS) < std::chrono::steady_clock::now();
155     };
156 
157     uint32_t count = DoRemoveSyncingOp(syncingTimeoutPred, syncingOps);
158     if (count > 0) {
159         ZLOGI("remove %u syncing ops by timeout", count);
160     }
161 }
162 
Schedule(const TimePoint & time)163 void KvStoreSyncManager::Schedule(const TimePoint &time)
164 {
165     ZLOGD("timeout %lld", time.time_since_epoch().count());
166     std::list<KvSyncOperation> syncOps;
167     bool delaySchedule = GetTimeoutSyncOps(time, syncOps);
168 
169     for (const auto &op : syncOps) {
170         op.syncFunc(op.syncEnd);
171     }
172 
173     std::lock_guard<std::mutex> lock(syncOpsMutex_);
174     DoCheckSyncingTimeout(realtimeSyncingOps_);
175     DoCheckSyncingTimeout(delaySyncingOps_);
176     if (!scheduleSyncOps_.empty()) {
177         auto nextTime = scheduleSyncOps_.begin()->first;
178         if (delaySchedule) {
179             nextTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(SYNC_MIN_DELAY_MS);
180         }
181         AddTimer(nextTime);
182     }
183 }
SetThreadPool(std::shared_ptr<ExecutorPool> executors)184 void KvStoreSyncManager::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
185 {
186     executors_ = executors;
187 }
188 } // namespace DistributedKv
189 } // namespace OHOS