1 /* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include <mutex> 17 #include <unordered_set> 18 #include <map> 19 #include <memory> 20 21 #include "emitter_log.h" 22 #include "inner_event.h" 23 #include "event_handler_impl.h" 24 #include "cj_fn_invoker.h" 25 #include "emitter.h" 26 27 using InnerEvent = OHOS::AppExecFwk::InnerEvent; 28 using Priority = OHOS::AppExecFwk::EventQueue::Priority; 29 30 namespace OHOS::EventsEmitter { 31 const int32_t SUCCESS = 0; 32 struct EventDataWorker { 33 CEventData data; 34 InnerEvent::EventId eventId; 35 CallbackInfo* callbackInfo; 36 }; 37 38 static std::mutex g_emitterInsMutex; 39 static std::map<InnerEvent::EventId, std::unordered_set<CallbackInfo *>> g_emitterImpls; 40 std::shared_ptr<EventHandlerImpl> eventHandler = EventHandlerImpl::GetEventHandler(); 41 CallbackImpl(std::string name,std::function<void (CEventData)> callback)42 CallbackImpl::CallbackImpl(std::string name, std::function<void(CEventData)> callback) 43 : name(name), callback(callback) 44 {} 45 ~CallbackInfo()46 CallbackInfo::~CallbackInfo() 47 { 48 callbackImpl = nullptr; 49 } 50 IsExistValidCallback(const InnerEvent::EventId & eventId)51 bool IsExistValidCallback(const InnerEvent::EventId &eventId) 52 { 53 std::lock_guard<std::mutex> lock(g_emitterInsMutex); 54 auto subscribe = g_emitterImpls.find(eventId); 55 if (subscribe == g_emitterImpls.end()) { 56 LOGW("emit has no callback"); 57 return false; 58 } 59 for (auto callbackInfo : subscribe->second) { 60 if (!callbackInfo->isDeleted) { 61 return true; 62 } 63 } 64 return false; 65 } 66 EmitWithEventData(InnerEvent::EventId eventId,uint32_t priority,CEventData data)67 void EmitWithEventData(InnerEvent::EventId eventId, uint32_t priority, CEventData data) 68 { 69 if (!IsExistValidCallback(eventId)) { 70 LOGE("Invalid callback"); 71 return; 72 } 73 std::unique_ptr<CEventData> dataPtr; 74 if (data.size == 0) { 75 dataPtr = std::make_unique<CEventData>(); 76 } else { 77 dataPtr = std::make_unique<CEventData>(data); 78 } 79 auto event = InnerEvent::Get(eventId, dataPtr); 80 eventHandler->SendEvent(event, 0, static_cast<Priority>(priority)); 81 } 82 SearchCallbackInfo(const InnerEvent::EventId & eventIdValue,const std::string & callbackName)83 CallbackInfo *SearchCallbackInfo(const InnerEvent::EventId &eventIdValue, const std::string &callbackName) 84 { 85 auto subscribe = g_emitterImpls.find(eventIdValue); 86 if (subscribe == g_emitterImpls.end()) { 87 return nullptr; 88 } 89 for (auto callbackInfo : subscribe->second) { 90 if (callbackInfo->isDeleted) { 91 continue; 92 } 93 if (callbackInfo->callbackImpl->name == callbackName) { 94 LOGD("Callback found.") 95 return callbackInfo; 96 } 97 } 98 LOGD("Callback not found.") 99 return nullptr; 100 } 101 UpdateOnceFlag(CallbackInfo * callbackInfo,bool once)102 void UpdateOnceFlag(CallbackInfo *callbackInfo, bool once) 103 { 104 if (!once) { 105 if (callbackInfo->once) { 106 LOGD("On change once to on"); 107 callbackInfo->once = false; 108 } else { 109 LOGD("On already on"); 110 } 111 } else { 112 if (callbackInfo->once) { 113 LOGD("Once already once"); 114 } else { 115 LOGD("Once change on to once"); 116 callbackInfo->once = true; 117 } 118 } 119 } 120 OutPutEventIdLog(const InnerEvent::EventId & eventId)121 void OutPutEventIdLog(const InnerEvent::EventId &eventId) 122 { 123 if (eventId.index() == OHOS::AppExecFwk::TYPE_U32_INDEX) { 124 LOGD("Event id value: %{public}u", std::get<uint32_t>(eventId)); 125 } else { 126 LOGD("Event id value: %{public}s", std::get<std::string>(eventId).c_str()); 127 } 128 } 129 OnOrOnce(InnerEvent::EventId eventId,CallbackImpl ** callbackImpl,bool once)130 int32_t OnOrOnce(InnerEvent::EventId eventId, CallbackImpl **callbackImpl, bool once) 131 { 132 OutPutEventIdLog(eventId); 133 std::lock_guard<std::mutex> lock(g_emitterInsMutex); 134 auto callback = *callbackImpl; 135 auto callbackInfo = SearchCallbackInfo(eventId, callback->name); 136 if (callbackInfo != nullptr) { 137 UpdateOnceFlag(callbackInfo, once); 138 delete callback; 139 *callbackImpl = nullptr; 140 return SUCCESS; 141 } 142 callbackInfo = new (std::nothrow) CallbackInfo(); 143 if (!callbackInfo) { 144 LOGE("new callbackInfo failed"); 145 delete callback; 146 *callbackImpl = nullptr; 147 return MEMORY_ERROR; 148 } 149 callbackInfo->callbackImpl = callback; 150 callbackInfo->once = once; 151 g_emitterImpls[eventId].insert(callbackInfo); 152 return SUCCESS; 153 } 154 On(uint32_t eventId,CallbackImpl * callback)155 int32_t Emitter::On(uint32_t eventId, CallbackImpl *callback) 156 { 157 InnerEvent::EventId id = eventId; 158 return OnOrOnce(id, &callback, false); 159 } 160 On(char * eventId,CallbackImpl * callback)161 int32_t Emitter::On(char* eventId, CallbackImpl *callback) 162 { 163 InnerEvent::EventId id = std::string(eventId); 164 return OnOrOnce(id, &callback, false); 165 } 166 Once(uint32_t eventId,CallbackImpl * callback)167 int32_t Emitter::Once(uint32_t eventId, CallbackImpl *callback) 168 { 169 InnerEvent::EventId id = eventId; 170 return OnOrOnce(id, &callback, true); 171 } 172 Once(char * eventId,CallbackImpl * callback)173 int32_t Emitter::Once(char* eventId, CallbackImpl *callback) 174 { 175 InnerEvent::EventId id = std::string(eventId); 176 return OnOrOnce(id, &callback, true); 177 } 178 Unsubscribe(InnerEvent::EventId eventId)179 void Unsubscribe(InnerEvent::EventId eventId) 180 { 181 std::lock_guard<std::mutex> lock(g_emitterInsMutex); 182 auto subscribe = g_emitterImpls.find(eventId); 183 if (subscribe != g_emitterImpls.end()) { 184 for (auto callbackInfo : subscribe->second) { 185 callbackInfo->isDeleted = true; 186 } 187 } 188 } 189 Unsubscribe(InnerEvent::EventId eventId,CallbackImpl * callback)190 void Unsubscribe(InnerEvent::EventId eventId, CallbackImpl *callback) 191 { 192 std::lock_guard<std::mutex> lock(g_emitterInsMutex); 193 auto callbackInfo = SearchCallbackInfo(eventId, callback->name); 194 if (callbackInfo != nullptr) { 195 callbackInfo->isDeleted = true; 196 } 197 } 198 Off(uint32_t eventId)199 void Emitter::Off(uint32_t eventId) 200 { 201 InnerEvent::EventId id = eventId; 202 Unsubscribe(id); 203 } 204 Off(char * eventId)205 void Emitter::Off(char* eventId) 206 { 207 InnerEvent::EventId id = std::string(eventId); 208 Unsubscribe(id); 209 } 210 Off(uint32_t eventId,CallbackImpl * callback)211 void Emitter::Off(uint32_t eventId, CallbackImpl *callback) 212 { 213 InnerEvent::EventId id = eventId; 214 Unsubscribe(id, callback); 215 } 216 Off(char * eventId,CallbackImpl * callback)217 void Emitter::Off(char* eventId, CallbackImpl *callback) 218 { 219 InnerEvent::EventId id = std::string(eventId); 220 Unsubscribe(id, callback); 221 } 222 Emit(uint32_t eventId,uint32_t priority,CEventData data)223 void Emitter::Emit(uint32_t eventId, uint32_t priority, CEventData data) 224 { 225 InnerEvent::EventId id = eventId; 226 EmitWithEventData(id, priority, data); 227 } 228 Emit(char * eventId,uint32_t priority,CEventData data)229 void Emitter::Emit(char* eventId, uint32_t priority, CEventData data) 230 { 231 InnerEvent::EventId id = std::string(eventId); 232 EmitWithEventData(id, priority, data); 233 } 234 GetListenerCountByEventId(InnerEvent::EventId eventId)235 uint32_t GetListenerCountByEventId(InnerEvent::EventId eventId) 236 { 237 uint32_t count = 0; 238 std::lock_guard<std::mutex> lock(g_emitterInsMutex); 239 auto subscribe = g_emitterImpls.find(eventId); 240 if (subscribe != g_emitterImpls.end()) { 241 for (auto callbackInfo : subscribe->second) { 242 if (!callbackInfo->isDeleted) { 243 ++count; 244 } 245 } 246 } 247 return count; 248 } 249 GetListenerCount(uint32_t eventId)250 uint32_t Emitter::GetListenerCount(uint32_t eventId) 251 { 252 InnerEvent::EventId id = eventId; 253 return GetListenerCountByEventId(id); 254 } 255 GetListenerCount(std::string eventId)256 uint32_t Emitter::GetListenerCount(std::string eventId) 257 { 258 InnerEvent::EventId id = eventId; 259 return GetListenerCountByEventId(id); 260 } 261 FreeCEventData(CEventData & eventData)262 void FreeCEventData(CEventData &eventData) 263 { 264 auto params = reinterpret_cast<CParameter *>(eventData.parameters); 265 for (int i = 0; i < eventData.size; i++) { 266 free(params[i].key); 267 free(params[i].value); 268 params[i].key = nullptr; 269 params[i].value = nullptr; 270 } 271 free(params); 272 params = nullptr; 273 } 274 ProcessCallback(const InnerEvent::Pointer & event,std::unordered_set<CallbackInfo * > & callbackInfos)275 void ProcessCallback(const InnerEvent::Pointer& event, std::unordered_set<CallbackInfo *>& callbackInfos) 276 { 277 auto value = event->GetUniqueObject<CEventData>(); 278 CEventData eventData = { .parameters = nullptr, .size = 0}; 279 if (value != nullptr) { 280 eventData = *value; 281 } 282 for (auto iter = callbackInfos.begin(); iter != callbackInfos.end();) { 283 CallbackInfo* callbackInfo = *iter; 284 if (callbackInfo->once || callbackInfo->isDeleted) { 285 LOGI("once callback or isDeleted callback"); 286 iter = callbackInfos.erase(iter); 287 if (callbackInfo->processed) { 288 delete callbackInfo->callbackImpl; 289 delete callbackInfo; 290 callbackInfo = nullptr; 291 continue; 292 } 293 } else { 294 ++iter; 295 } 296 if (!callbackInfo->isDeleted) { 297 callbackInfo->callbackImpl->callback(eventData); 298 } else { 299 LOGD("callback is deleted."); 300 } 301 callbackInfo->processed = true; 302 } 303 FreeCEventData(eventData); 304 } 305 ProcessEvent(const InnerEvent::Pointer & event)306 void EventHandlerImpl::ProcessEvent(const InnerEvent::Pointer& event) 307 { 308 LOGI("ProcessEvent"); 309 InnerEvent::EventId eventId = event->GetInnerEventIdEx(); 310 OutPutEventIdLog(eventId); 311 std::unordered_set<CallbackInfo *> callbackInfos; 312 { 313 std::lock_guard<std::mutex> lock(g_emitterInsMutex); 314 auto subscribe = g_emitterImpls.find(eventId); 315 if (subscribe == g_emitterImpls.end()) { 316 LOGW("ProcessEvent has no callback"); 317 return; 318 } 319 callbackInfos = subscribe->second; 320 } 321 LOGD("size = %{public}zu", callbackInfos.size()); 322 ProcessCallback(event, callbackInfos); 323 if (callbackInfos.empty()) { 324 g_emitterImpls.erase(eventId); 325 LOGD("ProcessEvent delete the last callback"); 326 } else { 327 std::lock_guard<std::mutex> lock(g_emitterInsMutex); 328 auto subscribe = g_emitterImpls.find(eventId); 329 subscribe->second = callbackInfos; 330 } 331 } 332 }