1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <thread>
17 #include <unordered_set>
18
19 #include "hitrace.h"
20 #include "distributed_object_impl.h"
21 #include "distributed_objectstore_impl.h"
22 #include "objectstore_errors.h"
23 #include "softbus_adapter.h"
24 #include "string_utils.h"
25 #include "asset_change_timer.h"
26 #include "object_radar_reporter.h"
27
28 namespace OHOS::ObjectStore {
DistributedObjectStoreImpl(FlatObjectStore * flatObjectStore)29 DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
30 : flatObjectStore_(flatObjectStore)
31 {
32 }
33
~DistributedObjectStoreImpl()34 DistributedObjectStoreImpl::~DistributedObjectStoreImpl()
35 {
36 delete flatObjectStore_;
37 }
38
CacheObject(const std::string & sessionId,FlatObjectStore * flatObjectStore)39 DistributedObject *DistributedObjectStoreImpl::CacheObject(
40 const std::string &sessionId, FlatObjectStore *flatObjectStore)
41 {
42 DistributedObjectImpl *object = new (std::nothrow) DistributedObjectImpl(sessionId, flatObjectStore);
43 if (object == nullptr) {
44 return nullptr;
45 }
46 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
47 objects_.push_back(object);
48 return object;
49 }
50
RemoveCacheObject(const std::string & sessionId)51 void DistributedObjectStoreImpl::RemoveCacheObject(const std::string &sessionId)
52 {
53 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
54 auto iter = objects_.begin();
55 while (iter != objects_.end()) {
56 if ((*iter)->GetSessionId() == sessionId) {
57 delete *iter;
58 iter = objects_.erase(iter);
59 } else {
60 iter++;
61 }
62 }
63 return;
64 }
65
CreateObject(const std::string & sessionId)66 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId)
67 {
68 DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
69 if (flatObjectStore_ == nullptr) {
70 LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
71 return nullptr;
72 }
73
74 if (sessionId.empty()) {
75 LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
76 return nullptr;
77 }
78
79 uint32_t status = flatObjectStore_->CreateObject(sessionId);
80 if (status != SUCCESS) {
81 LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
82 return nullptr;
83 }
84 return CacheObject(sessionId, flatObjectStore_);
85 }
86
CreateObject(const std::string & sessionId,uint32_t & status)87 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId, uint32_t &status)
88 {
89 DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
90 if (flatObjectStore_ == nullptr) {
91 LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
92 status = ERR_NULL_OBJECTSTORE;
93 return nullptr;
94 }
95
96 if (sessionId.empty()) {
97 LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
98 status = ERR_INVALID_ARGS;
99 return nullptr;
100 }
101
102 status = flatObjectStore_->CreateObject(sessionId);
103 if (status != SUCCESS) {
104 LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
105 return nullptr;
106 }
107 return CacheObject(sessionId, flatObjectStore_);
108 }
109
DeleteObject(const std::string & sessionId)110 uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
111 {
112 DataObjectHiTrace trace("DistributedObjectStoreImpl::DeleteObject");
113 if (flatObjectStore_ == nullptr) {
114 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
115 return ERR_NULL_OBJECTSTORE;
116 }
117 uint32_t status = flatObjectStore_->Delete(sessionId);
118 if (status != SUCCESS) {
119 LOG_ERROR("DistributedObjectStoreImpl::DeleteObject store delete err %{public}d", status);
120 return status;
121 }
122 RemoveCacheObject(sessionId);
123 return SUCCESS;
124 }
125
Get(const std::string & sessionId,DistributedObject ** object)126 uint32_t DistributedObjectStoreImpl::Get(const std::string &sessionId, DistributedObject **object)
127 {
128 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
129 auto iter = objects_.begin();
130 while (iter != objects_.end()) {
131 if ((*iter)->GetSessionId() == sessionId) {
132 *object = *iter;
133 return SUCCESS;
134 }
135 iter++;
136 }
137 LOG_ERROR("DistributedObjectStoreImpl::Get object err, no object");
138 return ERR_GET_OBJECT;
139 }
140
Watch(DistributedObject * object,std::shared_ptr<ObjectWatcher> watcher)141 uint32_t DistributedObjectStoreImpl::Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher)
142 {
143 if (object == nullptr) {
144 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
145 return ERR_NULL_OBJECT;
146 }
147 if (flatObjectStore_ == nullptr) {
148 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
149 return ERR_NULL_OBJECTSTORE;
150 }
151 std::lock_guard<std::mutex> lock(watchersLock_);
152 if (watchers_.count(object) != 0) {
153 LOG_ERROR("DistributedObjectStoreImpl::Watch already gets object");
154 return ERR_EXIST;
155 }
156 std::shared_ptr<WatcherProxy> watcherProxy = std::make_shared<WatcherProxy>(watcher, object->GetSessionId());
157 watcherProxy->SetAssetChangeCallBack(
158 [=](const std::string &sessionId, const std::string &assetKey, std::shared_ptr<ObjectWatcher> objectWatcher) {
159 AssetChangeTimer *assetChangeTimer = AssetChangeTimer::GetInstance(flatObjectStore_);
160 assetChangeTimer->OnAssetChanged(sessionId, assetKey, objectWatcher);
161 });
162 uint32_t status = flatObjectStore_->Watch(object->GetSessionId(), watcherProxy);
163 if (status != SUCCESS) {
164 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
165 return status;
166 }
167 watchers_.insert_or_assign(object, watcherProxy);
168 LOG_INFO("DistributedObjectStoreImpl:Watch object success.");
169 return SUCCESS;
170 }
171
UnWatch(DistributedObject * object)172 uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
173 {
174 if (object == nullptr) {
175 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
176 return ERR_NULL_OBJECT;
177 }
178 if (flatObjectStore_ == nullptr) {
179 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
180 return ERR_NULL_OBJECTSTORE;
181 }
182 uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
183 if (status != SUCCESS) {
184 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
185 return status;
186 }
187 std::lock_guard<std::mutex> lock(watchersLock_);
188 watchers_.erase(object);
189 LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
190 return SUCCESS;
191 }
192
SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)193 uint32_t DistributedObjectStoreImpl::SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)
194 {
195 if (flatObjectStore_ == nullptr) {
196 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
197 return ERR_NULL_OBJECTSTORE;
198 }
199 std::shared_ptr<StatusNotifierProxy> watcherProxy = std::make_shared<StatusNotifierProxy>(notifier);
200 return flatObjectStore_->SetStatusNotifier(watcherProxy);
201 }
202
NotifyCachedStatus(const std::string & sessionId)203 void DistributedObjectStoreImpl::NotifyCachedStatus(const std::string &sessionId)
204 {
205 flatObjectStore_->CheckRetrieveCache(sessionId);
206 }
207
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher,const std::string & sessionId)208 WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
209 : FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
210 {
211 }
212
OnChanged(const std::string & sessionId,const std::vector<std::string> & changedData,bool enableTransfer)213 void WatcherProxy::OnChanged(
214 const std::string &sessionId, const std::vector<std::string> &changedData, bool enableTransfer)
215 {
216 std::unordered_set<std::string> transferKeys;
217 std::vector<std::string> otherKeys;
218 for (const auto &str : changedData) {
219 if (str.find(ASSET_DOT) == std::string::npos) {
220 if (str != DEVICEID_KEY) {
221 otherKeys.push_back(str);
222 }
223 } else {
224 std::string assetKey;
225 if (FindChangedAssetKey(str, assetKey)) {
226 transferKeys.insert(assetKey);
227 }
228 }
229 }
230 if (!enableTransfer) {
231 otherKeys.insert(otherKeys.end(), transferKeys.begin(), transferKeys.end());
232 } else if (assetChangeCallback_ != nullptr && !transferKeys.empty()) {
233 for (auto &assetKey : transferKeys) {
234 assetChangeCallback_(sessionId, assetKey, objectWatcher_);
235 }
236 }
237 if (!otherKeys.empty()) {
238 objectWatcher_->OnChanged(sessionId, otherKeys);
239 }
240 }
241
FindChangedAssetKey(const std::string & changedKey,std::string & assetKey)242 bool WatcherProxy::FindChangedAssetKey(const std::string &changedKey, std::string &assetKey)
243 {
244 std::size_t dotPos = changedKey.find(ASSET_DOT);
245 if ((changedKey.size() > MODIFY_TIME_SUFFIX.length() && changedKey.substr(dotPos) == MODIFY_TIME_SUFFIX) ||
246 (changedKey.size() > SIZE_SUFFIX.length() && changedKey.substr(dotPos) == SIZE_SUFFIX)) {
247 assetKey = changedKey.substr(0, dotPos);
248 return true;
249 }
250 return false;
251 }
252
SetAssetChangeCallBack(const AssetChangeCallback & assetChangeCallback)253 void WatcherProxy::SetAssetChangeCallBack(const AssetChangeCallback &assetChangeCallback)
254 {
255 assetChangeCallback_ = assetChangeCallback;
256 }
257
GetInstance(const std::string & bundleName)258 DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
259 {
260 static std::mutex instLock_;
261 static DistributedObjectStore *instPtr = nullptr;
262 if (instPtr == nullptr) {
263 std::lock_guard<std::mutex> lock(instLock_);
264 if (instPtr == nullptr && !bundleName.empty()) {
265 RadarReporter::ReportStateStart(std::string(__FUNCTION__), CREATE, INIT_STORE, IDLE, START, bundleName);
266 LOG_INFO("new objectstore %{public}s", bundleName.c_str());
267 FlatObjectStore *flatObjectStore = new (std::nothrow) FlatObjectStore(bundleName);
268 if (flatObjectStore == nullptr) {
269 LOG_ERROR("no memory for FlatObjectStore malloc!");
270 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
271 RADAR_FAILED, NO_MEMORY, FINISHED);
272 return nullptr;
273 }
274 // Use instMemory to make sure this singleton not free before other object.
275 // This operation needn't to malloc memory, we needn't to check nullptr.
276 instPtr = new (std::nothrow) DistributedObjectStoreImpl(flatObjectStore);
277 if (instPtr == nullptr) {
278 delete flatObjectStore;
279 LOG_ERROR("no memory for DistributedObjectStoreImpl malloc!");
280 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
281 RADAR_FAILED, NO_MEMORY, FINISHED);
282 return nullptr;
283 }
284 RadarReporter::ReportStage(std::string(__FUNCTION__), CREATE, INIT_STORE, RADAR_SUCCESS);
285 }
286 }
287 return instPtr;
288 }
289
OnChanged(const std::string & sessionId,const std::string & networkId,const std::string & onlineStatus)290 void StatusNotifierProxy::OnChanged(
291 const std::string &sessionId, const std::string &networkId, const std::string &onlineStatus)
292 {
293 if (notifier != nullptr) {
294 notifier->OnChanged(sessionId, networkId, onlineStatus);
295 }
296 }
297
StatusNotifierProxy(const std::shared_ptr<StatusNotifier> & notifier)298 StatusNotifierProxy::StatusNotifierProxy(const std::shared_ptr<StatusNotifier> ¬ifier) : notifier(notifier)
299 {
300 }
301
~StatusNotifierProxy()302 StatusNotifierProxy::~StatusNotifierProxy()
303 {
304 LOG_ERROR("destroy");
305 notifier = nullptr;
306 }
307 } // namespace OHOS::ObjectStore
308