1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "KVDBWatcher"
17 
18 #include "kvdb_watcher.h"
19 
20 #include "error/general_error.h"
21 #include "ikvstore_observer.h"
22 #include "log_print.h"
23 #include "types.h"
24 #include "utils/anonymous.h"
25 
26 namespace OHOS::DistributedKv {
27 using namespace DistributedData;
28 using Error = DistributedData::GeneralError;
KVDBWatcher()29 KVDBWatcher::KVDBWatcher() {}
30 
OnChange(const Origin & origin,const PRIFields & primaryFields,ChangeInfo && values)31 int32_t KVDBWatcher::OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values)
32 {
33     auto store = origin.store;
34     auto changeData = values.find(store);
35     if (changeData != values.end()) {
36         auto observer = GetObserver();
37         if (observer == nullptr) {
38             return E_NOT_INIT;
39         }
40         std::vector<std::string> keys[OP_BUTT]{};
41         keys[OP_INSERT] = ConvertToKeys(changeData->second[OP_INSERT]);
42         keys[OP_UPDATE] = ConvertToKeys(changeData->second[OP_UPDATE]);
43         keys[OP_DELETE] = ConvertToKeys(changeData->second[OP_DELETE]);
44         DataOrigin dataOrigin;
45         dataOrigin.id = origin.id;
46         dataOrigin.store = origin.store;
47         observer->OnChange(dataOrigin, std::move(keys));
48     }
49     return E_OK;
50 }
51 
OnChange(const Origin & origin,const Fields & fields,ChangeData && datas)52 int32_t KVDBWatcher::OnChange(const Origin &origin, const Fields &fields, ChangeData &&datas)
53 {
54     auto store = origin.store;
55     auto changeData = datas.find(store);
56     if (changeData != datas.end()) {
57         auto observer = GetObserver();
58         if (observer == nullptr) {
59             return E_NOT_INIT;
60         }
61         auto inserts = ConvertToEntries(changeData->second[OP_INSERT]);
62         auto updates = ConvertToEntries(changeData->second[OP_UPDATE]);
63         auto deletes = ConvertToEntries(changeData->second[OP_DELETE]);
64         ChangeNotification change(std::move(inserts), std::move(updates), std::move(deletes), {}, false);
65         observer->OnChange(change);
66     }
67     return E_OK;
68 }
69 
GetObserver() const70 sptr<IKvStoreObserver> KVDBWatcher::GetObserver() const
71 {
72     std::shared_lock<decltype(mutex_)> lock(mutex_);
73     return observer_;
74 }
75 
SetObserver(sptr<IKvStoreObserver> observer)76 void KVDBWatcher::SetObserver(sptr<IKvStoreObserver> observer)
77 {
78     std::unique_lock<decltype(mutex_)> lock(mutex_);
79     observer_ = std::move(observer);
80 }
81 
ConvertToEntries(const std::vector<Values> & values)82 std::vector<Entry> KVDBWatcher::ConvertToEntries(const std::vector<Values> &values)
83 {
84     std::vector<Entry> changeData{};
85     for (auto &info : values) {
86         auto key = std::get_if<Bytes>(&info[0]);
87         auto value = std::get_if<Bytes>(&info[1]);
88         if (key == nullptr || value == nullptr) {
89             continue;
90         }
91         Entry tmpEntry{ *key, *value };
92         changeData.push_back(std::move(tmpEntry));
93     }
94     return changeData;
95 }
96 
ConvertToKeys(const std::vector<PRIValue> & values)97 std::vector<std::string> KVDBWatcher::ConvertToKeys(const std::vector<PRIValue> &values)
98 {
99     std::vector<std::string> keys{};
100     for (auto &info : values) {
101         auto key = std::get_if<std::string>(&info);
102         if (key == nullptr) {
103             continue;
104         }
105         keys.push_back(std::move(*key));
106     }
107     return keys;
108 }
109 } // namespace OHOS::DistributedKv
110