1 /*
2  * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "event_loop.h"
16 
17 #include <climits>
18 #include <functional>
19 #include <thread>
20 
21 #if defined(__HIVIEW_OHOS__)
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/prctl.h>
26 #elif defined(_WIN32)
27 #include <processthreadsapi.h>
28 #include <sstream>
29 #include <Synchapi.h>
30 #include <tchar.h>
31 #include <windows.h>
32 #endif
33 
34 #include "audit.h"
35 #include "file_util.h"
36 #include "hiview_logger.h"
37 #include "memory_util.h"
38 #include "thread_util.h"
39 #include "time_util.h"
40 namespace OHOS {
41 namespace HiviewDFX {
42 namespace {
GetFalseFuture()43 std::future<bool> GetFalseFuture()
44 {
45     std::promise<bool> tmpPromise;
46     tmpPromise.set_value(false);
47     return tmpPromise.get_future();
48 }
49 }
50 
51 DEFINE_LOG_TAG("HiView-EventLoop");
52 
EventLoop(const std::string & name)53 EventLoop::EventLoop(const std::string &name) : name_(name), nextWakeupTime_(0), currentProcessingEvent_(nullptr)
54 {}
55 
~EventLoop()56 EventLoop::~EventLoop()
57 {
58     StopLoop();
59 }
60 
InitEventQueueNotifier()61 bool EventLoop::InitEventQueueNotifier()
62 {
63 #if defined(__HIVIEW_OHOS__)
64 #if defined(USE_POLL)
65     for (int i = 0; i < 2; i++) { // 2:event queue fd size
66         if (eventQueueFd_[i] > 0) {
67             close(eventQueueFd_[i]);
68             eventQueueFd_[i] = -1;
69         }
70     }
71 
72     if (pipe2(eventQueueFd_, O_CLOEXEC) != 0) {
73         HIVIEW_LOGW("Failed to create event queue fd.");
74         return false;
75     }
76 
77     watchFds_[0].fd = eventQueueFd_[0];
78     watchFds_[0].events = POLLIN;
79     watchedFdSize_ = 1;
80 #else
81 #if defined EPOLL_CLOEXEC
82     sharedPollingFd_ = UniqueFd(epoll_create1(EPOLL_CLOEXEC));
83 #else
84     sharedPollingFd_ = UniqueFd(epoll_create(1024)); // listen 1024 sockets
85 #endif
86     pendingEventQueueFd_ = UniqueFd(eventfd(0, EFD_NONBLOCK));
87     struct epoll_event eventItem;
88     eventItem.events = EPOLLIN | EPOLLET;
89     eventItem.data.fd = pendingEventQueueFd_.Get();
90     int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, pendingEventQueueFd_.Get(), &eventItem);
91     if (result < 0) {
92         HIVIEW_LOGE("Fail to Create event poll queue.");
93         return false;
94     }
95 #endif
96 #elif defined(_WIN32)
97     watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX] = CreateEventA(NULL, FALSE, FALSE, NULL);
98 #endif
99     return true;
100 }
101 
WakeUp()102 void EventLoop::WakeUp()
103 {
104 #if defined(__HIVIEW_OHOS__)
105 #ifdef USE_POLL
106     if (eventQueueFd_[1] > 0) {
107         int32_t count = 1;
108         write(eventQueueFd_[1], &count, sizeof(count));
109     }
110 #else
111     if (pendingEventQueueFd_.Get() > 0) {
112         eventfd_t count = 1;
113         write(pendingEventQueueFd_.Get(), &count, sizeof(count));
114     }
115 #endif
116 #elif defined(_WIN32)
117     SetEvent(watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX]);
118 #endif
119 }
120 
StartLoop(bool createNewThread)121 void EventLoop::StartLoop(bool createNewThread)
122 {
123     std::lock_guard<std::mutex> lock(queueMutex_);
124     if (IsRunning()) {
125         return;
126     }
127     if (!InitEventQueueNotifier()) {
128         return;
129     }
130 
131     isRunning_ = true;
132     if (createNewThread) {
133         thread_ = std::make_unique<std::thread>(&EventLoop::Run, this);
134         return;
135     }
136     // handle loop in current thread cases
137     Run();
138 }
139 
StopLoop()140 void EventLoop::StopLoop()
141 {
142     needQuit_ = true;
143     if (!IsRunning()) {
144         return;
145     }
146 
147     {
148         std::lock_guard<std::mutex> lock(queueMutex_);
149         while (!pendingEvents_.empty()) {
150             pendingEvents_.pop();
151         }
152         isRunning_ = false;
153     }
154 
155     WakeUp();
156     if (thread_ != nullptr && thread_->joinable()) {
157         thread_->join();
158     }
159 }
160 
AddEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task task)161 uint64_t EventLoop::AddEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task task)
162 {
163     if (needQuit_) {
164         return 0;
165     }
166 
167     uint64_t now = NanoSecondSinceSystemStart();
168     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
169         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
170                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
171         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
172     }
173 
174     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
175     loopEvent.event = std::move(event);
176     loopEvent.handler = handler;
177     loopEvent.task = task;
178     std::lock_guard<std::mutex> lock(queueMutex_);
179     pendingEvents_.push(std::move(loopEvent));
180     WakeUp();
181     return now;
182 }
183 
AddEventForResult(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event)184 std::future<bool> EventLoop::AddEventForResult(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event)
185 {
186     if (needQuit_) {
187         return GetFalseFuture();
188     }
189 
190     if (handler == nullptr || event == nullptr) {
191         return GetFalseFuture();
192     }
193 
194     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
195         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
196                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
197         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
198     }
199 
200     auto bind = std::bind(&EventHandler::OnEventProxy, handler.get(), event);
201     auto task = std::make_shared<std::packaged_task<bool()>>(bind);
202     auto result = task->get_future();
203     uint64_t now = NanoSecondSinceSystemStart();
204     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
205     loopEvent.taskType = LOOP_PACKAGED_TASK;
206     loopEvent.event = std::move(event);
207     loopEvent.handler = handler;
208     loopEvent.packagedTask = std::move(task);
209     std::lock_guard<std::mutex> lock(queueMutex_);
210     pendingEvents_.push(std::move(loopEvent));
211     WakeUp();
212     return result;
213 }
214 
AddTimerEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task & task,uint64_t interval,bool repeat)215 uint64_t EventLoop::AddTimerEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event,
216     const Task &task, uint64_t interval, bool repeat)
217 {
218     if (needQuit_) {
219         return 0;
220     }
221 
222     uint64_t now = NanoSecondSinceSystemStart();
223     uint64_t intervalMicro = interval * static_cast<uint64_t>(TimeUtil::SEC_TO_NANOSEC);
224     if (now + intervalMicro < now) {
225         HIVIEW_LOGW("Add Timer Event fail. The interval is too large. please check.");
226         return -1;
227     }
228 
229     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
230         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
231                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
232         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
233     }
234 
235     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
236     loopEvent.isRepeat = repeat;
237     loopEvent.taskType = LOOP_EVENT_TASK;
238     loopEvent.interval = intervalMicro;
239     loopEvent.targetTime = now + intervalMicro;
240     loopEvent.event = std::move(event);
241     loopEvent.handler = handler;
242     loopEvent.task = task;
243     std::lock_guard<std::mutex> lock(queueMutex_);
244     pendingEvents_.push(std::move(loopEvent));
245     HIVIEW_LOGI("task[%{public}" PRIu64 "|%{public}d] has been pushed into task queue.", interval,
246         static_cast<int>(repeat));
247     ResetTimerIfNeedLocked();
248     return now;
249 }
250 
RemoveEvent(uint64_t seq)251 bool EventLoop::RemoveEvent(uint64_t seq)
252 {
253     std::lock_guard<std::mutex> lock(queueMutex_);
254     auto curEvent = currentProcessingEvent_.load(std::memory_order_relaxed);
255     if ((curEvent != nullptr) && (curEvent->seq == seq)) {
256         curEvent->seq = 0;
257         HIVIEW_LOGI("removing the current processing event.");
258         return false;
259     }
260     return pendingEvents_.remove(seq);
261 }
262 
GetRawName() const263 std::string EventLoop::GetRawName() const
264 {
265     auto pos = name_.find('@');
266     return pos == std::string::npos ? name_ : name_.substr(0, pos);
267 }
268 
ResetTimerIfNeedLocked()269 void EventLoop::ResetTimerIfNeedLocked()
270 {
271     const LoopEvent &event = pendingEvents_.top();
272     if (nextWakeupTime_ == event.targetTime) {
273         return;
274     }
275     WakeUp();
276 }
277 
AddFileDescriptorEventCallback(const std::string & name,std::shared_ptr<FileDescriptorEventCallback> source)278 bool EventLoop::AddFileDescriptorEventCallback(
279     const std::string &name, std::shared_ptr<FileDescriptorEventCallback> source)
280 {
281     if (needQuit_) {
282         return false;
283     }
284 
285     std::lock_guard<std::mutex> lock(queueMutex_);
286 #if defined(__HIVIEW_OHOS__)
287     if (eventSourceNameMap_.size() >= (MAX_WATCHED_FDS - 1)) {
288         HIVIEW_LOGW("Watched fds exceed 64.");
289         return false;
290     }
291 
292     if (eventSourceNameMap_.find(name) != eventSourceNameMap_.end()) {
293         HIVIEW_LOGW("Exist fd callback with same name.");
294         return false;
295     }
296 
297     int fd = source->GetPollFd();
298     if (fd <= 0) {
299         HIVIEW_LOGW("Invalid poll fd.");
300         return false;
301     }
302 
303 #ifdef USE_POLL
304     eventSourceNameMap_[name] = fd;
305     eventSourceMap_[fd] = source;
306     modifyFdStatus_ = true;
307     WakeUp();
308 #else
309     struct epoll_event eventItem;
310     eventItem.events = source->GetPollType();
311     eventItem.data.fd = fd;
312     int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, fd, &eventItem);
313     if (result < 0) {
314         HIVIEW_LOGW("Fail to Add Fd callback.");
315         return false;
316     }
317 
318     eventSourceNameMap_[name] = fd;
319     eventSourceMap_[fd] = source;
320 #endif
321 #elif defined(_WIN32)
322     // not supported yet
323 #endif
324     return true;
325 }
326 
RemoveFileDescriptorEventCallback(const std::string & name)327 bool EventLoop::RemoveFileDescriptorEventCallback(const std::string &name)
328 {
329     std::lock_guard<std::mutex> lock(queueMutex_);
330 #if defined(__HIVIEW_OHOS__)
331     if (eventSourceNameMap_.find(name) == eventSourceNameMap_.end()) {
332         HIVIEW_LOGW("fd callback name is not existed.");
333         return false;
334     }
335 
336     int fd = eventSourceNameMap_[name];
337     eventSourceNameMap_.erase(name);
338     eventSourceMap_.erase(fd);
339 
340 #ifdef USE_POLL
341     modifyFdStatus_ = true;
342     WakeUp();
343 #else
344     if (epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_DEL, fd, nullptr) == -1) {
345         HIVIEW_LOGW("fail to remove watched fd.");
346     }
347 #endif
348 #elif defined(_WIN32)
349     // not supported yet
350 #endif
351     return true;
352 }
353 
354 #ifdef USE_POLL
ModifyFdStatus()355 void EventLoop::ModifyFdStatus()
356 {
357     std::lock_guard<std::mutex> lock(queueMutex_);
358     modifyFdStatus_ = false;
359     int index = 1;
360     for (auto it = eventSourceMap_.begin(); it != eventSourceMap_.end(); it++) {
361         if (index > MAX_WATCHED_FDS - 1) {
362             break;
363         }
364 
365         watchFds_[index].fd = it->first;
366         watchFds_[index].events = it->second->GetPollType();
367         index++;
368         watchedFdSize_ = index;
369     }
370 }
371 
PollNextEvent(uint64_t timeout)372 void EventLoop::PollNextEvent(uint64_t timeout)
373 {
374     poll(watchFds_, watchedFdSize_, timeout);
375     isWaken_ = true;
376     if (modifyFdStatus_) {
377         ModifyFdStatus();
378         return;
379     }
380 
381     if (watchFds_[0].revents & POLLIN) {
382         // new queued event arrived
383         int32_t val = 0;
384         read(watchFds_[0].fd, &val, sizeof(val));
385         return;
386     }
387 
388     for (int i = 1; i < watchedFdSize_; i++) {
389         int32_t fd = watchFds_[i].fd;
390         std::lock_guard<std::mutex> lock(queueMutex_);
391         auto it = eventSourceMap_.find(fd);
392         if (it == eventSourceMap_.end()) {
393             continue;
394         }
395 
396         int32_t pollType = it->second->GetPollType();
397         if (watchFds_[i].revents & pollType) {
398             it->second->OnFileDescriptorEvent(fd, watchFds_[i].revents);
399         }
400     }
401 }
402 #endif
403 
Run()404 void EventLoop::Run()
405 {
406     if (MemoryUtil::DisableThreadCache() != 0 || MemoryUtil::DisableDelayFree() != 0) {
407         HIVIEW_LOGW("Failed to optimize memory for current thread");
408     }
409 
410     InitThreadName();
411 
412     while (true) {
413         uint64_t leftTimeNanosecond = ProcessQueuedEvent();
414         uint64_t leftTimeMill = INT_MAX;
415         if (leftTimeNanosecond != INT_MAX) {
416             leftTimeMill = (leftTimeNanosecond / static_cast<uint64_t>(TimeUtil::MILLISEC_TO_NANOSEC));
417         }
418         WaitNextEvent(leftTimeMill);
419         if (needQuit_) {
420             break;
421         }
422     }
423 }
424 
InitThreadName()425 void EventLoop::InitThreadName()
426 {
427     // set thread name
428     const int maxLength = 16;
429     std::string restrictedName = name_;
430     if (name_.length() >= maxLength) {
431         HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
432         restrictedName = name_.substr(0, maxLength - 1);
433     }
434     Thread::SetThreadDescription(restrictedName);
435 
436     name_ = name_ + "@" + std::to_string(Thread::GetTid());
437 }
438 
ProcessQueuedEvent()439 uint64_t EventLoop::ProcessQueuedEvent()
440 {
441     uint64_t leftTimeNanosecond = 0;
442     while (true) {
443         {
444             std::lock_guard<std::mutex> lock(queueMutex_);
445             if (pendingEvents_.empty()) {
446                 return INT_MAX;
447             }
448         }
449         uint64_t now = NanoSecondSinceSystemStart();
450         LoopEvent event;
451         if (!FetchNextEvent(now, leftTimeNanosecond, event)) {
452             break;
453         }
454 
455         ProcessEvent(event);
456 
457         if (event.isRepeat && (event.interval > 0)) {
458             // force update time
459             now = NanoSecondSinceSystemStart();
460             ReInsertPeriodicEvent(now, event);
461         }
462 
463         std::lock_guard<std::mutex> lock(queueMutex_);
464         currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
465     }
466     isWaken_ = false;
467     return leftTimeNanosecond;
468 }
469 
FetchNextEvent(uint64_t now,uint64_t & leftTimeNanosecond,LoopEvent & out)470 bool EventLoop::FetchNextEvent(uint64_t now, uint64_t& leftTimeNanosecond, LoopEvent& out)
471 {
472     if (needQuit_) {
473         return false;
474     }
475 
476     std::lock_guard<std::mutex> lock(queueMutex_);
477     if (pendingEvents_.empty()) {
478         return false;
479     }
480 
481     size_t pendingSize = pendingEvents_.size();
482     const size_t warningPendingSize = 1000;
483     if ((pendingSize > warningPendingSize) && (pendingSize % warningPendingSize == 0)) {
484         HIVIEW_LOGW("%{public}s has %{public}zu pending events.", name_.c_str(), pendingSize);
485     }
486     pendingEvents_.ShrinkIfNeedLocked();
487 
488     const LoopEvent &event = pendingEvents_.top();
489     if (event.targetTime > now) {
490         leftTimeNanosecond = event.targetTime - now;
491         nextWakeupTime_ = event.targetTime;
492         return false;
493     }
494 
495     out = event;
496     pendingEvents_.pop();
497     currentProcessingEvent_.store(&out, std::memory_order_relaxed);
498     return true;
499 }
500 
ProcessEvent(LoopEvent & event)501 void EventLoop::ProcessEvent(LoopEvent &event)
502 {
503     if (event.taskType == LOOP_EVENT_TASK) {
504         if (event.task != nullptr) {
505             event.task();
506         } else if ((event.handler != nullptr) && (event.event != nullptr)) {
507             event.handler->OnEventProxy(event.event);
508         } else {
509             HIVIEW_LOGW("Loop event task with null tasks.");
510         }
511     } else if (event.taskType == LOOP_PACKAGED_TASK) {
512         if (event.packagedTask != nullptr) {
513             event.packagedTask->operator()();
514         } else {
515             HIVIEW_LOGW("Loop packaged task with null tasks.");
516         }
517     } else {
518         HIVIEW_LOGW("unrecognized task type.");
519     }
520 }
521 
ReInsertPeriodicEvent(uint64_t now,LoopEvent & event)522 void EventLoop::ReInsertPeriodicEvent(uint64_t now, LoopEvent &event)
523 {
524     std::lock_guard<std::mutex> lock(queueMutex_);
525     currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
526     if (event.seq == 0) {
527         return;
528     }
529 
530     event.enqueueTime = now;
531     event.targetTime = now + event.interval;
532 
533     if (Audit::IsEnabled() && (event.event != nullptr) && (event.handler != nullptr)) {
534         event.event->ResetTimestamp();
535         auto digest = event.event->sender_ + Audit::DOMAIN_DELIMITER + event.handler->GetHandlerInfo() +
536                       Audit::DOMAIN_DELIMITER + GetName() + Audit::DOMAIN_DELIMITER + event.event->GetEventInfo();
537         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event.event->createTime_, digest);
538     }
539 
540     pendingEvents_.push(std::move(event));
541     ResetTimerIfNeedLocked();
542 }
543 
WaitNextEvent(uint64_t leftTimeMill)544 void EventLoop::WaitNextEvent(uint64_t leftTimeMill)
545 {
546 #if defined(__HIVIEW_OHOS__)
547 #ifdef USE_POLL
548     PollNextEvent(leftTimeMill);
549 #else
550     struct epoll_event eventItems[MAX_EVENT_SIZE];
551     int eventCount = epoll_wait(sharedPollingFd_.Get(), eventItems, MAX_EVENT_SIZE, leftTimeMill);
552     isWaken_ = true;
553     if (eventCount <= 0) {
554         // no event read from watched fd, process queued events
555         return;
556     }
557 
558     for (int i = 0; i < eventCount; i++) {
559         int fd = eventItems[i].data.fd;
560         uint32_t events = eventItems[i].events;
561         if (fd == pendingEventQueueFd_.Get()) {
562             // new queued event arrived
563             eventfd_t val = 0;
564             read(fd, &val, sizeof(val));
565             return;
566         } else {
567             // process data source callbacks
568             auto it = eventSourceMap_.find(fd);
569             if (it != eventSourceMap_.end()) {
570                 it->second->OnFileDescriptorEvent(fd, events);
571             }
572         }
573     }
574 #endif
575 #elif defined(_WIN32)
576     DWORD dWaitTime = (leftTimeMill >= INFINITE) ? INFINITE : static_cast<DWORD>(leftTimeMill);
577     DWORD result = WaitForMultipleObjects(MAX_HANDLE_ARRAY_SIZE, watchHandleList_, TRUE, dWaitTime);
578 #endif
579 }
580 
NanoSecondSinceSystemStart()581 uint64_t EventLoop::NanoSecondSinceSystemStart()
582 {
583     auto nanoNow = std::chrono::steady_clock::now().time_since_epoch();
584     return static_cast<uint64_t>(nanoNow.count());
585 }
586 }  // namespace HiviewDFX
587 }  // namespace OHOS
588