1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "io_poller.h"
17 #include "sched/execute_ctx.h"
18 #include "eu/co_routine.h"
19 #include "dfx/log/ffrt_log_api.h"
20 #include "ffrt_trace.h"
21 #include "internal_inc/assert.h"
22 #include "internal_inc/types.h"
23 #include "tm/scpu_task.h"
24 #include "util/ffrt_facade.h"
25 #include "util/name_manager.h"
26 
27 namespace ffrt {
28 constexpr unsigned int DEFAULT_CPUINDEX_LIMIT = 7;
29 struct IOPollerInstance: public IOPoller {
__anon089f3b050102ffrt::IOPollerInstance30     IOPollerInstance() noexcept: m_runner([&] { RunForever(); })
31     {
32         pthread_setname_np(m_runner.native_handle(), IO_POLLER_NAME);
33     }
34 
RunForeverffrt::IOPollerInstance35     void RunForever() noexcept
36     {
37         struct sched_param param;
38         param.sched_priority = 1;
39         int ret = pthread_setschedparam(pthread_self(), SCHED_RR, &param);
40         if (ret != 0) {
41             FFRT_LOGW("[%d] set priority warn ret[%d] eno[%d]\n", pthread_self(), ret, errno);
42         }
43         while (!m_exitFlag.load(std::memory_order_relaxed)) {
44             IOPoller::PollOnce(-1);
45         }
46     }
47 
~IOPollerInstanceffrt::IOPollerInstance48     ~IOPollerInstance() noexcept override
49     {
50         Stop();
51         m_runner.join();
52     }
53 private:
Stopffrt::IOPollerInstance54     void Stop() noexcept
55     {
56         m_exitFlag.store(true, std::memory_order_relaxed);
57         std::atomic_thread_fence(std::memory_order_acq_rel);
58         IOPoller::WakeUp();
59     }
60 
61 private:
62     std::thread m_runner;
63     std::atomic<bool> m_exitFlag { false };
64 };
65 
GetIOPoller()66 IOPoller& GetIOPoller() noexcept
67 {
68     static IOPollerInstance inst;
69     return inst;
70 }
71 
IOPoller()72 IOPoller::IOPoller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) },
73     m_events(32)
74 {
75     FFRT_ASSERT(m_epFd >= 0);
76     {
77         m_wakeData.data = nullptr;
78         m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
79         FFRT_ASSERT(m_wakeData.fd >= 0);
80         epoll_event ev{ .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
81         if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0) {
82             std::terminate();
83         }
84     }
85 }
86 
~IOPoller()87 IOPoller::~IOPoller() noexcept
88 {
89     ::close(m_wakeData.fd);
90     ::close(m_epFd);
91 }
92 
CasStrong(std::atomic<int> & a,int cmp,int exc)93 bool IOPoller::CasStrong(std::atomic<int>& a, int cmp, int exc)
94 {
95     return a.compare_exchange_strong(cmp, exc);
96 }
97 
WakeUp()98 void IOPoller::WakeUp() noexcept
99 {
100     uint64_t one = 1;
101     ssize_t n = ::write(m_wakeData.fd, &one, sizeof one);
102     FFRT_ASSERT(n == sizeof one);
103 }
104 
WaitFdEvent(int fd)105 void IOPoller::WaitFdEvent(int fd) noexcept
106 {
107     auto ctx = ExecuteCtx::Cur();
108     if (!ctx->task) {
109         FFRT_LOGI("nonworker shall not call this fun.");
110         return;
111     }
112     struct WakeData data = {.fd = fd, .data = static_cast<void *>(ctx->task)};
113 
114     epoll_event ev = { .events = EPOLLIN, .data = {.ptr = static_cast<void*>(&data)} };
115     FFRT_BLOCK_TRACER(ctx->task->gid, fd);
116     if (ThreadWaitMode(ctx->task)) {
117         std::unique_lock<std::mutex> lck(ctx->task->mutex_);
118         if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) == 0) {
119             if (FFRT_UNLIKELY(LegacyMode(ctx->task))) {
120                 ctx->task->blockType = BlockType::BLOCK_THREAD;
121             }
122             reinterpret_cast<SCPUEUTask*>(ctx->task)->waitCond_.wait(lck);
123         }
124         return;
125     }
126 
127     CoWait([&](CPUEUTask *task)->bool {
128         (void)task;
129         if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) == 0) {
130             return true;
131         }
132         // The ownership of the task belongs to epoll, and the task cannot be accessed any more.
133         FFRT_LOGI("epoll_ctl add err:efd:=%d, fd=%d errorno = %d", m_epFd, fd, errno);
134         return false;
135     });
136 }
137 
PollOnce(int timeout)138 void IOPoller::PollOnce(int timeout) noexcept
139 {
140     int ndfs = epoll_wait(m_epFd, m_events.data(), m_events.size(), timeout);
141     if (ndfs <= 0) {
142         if (errno != EINTR) {
143             FFRT_LOGE("epoll_wait error: efd = %d, errorno= %d", m_epFd, errno);
144         }
145         return;
146     }
147 
148     for (unsigned int i = 0; i < static_cast<unsigned int>(ndfs); ++i) {
149         struct WakeData *data = reinterpret_cast<struct WakeData *>(m_events[i].data.ptr);
150 
151         if (data->fd == m_wakeData.fd) {
152             uint64_t one = 1;
153             ssize_t n = ::read(m_wakeData.fd, &one, sizeof one);
154             FFRT_ASSERT(n == sizeof one);
155             continue;
156         }
157 
158         if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, data->fd, nullptr) != 0) {
159             FFRT_LOGI("epoll_ctl fd = %d errorno = %d", data->fd, errno);
160             continue;
161         }
162 
163         auto task = reinterpret_cast<CPUEUTask *>(data->data);
164         if (ThreadNotifyMode(task)) {
165             std::unique_lock<std::mutex> lck(task->mutex_);
166             if (BlockThread(task)) {
167                 task->blockType = BlockType::BLOCK_COROUTINE;
168             }
169             reinterpret_cast<SCPUEUTask*>(task)->waitCond_.notify_one();
170         } else {
171             CoRoutineFactory::CoWakeFunc(task, false);
172         }
173     }
174 }
175 }
176 
ffrt_wait_fd(int fd)177 void ffrt_wait_fd(int fd)
178 {
179     ffrt::FFRTFacade::GetIoPPInstance().WaitFdEvent(fd);
180 }