1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "kv_store_observer_impl.h"
16 
OnChange(const DistributedDB::KvStoreChangedData & data)17 void KvStoreObserverImpl::OnChange(const DistributedDB::KvStoreChangedData &data)
18 {
19     onChangeTime_ = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
20     insertedEntries_ = data.GetEntriesInserted();
21     updatedEntries_ = data.GetEntriesUpdated();
22     deleteEntries_ = data.GetEntriesDeleted();
23     MST_LOG("insertedEntries_.size():%zu, updatedEntries_.size():%zu, deleteEntries_.size():%zu",
24         insertedEntries_.size(), updatedEntries_.size(), deleteEntries_.size());
25     if (isSaveCumulatedData_) {
26         if (insertedEntries_.size() != 0) {
27             for (const auto &entry : insertedEntries_) {
28                 cumulatedInsertList_.push_back(entry);
29             }
30         }
31         if (updatedEntries_.size() != 0) {
32             for (const auto &entry : updatedEntries_) {
33                 cumulatedUpdateList_.push_back(entry);
34             }
35         }
36         if (deleteEntries_.size() != 0) {
37             for (const auto &entry : deleteEntries_) {
38                 cumulatedDeleteList_.push_back(entry);
39             }
40         }
41     }
42     {
43         std::unique_lock<std::mutex> waitChangeLock(waitChangeMutex_);
44         changed_++;
45         MST_LOG("comes a change,changed[%d]!!!", changed_);
46     }
47     waitChangeCv_.notify_all();
48 }
49 
KvStoreObserverImpl()50 KvStoreObserverImpl::KvStoreObserverImpl()
51 {
52     onChangeTime_ = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
53     isSaveCumulatedData_ = false;
54     Clear();
55 }
56 
~KvStoreObserverImpl()57 KvStoreObserverImpl::~KvStoreObserverImpl()
58 {
59 }
60 
KvStoreObserverImpl(const KvStoreObserverImpl & other)61 KvStoreObserverImpl::KvStoreObserverImpl(const KvStoreObserverImpl &other)
62 {
63     insertedEntries_ = other.insertedEntries_;
64     updatedEntries_ = other.updatedEntries_;
65     deleteEntries_ = other.deleteEntries_;
66     changed_ = other.changed_;
67     onChangeTime_ = other.onChangeTime_;
68     isSaveCumulatedData_ = other.isSaveCumulatedData_;
69     cumulatedInsertList_ = other.cumulatedInsertList_;
70     cumulatedUpdateList_ = other.cumulatedUpdateList_;
71     cumulatedDeleteList_ = other.cumulatedDeleteList_;
72 }
73 
operator =(const KvStoreObserverImpl & other)74 KvStoreObserverImpl& KvStoreObserverImpl::operator=(const KvStoreObserverImpl &other)
75 {
76     if (&other != this) {
77         insertedEntries_ = other.insertedEntries_;
78         updatedEntries_ = other.updatedEntries_;
79         deleteEntries_ = other.deleteEntries_;
80         changed_ = other.changed_;
81         onChangeTime_ = other.onChangeTime_;
82         isSaveCumulatedData_ = other.isSaveCumulatedData_;
83         cumulatedInsertList_ = other.cumulatedInsertList_;
84         cumulatedUpdateList_ = other.cumulatedUpdateList_;
85         cumulatedDeleteList_ = other.cumulatedDeleteList_;
86     }
87     return *this;
88 }
89 
GetInsertList() const90 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetInsertList() const
91 {
92     return insertedEntries_;
93 }
94 
GetUpdateList() const95 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetUpdateList() const
96 {
97     return updatedEntries_;
98 }
99 
GetDeleteList() const100 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetDeleteList() const
101 {
102     return deleteEntries_;
103 }
104 
GetChanged() const105 int KvStoreObserverImpl::GetChanged() const
106 {
107     return changed_;
108 }
109 
WaitUntilReachChangeCount(unsigned int countGoal,uint32_t timeout) const110 void KvStoreObserverImpl::WaitUntilReachChangeCount(unsigned int countGoal, uint32_t timeout) const // timeout in second
111 {
112     if (countGoal == 0 || changed_ >= countGoal) {
113         return;
114     }
115     // Change count has not reach countGoal
116     auto waitChangeFunc = [this, countGoal]()->bool {
117         MST_LOG("############################ realChanged=%d, countGoal=%u", this->changed_, countGoal);
118         return this->changed_ >= countGoal;
119     };
120     MST_LOG("############################ BEGIN ############################");
121     std::unique_lock<std::mutex> waitChangeLock(waitChangeMutex_);
122     if (timeout == 0) {
123         waitChangeCv_.wait(waitChangeLock, waitChangeFunc);
124     } else {
125         waitChangeCv_.wait_for(waitChangeLock, std::chrono::seconds(timeout), waitChangeFunc);
126     }
127     MST_LOG("############################ E N D ############################");
128 }
129 
WaitUntilReachRecordCount(unsigned int countExpect,ListType waitWhat,uint32_t timeout) const130 void KvStoreObserverImpl::WaitUntilReachRecordCount(unsigned int countExpect, ListType waitWhat, uint32_t timeout) const
131 {
132     MST_LOG("###########################- BEGIN list type: %d -###########################", waitWhat);
133     std::function<bool(void)> waitRecordsFunc;
134     switch (waitWhat) {
135         case INSERT_LIST:
136             if (countExpect == 0 || cumulatedInsertList_.size() >= countExpect) {
137                 return;
138             }
139             // Change count has not reach countExpect
140             waitRecordsFunc = [this, countExpect]()->bool {
141                 MST_LOG("[NbObserver][Wait] #### this->cumulatedInsertList_.size()=%zu, countExpect=%d ####",
142                     this->cumulatedInsertList_.size(), countExpect);
143                 return this->cumulatedInsertList_.size() >= countExpect;
144             };
145             break;
146         case UPDATE_LIST:
147             if (countExpect == 0 || cumulatedUpdateList_.size() >= countExpect) {
148                 return;
149             }
150             // Change count has not reach countExpect
151             waitRecordsFunc = [this, countExpect]()->bool {
152                 MST_LOG("[NbObserver][Wait] #### this->cumulatedUpdateList_.size()=%zu, countExpect=%d ####",
153                     this->cumulatedUpdateList_.size(), countExpect);
154                 return this->cumulatedUpdateList_.size() >= countExpect;
155             };
156             break;
157         case DELETE_LIST:
158             if (countExpect == 0 || cumulatedDeleteList_.size() >= countExpect) {
159                 return;
160             }
161             // Change count has not reach countExpect
162             waitRecordsFunc = [this, countExpect]()->bool {
163                 MST_LOG("[NbObserver][Wait] #### this->cumulatedDeleteList_.size()=%zu, countExpect=%d ####",
164                     this->cumulatedDeleteList_.size(), countExpect);
165                 return this->cumulatedDeleteList_.size() >= countExpect;
166             };
167             break;
168         default:
169             break;
170     }
171 
172     std::unique_lock<std::mutex> waitRecordsLock(waitChangeMutex_);
173     if (timeout == 0) {
174         waitChangeCv_.wait(waitRecordsLock, waitRecordsFunc);
175     } else {
176         waitChangeCv_.wait_for(waitRecordsLock, std::chrono::seconds(timeout), waitRecordsFunc);
177     }
178 
179     MST_LOG("############################ E-N-D ############################");
180 }
181 
GetOnChangeTime()182 microClock_type KvStoreObserverImpl::GetOnChangeTime()
183 {
184     return onChangeTime_;
185 }
186 
Clear()187 void KvStoreObserverImpl::Clear()
188 {
189     insertedEntries_.clear();
190     updatedEntries_.clear();
191     deleteEntries_.clear();
192     changed_ = 0;
193     cumulatedInsertList_.clear();
194     cumulatedUpdateList_.clear();
195     cumulatedDeleteList_.clear();
196 }
197 
SetCumulatedFlag(bool isSaveCumulatedData)198 void KvStoreObserverImpl::SetCumulatedFlag(bool isSaveCumulatedData)
199 {
200     isSaveCumulatedData_ = isSaveCumulatedData;
201 }
202 
GetCumulatedFlag() const203 bool KvStoreObserverImpl::GetCumulatedFlag() const
204 {
205     return isSaveCumulatedData_;
206 }
207 
GetCumulatedInsertList() const208 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetCumulatedInsertList() const
209 {
210     return cumulatedInsertList_;
211 }
212 
GetCumulatedUpdateList() const213 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetCumulatedUpdateList() const
214 {
215     return cumulatedUpdateList_;
216 }
217 
GetCumulatedDeleteList() const218 const std::list<DistributedDB::Entry> KvStoreObserverImpl::GetCumulatedDeleteList() const
219 {
220     return cumulatedDeleteList_;
221 }