1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "observer_bridge.h"
16 #include "kvdb_service_client.h"
17 #include "kvstore_observer_client.h"
18 namespace OHOS::DistributedKv {
19 constexpr uint32_t INVALID_SUBSCRIBE_TYPE = 0;
ObserverBridge(AppId appId,StoreId store,std::shared_ptr<Observer> observer,const Convertor & cvt)20 ObserverBridge::ObserverBridge(AppId appId, StoreId store, std::shared_ptr<Observer> observer, const Convertor &cvt)
21 : appId_(std::move(appId)), storeId_(std::move(store)), observer_(std::move(observer)), convert_(cvt)
22 {
23 }
24
~ObserverBridge()25 ObserverBridge::~ObserverBridge()
26 {
27 if (remote_ == nullptr) {
28 return;
29 }
30 auto service = KVDBServiceClient::GetInstance();
31 if (service == nullptr) {
32 return;
33 }
34 service->Unsubscribe(appId_, storeId_, remote_);
35 }
36
RegisterRemoteObserver(uint32_t realType)37 Status ObserverBridge::RegisterRemoteObserver(uint32_t realType)
38 {
39 std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
40 if (remote_ != nullptr) {
41 remote_->realType_ |= realType;
42 return SUCCESS;
43 }
44
45 auto service = KVDBServiceClient::GetInstance();
46 if (service == nullptr) {
47 return SERVER_UNAVAILABLE;
48 }
49
50 remote_ = new (std::nothrow) ObserverClient(observer_, convert_);
51 auto status = service->Subscribe(appId_, storeId_, remote_);
52 if (status != SUCCESS) {
53 remote_ = nullptr;
54 } else {
55 remote_->realType_ = realType;
56 }
57 return status;
58 }
59
UnregisterRemoteObserver(uint32_t realType)60 Status ObserverBridge::UnregisterRemoteObserver(uint32_t realType)
61 {
62 std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
63 if (remote_ == nullptr) {
64 return SUCCESS;
65 }
66
67 auto service = KVDBServiceClient::GetInstance();
68 if (service == nullptr) {
69 return SERVER_UNAVAILABLE;
70 }
71
72 Status status = Status::SUCCESS;
73 remote_->realType_ &= ~SUBSCRIBE_TYPE_LOCAL;
74 remote_->realType_ &= ~realType;
75 if (remote_->realType_ == 0) {
76 status = service->Unsubscribe(appId_, storeId_, remote_);
77 remote_ = nullptr;
78 }
79 return status;
80 }
81
OnChange(const DBChangedData & data)82 void ObserverBridge::OnChange(const DBChangedData &data)
83 {
84 std::string deviceId;
85 auto inserted = ConvertDB(data.GetEntriesInserted(), deviceId, convert_);
86 auto updated = ConvertDB(data.GetEntriesUpdated(), deviceId, convert_);
87 auto deleted = ConvertDB(data.GetEntriesDeleted(), deviceId, convert_);
88 ChangeNotification notice(std::move(inserted), std::move(updated), std::move(deleted), deviceId, false);
89 observer_->OnChange(notice);
90 }
91
ObserverClient(std::shared_ptr<Observer> observer,const Convertor & cvt)92 ObserverBridge::ObserverClient::ObserverClient(std::shared_ptr<Observer> observer, const Convertor &cvt)
93 : KvStoreObserverClient(observer), convert_(cvt), realType_(INVALID_SUBSCRIBE_TYPE)
94 {
95 }
96
OnChange(const ChangeNotification & data)97 void ObserverBridge::ObserverClient::OnChange(const ChangeNotification &data)
98 {
99 if ((realType_ & SUBSCRIBE_TYPE_REMOTE) != SUBSCRIBE_TYPE_REMOTE) {
100 return;
101 }
102 std::string deviceId;
103 auto inserted = ObserverBridge::ConvertDB(data.GetInsertEntries(), deviceId, convert_);
104 auto updated = ObserverBridge::ConvertDB(data.GetUpdateEntries(), deviceId, convert_);
105 auto deleted = ObserverBridge::ConvertDB(data.GetDeleteEntries(), deviceId, convert_);
106 ChangeNotification notice(std::move(inserted), std::move(updated), std::move(deleted), deviceId, false);
107 KvStoreObserverClient::OnChange(notice);
108 }
109
OnChange(const DataOrigin & origin,Keys && keys)110 void ObserverBridge::ObserverClient::OnChange(const DataOrigin &origin, Keys &&keys)
111 {
112 if ((realType_ & SUBSCRIBE_TYPE_CLOUD) != SUBSCRIBE_TYPE_CLOUD) {
113 return;
114 }
115 KvStoreObserverClient::OnChange(origin, std::move(keys));
116 }
117
118 template<class T>
ConvertDB(const T & dbEntries,std::string & deviceId,const Convertor & convert)119 std::vector<Entry> ObserverBridge::ConvertDB(const T &dbEntries, std::string &deviceId, const Convertor &convert)
120 {
121 std::vector<Entry> entries(dbEntries.size());
122 auto it = entries.begin();
123 for (const auto &dbEntry : dbEntries) {
124 Entry &entry = *it;
125 entry.key = convert.ToKey(DBKey(dbEntry.key), deviceId);
126 entry.value = dbEntry.value;
127 ++it;
128 }
129 return entries;
130 }
131
OnServiceDeath()132 void ObserverBridge::OnServiceDeath()
133 {
134 std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
135 if (remote_ == nullptr) {
136 return;
137 }
138 remote_ = nullptr;
139 }
140 } // namespace OHOS::DistributedKv
141