1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "wait_queue.h"
17 #include "sched/execute_ctx.h"
18 #include "eu/co_routine.h"
19 #include "dfx/log/ffrt_log_api.h"
20 #include "ffrt_trace.h"
21 #include "sync/mutex_private.h"
22 #include "tm/cpu_task.h"
23 
24 namespace ffrt {
TaskWithNode()25 TaskWithNode::TaskWithNode()
26 {
27     auto ctx = ExecuteCtx::Cur();
28     task = ctx->task;
29 }
30 
ThreadWait(WaitUntilEntry * wn,mutexPrivate * lk,bool legacyMode,CPUEUTask * task)31 void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk, bool legacyMode, CPUEUTask* task)
32 {
33     wqlock.lock();
34     if (legacyMode) {
35         task->blockType = BlockType::BLOCK_THREAD;
36         wn->task = task;
37     }
38     push_back(wn);
39     wqlock.unlock();
40     {
41         std::unique_lock<std::mutex> nl(wn->wl);
42         lk->unlock();
43         wn->cv.wait(nl);
44     }
45     lk->lock();
46 }
47 
ThreadWaitUntil(WaitUntilEntry * wn,mutexPrivate * lk,const TimePoint & tp,bool legacyMode,CPUEUTask * task)48 bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk,
49     const TimePoint& tp, bool legacyMode, CPUEUTask* task)
50 {
51     bool ret = false;
52     wqlock.lock();
53     wn->status.store(we_status::INIT, std::memory_order_release);
54     if (legacyMode) {
55         task->blockType = BlockType::BLOCK_THREAD;
56         wn->task = task;
57     }
58     push_back(wn);
59     wqlock.unlock();
60     {
61         std::unique_lock<std::mutex> nl(wn->wl);
62         lk->unlock();
63         if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) {
64             ret = true;
65         }
66     }
67 
68     // notify scenarios wn is already pooped
69     // in addition, condition variables may be spurious woken up
70     // in this case, wn needs to be removed from the linked list
71     if (ret || wn->status.load(std::memory_order_acquire) != we_status::NOTIFING) {
72         wqlock.lock();
73         remove(wn);
74         wqlock.unlock();
75     }
76     lk->lock();
77     return ret;
78 }
79 
SuspendAndWait(mutexPrivate * lk)80 void WaitQueue::SuspendAndWait(mutexPrivate* lk)
81 {
82     ExecuteCtx* ctx = ExecuteCtx::Cur();
83     CPUEUTask* task = ctx->task;
84     if (ThreadWaitMode(task)) {
85         ThreadWait(&ctx->wn, lk, LegacyMode(task), task);
86         return;
87     }
88     task->wue = new (std::nothrow) WaitUntilEntry(task);
89     FFRT_COND_RETURN_VOID(task->wue == nullptr, "new WaitUntilEntry failed");
90     FFRT_BLOCK_TRACER(task->gid, cnd);
91     CoWait([&](CPUEUTask* task) -> bool {
92         wqlock.lock();
93         push_back(task->wue);
94         lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
95         wqlock.unlock();
96         // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more.
97         return true;
98     });
99     delete task->wue;
100     task->wue = nullptr;
101     lk->lock();
102 }
103 
WeTimeoutProc(WaitQueue * wq,WaitUntilEntry * wue)104 bool WeTimeoutProc(WaitQueue* wq, WaitUntilEntry* wue)
105 {
106     wq->wqlock.lock();
107     bool toWake = true;
108 
109     // two kinds: 1) notify was not called, timeout grabbed the lock first;
110     if (wue->status.load(std::memory_order_acquire) == we_status::INIT) {
111         // timeout processes wue first, cv will not be processed again. timeout is responsible for destroying wue.
112         wq->remove(wue);
113         delete wue;
114         wue = nullptr;
115     } else {
116         // 2) notify enters the critical section, first writes the notify status, and then releases the lock
117         // notify is responsible for destroying wue.
118         wue->status.store(we_status::TIMEOUT_DONE, std::memory_order_release);
119         toWake = false;
120     }
121     wq->wqlock.unlock();
122     return toWake;
123 }
124 
SuspendAndWaitUntil(mutexPrivate * lk,const TimePoint & tp)125 bool WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept
126 {
127     bool ret = false;
128     ExecuteCtx* ctx = ExecuteCtx::Cur();
129     CPUEUTask* task = ctx->task;
130     if (ThreadWaitMode(task)) {
131         return ThreadWaitUntil(&ctx->wn, lk, tp, LegacyMode(task), task);
132     }
133     task->wue = new WaitUntilEntry(task);
134     task->wue->hasWaitTime = true;
135     task->wue->tp = tp;
136     task->wue->cb = ([&](WaitEntry* we) {
137         WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we);
138         ffrt::CPUEUTask* task = wue->task;
139         if (!WeTimeoutProc(this, wue)) {
140             return;
141         }
142         FFRT_LOGD("task(%d) time is up", task->gid);
143         CoRoutineFactory::CoWakeFunc(task, true);
144     });
145     FFRT_BLOCK_TRACER(task->gid, cnt);
146     CoWait([&](CPUEUTask* task) -> bool {
147         WaitUntilEntry* we = task->wue;
148         wqlock.lock();
149         push_back(we);
150         lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
151         // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed any more.
152         if (DelayedWakeup(we->tp, we, we->cb)) {
153             wqlock.unlock();
154             return true;
155         } else {
156             wqlock.unlock();
157             if (!WeTimeoutProc(this, we)) {
158                 return true;
159             }
160             task->wakeupTimeOut = true;
161             return false;
162         }
163     });
164     ret = task->wakeupTimeOut;
165     task->wue = nullptr;
166     task->wakeupTimeOut = false;
167     lk->lock();
168     return ret;
169 }
170 
WeNotifyProc(WaitUntilEntry * we)171 bool WaitQueue::WeNotifyProc(WaitUntilEntry* we)
172 {
173     if (!we->hasWaitTime) {
174         // For wait task without timeout, we will be deleted after the wait task wakes up.
175         return true;
176     }
177 
178     WaitEntry* dwe = static_cast<WaitEntry*>(we);
179     if (!DelayedRemove(we->tp, dwe)) {
180         // Deletion of timer failed during the notify process, indicating that timer cb has been executed at this time
181         // waiting for cb execution to complete, and marking notify as being processed.
182         we->status.store(we_status::NOTIFING, std::memory_order_release);
183         wqlock.unlock();
184         while (we->status.load(std::memory_order_acquire) != we_status::TIMEOUT_DONE) {
185         }
186         wqlock.lock();
187     }
188 
189     delete we;
190     return true;
191 }
192 
Notify(bool one)193 void WaitQueue::Notify(bool one) noexcept
194 {
195     // the caller should assure the WaitQueue life time.
196     // this function should assure the WaitQueue do not be access after the wqlock is empty(),
197     // that mean the last wait thread/co may destory the WaitQueue.
198     // all the break out should assure the wqlock is in unlock state.
199     // the continue should assure the wqlock is in lock state.
200     wqlock.lock();
201     for (; ;) {
202         if (empty()) {
203             wqlock.unlock();
204             break;
205         }
206         WaitUntilEntry* we = pop_front();
207         if (we == nullptr) {
208             wqlock.unlock();
209             break;
210         }
211         bool isEmpty = empty();
212         CPUEUTask* task = we->task;
213         if (ThreadNotifyMode(task) || we->weType == 2) {
214             std::unique_lock<std::mutex> lk(we->wl);
215             we->status.store(we_status::NOTIFING, std::memory_order_release);
216             if (BlockThread(task)) {
217                 task->blockType = BlockType::BLOCK_COROUTINE;
218                 we->task = nullptr;
219             }
220             wqlock.unlock();
221             we->cv.notify_one();
222         } else {
223             if (!WeNotifyProc(we)) {
224                 continue;
225             }
226             wqlock.unlock();
227             CoRoutineFactory::CoWakeFunc(task, false);
228         }
229         if (isEmpty || one) {
230             break;
231         }
232         wqlock.lock();
233     }
234 }
235 
236 } // namespace ffrt
237