1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "delayed_worker.h"
17 
18 #include <array>
19 #include <unistd.h>
20 #include <sstream>
21 #include <sys/prctl.h>
22 #include <sys/timerfd.h>
23 #include <thread>
24 #include <pthread.h>
25 #include "eu/blockaware.h"
26 #include "eu/execute_unit.h"
27 #include "dfx/log/ffrt_log_api.h"
28 #include "internal_inc/assert.h"
29 #include "util/name_manager.h"
30 #include "sched/scheduler.h"
31 namespace {
32     const int FFRT_DELAY_WORKER_MAGICNUM = 0x5aa5;
33     const int FFRT_DELAY_WORKER_IDLE_TIMEOUT_SECONDS = 3 * 60;
34     const int EPOLL_WAIT_TIMEOUT__MILISECONDS = 3 * 60 * 1000;
35     const int NS_PER_SEC = 1000 * 1000 * 1000;
36     const int FAKE_WAKE_UP_ERROR = 4;
37     const int WAIT_EVENT_SIZE = 5;
38     const int64_t EXECUTION_TIMEOUT_MILISECONDS = 500;
39     const int DUMP_MAP_MAX_COUNT = 3;
40     constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
41 }
42 
43 namespace ffrt {
44 pthread_key_t g_ffrtDelayWorkerFlagKey;
45 pthread_once_t g_ffrtDelayWorkerThreadKeyOnce = PTHREAD_ONCE_INIT;
FFRTDelayWorkeEnvKeyCreate()46 void FFRTDelayWorkeEnvKeyCreate()
47 {
48     pthread_key_create(&g_ffrtDelayWorkerFlagKey, nullptr);
49 }
50 
ThreadEnvCreate()51 void DelayedWorker::ThreadEnvCreate()
52 {
53     pthread_once(&g_ffrtDelayWorkerThreadKeyOnce, FFRTDelayWorkeEnvKeyCreate);
54 }
55 
IsDelayerWorkerThread()56 bool DelayedWorker::IsDelayerWorkerThread()
57 {
58     bool isDelayerWorkerFlag = false;
59     void* flag = pthread_getspecific(g_ffrtDelayWorkerFlagKey);
60     if ((flag != nullptr) && (reinterpret_cast<uintptr_t>(flag) == FFRT_DELAY_WORKER_MAGICNUM)) {
61         isDelayerWorkerFlag = true;
62     }
63     return isDelayerWorkerFlag;
64 }
65 
IsDelayedWorkerPreserved()66 bool IsDelayedWorkerPreserved()
67 {
68     std::unordered_set<std::string> whitelist = { "foundation", "com.ohos.sceneboard" };
69     char processName[PROCESS_NAME_BUFFER_LENGTH];
70     GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
71     if (whitelist.find(processName) != whitelist.end()) {
72         return true;
73     }
74 
75     return false;
76 }
77 
DumpMap()78 void DelayedWorker::DumpMap()
79 {
80     lock.lock();
81     if (map.empty()) {
82         lock.unlock();
83         return;
84     }
85 
86     TimePoint now = std::chrono::steady_clock::now();
87     if (now < map.begin()->first) {
88         lock.unlock();
89         return;
90     }
91 
92     int count = 0;
93     std::stringstream ss;
94     int printCount = map.size() < DUMP_MAP_MAX_COUNT ? map.size() : DUMP_MAP_MAX_COUNT;
95     for (auto it = map.begin(); it != map.end() && count < DUMP_MAP_MAX_COUNT; ++it, ++count) {
96         ss << it->first.time_since_epoch().count();
97         if (count < printCount - 1) {
98             ss << ",";
99         }
100     }
101     lock.unlock();
102     FFRT_LOGW("DumpMap:now=%lu,%s", now.time_since_epoch().count(), ss.str().c_str());
103 }
104 
ThreadInit()105 void DelayedWorker::ThreadInit()
106 {
107     if (delayWorker != nullptr && delayWorker->joinable()) {
108         delayWorker->join();
109     }
110     delayWorker = std::make_unique<std::thread>([this]() {
111         struct sched_param param;
112         param.sched_priority = 1;
113         int ret = pthread_setschedparam(pthread_self(), SCHED_RR, &param);
114         if (ret != 0) {
115             FFRT_LOGW("[%d] set priority warn ret[%d] eno[%d]\n", pthread_self(), ret, errno);
116         } else {
117             FFRT_LOGW("delayedWorker init");
118         }
119         prctl(PR_SET_NAME, DELAYED_WORKER_NAME);
120         pthread_setspecific(g_ffrtDelayWorkerFlagKey, reinterpret_cast<void*>(FFRT_DELAY_WORKER_MAGICNUM));
121         std::array<epoll_event, WAIT_EVENT_SIZE> waitedEvents;
122         static bool preserved = IsDelayedWorkerPreserved();
123         for (;;) {
124             std::unique_lock lk(lock);
125             if (toExit) {
126                 exited_ = true;
127                 FFRT_LOGW("delayedWorker exit");
128                 break;
129             }
130             int result = HandleWork();
131             if (toExit) {
132                 exited_ = true;
133                 FFRT_LOGW("delayedWorker exit");
134                 break;
135             }
136             if (result == 0) {
137                 uint64_t ns = map.begin()->first.time_since_epoch().count();
138                 itimerspec its = { {0, 0}, {static_cast<long>(ns / NS_PER_SEC), static_cast<long>(ns % NS_PER_SEC)} };
139                 int ret = timerfd_settime(timerfd_, TFD_TIMER_ABSTIME, &its, nullptr);
140                 if (ret != 0) {
141                     FFRT_LOGE("timerfd_settime error,ns=%lu,ret= %d.", ns, ret);
142                 }
143             } else if ((result == 1) && (!preserved)) {
144                 if (++noTaskDelayCount_ > 1) {
145                     exited_ = true;
146                     FFRT_LOGW("delayedWorker exit");
147                     break;
148                 }
149                 itimerspec its = { {0, 0}, {FFRT_DELAY_WORKER_IDLE_TIMEOUT_SECONDS, 0} };
150                 int ret = timerfd_settime(timerfd_, 0, &its, nullptr);
151                 if (ret != 0) {
152                     FFRT_LOGE("timerfd_settime error, ret= %d.", ret);
153                 }
154             }
155             lk.unlock();
156             FFRT_TRACE_BEGIN("epoll");
157             int nfds = epoll_wait(epollfd_, waitedEvents.data(), waitedEvents.size(),
158                 EPOLL_WAIT_TIMEOUT__MILISECONDS);
159             if (nfds == 0) {
160                 DumpMap();
161             }
162             FFRT_TRACE_END();
163 
164             if (nfds < 0) {
165                 if (errno != FAKE_WAKE_UP_ERROR) {
166                     FFRT_LOGW("epoll_wait error, errorno= %d.", errno);
167                 }
168                 continue;
169             }
170 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
171             for (int i = 0; i < nfds; i++) {
172                 if (waitedEvents[i].data.fd == monitorfd_) {
173                     char buffer;
174                     size_t n = ::read(monitorfd_, &buffer, sizeof buffer);
175                     if (n == 1) {
176                         FFRT_TRACE_BEGIN("monitor");
177                         monitor->MonitorMain();
178                         FFRT_TRACE_END();
179                     } else {
180                         FFRT_LOGE("monitor read fail:%d, %s", n, errno);
181                     }
182                     break;
183                 }
184             }
185 #endif
186         }
187     });
188 }
189 
DelayedWorker()190 DelayedWorker::DelayedWorker(): epollfd_ { ::epoll_create1(EPOLL_CLOEXEC) },
191     timerfd_ { ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC) }
192 {
193     FFRT_ASSERT(epollfd_ >= 0);
194     FFRT_ASSERT(timerfd_ >= 0);
195 
196     epoll_event timer_event { .events = EPOLLIN | EPOLLET, .data = { .fd = timerfd_ } };
197     if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, timerfd_, &timer_event) < 0) {
198         FFRT_LOGE("epoll_ctl add tfd error: efd=%d, fd=%d, errorno=%d", epollfd_, timerfd_, errno);
199         std::terminate();
200     }
201 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
202     monitor = ExecuteUnit::Instance().GetCPUMonitor();
203     monitorfd_ = BlockawareMonitorfd(-1, monitor->WakeupCond());
204     FFRT_ASSERT(monitorfd_ >= 0);
205     FFRT_LOGI("timerfd:%d, monitorfd:%d", timerfd_, monitorfd_);
206     /* monitorfd does not support 'CLOEXEC', and current kernel does not inherit monitorfd after 'fork'.
207      * 1. if user calls 'exec' directly after 'fork' and does not use ffrt, it's ok.
208      * 2. if user calls 'exec' directly, the original process cannot close monitorfd automatically, and
209      * it will be fail when new program use ffrt to create monitorfd.
210      */
211     epoll_event monitor_event {.events = EPOLLIN, .data = {.fd = monitorfd_}};
212     int ret = epoll_ctl(epollfd_, EPOLL_CTL_ADD, monitorfd_, &monitor_event);
213     if (ret < 0) {
214         FFRT_LOGE("monitor:%d add fail, ret:%d, errno:%d, %s", monitorfd_, ret, errno, strerror(errno));
215     }
216 #endif
217 }
218 
~DelayedWorker()219 DelayedWorker::~DelayedWorker()
220 {
221     lock.lock();
222     toExit = true;
223     lock.unlock();
224     itimerspec its = { {0, 0}, {0, 1} };
225     timerfd_settime(timerfd_, 0, &its, nullptr);
226     if (delayWorker != nullptr && delayWorker->joinable()) {
227         delayWorker->join();
228     }
229 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
230     ::close(monitorfd_);
231 #endif
232     ::close(timerfd_);
233 }
234 
GetInstance()235 DelayedWorker& DelayedWorker::GetInstance()
236 {
237     static DelayedWorker instance;
238     return instance;
239 }
240 
CheckTimeInterval(const TimePoint & startTp,const TimePoint & endTp)241 void CheckTimeInterval(const TimePoint& startTp, const TimePoint& endTp)
242 {
243     auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTp - startTp);
244     int64_t durationMs = duration.count();
245     if (durationMs > EXECUTION_TIMEOUT_MILISECONDS) {
246         FFRT_LOGW("handle work more than [%lld]ms", durationMs);
247     }
248 }
249 
HandleWork()250 int DelayedWorker::HandleWork()
251 {
252     if (!map.empty()) {
253         noTaskDelayCount_ = 0;
254         TimePoint startTp = std::chrono::steady_clock::now();
255         do {
256             auto cur = map.begin();
257             if (!toExit && cur != map.end() && cur->first <= startTp) {
258                 DelayedWork w = cur->second;
259                 map.erase(cur);
260                 lock.unlock();
261                 std::function<void(WaitEntry*)> workCb(move(*w.cb));
262                 (workCb)(w.we);
263                 lock.lock();
264                 FFRT_COND_DO_ERR(toExit, return -1, "HandleWork exit, map size:%d", map.size());
265                 TimePoint endTp = std::chrono::steady_clock::now();
266                 CheckTimeInterval(startTp, endTp);
267                 startTp = std::move(endTp);
268             } else {
269                 return 0;
270             }
271         } while (!map.empty());
272     }
273     return 1;
274 }
275 
276 // There is no requirement that to be less than now
dispatch(const TimePoint & to,WaitEntry * we,const std::function<void (WaitEntry *)> & wakeup)277 bool DelayedWorker::dispatch(const TimePoint& to, WaitEntry* we, const std::function<void(WaitEntry*)>& wakeup)
278 {
279     bool w = false;
280     lock.lock();
281     if (toExit) {
282         lock.unlock();
283         FFRT_LOGE("DelayedWorker destroy, dispatch failed\n");
284         return false;
285     }
286 
287     TimePoint now = std::chrono::steady_clock::now();
288     if (to <= now) {
289         lock.unlock();
290         return false;
291     }
292 
293     if (exited_) {
294         ThreadInit();
295         exited_ = false;
296     }
297 
298     if (map.empty() || to < map.begin()->first) {
299         w = true;
300     }
301     map.emplace(to, DelayedWork {we, &wakeup});
302     if (w) {
303         uint64_t ns = to.time_since_epoch().count();
304         itimerspec its = { {0, 0}, {static_cast<long>(ns / NS_PER_SEC), static_cast<long>(ns % NS_PER_SEC)} };
305         int ret = timerfd_settime(timerfd_, TFD_TIMER_ABSTIME, &its, nullptr);
306         if (ret != 0) {
307             FFRT_LOGE("timerfd_settime error, ns=%lu, ret= %d.", ns, ret);
308         }
309     }
310     lock.unlock();
311     return true;
312 }
313 
remove(const TimePoint & to,WaitEntry * we)314 bool DelayedWorker::remove(const TimePoint& to, WaitEntry* we)
315 {
316     std::lock_guard<decltype(lock)> l(lock);
317 
318     auto range = map.equal_range(to);
319     for (auto it = range.first; it != range.second; ++it) {
320         if (it->second.we == we) {
321             map.erase(it);
322             return true;
323         }
324     }
325 
326     return false;
327 }
328 } // namespace ffrt