1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "wait_queue.h"
17 #include "sched/execute_ctx.h"
18 #include "eu/co_routine.h"
19 #include "dfx/log/ffrt_log_api.h"
20 #include "ffrt_trace.h"
21 #include "sync/mutex_private.h"
22 #include "tm/cpu_task.h"
23
24 namespace ffrt {
TaskWithNode()25 TaskWithNode::TaskWithNode()
26 {
27 auto ctx = ExecuteCtx::Cur();
28 task = ctx->task;
29 }
30
ThreadWait(WaitUntilEntry * wn,mutexPrivate * lk,bool legacyMode,CPUEUTask * task)31 void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk, bool legacyMode, CPUEUTask* task)
32 {
33 wqlock.lock();
34 if (legacyMode) {
35 task->blockType = BlockType::BLOCK_THREAD;
36 wn->task = task;
37 }
38 push_back(wn);
39 wqlock.unlock();
40 {
41 std::unique_lock<std::mutex> nl(wn->wl);
42 lk->unlock();
43 wn->cv.wait(nl);
44 }
45 lk->lock();
46 }
47
ThreadWaitUntil(WaitUntilEntry * wn,mutexPrivate * lk,const TimePoint & tp,bool legacyMode,CPUEUTask * task)48 bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk,
49 const TimePoint& tp, bool legacyMode, CPUEUTask* task)
50 {
51 bool ret = false;
52 wqlock.lock();
53 wn->status.store(we_status::INIT, std::memory_order_release);
54 if (legacyMode) {
55 task->blockType = BlockType::BLOCK_THREAD;
56 wn->task = task;
57 }
58 push_back(wn);
59 wqlock.unlock();
60 {
61 std::unique_lock<std::mutex> nl(wn->wl);
62 lk->unlock();
63 if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) {
64 ret = true;
65 }
66 }
67
68 // notify scenarios wn is already pooped
69 // in addition, condition variables may be spurious woken up
70 // in this case, wn needs to be removed from the linked list
71 if (ret || wn->status.load(std::memory_order_acquire) != we_status::NOTIFING) {
72 wqlock.lock();
73 remove(wn);
74 wqlock.unlock();
75 }
76 lk->lock();
77 return ret;
78 }
79
SuspendAndWait(mutexPrivate * lk)80 void WaitQueue::SuspendAndWait(mutexPrivate* lk)
81 {
82 ExecuteCtx* ctx = ExecuteCtx::Cur();
83 CPUEUTask* task = ctx->task;
84 if (ThreadWaitMode(task)) {
85 ThreadWait(&ctx->wn, lk, LegacyMode(task), task);
86 return;
87 }
88 task->wue = new (std::nothrow) WaitUntilEntry(task);
89 FFRT_COND_RETURN_VOID(task->wue == nullptr, "new WaitUntilEntry failed");
90 FFRT_BLOCK_TRACER(task->gid, cnd);
91 CoWait([&](CPUEUTask* task) -> bool {
92 wqlock.lock();
93 push_back(task->wue);
94 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
95 wqlock.unlock();
96 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more.
97 return true;
98 });
99 delete task->wue;
100 task->wue = nullptr;
101 lk->lock();
102 }
103
WeTimeoutProc(WaitQueue * wq,WaitUntilEntry * wue)104 bool WeTimeoutProc(WaitQueue* wq, WaitUntilEntry* wue)
105 {
106 wq->wqlock.lock();
107 bool toWake = true;
108
109 // two kinds: 1) notify was not called, timeout grabbed the lock first;
110 if (wue->status.load(std::memory_order_acquire) == we_status::INIT) {
111 // timeout processes wue first, cv will not be processed again. timeout is responsible for destroying wue.
112 wq->remove(wue);
113 delete wue;
114 wue = nullptr;
115 } else {
116 // 2) notify enters the critical section, first writes the notify status, and then releases the lock
117 // notify is responsible for destroying wue.
118 wue->status.store(we_status::TIMEOUT_DONE, std::memory_order_release);
119 toWake = false;
120 }
121 wq->wqlock.unlock();
122 return toWake;
123 }
124
SuspendAndWaitUntil(mutexPrivate * lk,const TimePoint & tp)125 bool WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept
126 {
127 bool ret = false;
128 ExecuteCtx* ctx = ExecuteCtx::Cur();
129 CPUEUTask* task = ctx->task;
130 if (ThreadWaitMode(task)) {
131 return ThreadWaitUntil(&ctx->wn, lk, tp, LegacyMode(task), task);
132 }
133 task->wue = new WaitUntilEntry(task);
134 task->wue->hasWaitTime = true;
135 task->wue->tp = tp;
136 task->wue->cb = ([&](WaitEntry* we) {
137 WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we);
138 ffrt::CPUEUTask* task = wue->task;
139 if (!WeTimeoutProc(this, wue)) {
140 return;
141 }
142 FFRT_LOGD("task(%d) time is up", task->gid);
143 CoRoutineFactory::CoWakeFunc(task, true);
144 });
145 FFRT_BLOCK_TRACER(task->gid, cnt);
146 CoWait([&](CPUEUTask* task) -> bool {
147 WaitUntilEntry* we = task->wue;
148 wqlock.lock();
149 push_back(we);
150 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
151 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more.
152 if (DelayedWakeup(we->tp, we, we->cb)) {
153 wqlock.unlock();
154 return true;
155 } else {
156 wqlock.unlock();
157 if (!WeTimeoutProc(this, we)) {
158 return true;
159 }
160 task->wakeupTimeOut = true;
161 return false;
162 }
163 });
164 ret = task->wakeupTimeOut;
165 task->wue = nullptr;
166 task->wakeupTimeOut = false;
167 lk->lock();
168 return ret;
169 }
170
WeNotifyProc(WaitUntilEntry * we)171 bool WaitQueue::WeNotifyProc(WaitUntilEntry* we)
172 {
173 if (!we->hasWaitTime) {
174 // For wait task without timeout, we will be deleted after the wait task wakes up.
175 return true;
176 }
177
178 WaitEntry* dwe = static_cast<WaitEntry*>(we);
179 if (!DelayedRemove(we->tp, dwe)) {
180 // Deletion of timer failed during the notify process, indicating that timer cb has been executed at this time
181 // waiting for cb execution to complete, and marking notify as being processed.
182 we->status.store(we_status::NOTIFING, std::memory_order_release);
183 wqlock.unlock();
184 while (we->status.load(std::memory_order_acquire) != we_status::TIMEOUT_DONE) {
185 }
186 wqlock.lock();
187 }
188
189 delete we;
190 return true;
191 }
192
Notify(bool one)193 void WaitQueue::Notify(bool one) noexcept
194 {
195 // the caller should assure the WaitQueue life time.
196 // this function should assure the WaitQueue do not be access after the wqlock is empty(),
197 // that mean the last wait thread/co may destory the WaitQueue.
198 // all the break out should assure the wqlock is in unlock state.
199 // the continue should assure the wqlock is in lock state.
200 wqlock.lock();
201 for (; ;) {
202 if (empty()) {
203 wqlock.unlock();
204 break;
205 }
206 WaitUntilEntry* we = pop_front();
207 if (we == nullptr) {
208 wqlock.unlock();
209 break;
210 }
211 bool isEmpty = empty();
212 CPUEUTask* task = we->task;
213 if (ThreadNotifyMode(task) || we->weType == 2) {
214 std::unique_lock<std::mutex> lk(we->wl);
215 we->status.store(we_status::NOTIFING, std::memory_order_release);
216 if (BlockThread(task)) {
217 task->blockType = BlockType::BLOCK_COROUTINE;
218 we->task = nullptr;
219 }
220 wqlock.unlock();
221 we->cv.notify_one();
222 } else {
223 if (!WeNotifyProc(we)) {
224 continue;
225 }
226 wqlock.unlock();
227 CoRoutineFactory::CoWakeFunc(task, false);
228 }
229 if (isEmpty || one) {
230 break;
231 }
232 wqlock.lock();
233 }
234 }
235
236 } // namespace ffrt
237