1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <thread>
17 #include <unordered_set>
18 
19 #include "hitrace.h"
20 #include "distributed_object_impl.h"
21 #include "distributed_objectstore_impl.h"
22 #include "objectstore_errors.h"
23 #include "softbus_adapter.h"
24 #include "string_utils.h"
25 #include "asset_change_timer.h"
26 #include "object_radar_reporter.h"
27 
28 namespace OHOS::ObjectStore {
DistributedObjectStoreImpl(FlatObjectStore * flatObjectStore)29 DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
30     : flatObjectStore_(flatObjectStore)
31 {
32 }
33 
~DistributedObjectStoreImpl()34 DistributedObjectStoreImpl::~DistributedObjectStoreImpl()
35 {
36     delete flatObjectStore_;
37 }
38 
CacheObject(const std::string & sessionId,FlatObjectStore * flatObjectStore)39 DistributedObject *DistributedObjectStoreImpl::CacheObject(
40     const std::string &sessionId, FlatObjectStore *flatObjectStore)
41 {
42     DistributedObjectImpl *object = new (std::nothrow) DistributedObjectImpl(sessionId, flatObjectStore);
43     if (object == nullptr) {
44         return nullptr;
45     }
46     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
47     objects_.push_back(object);
48     return object;
49 }
50 
RemoveCacheObject(const std::string & sessionId)51 void DistributedObjectStoreImpl::RemoveCacheObject(const std::string &sessionId)
52 {
53     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
54     auto iter = objects_.begin();
55     while (iter != objects_.end()) {
56         if ((*iter)->GetSessionId() == sessionId) {
57             delete *iter;
58             iter = objects_.erase(iter);
59         } else {
60             iter++;
61         }
62     }
63     return;
64 }
65 
CreateObject(const std::string & sessionId)66 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId)
67 {
68     DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
69     if (flatObjectStore_ == nullptr) {
70         LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
71         return nullptr;
72     }
73 
74     if (sessionId.empty()) {
75         LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
76         return nullptr;
77     }
78 
79     uint32_t status = flatObjectStore_->CreateObject(sessionId);
80     if (status != SUCCESS) {
81         LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
82         return nullptr;
83     }
84     return CacheObject(sessionId, flatObjectStore_);
85 }
86 
CreateObject(const std::string & sessionId,uint32_t & status)87 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId, uint32_t &status)
88 {
89     DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
90     if (flatObjectStore_ == nullptr) {
91         LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
92         status = ERR_NULL_OBJECTSTORE;
93         return nullptr;
94     }
95 
96     if (sessionId.empty()) {
97         LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
98         status = ERR_INVALID_ARGS;
99         return nullptr;
100     }
101 
102     status = flatObjectStore_->CreateObject(sessionId);
103     if (status != SUCCESS) {
104         LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
105         return nullptr;
106     }
107     return CacheObject(sessionId, flatObjectStore_);
108 }
109 
DeleteObject(const std::string & sessionId)110 uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
111 {
112     DataObjectHiTrace trace("DistributedObjectStoreImpl::DeleteObject");
113     if (flatObjectStore_ == nullptr) {
114         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
115         return ERR_NULL_OBJECTSTORE;
116     }
117     uint32_t status = flatObjectStore_->Delete(sessionId);
118     if (status != SUCCESS) {
119         LOG_ERROR("DistributedObjectStoreImpl::DeleteObject store delete err %{public}d", status);
120         return status;
121     }
122     RemoveCacheObject(sessionId);
123     return SUCCESS;
124 }
125 
Get(const std::string & sessionId,DistributedObject ** object)126 uint32_t DistributedObjectStoreImpl::Get(const std::string &sessionId, DistributedObject **object)
127 {
128     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
129     auto iter = objects_.begin();
130     while (iter != objects_.end()) {
131         if ((*iter)->GetSessionId() == sessionId) {
132             *object = *iter;
133             return SUCCESS;
134         }
135         iter++;
136     }
137     LOG_ERROR("DistributedObjectStoreImpl::Get object err, no object");
138     return ERR_GET_OBJECT;
139 }
140 
Watch(DistributedObject * object,std::shared_ptr<ObjectWatcher> watcher)141 uint32_t DistributedObjectStoreImpl::Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher)
142 {
143     if (object == nullptr) {
144         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
145         return ERR_NULL_OBJECT;
146     }
147     if (flatObjectStore_ == nullptr) {
148         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
149         return ERR_NULL_OBJECTSTORE;
150     }
151     std::lock_guard<std::mutex> lock(watchersLock_);
152     if (watchers_.count(object) != 0) {
153         LOG_ERROR("DistributedObjectStoreImpl::Watch already gets object");
154         return ERR_EXIST;
155     }
156     std::shared_ptr<WatcherProxy> watcherProxy = std::make_shared<WatcherProxy>(watcher, object->GetSessionId());
157     watcherProxy->SetAssetChangeCallBack(
158         [=](const std::string &sessionId, const std::string &assetKey, std::shared_ptr<ObjectWatcher> objectWatcher) {
159             AssetChangeTimer *assetChangeTimer = AssetChangeTimer::GetInstance(flatObjectStore_);
160             assetChangeTimer->OnAssetChanged(sessionId, assetKey, objectWatcher);
161         });
162     uint32_t status = flatObjectStore_->Watch(object->GetSessionId(), watcherProxy);
163     if (status != SUCCESS) {
164         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
165         return status;
166     }
167     watchers_.insert_or_assign(object, watcherProxy);
168     LOG_INFO("DistributedObjectStoreImpl:Watch object success.");
169     return SUCCESS;
170 }
171 
UnWatch(DistributedObject * object)172 uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
173 {
174     if (object == nullptr) {
175         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
176         return ERR_NULL_OBJECT;
177     }
178     if (flatObjectStore_ == nullptr) {
179         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
180         return ERR_NULL_OBJECTSTORE;
181     }
182     uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
183     if (status != SUCCESS) {
184         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
185         return status;
186     }
187     std::lock_guard<std::mutex> lock(watchersLock_);
188     watchers_.erase(object);
189     LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
190     return SUCCESS;
191 }
192 
SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)193 uint32_t DistributedObjectStoreImpl::SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)
194 {
195     if (flatObjectStore_ == nullptr) {
196         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
197         return ERR_NULL_OBJECTSTORE;
198     }
199     std::shared_ptr<StatusNotifierProxy> watcherProxy = std::make_shared<StatusNotifierProxy>(notifier);
200     return flatObjectStore_->SetStatusNotifier(watcherProxy);
201 }
202 
NotifyCachedStatus(const std::string & sessionId)203 void DistributedObjectStoreImpl::NotifyCachedStatus(const std::string &sessionId)
204 {
205     flatObjectStore_->CheckRetrieveCache(sessionId);
206 }
207 
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher,const std::string & sessionId)208 WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
209     : FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
210 {
211 }
212 
OnChanged(const std::string & sessionId,const std::vector<std::string> & changedData,bool enableTransfer)213 void WatcherProxy::OnChanged(
214     const std::string &sessionId, const std::vector<std::string> &changedData, bool enableTransfer)
215 {
216     std::unordered_set<std::string> transferKeys;
217     std::vector<std::string> otherKeys;
218     for (const auto &str : changedData) {
219         if (str.find(ASSET_DOT) == std::string::npos) {
220             if (str != DEVICEID_KEY) {
221                 otherKeys.push_back(str);
222             }
223         } else {
224             std::string assetKey;
225             if (FindChangedAssetKey(str, assetKey)) {
226                 transferKeys.insert(assetKey);
227             }
228         }
229     }
230     if (!enableTransfer) {
231         otherKeys.insert(otherKeys.end(), transferKeys.begin(), transferKeys.end());
232     } else if (assetChangeCallback_ != nullptr && !transferKeys.empty()) {
233         for (auto &assetKey : transferKeys) {
234             assetChangeCallback_(sessionId, assetKey, objectWatcher_);
235         }
236     }
237     if (!otherKeys.empty()) {
238         objectWatcher_->OnChanged(sessionId, otherKeys);
239     }
240 }
241 
FindChangedAssetKey(const std::string & changedKey,std::string & assetKey)242 bool WatcherProxy::FindChangedAssetKey(const std::string &changedKey, std::string &assetKey)
243 {
244     std::size_t dotPos = changedKey.find(ASSET_DOT);
245     if ((changedKey.size() > MODIFY_TIME_SUFFIX.length() && changedKey.substr(dotPos) == MODIFY_TIME_SUFFIX) ||
246             (changedKey.size() > SIZE_SUFFIX.length() && changedKey.substr(dotPos) == SIZE_SUFFIX)) {
247         assetKey = changedKey.substr(0, dotPos);
248         return true;
249     }
250     return false;
251 }
252 
SetAssetChangeCallBack(const AssetChangeCallback & assetChangeCallback)253 void WatcherProxy::SetAssetChangeCallBack(const AssetChangeCallback &assetChangeCallback)
254 {
255     assetChangeCallback_ = assetChangeCallback;
256 }
257 
GetInstance(const std::string & bundleName)258 DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
259 {
260     static std::mutex instLock_;
261     static DistributedObjectStore *instPtr = nullptr;
262     if (instPtr == nullptr) {
263         std::lock_guard<std::mutex> lock(instLock_);
264         if (instPtr == nullptr && !bundleName.empty()) {
265             RadarReporter::ReportStateStart(std::string(__FUNCTION__), CREATE, INIT_STORE, IDLE, START, bundleName);
266             LOG_INFO("new objectstore %{public}s", bundleName.c_str());
267             FlatObjectStore *flatObjectStore = new (std::nothrow) FlatObjectStore(bundleName);
268             if (flatObjectStore == nullptr) {
269                 LOG_ERROR("no memory for FlatObjectStore malloc!");
270                 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
271                     RADAR_FAILED, NO_MEMORY, FINISHED);
272                 return nullptr;
273             }
274             // Use instMemory to make sure this singleton not free before other object.
275             // This operation needn't to malloc memory, we needn't to check nullptr.
276             instPtr = new (std::nothrow) DistributedObjectStoreImpl(flatObjectStore);
277             if (instPtr == nullptr) {
278                 delete flatObjectStore;
279                 LOG_ERROR("no memory for DistributedObjectStoreImpl malloc!");
280                 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
281                     RADAR_FAILED, NO_MEMORY, FINISHED);
282                 return nullptr;
283             }
284             RadarReporter::ReportStage(std::string(__FUNCTION__), CREATE, INIT_STORE, RADAR_SUCCESS);
285         }
286     }
287     return instPtr;
288 }
289 
OnChanged(const std::string & sessionId,const std::string & networkId,const std::string & onlineStatus)290 void StatusNotifierProxy::OnChanged(
291     const std::string &sessionId, const std::string &networkId, const std::string &onlineStatus)
292 {
293     if (notifier != nullptr) {
294         notifier->OnChanged(sessionId, networkId, onlineStatus);
295     }
296 }
297 
StatusNotifierProxy(const std::shared_ptr<StatusNotifier> & notifier)298 StatusNotifierProxy::StatusNotifierProxy(const std::shared_ptr<StatusNotifier> &notifier) : notifier(notifier)
299 {
300 }
301 
~StatusNotifierProxy()302 StatusNotifierProxy::~StatusNotifierProxy()
303 {
304     LOG_ERROR("destroy");
305     notifier = nullptr;
306 }
307 } // namespace OHOS::ObjectStore
308