1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "queue_monitor.h"
16 #include <sstream>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "util/slab.h"
19 #include "sync/sync.h"
20 #include "c/ffrt_dump.h"
21 #include "dfx/sysevent/sysevent.h"
22 #include "internal_inc/osal.h"
23
24 namespace {
25 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
26 constexpr uint32_t INVALID_TASK_ID = 0;
27 constexpr uint32_t TIME_CONVERT_UNIT = 1000;
28 constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
29 constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
30 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
31
GetDelayedTimeStamp(uint64_t delayUs)32 inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
33 {
34 return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs);
35 }
36 }
37
38 namespace ffrt {
QueueMonitor()39 QueueMonitor::QueueMonitor()
40 {
41 FFRT_LOGI("queue monitor ctor enter");
42 queuesRunningInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
43 queuesStructInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
44 lastReportedTask_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
45 we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
46 uint64_t timeout = ffrt_task_timeout_get_threshold() * TIME_CONVERT_UNIT;
47 if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
48 timeoutUs_ = 0;
49 FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout);
50 return;
51 }
52 timeoutUs_ = timeout;
53 FFRT_LOGI("queue monitor ctor leave, watchdog timeout %llu us", timeoutUs_);
54 }
55
~QueueMonitor()56 QueueMonitor::~QueueMonitor()
57 {
58 FFRT_LOGI("destruction of QueueMonitor");
59 SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
60 }
61
GetInstance()62 QueueMonitor& QueueMonitor::GetInstance()
63 {
64 static QueueMonitor instance;
65 return instance;
66 }
67
RegisterQueueId(uint32_t queueId,QueueHandler * queueStruct)68 void QueueMonitor::RegisterQueueId(uint32_t queueId, QueueHandler* queueStruct)
69 {
70 std::unique_lock lock(mutex_);
71 if (queueId == queuesRunningInfo_.size()) {
72 queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
73 queuesStructInfo_.emplace_back(queueStruct);
74 lastReportedTask_.emplace_back(INVALID_TASK_ID);
75 FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId);
76 return;
77 }
78
79 // only need to ensure that the corresponding info index has been initialized after constructed.
80 if (queueId > queuesRunningInfo_.size()) {
81 for (uint32_t i = queuesRunningInfo_.size(); i <= queueId; ++i) {
82 queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
83 queuesStructInfo_.emplace_back(nullptr);
84 lastReportedTask_.emplace_back(INVALID_TASK_ID);
85 }
86 queuesStructInfo_[queueId] = queueStruct;
87 }
88 if (queuesStructInfo_[queueId] == nullptr) {
89 queuesStructInfo_[queueId] = queueStruct;
90 }
91 FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId);
92 }
93
ResetQueueInfo(uint32_t queueId)94 void QueueMonitor::ResetQueueInfo(uint32_t queueId)
95 {
96 std::shared_lock lock(mutex_);
97 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
98 "ResetQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
99 queuesRunningInfo_[queueId].first = INVALID_TASK_ID;
100 lastReportedTask_[queueId] = INVALID_TASK_ID;
101 }
102
ResetQueueStruct(uint32_t queueId)103 void QueueMonitor::ResetQueueStruct(uint32_t queueId)
104 {
105 std::shared_lock lock(mutex_);
106 FFRT_COND_DO_ERR((queuesStructInfo_.size() <= queueId), return,
107 "ResetQueueStruct queueId=%u access violation, StructInfo_.size=%u", queueId, queuesStructInfo_.size());
108 queuesStructInfo_[queueId] = nullptr;
109 }
110
UpdateQueueInfo(uint32_t queueId,const uint64_t & taskId)111 void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
112 {
113 std::shared_lock lock(mutex_);
114 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
115 "UpdateQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
116 TimePoint now = std::chrono::steady_clock::now();
117 queuesRunningInfo_[queueId] = {taskId, now};
118 if (exit_.exchange(false)) {
119 SendDelayedWorker(now + std::chrono::microseconds(timeoutUs_));
120 }
121 }
122
QueryQueueStatus(uint32_t queueId)123 uint64_t QueueMonitor::QueryQueueStatus(uint32_t queueId)
124 {
125 std::shared_lock lock(mutex_);
126 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return INVALID_TASK_ID,
127 "QueryQueueStatus queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
128 return queuesRunningInfo_[queueId].first;
129 }
130
SendDelayedWorker(TimePoint delay)131 void QueueMonitor::SendDelayedWorker(TimePoint delay)
132 {
133 we_->tp = delay;
134 we_->cb = ([this](WaitEntry* we_) { CheckQueuesStatus(); });
135
136 bool result = DelayedWakeup(we_->tp, we_, we_->cb);
137 // insurance mechanism, generally does not fail
138 while (!result) {
139 FFRT_LOGW("failed to set delayedworker because the given timestamp has passed");
140 we_->tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US);
141 result = DelayedWakeup(we_->tp, we_, we_->cb);
142 }
143 }
144
ResetTaskTimestampAfterWarning(uint32_t queueId,const uint64_t & taskId)145 void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)
146 {
147 std::unique_lock lock(mutex_);
148 if (queuesRunningInfo_[queueId].first == taskId) {
149 queuesRunningInfo_[queueId].second += std::chrono::microseconds(timeoutUs_);
150 }
151 }
152
CheckQueuesStatus()153 void QueueMonitor::CheckQueuesStatus()
154 {
155 {
156 std::unique_lock lock(mutex_);
157 auto iter = std::find_if(queuesRunningInfo_.cbegin(), queuesRunningInfo_.cend(),
158 [](const auto& pair) { return pair.first != INVALID_TASK_ID; });
159 if (iter == queuesRunningInfo_.cend()) {
160 exit_ = true;
161 return;
162 }
163 }
164
165 TimePoint oldestStartedTime = std::chrono::steady_clock::now();
166 TimePoint startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US);
167 uint64_t taskId = 0;
168 uint32_t queueRunningInfoSize = 0;
169 TimePoint taskTimestamp = oldestStartedTime;
170 {
171 std::shared_lock lock(mutex_);
172 queueRunningInfoSize = queuesRunningInfo_.size();
173 }
174 for (uint32_t i = 0; i < queueRunningInfoSize; ++i) {
175 {
176 std::unique_lock lock(mutex_);
177 taskId = queuesRunningInfo_[i].first;
178 taskTimestamp = queuesRunningInfo_[i].second;
179 }
180
181 if (taskId == INVALID_TASK_ID) {
182 continue;
183 }
184
185 if (taskTimestamp < startThreshold) {
186 std::stringstream ss;
187 char processName[PROCESS_NAME_BUFFER_LENGTH];
188 GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
189 ss << "Serial_Queue_Timeout, process name:[" << processName << "], serial queue qid:[" << i
190 << "], serial task gid:[" << taskId << "], execution:[" << timeoutUs_ << "] us.";
191 if (queuesStructInfo_[i] != nullptr) {
192 ss << queuesStructInfo_[i]->GetDfxInfo();
193 }
194 FFRT_LOGE("%s", ss.str().c_str());
195 #ifdef FFRT_SEND_EVENT
196 if (lastReportedTask_[i] != taskId) {
197 lastReportedTask_[i] = taskId;
198 std::string processNameStr = std::string(processName);
199 std::string senarioName = "Serial_Queue_Timeout";
200 TaskTimeoutReport(ss, processNameStr, senarioName);
201 }
202 #endif
203 ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
204 if (func) {
205 func(taskId, ss.str().c_str(), ss.str().size());
206 }
207 // reset timeout task timestamp for next warning
208 ResetTaskTimestampAfterWarning(i, taskId);
209 continue;
210 }
211
212 if (taskTimestamp < oldestStartedTime) {
213 oldestStartedTime = taskTimestamp;
214 }
215 }
216
217 SendDelayedWorker(oldestStartedTime + std::chrono::microseconds(timeoutUs_));
218 FFRT_LOGD("global watchdog completed queue status check and scheduled the next");
219 }
220
HasQueueActive()221 bool QueueMonitor::HasQueueActive()
222 {
223 std::unique_lock lock(mutex_);
224 for (uint32_t i = 0; i < queuesRunningInfo_.size(); ++i) {
225 if (queuesRunningInfo_[i].first != INVALID_TASK_ID) {
226 return true;
227 }
228 }
229 return false;
230 }
231 } // namespace ffrt
232