1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "kv_store_observer_impl.h"
16
OnChange(const DistributedDB::KvStoreChangedData & data)17 void KvStoreObserverImpl::OnChange(const DistributedDB::KvStoreChangedData &data)
18 {
19 onChangeTime_ = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
20 insertedEntries_ = data.GetEntriesInserted();
21 updatedEntries_ = data.GetEntriesUpdated();
22 deleteEntries_ = data.GetEntriesDeleted();
23 MST_LOG("insertedEntries_.size():%zu, updatedEntries_.size():%zu, deleteEntries_.size():%zu",
24 insertedEntries_.size(), updatedEntries_.size(), deleteEntries_.size());
25 if (isSaveCumulatedData_) {
26 if (insertedEntries_.size() != 0) {
27 for (const auto &entry : insertedEntries_) {
28 cumulatedInsertList_.push_back(entry);
29 }
30 }
31 if (updatedEntries_.size() != 0) {
32 for (const auto &entry : updatedEntries_) {
33 cumulatedUpdateList_.push_back(entry);
34 }
35 }
36 if (deleteEntries_.size() != 0) {
37 for (const auto &entry : deleteEntries_) {
38 cumulatedDeleteList_.push_back(entry);
39 }
40 }
41 }
42 {
43 std::unique_lock<std::mutex> waitChangeLock(waitChangeMutex_);
44 changed_++;
45 MST_LOG("comes a change,changed[%d]!!!", changed_);
46 }
47 waitChangeCv_.notify_all();
48 }
49
KvStoreObserverImpl()50 KvStoreObserverImpl::KvStoreObserverImpl()
51 {
52 onChangeTime_ = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
53 isSaveCumulatedData_ = false;
54 Clear();
55 }
56
~KvStoreObserverImpl()57 KvStoreObserverImpl::~KvStoreObserverImpl()
58 {
59 }
60
KvStoreObserverImpl(const KvStoreObserverImpl & other)61 KvStoreObserverImpl::KvStoreObserverImpl(const KvStoreObserverImpl &other)
62 {
63 insertedEntries_ = other.insertedEntries_;
64 updatedEntries_ = other.updatedEntries_;
65 deleteEntries_ = other.deleteEntries_;
66 changed_ = other.changed_;
67 onChangeTime_ = other.onChangeTime_;
68 isSaveCumulatedData_ = other.isSaveCumulatedData_;
69 cumulatedInsertList_ = other.cumulatedInsertList_;
70 cumulatedUpdateList_ = other.cumulatedUpdateList_;
71 cumulatedDeleteList_ = other.cumulatedDeleteList_;
72 }
73
operator =(const KvStoreObserverImpl & other)74 KvStoreObserverImpl& KvStoreObserverImpl::operator=(const KvStoreObserverImpl &other)
75 {
76 if (&other != this) {
77 insertedEntries_ = other.insertedEntries_;
78 updatedEntries_ = other.updatedEntries_;
79 deleteEntries_ = other.deleteEntries_;
80 changed_ = other.changed_;
81 onChangeTime_ = other.onChangeTime_;
82 isSaveCumulatedData_ = other.isSaveCumulatedData_;
83 cumulatedInsertList_ = other.cumulatedInsertList_;
84 cumulatedUpdateList_ = other.cumulatedUpdateList_;
85 cumulatedDeleteList_ = other.cumulatedDeleteList_;
86 }
87 return *this;
88 }
89
GetInsertList() const90 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetInsertList() const
91 {
92 return insertedEntries_;
93 }
94
GetUpdateList() const95 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetUpdateList() const
96 {
97 return updatedEntries_;
98 }
99
GetDeleteList() const100 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetDeleteList() const
101 {
102 return deleteEntries_;
103 }
104
GetChanged() const105 int KvStoreObserverImpl::GetChanged() const
106 {
107 return changed_;
108 }
109
WaitUntilReachChangeCount(unsigned int countGoal,uint32_t timeout) const110 void KvStoreObserverImpl::WaitUntilReachChangeCount(unsigned int countGoal, uint32_t timeout) const // timeout in second
111 {
112 if (countGoal == 0 || changed_ >= countGoal) {
113 return;
114 }
115 // Change count has not reach countGoal
116 auto waitChangeFunc = [this, countGoal]()->bool {
117 MST_LOG("############################ realChanged=%d, countGoal=%u", this->changed_, countGoal);
118 return this->changed_ >= countGoal;
119 };
120 MST_LOG("############################ BEGIN ############################");
121 std::unique_lock<std::mutex> waitChangeLock(waitChangeMutex_);
122 if (timeout == 0) {
123 waitChangeCv_.wait(waitChangeLock, waitChangeFunc);
124 } else {
125 waitChangeCv_.wait_for(waitChangeLock, std::chrono::seconds(timeout), waitChangeFunc);
126 }
127 MST_LOG("############################ E N D ############################");
128 }
129
WaitUntilReachRecordCount(unsigned int countExpect,ListType waitWhat,uint32_t timeout) const130 void KvStoreObserverImpl::WaitUntilReachRecordCount(unsigned int countExpect, ListType waitWhat, uint32_t timeout) const
131 {
132 MST_LOG("###########################- BEGIN list type: %d -###########################", waitWhat);
133 std::function<bool(void)> waitRecordsFunc;
134 switch (waitWhat) {
135 case INSERT_LIST:
136 if (countExpect == 0 || cumulatedInsertList_.size() >= countExpect) {
137 return;
138 }
139 // Change count has not reach countExpect
140 waitRecordsFunc = [this, countExpect]()->bool {
141 MST_LOG("[NbObserver][Wait] #### this->cumulatedInsertList_.size()=%zu, countExpect=%d ####",
142 this->cumulatedInsertList_.size(), countExpect);
143 return this->cumulatedInsertList_.size() >= countExpect;
144 };
145 break;
146 case UPDATE_LIST:
147 if (countExpect == 0 || cumulatedUpdateList_.size() >= countExpect) {
148 return;
149 }
150 // Change count has not reach countExpect
151 waitRecordsFunc = [this, countExpect]()->bool {
152 MST_LOG("[NbObserver][Wait] #### this->cumulatedUpdateList_.size()=%zu, countExpect=%d ####",
153 this->cumulatedUpdateList_.size(), countExpect);
154 return this->cumulatedUpdateList_.size() >= countExpect;
155 };
156 break;
157 case DELETE_LIST:
158 if (countExpect == 0 || cumulatedDeleteList_.size() >= countExpect) {
159 return;
160 }
161 // Change count has not reach countExpect
162 waitRecordsFunc = [this, countExpect]()->bool {
163 MST_LOG("[NbObserver][Wait] #### this->cumulatedDeleteList_.size()=%zu, countExpect=%d ####",
164 this->cumulatedDeleteList_.size(), countExpect);
165 return this->cumulatedDeleteList_.size() >= countExpect;
166 };
167 break;
168 default:
169 break;
170 }
171
172 std::unique_lock<std::mutex> waitRecordsLock(waitChangeMutex_);
173 if (timeout == 0) {
174 waitChangeCv_.wait(waitRecordsLock, waitRecordsFunc);
175 } else {
176 waitChangeCv_.wait_for(waitRecordsLock, std::chrono::seconds(timeout), waitRecordsFunc);
177 }
178
179 MST_LOG("############################ E-N-D ############################");
180 }
181
GetOnChangeTime()182 microClock_type KvStoreObserverImpl::GetOnChangeTime()
183 {
184 return onChangeTime_;
185 }
186
Clear()187 void KvStoreObserverImpl::Clear()
188 {
189 insertedEntries_.clear();
190 updatedEntries_.clear();
191 deleteEntries_.clear();
192 changed_ = 0;
193 cumulatedInsertList_.clear();
194 cumulatedUpdateList_.clear();
195 cumulatedDeleteList_.clear();
196 }
197
SetCumulatedFlag(bool isSaveCumulatedData)198 void KvStoreObserverImpl::SetCumulatedFlag(bool isSaveCumulatedData)
199 {
200 isSaveCumulatedData_ = isSaveCumulatedData;
201 }
202
GetCumulatedFlag() const203 bool KvStoreObserverImpl::GetCumulatedFlag() const
204 {
205 return isSaveCumulatedData_;
206 }
207
GetCumulatedInsertList() const208 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetCumulatedInsertList() const
209 {
210 return cumulatedInsertList_;
211 }
212
GetCumulatedUpdateList() const213 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetCumulatedUpdateList() const
214 {
215 return cumulatedUpdateList_;
216 }
217
GetCumulatedDeleteList() const218 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetCumulatedDeleteList() const
219 {
220 return cumulatedDeleteList_;
221 }