1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "concurrent_queue.h"
17 #include <climits>
18 #include "dfx/log/ffrt_log_api.h"
19 #include "tm/queue_task.h"
20 #include "eu/loop.h"
21
22 namespace ffrt {
DelayTaskCb(void * task)23 static void DelayTaskCb(void* task)
24 {
25 static_cast<QueueTask*>(task)->Execute();
26 }
27
~ConcurrentQueue()28 ConcurrentQueue::~ConcurrentQueue()
29 {
30 FFRT_LOGI("destruct concurrent queueId=%u leave", queueId_);
31 }
32
Push(QueueTask * task)33 int ConcurrentQueue::Push(QueueTask* task)
34 {
35 std::unique_lock lock(mutex_);
36 FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
37 if (task->GetPriority() > ffrt_queue_priority_idle) {
38 task->SetPriority(ffrt_queue_priority_low);
39 }
40
41 if (loop_ != nullptr) {
42 if (task->GetDelay() == 0) {
43 whenMap_.insert({task->GetUptime(), task});
44 loop_->WakeUp();
45 return SUCC;
46 }
47 return PushDelayTaskToTimer(task);
48 }
49 FFRT_COND_DO_ERR(IsOnLoop(), return FAILED, "cannot push task, [queueId=%u] loop empty", queueId_);
50
51 if (concurrency_.load() < maxConcurrency_) {
52 int oldValue = concurrency_.fetch_add(1);
53 FFRT_LOGD("task [gid=%llu] concurrency[%u] + 1 [queueId=%u]", task->gid, oldValue, queueId_);
54
55 if (task->GetDelay() > 0) {
56 whenMap_.insert({task->GetUptime(), task});
57 }
58
59 return CONCURRENT;
60 }
61
62 whenMap_.insert({task->GetUptime(), task});
63 if (task == whenMap_.begin()->second) {
64 cond_.notify_all();
65 }
66
67 return SUCC;
68 }
69
Pull()70 QueueTask* ConcurrentQueue::Pull()
71 {
72 std::unique_lock lock(mutex_);
73 // wait for delay task
74 uint64_t now = GetNow();
75 if (loop_ != nullptr) {
76 if (!whenMap_.empty() && now >= whenMap_.begin()->first && !isExit_) {
77 return dequeFunc_(queueId_, now, whenMap_, nullptr);
78 }
79 return nullptr;
80 }
81
82 while (!whenMap_.empty() && now < whenMap_.begin()->first && !isExit_) {
83 uint64_t diff = whenMap_.begin()->first - now;
84 FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
85 cond_.wait_for(lock, std::chrono::microseconds(diff));
86 FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
87 now = GetNow();
88 }
89
90 // abort dequeue in abnormal scenarios
91 if (whenMap_.empty()) {
92 uint32_t queueId = queueId_;
93 int oldValue = concurrency_.fetch_sub(1); // 取不到后继的task,当前这个task正式退出
94 FFRT_LOGD("concurrency[%d] - 1 [queueId=%u] switch into inactive", oldValue, queueId);
95 return nullptr;
96 }
97 FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
98
99 // dequeue next expired task by priority
100 return dequeFunc_(queueId_, now, whenMap_, nullptr);
101 }
102
Stop()103 void ConcurrentQueue::Stop()
104 {
105 std::unique_lock lock(mutex_);
106 isExit_ = true;
107
108 for (auto it = whenMap_.begin(); it != whenMap_.end(); it++) {
109 if (it->second) {
110 it->second->Notify();
111 it->second->Destroy();
112 }
113 }
114 whenMap_.clear();
115 if (loop_ == nullptr) {
116 cond_.notify_all();
117 }
118
119 FFRT_LOGI("clear [queueId=%u] succ", queueId_);
120 }
121
SetLoop(Loop * loop)122 bool ConcurrentQueue::SetLoop(Loop* loop)
123 {
124 if (loop == nullptr || loop_ != nullptr) {
125 FFRT_LOGE("queueId %s should bind to loop invalid", queueId_);
126 return false;
127 }
128
129 loop_ = loop;
130 isOnLoop_.store(true);
131 return true;
132 }
133
PushDelayTaskToTimer(QueueTask * task)134 int ConcurrentQueue::PushDelayTaskToTimer(QueueTask* task)
135 {
136 uint64_t delayMs = (task->GetDelay() - 1) / 1000 + 1;
137 int timeout = delayMs > INT_MAX ? INT_MAX : delayMs;
138 if (loop_->TimerStart(timeout, task, DelayTaskCb, false) < 0) {
139 FFRT_LOGE("push delay queue task to timer fail");
140 return FAILED;
141 }
142 return SUCC;
143 }
144
CreateConcurrentQueue(const ffrt_queue_attr_t * attr)145 std::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr)
146 {
147 int maxConcurrency = ffrt_queue_attr_get_max_concurrency(attr) <= 0 ? 1 : ffrt_queue_attr_get_max_concurrency(attr);
148 return std::make_unique<ConcurrentQueue>(maxConcurrency);
149 }
150 } // namespace ffrt
151