1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eventhandler_adapter_queue.h"
17 #include <securec.h>
18 #include <sstream>
19 #include "dfx/log/ffrt_log_api.h"
20 #include "util/time_format.h"
21 
22 namespace {
23 constexpr int MAX_DUMP_SIZE = 500;
24 constexpr uint8_t HISTORY_TASK_NUM_POWER = 32;
25 
DumpRunningTaskInfo(const char * tag,const ffrt::HistoryTask & currentRunningTask,std::ostringstream & oss)26 void DumpRunningTaskInfo(const char* tag, const ffrt::HistoryTask& currentRunningTask, std::ostringstream& oss)
27 {
28     oss << tag << " Current Running: ";
29     if (currentRunningTask.beginTime_ == std::numeric_limits<uint64_t>::max()) {
30         oss << "{}";
31     } else {
32         oss << "start at " << ffrt::FormatDateString4SteadyClock(currentRunningTask.beginTime_) << ", ";
33         oss << "Event { ";
34         oss << "send thread = " << currentRunningTask.senderKernelThreadId_;
35         oss << ", send time = " << ffrt::FormatDateString4SteadyClock(currentRunningTask.sendTime_);
36         oss << ", handle time = " << ffrt::FormatDateString4SteadyClock(currentRunningTask.handleTime_);
37         oss << ", task name = " << currentRunningTask.taskName_;
38         oss << " }\n";
39     }
40 }
41 
DumpHistoryTaskInfo(const char * tag,const std::vector<ffrt::HistoryTask> & historyTasks,std::ostringstream & oss)42 void DumpHistoryTaskInfo(const char* tag, const std::vector<ffrt::HistoryTask>& historyTasks, std::ostringstream& oss)
43 {
44     oss << tag << " History event queue information:\n";
45     for (uint8_t i = 0; i < HISTORY_TASK_NUM_POWER; i++) {
46         auto historyTask = historyTasks[i];
47         if (historyTask.senderKernelThreadId_ == 0) {
48             continue;
49         }
50 
51         oss << tag << " No. " << (i + 1) << " : Event { ";
52         oss << "send thread = " << historyTask.senderKernelThreadId_;
53         oss << ", send time = " << ffrt::FormatDateString4SteadyClock(historyTask.sendTime_);
54         oss << ", handle time = " << ffrt::FormatDateString4SteadyClock(historyTask.handleTime_);
55         oss << ", trigger time = " << ffrt::FormatDateString4SteadyClock(historyTask.triggerTime_);
56         oss << ", complete time = " << ffrt::FormatDateString4SteadyClock(historyTask.completeTime_);
57         oss << ", task name = " << historyTask.taskName_;
58         oss << " }\n";
59     }
60 }
61 
DumpUnexecutedTaskInfo(const char * tag,const std::multimap<uint64_t,ffrt::QueueTask * > & whenMap,std::ostringstream & oss)62 void DumpUnexecutedTaskInfo(const char* tag,
63     const std::multimap<uint64_t, ffrt::QueueTask*>& whenMap, std::ostringstream& oss)
64 {
65     static std::pair<ffrt_inner_queue_priority_t, std::string> priorityPairArr[] = {
66         {ffrt_inner_queue_priority_immediate, "Immediate"}, {ffrt_inner_queue_priority_high, "High"},
67         {ffrt_inner_queue_priority_low, "Low"}, {ffrt_inner_queue_priority_idle, "Idle"},
68         {ffrt_inner_queue_priority_vip, "Vip"}
69     };
70     uint32_t total = 0;
71     uint32_t dumpSize = MAX_DUMP_SIZE;
72 
73     std::multimap<ffrt_inner_queue_priority_t, ffrt::QueueTask*> priorityMap;
74     for (auto it = whenMap.begin(); it != whenMap.end(); it++) {
75         priorityMap.insert({static_cast<ffrt_inner_queue_priority_t>(it->second->GetPriority()), it->second});
76     }
77 
78     auto taskDumpFun = [&](int n, ffrt::QueueTask* task) {
79         oss << tag << " No. " << n << " : Event { ";
80         oss << "send thread = " << task->fromTid;
81         oss << ", send time = " << ffrt::FormatDateString4SteadyClock(task->GetUptime() - task->GetDelay());
82         oss << ", handle time = " << ffrt::FormatDateString4SteadyClock(task->GetUptime());
83         oss << ", task name = " << task->label;
84         oss << " }\n";
85         dumpSize--;
86     };
87 
88     for (auto pair : priorityPairArr) {
89         auto range = priorityMap.equal_range(pair.first);
90         oss << tag << " " << pair.second << " priority event queue information:\n";
91         int n = 0;
92         for (auto it = range.first; it != range.second; ++it) {
93             total++;
94             n++;
95             if (dumpSize > 0) {
96                 taskDumpFun(n, it->second);
97             }
98         }
99         oss << tag << " Total size of " << pair.second << " events : " << n << "\n";
100     }
101     oss << tag << " Total event size : " << total << "\n";
102 }
103 }
104 
105 namespace ffrt {
EventHandlerAdapterQueue()106 EventHandlerAdapterQueue::EventHandlerAdapterQueue() : EventHandlerInteractiveQueue()
107 {
108     dequeFunc_ = QueueStrategy<QueueTask>::DequeSingleAgainstStarvation;
109     historyTasks_ = std::vector<HistoryTask>(HISTORY_TASK_NUM_POWER);
110     pulledTaskCount_ = std::vector<int>(ffrt_inner_queue_priority_idle + 1, 0);
111 }
112 
~EventHandlerAdapterQueue()113 EventHandlerAdapterQueue::~EventHandlerAdapterQueue()
114 {
115     FFRT_LOGI("destruct eventhandler adapter queueId=%u leave", queueId_);
116 }
117 
Push(QueueTask * task)118 int EventHandlerAdapterQueue::Push(QueueTask* task)
119 {
120     std::unique_lock lock(mutex_);
121     FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
122 
123     if (!isActiveState_.load()) {
124         pulledTaskCount_[task->GetPriority()]++;
125         isActiveState_.store(true);
126         return INACTIVE;
127     }
128 
129     if (task->InsertHead()) {
130         std::multimap<uint64_t, QueueTask*> tmpWhenMap {{0, task}};
131         tmpWhenMap.insert(whenMap_.begin(), whenMap_.end());
132         whenMap_.swap(tmpWhenMap);
133     } else {
134         whenMap_.insert({task->GetUptime(), task});
135     }
136     if (task == whenMap_.begin()->second) {
137         cond_.notify_one();
138     }
139 
140     return SUCC;
141 }
142 
Pull()143 QueueTask* EventHandlerAdapterQueue::Pull()
144 {
145     std::unique_lock lock(mutex_);
146     // wait for delay task
147     uint64_t now = GetNow();
148     while (!whenMap_.empty() && now < whenMap_.begin()->first && !isExit_) {
149         uint64_t diff = whenMap_.begin()->first - now;
150         FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
151         cond_.wait_for(lock, std::chrono::microseconds(diff));
152         FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
153         now = GetNow();
154     }
155 
156     // abort dequeue in abnormal scenarios
157     if (whenMap_.empty()) {
158         FFRT_LOGD("[queueId=%u] switch into inactive", queueId_);
159         isActiveState_.store(false);
160         return nullptr;
161     }
162     FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
163 
164     // dequeue due tasks in batch
165     return dequeFunc_(queueId_, now, whenMap_, &pulledTaskCount_);
166 }
167 
IsIdle()168 bool EventHandlerAdapterQueue::IsIdle()
169 {
170     std::unique_lock lock(mutex_);
171     int nonIdleNum = std::count_if(whenMap_.cbegin(), whenMap_.cend(),
172         [](const auto& pair) { return pair.second->GetPriority() <= ffrt_queue_priority_idle; });
173     return nonIdleNum == 0;
174 }
175 
Dump(const char * tag,char * buf,uint32_t len,bool historyInfo)176 int EventHandlerAdapterQueue::Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)
177 {
178     std::unique_lock lock(mutex_);
179     std::ostringstream oss;
180     if (historyInfo) {
181         DumpRunningTaskInfo(tag, currentRunningTask_, oss);
182         DumpHistoryTaskInfo(tag, historyTasks_, oss);
183     }
184     DumpUnexecutedTaskInfo(tag, whenMap_, oss);
185     return snprintf_s(buf, len, len - 1, "%s", oss.str().c_str());
186 }
187 
DumpSize(ffrt_inner_queue_priority_t priority)188 int EventHandlerAdapterQueue::DumpSize(ffrt_inner_queue_priority_t priority)
189 {
190     std::unique_lock lock(mutex_);
191     return std::count_if(whenMap_.begin(), whenMap_.end(), [=](const auto& pair) {
192         return static_cast<ffrt_inner_queue_priority_t>(pair.second->GetPriority()) == priority;
193     });
194 }
195 
SetCurrentRunningTask(QueueTask * task)196 void EventHandlerAdapterQueue::SetCurrentRunningTask(QueueTask* task)
197 {
198     currentRunningTask_ = HistoryTask(GetNow(), task);
199 }
200 
PushHistoryTask(QueueTask * task,uint64_t triggerTime,uint64_t completeTime)201 void EventHandlerAdapterQueue::PushHistoryTask(QueueTask* task, uint64_t triggerTime, uint64_t completeTime)
202 {
203     HistoryTask historyTask;
204     historyTask.senderKernelThreadId_ = task->fromTid;
205     historyTask.taskName_ = task->label;
206     historyTask.sendTime_ = task->GetUptime() - task->GetDelay();
207     historyTask.handleTime_ = task->GetUptime();
208     historyTask.triggerTime_ = triggerTime;
209     historyTask.completeTime_ = completeTime;
210     historyTasks_[historyTaskIndex_.fetch_add(1) & (HISTORY_TASK_NUM_POWER - 1)] = historyTask;
211 }
212 
CreateEventHandlerAdapterQueue(const ffrt_queue_attr_t * attr)213 std::unique_ptr<BaseQueue> CreateEventHandlerAdapterQueue(const ffrt_queue_attr_t* attr)
214 {
215     (void)attr;
216     return std::make_unique<EventHandlerAdapterQueue>();
217 }
218 } // namespace ffrt
219