1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "metadata/meta_data_manager.h"
17 #include <csignal>
18 #define LOG_TAG "MetaDataManager"
19
20 #include "kv_store_nb_delegate.h"
21 #include "log_print.h"
22 #include "utils/anonymous.h"
23
24 namespace OHOS::DistributedData {
25 class MetaObserver : public DistributedDB::KvStoreObserver {
26 public:
27 using Filter = MetaDataManager::Filter;
28 using MetaStore = MetaDataManager::MetaStore;
29 using Observer = MetaDataManager::Observer;
30 using DBOrigin = DistributedDB::Origin;
31 using DBChangeData = DistributedDB::ChangedData;
32 using Type = DistributedDB::Type;
33 MetaObserver(std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer,
34 bool isLocal = false);
35 virtual ~MetaObserver();
36
37 // Database change callback
38 void OnChange(const DistributedDB::KvStoreChangedData &data) override;
39 void OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data) override;
40
41 void HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData);
42
43 private:
44 std::shared_ptr<MetaStore> metaStore_;
45 std::shared_ptr<Filter> filter_;
46 Observer observer_;
47 };
48
MetaObserver(std::shared_ptr<MetaStore> metaStore,std::shared_ptr<Filter> filter,Observer observer,bool isLocal)49 MetaObserver::MetaObserver(
50 std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer, bool isLocal)
51 : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer))
52 {
53 if (metaStore_ != nullptr) {
54 int mode = isLocal ? DistributedDB::OBSERVER_CHANGES_LOCAL_ONLY
55 : (DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN);
56 auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this);
57 if (!isLocal) {
58 status = metaStore_->RegisterObserver(filter_->GetKey(), DistributedDB::OBSERVER_CHANGES_CLOUD, this);
59 }
60 if (status != DistributedDB::DBStatus::OK) {
61 ZLOGE("register meta observer failed :%{public}d.", status);
62 }
63 }
64 }
65
~MetaObserver()66 MetaObserver::~MetaObserver()
67 {
68 if (metaStore_ != nullptr) {
69 metaStore_->UnRegisterObserver(this);
70 }
71 }
72
operator ()(const std::string & key) const73 bool MetaDataManager::Filter::operator()(const std::string &key) const
74 {
75 return key.find(pattern_) == 0;
76 }
77
GetKey() const78 std::vector<uint8_t> MetaDataManager::Filter::GetKey() const
79 {
80 return std::vector<uint8_t>();
81 }
82
Filter(const std::string & pattern)83 MetaDataManager::Filter::Filter(const std::string &pattern) : pattern_(pattern)
84 {
85 }
86
OnChange(const DistributedDB::KvStoreChangedData & data)87 void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data)
88 {
89 if (filter_ == nullptr) {
90 ZLOGE("filter_ is nullptr!");
91 return;
92 }
93 auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() };
94 int32_t next = MetaDataManager::INSERT;
95 for (auto value : values) {
96 int32_t action = next++;
97 if (value->empty()) {
98 continue;
99 }
100 for (const auto &entry : *value) {
101 std::string key(entry.key.begin(), entry.key.end());
102 if (!(*filter_)(key)) {
103 continue;
104 }
105 observer_(key, { entry.value.begin(), entry.value.end() }, action);
106 }
107 }
108 }
109
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)110 void MetaObserver::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
111 {
112 (void)origin;
113 (void)originalId;
114 HandleChanges(MetaDataManager::INSERT, data.primaryData[MetaDataManager::INSERT]);
115 HandleChanges(MetaDataManager::UPDATE, data.primaryData[MetaDataManager::UPDATE]);
116 HandleChanges(MetaDataManager::DELETE, data.primaryData[MetaDataManager::DELETE]);
117 }
118
HandleChanges(int32_t flag,std::vector<std::vector<Type>> & priData)119 void MetaObserver::HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData)
120 {
121 if (priData.empty()) {
122 return;
123 }
124 if (filter_ == nullptr) {
125 ZLOGE("filter_ is nullptr!");
126 return;
127 }
128 for (const auto &priKey : priData) {
129 if (priKey.empty()) {
130 continue;
131 }
132 auto strValue = std::get_if<std::string>(&priKey[0]);
133 if (strValue != nullptr) {
134 auto key = *strValue;
135 if (!(*filter_)(key)) {
136 continue;
137 }
138 observer_(key, "", flag);
139 }
140 }
141 }
142
GetInstance()143 MetaDataManager &MetaDataManager::GetInstance()
144 {
145 static MetaDataManager instance;
146 return instance;
147 }
148
149 MetaDataManager::MetaDataManager() = default;
150
~MetaDataManager()151 MetaDataManager::~MetaDataManager()
152 {
153 metaObservers_.Clear();
154 }
155
Initialize(std::shared_ptr<MetaStore> metaStore,const Backup & backup)156 void MetaDataManager::Initialize(std::shared_ptr<MetaStore> metaStore, const Backup &backup)
157 {
158 if (metaStore == nullptr) {
159 return;
160 }
161
162 std::lock_guard<decltype(mutex_)> lg(mutex_);
163 if (inited_) {
164 return;
165 }
166 metaStore_ = std::move(metaStore);
167 backup_ = backup;
168 inited_ = true;
169 }
170
SetSyncer(const Syncer & syncer)171 void MetaDataManager::SetSyncer(const Syncer &syncer)
172 {
173 if (metaStore_ == nullptr) {
174 return;
175 }
176 syncer_ = syncer;
177 }
178
SetCloudSyncer(const CloudSyncer & cloudSyncer)179 void MetaDataManager::SetCloudSyncer(const CloudSyncer &cloudSyncer)
180 {
181 if (metaStore_ == nullptr) {
182 return;
183 }
184 cloudSyncer_ = cloudSyncer;
185 }
186
SaveMeta(const std::string & key,const Serializable & value,bool isLocal)187 bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal)
188 {
189 if (!inited_) {
190 return false;
191 }
192
193 auto data = Serializable::Marshall(value);
194 auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() })
195 : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() });
196 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
197 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
198 status, isLocal, Anonymous::Change(key).c_str());
199 StopSA();
200 return false;
201 }
202 if (status == DistributedDB::DBStatus::OK && backup_) {
203 backup_(metaStore_);
204 }
205 if (!isLocal && cloudSyncer_) {
206 cloudSyncer_();
207 }
208 if (status != DistributedDB::DBStatus::OK) {
209 ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s", status, isLocal,
210 Anonymous::Change(key).c_str());
211 }
212 return status == DistributedDB::DBStatus::OK;
213 }
214
LoadMeta(const std::string & key,Serializable & value,bool isLocal)215 bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal)
216 {
217 if (!inited_) {
218 return false;
219 }
220
221 DistributedDB::Value data;
222 auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data)
223 : metaStore_->Get({ key.begin(), key.end() }, data);
224 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
225 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
226 status, isLocal, Anonymous::Change(key).c_str());
227 StopSA();
228 return false;
229 }
230 if (status != DistributedDB::DBStatus::OK) {
231 return false;
232 }
233 Serializable::Unmarshall({ data.begin(), data.end() }, value);
234 if (isLocal) {
235 data.assign(data.size(), 0);
236 }
237 return true;
238 }
239
GetEntries(const std::string & prefix,std::vector<Bytes> & entries,bool isLocal)240 bool MetaDataManager::GetEntries(const std::string &prefix, std::vector<Bytes> &entries, bool isLocal)
241 {
242 std::vector<DistributedDB::Entry> dbEntries;
243 auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries)
244 : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries);
245 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
246 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d", status, isLocal);
247 StopSA();
248 return false;
249 }
250 if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) {
251 ZLOGE("failed! prefix:%{public}s status:%{public}d isLocal:%{public}d", Anonymous::Change(prefix).c_str(),
252 status, isLocal);
253 return false;
254 }
255 entries.resize(dbEntries.size());
256 for (size_t i = 0; i < dbEntries.size(); ++i) {
257 entries[i] = std::move(dbEntries[i].value);
258 }
259 return true;
260 }
261
DelMeta(const std::string & key,bool isLocal)262 bool MetaDataManager::DelMeta(const std::string &key, bool isLocal)
263 {
264 if (!inited_) {
265 return false;
266 }
267
268 DistributedDB::Value data;
269 auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() })
270 : metaStore_->Delete({ key.begin(), key.end() });
271 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
272 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
273 status, isLocal, Anonymous::Change(key).c_str());
274 StopSA();
275 return false;
276 }
277 if (status == DistributedDB::DBStatus::OK && backup_) {
278 backup_(metaStore_);
279 }
280 if (!isLocal && cloudSyncer_) {
281 cloudSyncer_();
282 }
283 return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
284 }
285
Sync(const std::vector<std::string> & devices,OnComplete complete)286 bool MetaDataManager::Sync(const std::vector<std::string> &devices, OnComplete complete)
287 {
288 if (!inited_ || devices.empty()) {
289 return false;
290 }
291 auto status = metaStore_->Sync(devices, DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL, [complete](auto &dbResults) {
292 std::map<std::string, int32_t> results;
293 for (auto &[uuid, status] : dbResults) {
294 results.insert_or_assign(uuid, static_cast<int32_t>(status));
295 }
296 complete(results);
297 });
298 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
299 ZLOGE("db corrupted! status:%{public}d", status);
300 StopSA();
301 return false;
302 }
303 if (status != DistributedDB::OK) {
304 ZLOGW("meta data sync error %{public}d.", status);
305 }
306 return status == DistributedDB::OK;
307 }
308
Subscribe(std::shared_ptr<Filter> filter,Observer observer)309 bool MetaDataManager::Subscribe(std::shared_ptr<Filter> filter, Observer observer)
310 {
311 if (!inited_) {
312 return false;
313 }
314
315 return metaObservers_.ComputeIfAbsent("", [this, &observer, &filter](const std::string &key) -> auto {
316 return std::make_shared<MetaObserver>(metaStore_, filter, observer);
317 });
318 }
319
Subscribe(std::string prefix,Observer observer,bool isLocal)320 bool MetaDataManager::Subscribe(std::string prefix, Observer observer, bool isLocal)
321 {
322 if (!inited_) {
323 return false;
324 }
325
326 return metaObservers_.ComputeIfAbsent(prefix, [this, isLocal, &observer, &prefix](const std::string &key) -> auto {
327 return std::make_shared<MetaObserver>(metaStore_, std::make_shared<Filter>(prefix), observer, isLocal);
328 });
329 }
330
Unsubscribe(std::string filter)331 bool MetaDataManager::Unsubscribe(std::string filter)
332 {
333 if (!inited_) {
334 return false;
335 }
336
337 return metaObservers_.Erase(filter);
338 }
339
StopSA()340 void MetaDataManager::StopSA()
341 {
342 ZLOGI("stop distributeddata");
343 int err = raise(SIGKILL);
344 if (err < 0) {
345 ZLOGE("stop distributeddata failed, errCode: %{public}d", err);
346 }
347 }
348 } // namespace OHOS::DistributedData