1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "KVDBWatcher"
17
18 #include "kvdb_watcher.h"
19
20 #include "error/general_error.h"
21 #include "ikvstore_observer.h"
22 #include "log_print.h"
23 #include "types.h"
24 #include "utils/anonymous.h"
25
26 namespace OHOS::DistributedKv {
27 using namespace DistributedData;
28 using Error = DistributedData::GeneralError;
KVDBWatcher()29 KVDBWatcher::KVDBWatcher() {}
30
OnChange(const Origin & origin,const PRIFields & primaryFields,ChangeInfo && values)31 int32_t KVDBWatcher::OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values)
32 {
33 auto store = origin.store;
34 auto changeData = values.find(store);
35 if (changeData != values.end()) {
36 auto observer = GetObserver();
37 if (observer == nullptr) {
38 return E_NOT_INIT;
39 }
40 std::vector<std::string> keys[OP_BUTT]{};
41 keys[OP_INSERT] = ConvertToKeys(changeData->second[OP_INSERT]);
42 keys[OP_UPDATE] = ConvertToKeys(changeData->second[OP_UPDATE]);
43 keys[OP_DELETE] = ConvertToKeys(changeData->second[OP_DELETE]);
44 DataOrigin dataOrigin;
45 dataOrigin.id = origin.id;
46 dataOrigin.store = origin.store;
47 observer->OnChange(dataOrigin, std::move(keys));
48 }
49 return E_OK;
50 }
51
OnChange(const Origin & origin,const Fields & fields,ChangeData && datas)52 int32_t KVDBWatcher::OnChange(const Origin &origin, const Fields &fields, ChangeData &&datas)
53 {
54 auto store = origin.store;
55 auto changeData = datas.find(store);
56 if (changeData != datas.end()) {
57 auto observer = GetObserver();
58 if (observer == nullptr) {
59 return E_NOT_INIT;
60 }
61 auto inserts = ConvertToEntries(changeData->second[OP_INSERT]);
62 auto updates = ConvertToEntries(changeData->second[OP_UPDATE]);
63 auto deletes = ConvertToEntries(changeData->second[OP_DELETE]);
64 ChangeNotification change(std::move(inserts), std::move(updates), std::move(deletes), {}, false);
65 observer->OnChange(change);
66 }
67 return E_OK;
68 }
69
GetObserver() const70 sptr<IKvStoreObserver> KVDBWatcher::GetObserver() const
71 {
72 std::shared_lock<decltype(mutex_)> lock(mutex_);
73 return observer_;
74 }
75
SetObserver(sptr<IKvStoreObserver> observer)76 void KVDBWatcher::SetObserver(sptr<IKvStoreObserver> observer)
77 {
78 std::unique_lock<decltype(mutex_)> lock(mutex_);
79 observer_ = std::move(observer);
80 }
81
ConvertToEntries(const std::vector<Values> & values)82 std::vector<Entry> KVDBWatcher::ConvertToEntries(const std::vector<Values> &values)
83 {
84 std::vector<Entry> changeData{};
85 for (auto &info : values) {
86 auto key = std::get_if<Bytes>(&info[0]);
87 auto value = std::get_if<Bytes>(&info[1]);
88 if (key == nullptr || value == nullptr) {
89 continue;
90 }
91 Entry tmpEntry{ *key, *value };
92 changeData.push_back(std::move(tmpEntry));
93 }
94 return changeData;
95 }
96
ConvertToKeys(const std::vector<PRIValue> & values)97 std::vector<std::string> KVDBWatcher::ConvertToKeys(const std::vector<PRIValue> &values)
98 {
99 std::vector<std::string> keys{};
100 for (auto &info : values) {
101 auto key = std::get_if<std::string>(&info);
102 if (key == nullptr) {
103 continue;
104 }
105 keys.push_back(std::move(*key));
106 }
107 return keys;
108 }
109 } // namespace OHOS::DistributedKv
110