/* * Copyright (c) 2022 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef DATA_SHARE_CALLBACKS_MANAGER_H #define DATA_SHARE_CALLBACKS_MANAGER_H #include #include #include #include "datashare_errno.h" #include "datashare_log.h" #include "datashare_template.h" namespace OHOS::DataShare { template class CallbacksManager { public: struct ObserverNodeOnEnabled { ObserverNodeOnEnabled(const std::shared_ptr &observer, bool isNotifyOnEnabled = false) : observer_(observer), isNotifyOnEnabled_(isNotifyOnEnabled) {}; std::shared_ptr observer_; bool isNotifyOnEnabled_; }; std::vector AddObservers(const std::vector &keys, void *subscriber, const std::shared_ptr observer, std::function &, const std::shared_ptr &observer)>, std::function &, const std::shared_ptr &observer, std::vector &)>); std::vector DelObservers(const std::vector &keys, void *subscriber, std::function &, std::vector &)> processOnLastDel = CallbacksManager::DefaultProcess); std::vector DelObservers(void *subscriber, std::function &, std::vector &)> processOnLastDel = CallbacksManager::DefaultProcess); std::vector EnableObservers(const std::vector &keys, void *subscriber, std::function> &)> processOnLocalEnabled, std::function &, std::vector &)>); std::vector DisableObservers(const std::vector &keys, void *subscriber, std::function &, std::vector &)> processOnLastDel = CallbacksManager::DefaultProcess); std::vector> GetEnabledObservers(const Key &); int GetAllSubscriberSize(); int GetAllSubscriberSize(const Key &key); std::vector GetKeys(); void SetObserversNotifiedOnEnabled(const Key &key); private: static void DefaultProcess(const std::vector &, std::vector &){}; struct ObserverNode { std::shared_ptr observer_; bool enabled_; void *subscriber_; bool isNotifyOnEnabled_; ObserverNode(const std::shared_ptr &observer, void *subscriber) : observer_(observer), subscriber_(subscriber) { enabled_ = true; isNotifyOnEnabled_ = false; }; }; void DelLocalObservers(const Key &key, void *subscriber, std::vector &lastDelKeys, std::vector &result); void DelLocalObservers(void *subscriber, std::vector &lastDelKeys, std::vector &result); std::recursive_mutex mutex_{}; std::map> callbacks_; }; template std::vector CallbacksManager::AddObservers(const std::vector &keys, void *subscriber, const std::shared_ptr observer, std::function &, const std::shared_ptr &observer)> processOnLocalAdd, std::function &, const std::shared_ptr &observer, std::vector &)> processOnFirstAdd) { std::vector result; std::vector firstRegisterKey; std::vector localRegisterKey; { std::lock_guard lck(mutex_); for (auto &key : keys) { std::vector> enabledObservers = GetEnabledObservers(key); if (enabledObservers.empty()) { callbacks_[key].emplace_back(observer, subscriber); firstRegisterKey.emplace_back(key); continue; } localRegisterKey.emplace_back(key); callbacks_[key].emplace_back(observer, subscriber); result.emplace_back(key, E_OK); } } if (!localRegisterKey.empty()) { processOnLocalAdd(localRegisterKey, observer); } processOnFirstAdd(firstRegisterKey, observer, result); return result; } template std::vector CallbacksManager::GetKeys() { std::vector keys; { std::lock_guard lck(mutex_); for (auto &it : callbacks_) { keys.emplace_back(it.first); } } return keys; } template void CallbacksManager::DelLocalObservers(void *subscriber, std::vector &lastDelKeys, std::vector &result) { for (auto &it : callbacks_) { DelLocalObservers(it.first, subscriber, lastDelKeys, result); } } template void CallbacksManager::DelLocalObservers(const Key &key, void *subscriber, std::vector &lastDelKeys, std::vector &result) { auto it = callbacks_.find(key); if (it == callbacks_.end()) { result.emplace_back(key, E_UNREGISTERED_EMPTY); return; } std::vector &callbacks = it->second; auto callbackIt = callbacks.begin(); while (callbackIt != callbacks.end()) { if (callbackIt->subscriber_ != subscriber) { callbackIt++; continue; } callbackIt = callbacks.erase(callbackIt); } if (!it->second.empty()) { result.emplace_back(key, E_OK); return; } lastDelKeys.emplace_back(key); } template std::vector CallbacksManager::DelObservers(void *subscriber, std::function &, std::vector &)> processOnLastDel) { std::vector result; std::vector lastDelKeys; { std::lock_guard lck(mutex_); DelLocalObservers(subscriber, lastDelKeys, result); if (lastDelKeys.empty()) { return result; } for (auto &key : lastDelKeys) { callbacks_.erase(key); } } processOnLastDel(lastDelKeys, result); return result; } template std::vector CallbacksManager::DelObservers(const std::vector &keys, void *subscriber, std::function &, std::vector &)> processOnLastDel) { std::vector result; std::vector lastDelKeys; { std::lock_guard lck(mutex_); for (auto &key : keys) { DelLocalObservers(key, subscriber, lastDelKeys, result); } if (lastDelKeys.empty()) { return result; } for (auto &key : lastDelKeys) { callbacks_.erase(key); } } processOnLastDel(lastDelKeys, result); return result; } template std::vector> CallbacksManager::GetEnabledObservers(const Key &inputKey) { std::lock_guard lck(mutex_); auto it = callbacks_.find(inputKey); if (it == callbacks_.end()) { return std::vector>(); } std::vector> results; for (const auto &value : it->second) { if (value.enabled_ && value.observer_ != nullptr) { results.emplace_back(value.observer_); } } return results; } template std::vector CallbacksManager::EnableObservers( const std::vector &keys, void *subscriber, std::function> &)> enableLocalFunc, std::function &, std::vector &)> enableServiceFunc) { std::vector result; std::vector sendServiceKeys; std::map> refreshObservers; { std::lock_guard lck(mutex_); for (auto &key : keys) { auto it = callbacks_.find(key); if (it == callbacks_.end()) { result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST); continue; } auto& allObservers = it->second; auto iterator = std::find_if(allObservers.begin(), allObservers.end(), [&subscriber](ObserverNode node) { if (node.subscriber_ == subscriber) { return true; } return false; }); if (iterator == allObservers.end()) { result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST); continue; } if (iterator->enabled_) { result.emplace_back(key, E_OK); continue; } std::vector> enabledObservers = GetEnabledObservers(key); if (enabledObservers.empty()) { sendServiceKeys.emplace_back(key); } refreshObservers[key].emplace_back(iterator->observer_, iterator->isNotifyOnEnabled_); iterator->enabled_ = true; } } enableServiceFunc(sendServiceKeys, result); enableLocalFunc(refreshObservers); return result; } template std::vector CallbacksManager::DisableObservers(const std::vector &keys, void *subscriber, std::function &, std::vector &)> processOnLastDisable) { std::vector result; std::vector lastDisabledKeys; { std::lock_guard lck(mutex_); for (auto &key : keys) { auto it = callbacks_.find(key); if (it == callbacks_.end()) { result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST); continue; } std::vector> enabledObservers = GetEnabledObservers(key); if (enabledObservers.empty()) { result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST); continue; } bool hasDisabled = false; for (auto &item : callbacks_[key]) { if (item.subscriber_ == subscriber) { item.enabled_ = false; item.isNotifyOnEnabled_ = false; hasDisabled = true; } } if (!hasDisabled) { result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST); continue; } enabledObservers = GetEnabledObservers(key); if (!enabledObservers.empty()) { result.emplace_back(key, E_OK); continue; } lastDisabledKeys.emplace_back(key); } } processOnLastDisable(lastDisabledKeys, result); return result; } template int CallbacksManager::GetAllSubscriberSize() { int count = 0; std::lock_guard lck(mutex_); for (auto &[key, value] : callbacks_) { count += GetAllSubscriberSize(key); } return count; } template int CallbacksManager::GetAllSubscriberSize(const Key &key) { std::lock_guard lck(mutex_); auto it = callbacks_.find(key); if (it == callbacks_.end()) { return 0; } return it->second.size(); } template void CallbacksManager::SetObserversNotifiedOnEnabled(const Key &key) { std::lock_guard lck(mutex_); auto it = callbacks_.find(key); if (it == callbacks_.end()) { return; } std::vector &callbacks = it->second; uint32_t num = 0; for (auto &observerNode : callbacks) { if (!observerNode.enabled_) { num++; observerNode.isNotifyOnEnabled_ = true; } } if (num > 0) { LOG_INFO("total %{public}zu, not refreshed %{public}u", callbacks.size(), num); } } } // namespace OHOS::DataShare #endif // DATA_SHARE_CALLBACKS_MANAGER_H