1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "queue_monitor.h"
16 #include <sstream>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "util/slab.h"
19 #include "sync/sync.h"
20 #include "c/ffrt_dump.h"
21 #include "dfx/sysevent/sysevent.h"
22 #include "internal_inc/osal.h"
23 
24 namespace {
25 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
26 constexpr uint32_t INVALID_TASK_ID = 0;
27 constexpr uint32_t TIME_CONVERT_UNIT = 1000;
28 constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
29 constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
30 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
31 
GetDelayedTimeStamp(uint64_t delayUs)32 inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
33 {
34     return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs);
35 }
36 }
37 
38 namespace ffrt {
QueueMonitor()39 QueueMonitor::QueueMonitor()
40 {
41     FFRT_LOGI("queue monitor ctor enter");
42     queuesRunningInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
43     queuesStructInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
44     lastReportedTask_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
45     we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
46     uint64_t timeout = ffrt_task_timeout_get_threshold() * TIME_CONVERT_UNIT;
47     if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
48         timeoutUs_ = 0;
49         FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout);
50         return;
51     }
52     timeoutUs_ = timeout;
53     FFRT_LOGI("queue monitor ctor leave, watchdog timeout %llu us", timeoutUs_);
54 }
55 
~QueueMonitor()56 QueueMonitor::~QueueMonitor()
57 {
58     FFRT_LOGI("destruction of QueueMonitor");
59     SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
60 }
61 
GetInstance()62 QueueMonitor& QueueMonitor::GetInstance()
63 {
64     static QueueMonitor instance;
65     return instance;
66 }
67 
RegisterQueueId(uint32_t queueId,QueueHandler * queueStruct)68 void QueueMonitor::RegisterQueueId(uint32_t queueId, QueueHandler* queueStruct)
69 {
70     std::unique_lock lock(mutex_);
71     if (queueId == queuesRunningInfo_.size()) {
72         queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
73         queuesStructInfo_.emplace_back(queueStruct);
74         lastReportedTask_.emplace_back(INVALID_TASK_ID);
75         FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId);
76         return;
77     }
78 
79     // only need to ensure that the corresponding info index has been initialized after constructed.
80     if (queueId > queuesRunningInfo_.size()) {
81         for (uint32_t i = queuesRunningInfo_.size(); i <= queueId; ++i) {
82             queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
83             queuesStructInfo_.emplace_back(nullptr);
84             lastReportedTask_.emplace_back(INVALID_TASK_ID);
85         }
86         queuesStructInfo_[queueId] = queueStruct;
87     }
88     if (queuesStructInfo_[queueId] == nullptr) {
89         queuesStructInfo_[queueId] = queueStruct;
90     }
91     FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId);
92 }
93 
ResetQueueInfo(uint32_t queueId)94 void QueueMonitor::ResetQueueInfo(uint32_t queueId)
95 {
96     std::shared_lock lock(mutex_);
97     FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
98         "ResetQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
99     queuesRunningInfo_[queueId].first = INVALID_TASK_ID;
100     lastReportedTask_[queueId] = INVALID_TASK_ID;
101 }
102 
ResetQueueStruct(uint32_t queueId)103 void QueueMonitor::ResetQueueStruct(uint32_t queueId)
104 {
105     std::shared_lock lock(mutex_);
106     FFRT_COND_DO_ERR((queuesStructInfo_.size() <= queueId), return,
107         "ResetQueueStruct queueId=%u access violation, StructInfo_.size=%u", queueId, queuesStructInfo_.size());
108     queuesStructInfo_[queueId] = nullptr;
109 }
110 
UpdateQueueInfo(uint32_t queueId,const uint64_t & taskId)111 void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
112 {
113     std::shared_lock lock(mutex_);
114     FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
115         "UpdateQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
116     TimePoint now = std::chrono::steady_clock::now();
117     queuesRunningInfo_[queueId] = {taskId, now};
118     if (exit_.exchange(false)) {
119         SendDelayedWorker(now + std::chrono::microseconds(timeoutUs_));
120     }
121 }
122 
QueryQueueStatus(uint32_t queueId)123 uint64_t QueueMonitor::QueryQueueStatus(uint32_t queueId)
124 {
125     std::shared_lock lock(mutex_);
126     FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return INVALID_TASK_ID,
127         "QueryQueueStatus queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
128     return queuesRunningInfo_[queueId].first;
129 }
130 
SendDelayedWorker(TimePoint delay)131 void QueueMonitor::SendDelayedWorker(TimePoint delay)
132 {
133     we_->tp = delay;
134     we_->cb = ([this](WaitEntry* we_) { CheckQueuesStatus(); });
135 
136     bool result = DelayedWakeup(we_->tp, we_, we_->cb);
137     // insurance mechanism, generally does not fail
138     while (!result) {
139         FFRT_LOGW("failed to set delayedworker because the given timestamp has passed");
140         we_->tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US);
141         result = DelayedWakeup(we_->tp, we_, we_->cb);
142     }
143 }
144 
ResetTaskTimestampAfterWarning(uint32_t queueId,const uint64_t & taskId)145 void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)
146 {
147     std::unique_lock lock(mutex_);
148     if (queuesRunningInfo_[queueId].first == taskId) {
149         queuesRunningInfo_[queueId].second += std::chrono::microseconds(timeoutUs_);
150     }
151 }
152 
CheckQueuesStatus()153 void QueueMonitor::CheckQueuesStatus()
154 {
155     {
156         std::unique_lock lock(mutex_);
157         auto iter = std::find_if(queuesRunningInfo_.cbegin(), queuesRunningInfo_.cend(),
158             [](const auto& pair) { return pair.first != INVALID_TASK_ID; });
159         if (iter == queuesRunningInfo_.cend()) {
160             exit_ = true;
161             return;
162         }
163     }
164 
165     TimePoint oldestStartedTime = std::chrono::steady_clock::now();
166     TimePoint startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US);
167     uint64_t taskId = 0;
168     uint32_t queueRunningInfoSize = 0;
169     TimePoint taskTimestamp = oldestStartedTime;
170     {
171         std::shared_lock lock(mutex_);
172         queueRunningInfoSize = queuesRunningInfo_.size();
173     }
174     for (uint32_t i = 0; i < queueRunningInfoSize; ++i) {
175         {
176             std::unique_lock lock(mutex_);
177             taskId = queuesRunningInfo_[i].first;
178             taskTimestamp = queuesRunningInfo_[i].second;
179         }
180 
181         if (taskId == INVALID_TASK_ID) {
182             continue;
183         }
184 
185         if (taskTimestamp < startThreshold) {
186             std::stringstream ss;
187             char processName[PROCESS_NAME_BUFFER_LENGTH];
188             GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
189             ss << "Serial_Queue_Timeout, process name:[" << processName << "], serial queue qid:[" << i
190                 << "], serial task gid:[" << taskId << "], execution:[" << timeoutUs_ << "] us.";
191             if (queuesStructInfo_[i] != nullptr) {
192                 ss << queuesStructInfo_[i]->GetDfxInfo();
193             }
194             FFRT_LOGE("%s", ss.str().c_str());
195 #ifdef FFRT_SEND_EVENT
196             if (lastReportedTask_[i] != taskId) {
197                 lastReportedTask_[i] = taskId;
198                 std::string processNameStr = std::string(processName);
199                 std::string senarioName = "Serial_Queue_Timeout";
200                 TaskTimeoutReport(ss, processNameStr, senarioName);
201             }
202 #endif
203             ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
204             if (func) {
205                 func(taskId, ss.str().c_str(), ss.str().size());
206             }
207             // reset timeout task timestamp for next warning
208             ResetTaskTimestampAfterWarning(i, taskId);
209             continue;
210         }
211 
212         if (taskTimestamp < oldestStartedTime) {
213             oldestStartedTime = taskTimestamp;
214         }
215     }
216 
217     SendDelayedWorker(oldestStartedTime + std::chrono::microseconds(timeoutUs_));
218     FFRT_LOGD("global watchdog completed queue status check and scheduled the next");
219 }
220 
HasQueueActive()221 bool QueueMonitor::HasQueueActive()
222 {
223     std::unique_lock lock(mutex_);
224     for (uint32_t i = 0; i < queuesRunningInfo_.size(); ++i) {
225         if (queuesRunningInfo_[i].first != INVALID_TASK_ID) {
226             return true;
227         }
228     }
229     return false;
230 }
231 } // namespace ffrt
232