1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "observer_bridge.h"
16 #include "kvdb_service_client.h"
17 #include "kvstore_observer_client.h"
18 namespace OHOS::DistributedKv {
19 constexpr uint32_t INVALID_SUBSCRIBE_TYPE = 0;
ObserverBridge(AppId appId,StoreId store,std::shared_ptr<Observer> observer,const Convertor & cvt)20 ObserverBridge::ObserverBridge(AppId appId, StoreId store, std::shared_ptr<Observer> observer, const Convertor &cvt)
21     : appId_(std::move(appId)), storeId_(std::move(store)), observer_(std::move(observer)), convert_(cvt)
22 {
23 }
24 
~ObserverBridge()25 ObserverBridge::~ObserverBridge()
26 {
27     if (remote_ == nullptr) {
28         return;
29     }
30     auto service = KVDBServiceClient::GetInstance();
31     if (service == nullptr) {
32         return;
33     }
34     service->Unsubscribe(appId_, storeId_, remote_);
35 }
36 
RegisterRemoteObserver(uint32_t realType)37 Status ObserverBridge::RegisterRemoteObserver(uint32_t realType)
38 {
39     std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
40     if (remote_ != nullptr) {
41         remote_->realType_ |= realType;
42         return SUCCESS;
43     }
44 
45     auto service = KVDBServiceClient::GetInstance();
46     if (service == nullptr) {
47         return SERVER_UNAVAILABLE;
48     }
49 
50     remote_ = new (std::nothrow) ObserverClient(observer_, convert_);
51     auto status = service->Subscribe(appId_, storeId_, remote_);
52     if (status != SUCCESS) {
53         remote_ = nullptr;
54     } else {
55         remote_->realType_ = realType;
56     }
57     return status;
58 }
59 
UnregisterRemoteObserver(uint32_t realType)60 Status ObserverBridge::UnregisterRemoteObserver(uint32_t realType)
61 {
62     std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
63     if (remote_ == nullptr) {
64         return SUCCESS;
65     }
66 
67     auto service = KVDBServiceClient::GetInstance();
68     if (service == nullptr) {
69         return SERVER_UNAVAILABLE;
70     }
71 
72     Status status = Status::SUCCESS;
73     remote_->realType_ &= ~SUBSCRIBE_TYPE_LOCAL;
74     remote_->realType_ &= ~realType;
75     if (remote_->realType_ == 0) {
76         status = service->Unsubscribe(appId_, storeId_, remote_);
77         remote_ = nullptr;
78     }
79     return status;
80 }
81 
OnChange(const DBChangedData & data)82 void ObserverBridge::OnChange(const DBChangedData &data)
83 {
84     std::string deviceId;
85     auto inserted = ConvertDB(data.GetEntriesInserted(), deviceId, convert_);
86     auto updated = ConvertDB(data.GetEntriesUpdated(), deviceId, convert_);
87     auto deleted = ConvertDB(data.GetEntriesDeleted(), deviceId, convert_);
88     ChangeNotification notice(std::move(inserted), std::move(updated), std::move(deleted), deviceId, false);
89     observer_->OnChange(notice);
90 }
91 
ObserverClient(std::shared_ptr<Observer> observer,const Convertor & cvt)92 ObserverBridge::ObserverClient::ObserverClient(std::shared_ptr<Observer> observer, const Convertor &cvt)
93     : KvStoreObserverClient(observer), convert_(cvt), realType_(INVALID_SUBSCRIBE_TYPE)
94 {
95 }
96 
OnChange(const ChangeNotification & data)97 void ObserverBridge::ObserverClient::OnChange(const ChangeNotification &data)
98 {
99     if ((realType_ & SUBSCRIBE_TYPE_REMOTE) != SUBSCRIBE_TYPE_REMOTE) {
100         return;
101     }
102     std::string deviceId;
103     auto inserted = ObserverBridge::ConvertDB(data.GetInsertEntries(), deviceId, convert_);
104     auto updated = ObserverBridge::ConvertDB(data.GetUpdateEntries(), deviceId, convert_);
105     auto deleted = ObserverBridge::ConvertDB(data.GetDeleteEntries(), deviceId, convert_);
106     ChangeNotification notice(std::move(inserted), std::move(updated), std::move(deleted), deviceId, false);
107     KvStoreObserverClient::OnChange(notice);
108 }
109 
OnChange(const DataOrigin & origin,Keys && keys)110 void ObserverBridge::ObserverClient::OnChange(const DataOrigin &origin, Keys &&keys)
111 {
112     if ((realType_ & SUBSCRIBE_TYPE_CLOUD) != SUBSCRIBE_TYPE_CLOUD) {
113         return;
114     }
115     KvStoreObserverClient::OnChange(origin, std::move(keys));
116 }
117 
118 template<class T>
ConvertDB(const T & dbEntries,std::string & deviceId,const Convertor & convert)119 std::vector<Entry> ObserverBridge::ConvertDB(const T &dbEntries, std::string &deviceId, const Convertor &convert)
120 {
121     std::vector<Entry> entries(dbEntries.size());
122     auto it = entries.begin();
123     for (const auto &dbEntry : dbEntries) {
124         Entry &entry = *it;
125         entry.key = convert.ToKey(DBKey(dbEntry.key), deviceId);
126         entry.value = dbEntry.value;
127         ++it;
128     }
129     return entries;
130 }
131 
OnServiceDeath()132 void ObserverBridge::OnServiceDeath()
133 {
134     std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
135     if (remote_ == nullptr) {
136         return;
137     }
138     remote_ = nullptr;
139 }
140 } // namespace OHOS::DistributedKv
141