1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "concurrent_queue.h"
17 #include <climits>
18 #include "dfx/log/ffrt_log_api.h"
19 #include "tm/queue_task.h"
20 #include "eu/loop.h"
21 
22 namespace ffrt {
DelayTaskCb(void * task)23 static void DelayTaskCb(void* task)
24 {
25     static_cast<QueueTask*>(task)->Execute();
26 }
27 
~ConcurrentQueue()28 ConcurrentQueue::~ConcurrentQueue()
29 {
30     FFRT_LOGI("destruct concurrent queueId=%u leave", queueId_);
31 }
32 
Push(QueueTask * task)33 int ConcurrentQueue::Push(QueueTask* task)
34 {
35     std::unique_lock lock(mutex_);
36     FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
37     if (task->GetPriority() > ffrt_queue_priority_idle) {
38         task->SetPriority(ffrt_queue_priority_low);
39     }
40 
41     if (loop_ != nullptr) {
42         if (task->GetDelay() == 0) {
43             whenMap_.insert({task->GetUptime(), task});
44             loop_->WakeUp();
45             return SUCC;
46         }
47         return PushDelayTaskToTimer(task);
48     }
49     FFRT_COND_DO_ERR(IsOnLoop(), return FAILED, "cannot push task, [queueId=%u] loop empty", queueId_);
50 
51     if (concurrency_.load() < maxConcurrency_) {
52         int oldValue = concurrency_.fetch_add(1);
53         FFRT_LOGD("task [gid=%llu] concurrency[%u] + 1 [queueId=%u]", task->gid, oldValue, queueId_);
54 
55         if (task->GetDelay() > 0) {
56             whenMap_.insert({task->GetUptime(), task});
57         }
58 
59         return CONCURRENT;
60     }
61 
62     whenMap_.insert({task->GetUptime(), task});
63     if (task == whenMap_.begin()->second) {
64         cond_.notify_all();
65     }
66 
67     return SUCC;
68 }
69 
Pull()70 QueueTask* ConcurrentQueue::Pull()
71 {
72     std::unique_lock lock(mutex_);
73     // wait for delay task
74     uint64_t now = GetNow();
75     if (loop_ != nullptr) {
76         if (!whenMap_.empty() && now >= whenMap_.begin()->first && !isExit_) {
77             return dequeFunc_(queueId_, now, whenMap_, nullptr);
78         }
79         return nullptr;
80     }
81 
82     while (!whenMap_.empty() && now < whenMap_.begin()->first && !isExit_) {
83         uint64_t diff = whenMap_.begin()->first - now;
84         FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
85         cond_.wait_for(lock, std::chrono::microseconds(diff));
86         FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
87         now = GetNow();
88     }
89 
90     // abort dequeue in abnormal scenarios
91     if (whenMap_.empty()) {
92         uint32_t queueId = queueId_;
93         int oldValue = concurrency_.fetch_sub(1); // 取不到后继的task,当前这个task正式退出
94         FFRT_LOGD("concurrency[%d] - 1 [queueId=%u] switch into inactive", oldValue, queueId);
95         return nullptr;
96     }
97     FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
98 
99     // dequeue next expired task by priority
100     return dequeFunc_(queueId_, now, whenMap_, nullptr);
101 }
102 
Stop()103 void ConcurrentQueue::Stop()
104 {
105     std::unique_lock lock(mutex_);
106     isExit_ = true;
107 
108     for (auto it = whenMap_.begin(); it != whenMap_.end(); it++) {
109         if (it->second) {
110             it->second->Notify();
111             it->second->Destroy();
112         }
113     }
114     whenMap_.clear();
115     if (loop_ == nullptr) {
116         cond_.notify_all();
117     }
118 
119     FFRT_LOGI("clear [queueId=%u] succ", queueId_);
120 }
121 
SetLoop(Loop * loop)122 bool ConcurrentQueue::SetLoop(Loop* loop)
123 {
124     if (loop == nullptr || loop_ != nullptr) {
125         FFRT_LOGE("queueId %s should bind to loop invalid", queueId_);
126         return false;
127     }
128 
129     loop_ = loop;
130     isOnLoop_.store(true);
131     return true;
132 }
133 
PushDelayTaskToTimer(QueueTask * task)134 int ConcurrentQueue::PushDelayTaskToTimer(QueueTask* task)
135 {
136     uint64_t delayMs = (task->GetDelay() - 1) / 1000 + 1;
137     int timeout = delayMs > INT_MAX ? INT_MAX : delayMs;
138     if (loop_->TimerStart(timeout, task, DelayTaskCb, false) < 0) {
139         FFRT_LOGE("push delay queue task to timer fail");
140         return FAILED;
141     }
142     return SUCC;
143 }
144 
CreateConcurrentQueue(const ffrt_queue_attr_t * attr)145 std::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr)
146 {
147     int maxConcurrency = ffrt_queue_attr_get_max_concurrency(attr) <= 0 ? 1 : ffrt_queue_attr_get_max_concurrency(attr);
148     return std::make_unique<ConcurrentQueue>(maxConcurrency);
149 }
150 } // namespace ffrt
151