1 /*
2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "event_loop.h"
16
17 #include <climits>
18 #include <functional>
19 #include <thread>
20
21 #if defined(__HIVIEW_OHOS__)
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/prctl.h>
26 #elif defined(_WIN32)
27 #include <processthreadsapi.h>
28 #include <sstream>
29 #include <Synchapi.h>
30 #include <tchar.h>
31 #include <windows.h>
32 #endif
33
34 #include "audit.h"
35 #include "file_util.h"
36 #include "hiview_logger.h"
37 #include "memory_util.h"
38 #include "thread_util.h"
39 #include "time_util.h"
40 namespace OHOS {
41 namespace HiviewDFX {
42 namespace {
GetFalseFuture()43 std::future<bool> GetFalseFuture()
44 {
45 std::promise<bool> tmpPromise;
46 tmpPromise.set_value(false);
47 return tmpPromise.get_future();
48 }
49 }
50
51 DEFINE_LOG_TAG("HiView-EventLoop");
52
EventLoop(const std::string & name)53 EventLoop::EventLoop(const std::string &name) : name_(name), nextWakeupTime_(0), currentProcessingEvent_(nullptr)
54 {}
55
~EventLoop()56 EventLoop::~EventLoop()
57 {
58 StopLoop();
59 }
60
InitEventQueueNotifier()61 bool EventLoop::InitEventQueueNotifier()
62 {
63 #if defined(__HIVIEW_OHOS__)
64 #if defined(USE_POLL)
65 for (int i = 0; i < 2; i++) { // 2:event queue fd size
66 if (eventQueueFd_[i] > 0) {
67 close(eventQueueFd_[i]);
68 eventQueueFd_[i] = -1;
69 }
70 }
71
72 if (pipe2(eventQueueFd_, O_CLOEXEC) != 0) {
73 HIVIEW_LOGW("Failed to create event queue fd.");
74 return false;
75 }
76
77 watchFds_[0].fd = eventQueueFd_[0];
78 watchFds_[0].events = POLLIN;
79 watchedFdSize_ = 1;
80 #else
81 #if defined EPOLL_CLOEXEC
82 sharedPollingFd_ = UniqueFd(epoll_create1(EPOLL_CLOEXEC));
83 #else
84 sharedPollingFd_ = UniqueFd(epoll_create(1024)); // listen 1024 sockets
85 #endif
86 pendingEventQueueFd_ = UniqueFd(eventfd(0, EFD_NONBLOCK));
87 struct epoll_event eventItem;
88 eventItem.events = EPOLLIN | EPOLLET;
89 eventItem.data.fd = pendingEventQueueFd_.Get();
90 int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, pendingEventQueueFd_.Get(), &eventItem);
91 if (result < 0) {
92 HIVIEW_LOGE("Fail to Create event poll queue.");
93 return false;
94 }
95 #endif
96 #elif defined(_WIN32)
97 watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX] = CreateEventA(NULL, FALSE, FALSE, NULL);
98 #endif
99 return true;
100 }
101
WakeUp()102 void EventLoop::WakeUp()
103 {
104 #if defined(__HIVIEW_OHOS__)
105 #ifdef USE_POLL
106 if (eventQueueFd_[1] > 0) {
107 int32_t count = 1;
108 write(eventQueueFd_[1], &count, sizeof(count));
109 }
110 #else
111 if (pendingEventQueueFd_.Get() > 0) {
112 eventfd_t count = 1;
113 write(pendingEventQueueFd_.Get(), &count, sizeof(count));
114 }
115 #endif
116 #elif defined(_WIN32)
117 SetEvent(watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX]);
118 #endif
119 }
120
StartLoop(bool createNewThread)121 void EventLoop::StartLoop(bool createNewThread)
122 {
123 std::lock_guard<std::mutex> lock(queueMutex_);
124 if (IsRunning()) {
125 return;
126 }
127 if (!InitEventQueueNotifier()) {
128 return;
129 }
130
131 isRunning_ = true;
132 if (createNewThread) {
133 thread_ = std::make_unique<std::thread>(&EventLoop::Run, this);
134 return;
135 }
136 // handle loop in current thread cases
137 Run();
138 }
139
StopLoop()140 void EventLoop::StopLoop()
141 {
142 needQuit_ = true;
143 if (!IsRunning()) {
144 return;
145 }
146
147 {
148 std::lock_guard<std::mutex> lock(queueMutex_);
149 while (!pendingEvents_.empty()) {
150 pendingEvents_.pop();
151 }
152 isRunning_ = false;
153 }
154
155 WakeUp();
156 if (thread_ != nullptr && thread_->joinable()) {
157 thread_->join();
158 }
159 }
160
AddEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task task)161 uint64_t EventLoop::AddEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task task)
162 {
163 if (needQuit_) {
164 return 0;
165 }
166
167 uint64_t now = NanoSecondSinceSystemStart();
168 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
169 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
170 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
171 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
172 }
173
174 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
175 loopEvent.event = std::move(event);
176 loopEvent.handler = handler;
177 loopEvent.task = task;
178 std::lock_guard<std::mutex> lock(queueMutex_);
179 pendingEvents_.push(std::move(loopEvent));
180 WakeUp();
181 return now;
182 }
183
AddEventForResult(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event)184 std::future<bool> EventLoop::AddEventForResult(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event)
185 {
186 if (needQuit_) {
187 return GetFalseFuture();
188 }
189
190 if (handler == nullptr || event == nullptr) {
191 return GetFalseFuture();
192 }
193
194 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
195 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
196 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
197 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
198 }
199
200 auto bind = std::bind(&EventHandler::OnEventProxy, handler.get(), event);
201 auto task = std::make_shared<std::packaged_task<bool()>>(bind);
202 auto result = task->get_future();
203 uint64_t now = NanoSecondSinceSystemStart();
204 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
205 loopEvent.taskType = LOOP_PACKAGED_TASK;
206 loopEvent.event = std::move(event);
207 loopEvent.handler = handler;
208 loopEvent.packagedTask = std::move(task);
209 std::lock_guard<std::mutex> lock(queueMutex_);
210 pendingEvents_.push(std::move(loopEvent));
211 WakeUp();
212 return result;
213 }
214
AddTimerEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task & task,uint64_t interval,bool repeat)215 uint64_t EventLoop::AddTimerEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event,
216 const Task &task, uint64_t interval, bool repeat)
217 {
218 if (needQuit_) {
219 return 0;
220 }
221
222 uint64_t now = NanoSecondSinceSystemStart();
223 uint64_t intervalMicro = interval * static_cast<uint64_t>(TimeUtil::SEC_TO_NANOSEC);
224 if (now + intervalMicro < now) {
225 HIVIEW_LOGW("Add Timer Event fail. The interval is too large. please check.");
226 return -1;
227 }
228
229 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
230 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
231 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
232 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
233 }
234
235 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
236 loopEvent.isRepeat = repeat;
237 loopEvent.taskType = LOOP_EVENT_TASK;
238 loopEvent.interval = intervalMicro;
239 loopEvent.targetTime = now + intervalMicro;
240 loopEvent.event = std::move(event);
241 loopEvent.handler = handler;
242 loopEvent.task = task;
243 std::lock_guard<std::mutex> lock(queueMutex_);
244 pendingEvents_.push(std::move(loopEvent));
245 HIVIEW_LOGI("task[%{public}" PRIu64 "|%{public}d] has been pushed into task queue.", interval,
246 static_cast<int>(repeat));
247 ResetTimerIfNeedLocked();
248 return now;
249 }
250
RemoveEvent(uint64_t seq)251 bool EventLoop::RemoveEvent(uint64_t seq)
252 {
253 std::lock_guard<std::mutex> lock(queueMutex_);
254 auto curEvent = currentProcessingEvent_.load(std::memory_order_relaxed);
255 if ((curEvent != nullptr) && (curEvent->seq == seq)) {
256 curEvent->seq = 0;
257 HIVIEW_LOGI("removing the current processing event.");
258 return false;
259 }
260 return pendingEvents_.remove(seq);
261 }
262
GetRawName() const263 std::string EventLoop::GetRawName() const
264 {
265 auto pos = name_.find('@');
266 return pos == std::string::npos ? name_ : name_.substr(0, pos);
267 }
268
ResetTimerIfNeedLocked()269 void EventLoop::ResetTimerIfNeedLocked()
270 {
271 const LoopEvent &event = pendingEvents_.top();
272 if (nextWakeupTime_ == event.targetTime) {
273 return;
274 }
275 WakeUp();
276 }
277
AddFileDescriptorEventCallback(const std::string & name,std::shared_ptr<FileDescriptorEventCallback> source)278 bool EventLoop::AddFileDescriptorEventCallback(
279 const std::string &name, std::shared_ptr<FileDescriptorEventCallback> source)
280 {
281 if (needQuit_) {
282 return false;
283 }
284
285 std::lock_guard<std::mutex> lock(queueMutex_);
286 #if defined(__HIVIEW_OHOS__)
287 if (eventSourceNameMap_.size() >= (MAX_WATCHED_FDS - 1)) {
288 HIVIEW_LOGW("Watched fds exceed 64.");
289 return false;
290 }
291
292 if (eventSourceNameMap_.find(name) != eventSourceNameMap_.end()) {
293 HIVIEW_LOGW("Exist fd callback with same name.");
294 return false;
295 }
296
297 int fd = source->GetPollFd();
298 if (fd <= 0) {
299 HIVIEW_LOGW("Invalid poll fd.");
300 return false;
301 }
302
303 #ifdef USE_POLL
304 eventSourceNameMap_[name] = fd;
305 eventSourceMap_[fd] = source;
306 modifyFdStatus_ = true;
307 WakeUp();
308 #else
309 struct epoll_event eventItem;
310 eventItem.events = source->GetPollType();
311 eventItem.data.fd = fd;
312 int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, fd, &eventItem);
313 if (result < 0) {
314 HIVIEW_LOGW("Fail to Add Fd callback.");
315 return false;
316 }
317
318 eventSourceNameMap_[name] = fd;
319 eventSourceMap_[fd] = source;
320 #endif
321 #elif defined(_WIN32)
322 // not supported yet
323 #endif
324 return true;
325 }
326
RemoveFileDescriptorEventCallback(const std::string & name)327 bool EventLoop::RemoveFileDescriptorEventCallback(const std::string &name)
328 {
329 std::lock_guard<std::mutex> lock(queueMutex_);
330 #if defined(__HIVIEW_OHOS__)
331 if (eventSourceNameMap_.find(name) == eventSourceNameMap_.end()) {
332 HIVIEW_LOGW("fd callback name is not existed.");
333 return false;
334 }
335
336 int fd = eventSourceNameMap_[name];
337 eventSourceNameMap_.erase(name);
338 eventSourceMap_.erase(fd);
339
340 #ifdef USE_POLL
341 modifyFdStatus_ = true;
342 WakeUp();
343 #else
344 if (epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_DEL, fd, nullptr) == -1) {
345 HIVIEW_LOGW("fail to remove watched fd.");
346 }
347 #endif
348 #elif defined(_WIN32)
349 // not supported yet
350 #endif
351 return true;
352 }
353
354 #ifdef USE_POLL
ModifyFdStatus()355 void EventLoop::ModifyFdStatus()
356 {
357 std::lock_guard<std::mutex> lock(queueMutex_);
358 modifyFdStatus_ = false;
359 int index = 1;
360 for (auto it = eventSourceMap_.begin(); it != eventSourceMap_.end(); it++) {
361 if (index > MAX_WATCHED_FDS - 1) {
362 break;
363 }
364
365 watchFds_[index].fd = it->first;
366 watchFds_[index].events = it->second->GetPollType();
367 index++;
368 watchedFdSize_ = index;
369 }
370 }
371
PollNextEvent(uint64_t timeout)372 void EventLoop::PollNextEvent(uint64_t timeout)
373 {
374 poll(watchFds_, watchedFdSize_, timeout);
375 isWaken_ = true;
376 if (modifyFdStatus_) {
377 ModifyFdStatus();
378 return;
379 }
380
381 if (watchFds_[0].revents & POLLIN) {
382 // new queued event arrived
383 int32_t val = 0;
384 read(watchFds_[0].fd, &val, sizeof(val));
385 return;
386 }
387
388 for (int i = 1; i < watchedFdSize_; i++) {
389 int32_t fd = watchFds_[i].fd;
390 std::lock_guard<std::mutex> lock(queueMutex_);
391 auto it = eventSourceMap_.find(fd);
392 if (it == eventSourceMap_.end()) {
393 continue;
394 }
395
396 int32_t pollType = it->second->GetPollType();
397 if (watchFds_[i].revents & pollType) {
398 it->second->OnFileDescriptorEvent(fd, watchFds_[i].revents);
399 }
400 }
401 }
402 #endif
403
Run()404 void EventLoop::Run()
405 {
406 if (MemoryUtil::DisableThreadCache() != 0 || MemoryUtil::DisableDelayFree() != 0) {
407 HIVIEW_LOGW("Failed to optimize memory for current thread");
408 }
409
410 InitThreadName();
411
412 while (true) {
413 uint64_t leftTimeNanosecond = ProcessQueuedEvent();
414 uint64_t leftTimeMill = INT_MAX;
415 if (leftTimeNanosecond != INT_MAX) {
416 leftTimeMill = (leftTimeNanosecond / static_cast<uint64_t>(TimeUtil::MILLISEC_TO_NANOSEC));
417 }
418 WaitNextEvent(leftTimeMill);
419 if (needQuit_) {
420 break;
421 }
422 }
423 }
424
InitThreadName()425 void EventLoop::InitThreadName()
426 {
427 // set thread name
428 const int maxLength = 16;
429 std::string restrictedName = name_;
430 if (name_.length() >= maxLength) {
431 HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
432 restrictedName = name_.substr(0, maxLength - 1);
433 }
434 Thread::SetThreadDescription(restrictedName);
435
436 name_ = name_ + "@" + std::to_string(Thread::GetTid());
437 }
438
ProcessQueuedEvent()439 uint64_t EventLoop::ProcessQueuedEvent()
440 {
441 uint64_t leftTimeNanosecond = 0;
442 while (true) {
443 {
444 std::lock_guard<std::mutex> lock(queueMutex_);
445 if (pendingEvents_.empty()) {
446 return INT_MAX;
447 }
448 }
449 uint64_t now = NanoSecondSinceSystemStart();
450 LoopEvent event;
451 if (!FetchNextEvent(now, leftTimeNanosecond, event)) {
452 break;
453 }
454
455 ProcessEvent(event);
456
457 if (event.isRepeat && (event.interval > 0)) {
458 // force update time
459 now = NanoSecondSinceSystemStart();
460 ReInsertPeriodicEvent(now, event);
461 }
462
463 std::lock_guard<std::mutex> lock(queueMutex_);
464 currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
465 }
466 isWaken_ = false;
467 return leftTimeNanosecond;
468 }
469
FetchNextEvent(uint64_t now,uint64_t & leftTimeNanosecond,LoopEvent & out)470 bool EventLoop::FetchNextEvent(uint64_t now, uint64_t& leftTimeNanosecond, LoopEvent& out)
471 {
472 if (needQuit_) {
473 return false;
474 }
475
476 std::lock_guard<std::mutex> lock(queueMutex_);
477 if (pendingEvents_.empty()) {
478 return false;
479 }
480
481 size_t pendingSize = pendingEvents_.size();
482 const size_t warningPendingSize = 1000;
483 if ((pendingSize > warningPendingSize) && (pendingSize % warningPendingSize == 0)) {
484 HIVIEW_LOGW("%{public}s has %{public}zu pending events.", name_.c_str(), pendingSize);
485 }
486 pendingEvents_.ShrinkIfNeedLocked();
487
488 const LoopEvent &event = pendingEvents_.top();
489 if (event.targetTime > now) {
490 leftTimeNanosecond = event.targetTime - now;
491 nextWakeupTime_ = event.targetTime;
492 return false;
493 }
494
495 out = event;
496 pendingEvents_.pop();
497 currentProcessingEvent_.store(&out, std::memory_order_relaxed);
498 return true;
499 }
500
ProcessEvent(LoopEvent & event)501 void EventLoop::ProcessEvent(LoopEvent &event)
502 {
503 if (event.taskType == LOOP_EVENT_TASK) {
504 if (event.task != nullptr) {
505 event.task();
506 } else if ((event.handler != nullptr) && (event.event != nullptr)) {
507 event.handler->OnEventProxy(event.event);
508 } else {
509 HIVIEW_LOGW("Loop event task with null tasks.");
510 }
511 } else if (event.taskType == LOOP_PACKAGED_TASK) {
512 if (event.packagedTask != nullptr) {
513 event.packagedTask->operator()();
514 } else {
515 HIVIEW_LOGW("Loop packaged task with null tasks.");
516 }
517 } else {
518 HIVIEW_LOGW("unrecognized task type.");
519 }
520 }
521
ReInsertPeriodicEvent(uint64_t now,LoopEvent & event)522 void EventLoop::ReInsertPeriodicEvent(uint64_t now, LoopEvent &event)
523 {
524 std::lock_guard<std::mutex> lock(queueMutex_);
525 currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
526 if (event.seq == 0) {
527 return;
528 }
529
530 event.enqueueTime = now;
531 event.targetTime = now + event.interval;
532
533 if (Audit::IsEnabled() && (event.event != nullptr) && (event.handler != nullptr)) {
534 event.event->ResetTimestamp();
535 auto digest = event.event->sender_ + Audit::DOMAIN_DELIMITER + event.handler->GetHandlerInfo() +
536 Audit::DOMAIN_DELIMITER + GetName() + Audit::DOMAIN_DELIMITER + event.event->GetEventInfo();
537 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event.event->createTime_, digest);
538 }
539
540 pendingEvents_.push(std::move(event));
541 ResetTimerIfNeedLocked();
542 }
543
WaitNextEvent(uint64_t leftTimeMill)544 void EventLoop::WaitNextEvent(uint64_t leftTimeMill)
545 {
546 #if defined(__HIVIEW_OHOS__)
547 #ifdef USE_POLL
548 PollNextEvent(leftTimeMill);
549 #else
550 struct epoll_event eventItems[MAX_EVENT_SIZE];
551 int eventCount = epoll_wait(sharedPollingFd_.Get(), eventItems, MAX_EVENT_SIZE, leftTimeMill);
552 isWaken_ = true;
553 if (eventCount <= 0) {
554 // no event read from watched fd, process queued events
555 return;
556 }
557
558 for (int i = 0; i < eventCount; i++) {
559 int fd = eventItems[i].data.fd;
560 uint32_t events = eventItems[i].events;
561 if (fd == pendingEventQueueFd_.Get()) {
562 // new queued event arrived
563 eventfd_t val = 0;
564 read(fd, &val, sizeof(val));
565 return;
566 } else {
567 // process data source callbacks
568 auto it = eventSourceMap_.find(fd);
569 if (it != eventSourceMap_.end()) {
570 it->second->OnFileDescriptorEvent(fd, events);
571 }
572 }
573 }
574 #endif
575 #elif defined(_WIN32)
576 DWORD dWaitTime = (leftTimeMill >= INFINITE) ? INFINITE : static_cast<DWORD>(leftTimeMill);
577 DWORD result = WaitForMultipleObjects(MAX_HANDLE_ARRAY_SIZE, watchHandleList_, TRUE, dWaitTime);
578 #endif
579 }
580
NanoSecondSinceSystemStart()581 uint64_t EventLoop::NanoSecondSinceSystemStart()
582 {
583 auto nanoNow = std::chrono::steady_clock::now().time_since_epoch();
584 return static_cast<uint64_t>(nanoNow.count());
585 }
586 } // namespace HiviewDFX
587 } // namespace OHOS
588