1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "worker_monitor.h"
17 #include <cstring>
18 #include <iostream>
19 #include <fstream>
20 #include <sstream>
21 #include <regex>
22 #ifdef FFRT_OH_TRACE_ENABLE
23 #include "backtrace_local.h"
24 #endif
25 
26 #include "dfx/sysevent/sysevent.h"
27 #include "eu/execute_unit.h"
28 #include "eu/worker_manager.h"
29 #include "eu/co_routine_factory.h"
30 #include "internal_inc/osal.h"
31 #include "sched/scheduler.h"
32 #include "util/ffrt_facade.h"
33 #include "dfx/bbox/bbox.h"
34 
35 namespace {
36 constexpr int HISYSEVENT_TIMEOUT_SEC = 60;
37 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
38 constexpr int MONITOR_SAMPLING_CYCLE_US = 500 * 1000;
39 constexpr int SAMPLING_TIMES_PER_SEC = 1000 * 1000 / MONITOR_SAMPLING_CYCLE_US;
40 constexpr uint64_t TIMEOUT_MEMSHRINK_CYCLE_US = 60 * 1000 * 1000;
41 constexpr int RECORD_IPC_INFO_TIME_THRESHOLD = 600;
42 constexpr char IPC_STACK_NAME[] = "libipc_common";
43 constexpr char TRANSACTION_PATH[] = "/proc/transaction_proc";
44 constexpr char CONF_FILEPATH[] = "/etc/ffrt/worker_monitor.conf";
45 const std::vector<int> TIMEOUT_RECORD_CYCLE_LIST = { 1, 3, 5, 10, 30, 60, 10 * 60, 30 * 60 };
46 }
47 
48 namespace ffrt {
WorkerMonitor()49 WorkerMonitor::WorkerMonitor()
50 {
51     // 获取当前进程名称
52     char processName[PROCESS_NAME_BUFFER_LENGTH];
53     GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
54 
55     // 从配置文件读取黑名单比对
56     std::string skipProcess;
57     std::ifstream file(CONF_FILEPATH);
58     if (file.is_open()) {
59         while (std::getline(file, skipProcess)) {
60             if (strstr(processName, skipProcess.c_str()) != nullptr) {
61                 skipSampling_ = true;
62                 return;
63             }
64         }
65     } else {
66         FFRT_LOGW("worker_monitor.conf does not exist or file permission denied");
67     }
68 }
69 
~WorkerMonitor()70 WorkerMonitor::~WorkerMonitor()
71 {
72     std::lock_guard lock(mutex_);
73     skipSampling_ = true;
74 }
75 
GetInstance()76 WorkerMonitor& WorkerMonitor::GetInstance()
77 {
78     static WorkerMonitor instance;
79     return instance;
80 }
81 
SubmitTask()82 void WorkerMonitor::SubmitTask()
83 {
84     if (skipSampling_) {
85         return;
86     }
87 
88     std::lock_guard submitTaskLock(submitTaskMutex_);
89     if (samplingTaskExit_) {
90         SubmitSamplingTask();
91         samplingTaskExit_ = false;
92     }
93     if (memReleaseTaskExit_) {
94         SubmitMemReleaseTask();
95         memReleaseTaskExit_ = false;
96     }
97 }
98 
SubmitSamplingTask()99 void WorkerMonitor::SubmitSamplingTask()
100 {
101     watchdogWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(MONITOR_SAMPLING_CYCLE_US);
102     watchdogWaitEntry_.cb = ([this](WaitEntry* we) { CheckWorkerStatus(); });
103     if (!DelayedWakeup(watchdogWaitEntry_.tp, &watchdogWaitEntry_, watchdogWaitEntry_.cb)) {
104         FFRT_LOGW("Set delayed worker failed.");
105     }
106 }
107 
SubmitMemReleaseTask()108 void WorkerMonitor::SubmitMemReleaseTask()
109 {
110     memReleaseWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(TIMEOUT_MEMSHRINK_CYCLE_US);
111     memReleaseWaitEntry_.cb = ([this](WaitEntry* we) {
112         std::lock_guard lock(mutex_);
113         if (skipSampling_) {
114             return;
115         }
116 
117         WorkerGroupCtl* workerGroup = FFRTFacade::GetEUInstance().GetGroupCtl();
118         {
119             bool noWorkerThreads = true;
120             std::lock_guard submitTaskLock(submitTaskMutex_);
121             for (int i = 0; i < QoS::MaxNum(); i++) {
122                 std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
123                 if (!workerGroup[i].threads.empty()) {
124                     noWorkerThreads = false;
125                     break;
126                 }
127             }
128             if (noWorkerThreads) {
129                 CoRoutineReleaseMem();
130                 samplingTaskExit_ = true;
131                 return;
132             }
133         }
134 
135         CoRoutineReleaseMem();
136         SubmitMemReleaseTask();
137     });
138     if (!DelayedWakeup(memReleaseWaitEntry_.tp, &memReleaseWaitEntry_, memReleaseWaitEntry_.cb)) {
139         FFRT_LOGW("Set delayed worker failed.");
140     }
141 }
142 
CheckWorkerStatus()143 void WorkerMonitor::CheckWorkerStatus()
144 {
145     std::lock_guard lock(mutex_);
146     if (skipSampling_) {
147         return;
148     }
149 
150     WorkerGroupCtl* workerGroup = FFRTFacade::GetEUInstance().GetGroupCtl();
151     {
152         bool noWorkerThreads = true;
153         std::lock_guard submitTaskLock(submitTaskMutex_);
154         for (int i = 0; i < QoS::MaxNum(); i++) {
155             std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
156             if (!workerGroup[i].threads.empty()) {
157                 noWorkerThreads = false;
158                 break;
159             }
160         }
161         if (noWorkerThreads) {
162             samplingTaskExit_ = true;
163             return;
164         }
165     }
166 
167     std::vector<TimeoutFunctionInfo> timeoutFunctions;
168     for (int i = 0; i < QoS::MaxNum(); i++) {
169         int executionNum = FFRTFacade::GetEUInstance().GetCPUMonitor()->WakedWorkerNum(i);
170         int sleepingWorkerNum = FFRTFacade::GetEUInstance().GetCPUMonitor()->SleepingWorkerNum(i);
171 
172         std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
173         CoWorkerInfo coWorkerInfo(i, workerGroup[i].threads.size(), executionNum, sleepingWorkerNum);
174         for (auto& thread : workerGroup[i].threads) {
175             WorkerThread* worker = thread.first;
176             CPUEUTask* workerTask = worker->curTask;
177             if (workerTask == nullptr) {
178                 workerStatus_.erase(worker);
179                 continue;
180             }
181 
182             RecordTimeoutFunctionInfo(coWorkerInfo, worker, workerTask, timeoutFunctions);
183         }
184     }
185 
186     for (const auto& timeoutFunction : timeoutFunctions) {
187         RecordSymbolAndBacktrace(timeoutFunction);
188     }
189 
190     SubmitSamplingTask();
191 }
192 
RecordTimeoutFunctionInfo(const CoWorkerInfo & coWorkerInfo,WorkerThread * worker,CPUEUTask * workerTask,std::vector<TimeoutFunctionInfo> & timeoutFunctions)193 void WorkerMonitor::RecordTimeoutFunctionInfo(const CoWorkerInfo& coWorkerInfo, WorkerThread* worker,
194     CPUEUTask* workerTask, std::vector<TimeoutFunctionInfo>& timeoutFunctions)
195 {
196     auto workerIter = workerStatus_.find(worker);
197     if (workerIter == workerStatus_.end()) {
198         workerStatus_[worker] = TaskTimeoutInfo(workerTask);
199         return;
200     }
201 
202     TaskTimeoutInfo& taskInfo = workerIter->second;
203     if (taskInfo.task_ == workerTask) {
204         if (++taskInfo.sampledTimes_ < SAMPLING_TIMES_PER_SEC) {
205             return;
206         }
207 
208         taskInfo.sampledTimes_ = 0;
209         if (++taskInfo.executionTime_ % TIMEOUT_RECORD_CYCLE_LIST[taskInfo.recordLevel_] == 0) {
210             WorkerInfo workerInfo(worker->Id(), worker->curTaskGid_, worker->curTaskType_, worker->curTaskLabel_);
211             timeoutFunctions.emplace_back(coWorkerInfo, workerInfo, taskInfo.executionTime_);
212             if (taskInfo.recordLevel_ < static_cast<int>(TIMEOUT_RECORD_CYCLE_LIST.size()) - 1) {
213                 taskInfo.recordLevel_++;
214             }
215         }
216 
217         return;
218     }
219 
220     if (taskInfo.executionTime_ > 0) {
221         FFRT_LOGI("Tid[%d] function is executed, which occupies worker for [%d]s.",
222             worker->Id(), taskInfo.executionTime_);
223     }
224     workerIter->second = TaskTimeoutInfo(workerTask);
225 }
226 
RecordSymbolAndBacktrace(const TimeoutFunctionInfo & timeoutFunction)227 void WorkerMonitor::RecordSymbolAndBacktrace(const TimeoutFunctionInfo& timeoutFunction)
228 {
229     std::stringstream ss;
230     char processName[PROCESS_NAME_BUFFER_LENGTH];
231     GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
232     ss << "Task_Sch_Timeout: process name:[" << processName << "], Tid:[" << timeoutFunction.workerInfo_.tid_ <<
233         "], Worker QoS Level:[" << timeoutFunction.coWorkerInfo_.qosLevel_ << "], Concurrent Worker Count:[" <<
234         timeoutFunction.coWorkerInfo_.coWorkerCount_ << "], Execution Worker Number:[" <<
235         timeoutFunction.coWorkerInfo_.executionNum_ << "], Sleeping Worker Number:[" <<
236         timeoutFunction.coWorkerInfo_.sleepingWorkerNum_ << "], Task Type:[" <<
237         timeoutFunction.workerInfo_.workerTaskType_ << "], ";
238 
239 #ifdef WORKER_CACHE_TASKNAMEID
240     if (timeoutFunction.workerInfo_.workerTaskType_ == ffrt_normal_task ||
241         timeoutFunction.workerInfo_.workerTaskType_ == ffrt_queue_task) {
242         ss << "Task Name:[" << timeoutFunction.workerInfo_.label_ <<
243             "], Task Id:[" << timeoutFunction.workerInfo_.gid_ << "], ";
244     }
245 #endif
246 
247     ss << "occupies worker for more than [" << timeoutFunction.executionTime_ << "]s";
248     FFRT_LOGW("%s", ss.str().c_str());
249 
250 #ifdef FFRT_OH_TRACE_ENABLE
251     std::string dumpInfo;
252     if (OHOS::HiviewDFX::GetBacktraceStringByTid(dumpInfo, timeoutFunction.workerInfo_.tid_, 0, false)) {
253         FFRT_LOGW("Backtrace:\n%s", dumpInfo.c_str());
254         if (timeoutFunction.executionTime_ >= RECORD_IPC_INFO_TIME_THRESHOLD) {
255             RecordIpcInfo(dumpInfo, timeoutFunction.workerInfo_.tid_);
256         }
257     }
258 
259     RecordKeyInfo(dumpInfo);
260 #endif
261 #ifdef FFRT_SEND_EVENT
262     if (timeoutFunction.executionTime_ == HISYSEVENT_TIMEOUT_SEC) {
263         std::string processNameStr = std::string(processName);
264         std::string senarioName = "Task_Sch_Timeout";
265         TaskTimeoutReport(ss, processNameStr, senarioName);
266     }
267 #endif
268 }
269 
RecordIpcInfo(const std::string & dumpInfo,int tid)270 void WorkerMonitor::RecordIpcInfo(const std::string& dumpInfo, int tid)
271 {
272     if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos) {
273         return;
274     }
275 
276     std::ifstream transactionFile(TRANSACTION_PATH);
277     FFRT_COND_DO_ERR(!transactionFile.is_open(), return, "open transaction_proc failed");
278 
279     FFRT_LOGW("transaction_proc:");
280     std::string line;
281     std::string regexStr = ".*" + std::to_string(tid) + ".*to.*code.*";
282     while (getline(transactionFile, line)) {
283         if (std::regex_match(line, std::regex(regexStr))) {
284             FFRT_LOGW("%s", line.c_str());
285         }
286     }
287 
288     transactionFile.close();
289 }
290 
RecordKeyInfo(const std::string & dumpInfo)291 void WorkerMonitor::RecordKeyInfo(const std::string& dumpInfo)
292 {
293     if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos || dumpInfo.find("libpower") == std::string::npos) {
294         return;
295     }
296 
297 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
298     std::string keyInfo = SaveKeyInfo();
299     FFRT_LOGW("%s", keyInfo.c_str());
300 #endif
301 }
302 }