1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "metadata/meta_data_manager.h"
17 #include <csignal>
18 #define LOG_TAG "MetaDataManager"
19 
20 #include "kv_store_nb_delegate.h"
21 #include "log_print.h"
22 #include "utils/anonymous.h"
23 
24 namespace OHOS::DistributedData {
25 class MetaObserver : public DistributedDB::KvStoreObserver {
26 public:
27     using Filter = MetaDataManager::Filter;
28     using MetaStore = MetaDataManager::MetaStore;
29     using Observer = MetaDataManager::Observer;
30     using DBOrigin = DistributedDB::Origin;
31     using DBChangeData = DistributedDB::ChangedData;
32     using Type = DistributedDB::Type;
33     MetaObserver(std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer,
34         bool isLocal = false);
35     virtual ~MetaObserver();
36 
37     // Database change callback
38     void OnChange(const DistributedDB::KvStoreChangedData &data) override;
39     void OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data) override;
40 
41     void HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData);
42 
43 private:
44     std::shared_ptr<MetaStore> metaStore_;
45     std::shared_ptr<Filter> filter_;
46     Observer observer_;
47 };
48 
MetaObserver(std::shared_ptr<MetaStore> metaStore,std::shared_ptr<Filter> filter,Observer observer,bool isLocal)49 MetaObserver::MetaObserver(
50     std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer, bool isLocal)
51     : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer))
52 {
53     if (metaStore_ != nullptr) {
54         int mode = isLocal ? DistributedDB::OBSERVER_CHANGES_LOCAL_ONLY
55                            : (DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN);
56         auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this);
57         if (!isLocal) {
58             status = metaStore_->RegisterObserver(filter_->GetKey(), DistributedDB::OBSERVER_CHANGES_CLOUD, this);
59         }
60         if (status != DistributedDB::DBStatus::OK) {
61             ZLOGE("register meta observer failed :%{public}d.", status);
62         }
63     }
64 }
65 
~MetaObserver()66 MetaObserver::~MetaObserver()
67 {
68     if (metaStore_ != nullptr) {
69         metaStore_->UnRegisterObserver(this);
70     }
71 }
72 
operator ()(const std::string & key) const73 bool MetaDataManager::Filter::operator()(const std::string &key) const
74 {
75     return key.find(pattern_) == 0;
76 }
77 
GetKey() const78 std::vector<uint8_t> MetaDataManager::Filter::GetKey() const
79 {
80     return std::vector<uint8_t>();
81 }
82 
Filter(const std::string & pattern)83 MetaDataManager::Filter::Filter(const std::string &pattern) : pattern_(pattern)
84 {
85 }
86 
OnChange(const DistributedDB::KvStoreChangedData & data)87 void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data)
88 {
89     if (filter_ == nullptr) {
90         ZLOGE("filter_ is nullptr!");
91         return;
92     }
93     auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() };
94     int32_t next = MetaDataManager::INSERT;
95     for (auto value : values) {
96         int32_t action = next++;
97         if (value->empty()) {
98             continue;
99         }
100         for (const auto &entry : *value) {
101             std::string key(entry.key.begin(), entry.key.end());
102             if (!(*filter_)(key)) {
103                 continue;
104             }
105             observer_(key, { entry.value.begin(), entry.value.end() }, action);
106         }
107     }
108 }
109 
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)110 void MetaObserver::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
111 {
112     (void)origin;
113     (void)originalId;
114     HandleChanges(MetaDataManager::INSERT, data.primaryData[MetaDataManager::INSERT]);
115     HandleChanges(MetaDataManager::UPDATE, data.primaryData[MetaDataManager::UPDATE]);
116     HandleChanges(MetaDataManager::DELETE, data.primaryData[MetaDataManager::DELETE]);
117 }
118 
HandleChanges(int32_t flag,std::vector<std::vector<Type>> & priData)119 void MetaObserver::HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData)
120 {
121     if (priData.empty()) {
122         return;
123     }
124     if (filter_ == nullptr) {
125         ZLOGE("filter_ is nullptr!");
126         return;
127     }
128     for (const auto &priKey : priData) {
129         if (priKey.empty()) {
130             continue;
131         }
132         auto strValue = std::get_if<std::string>(&priKey[0]);
133         if (strValue != nullptr) {
134             auto key = *strValue;
135             if (!(*filter_)(key)) {
136                 continue;
137             }
138             observer_(key, "", flag);
139         }
140     }
141 }
142 
GetInstance()143 MetaDataManager &MetaDataManager::GetInstance()
144 {
145     static MetaDataManager instance;
146     return instance;
147 }
148 
149 MetaDataManager::MetaDataManager() = default;
150 
~MetaDataManager()151 MetaDataManager::~MetaDataManager()
152 {
153     metaObservers_.Clear();
154 }
155 
Initialize(std::shared_ptr<MetaStore> metaStore,const Backup & backup)156 void MetaDataManager::Initialize(std::shared_ptr<MetaStore> metaStore, const Backup &backup)
157 {
158     if (metaStore == nullptr) {
159         return;
160     }
161 
162     std::lock_guard<decltype(mutex_)> lg(mutex_);
163     if (inited_) {
164         return;
165     }
166     metaStore_ = std::move(metaStore);
167     backup_ = backup;
168     inited_ = true;
169 }
170 
SetSyncer(const Syncer & syncer)171 void MetaDataManager::SetSyncer(const Syncer &syncer)
172 {
173     if (metaStore_ == nullptr) {
174         return;
175     }
176     syncer_ = syncer;
177 }
178 
SetCloudSyncer(const CloudSyncer & cloudSyncer)179 void MetaDataManager::SetCloudSyncer(const CloudSyncer &cloudSyncer)
180 {
181     if (metaStore_ == nullptr) {
182         return;
183     }
184     cloudSyncer_ = cloudSyncer;
185 }
186 
SaveMeta(const std::string & key,const Serializable & value,bool isLocal)187 bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal)
188 {
189     if (!inited_) {
190         return false;
191     }
192 
193     auto data = Serializable::Marshall(value);
194     auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() })
195                           : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() });
196     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
197         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
198             status, isLocal, Anonymous::Change(key).c_str());
199         StopSA();
200         return false;
201     }
202     if (status == DistributedDB::DBStatus::OK && backup_) {
203         backup_(metaStore_);
204     }
205     if (!isLocal && cloudSyncer_) {
206         cloudSyncer_();
207     }
208     if (status != DistributedDB::DBStatus::OK) {
209         ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s", status, isLocal,
210             Anonymous::Change(key).c_str());
211     }
212     return status == DistributedDB::DBStatus::OK;
213 }
214 
LoadMeta(const std::string & key,Serializable & value,bool isLocal)215 bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal)
216 {
217     if (!inited_) {
218         return false;
219     }
220 
221     DistributedDB::Value data;
222     auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data)
223                           : metaStore_->Get({ key.begin(), key.end() }, data);
224     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
225         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
226             status, isLocal, Anonymous::Change(key).c_str());
227         StopSA();
228         return false;
229     }
230     if (status != DistributedDB::DBStatus::OK) {
231         return false;
232     }
233     Serializable::Unmarshall({ data.begin(), data.end() }, value);
234     if (isLocal) {
235         data.assign(data.size(), 0);
236     }
237     return true;
238 }
239 
GetEntries(const std::string & prefix,std::vector<Bytes> & entries,bool isLocal)240 bool MetaDataManager::GetEntries(const std::string &prefix, std::vector<Bytes> &entries, bool isLocal)
241 {
242     std::vector<DistributedDB::Entry> dbEntries;
243     auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries)
244                           : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries);
245     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
246         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d", status, isLocal);
247         StopSA();
248         return false;
249     }
250     if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) {
251         ZLOGE("failed! prefix:%{public}s status:%{public}d isLocal:%{public}d", Anonymous::Change(prefix).c_str(),
252             status, isLocal);
253         return false;
254     }
255     entries.resize(dbEntries.size());
256     for (size_t i = 0; i < dbEntries.size(); ++i) {
257         entries[i] = std::move(dbEntries[i].value);
258     }
259     return true;
260 }
261 
DelMeta(const std::string & key,bool isLocal)262 bool MetaDataManager::DelMeta(const std::string &key, bool isLocal)
263 {
264     if (!inited_) {
265         return false;
266     }
267 
268     DistributedDB::Value data;
269     auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() })
270                           : metaStore_->Delete({ key.begin(), key.end() });
271     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
272         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
273             status, isLocal, Anonymous::Change(key).c_str());
274         StopSA();
275         return false;
276     }
277     if (status == DistributedDB::DBStatus::OK && backup_) {
278         backup_(metaStore_);
279     }
280     if (!isLocal && cloudSyncer_) {
281         cloudSyncer_();
282     }
283     return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
284 }
285 
Sync(const std::vector<std::string> & devices,OnComplete complete)286 bool MetaDataManager::Sync(const std::vector<std::string> &devices, OnComplete complete)
287 {
288     if (!inited_ || devices.empty()) {
289         return false;
290     }
291     auto status = metaStore_->Sync(devices, DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL, [complete](auto &dbResults) {
292         std::map<std::string, int32_t> results;
293         for (auto &[uuid, status] : dbResults) {
294             results.insert_or_assign(uuid, static_cast<int32_t>(status));
295         }
296         complete(results);
297     });
298     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
299         ZLOGE("db corrupted! status:%{public}d", status);
300         StopSA();
301         return false;
302     }
303     if (status != DistributedDB::OK) {
304         ZLOGW("meta data sync error %{public}d.", status);
305     }
306     return status == DistributedDB::OK;
307 }
308 
Subscribe(std::shared_ptr<Filter> filter,Observer observer)309 bool MetaDataManager::Subscribe(std::shared_ptr<Filter> filter, Observer observer)
310 {
311     if (!inited_) {
312         return false;
313     }
314 
315     return metaObservers_.ComputeIfAbsent("", [this, &observer, &filter](const std::string &key) -> auto {
316         return std::make_shared<MetaObserver>(metaStore_, filter, observer);
317     });
318 }
319 
Subscribe(std::string prefix,Observer observer,bool isLocal)320 bool MetaDataManager::Subscribe(std::string prefix, Observer observer, bool isLocal)
321 {
322     if (!inited_) {
323         return false;
324     }
325 
326     return metaObservers_.ComputeIfAbsent(prefix, [this, isLocal, &observer, &prefix](const std::string &key) -> auto {
327         return std::make_shared<MetaObserver>(metaStore_, std::make_shared<Filter>(prefix), observer, isLocal);
328     });
329 }
330 
Unsubscribe(std::string filter)331 bool MetaDataManager::Unsubscribe(std::string filter)
332 {
333     if (!inited_) {
334         return false;
335     }
336 
337     return metaObservers_.Erase(filter);
338 }
339 
StopSA()340 void MetaDataManager::StopSA()
341 {
342     ZLOGI("stop distributeddata");
343     int err = raise(SIGKILL);
344     if (err < 0) {
345         ZLOGE("stop distributeddata failed, errCode: %{public}d", err);
346     }
347 }
348 } // namespace OHOS::DistributedData