1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rs_thread_looper_impl.h"
17
18 #include <algorithm>
19
20 namespace OHOS {
21 namespace Rosen {
22 static thread_local std::unique_ptr<ThreadLooperImpl> g_looperInstance = nullptr;
23
CreateThreadInstance()24 ThreadLooperImpl* ThreadLooperImpl::CreateThreadInstance()
25 {
26 if (!g_looperInstance) {
27 g_looperInstance = std::make_unique<ThreadLooperImpl>();
28 }
29 return g_looperInstance.get();
30 }
31
GetThreadInstance()32 ThreadLooperImpl* ThreadLooperImpl::GetThreadInstance()
33 {
34 return g_looperInstance.get();
35 }
36
HaveDelayedMessageToProcess()37 bool ThreadLooperImpl::HaveDelayedMessageToProcess()
38 {
39 if (delayedQueue_.empty()) {
40 return false;
41 }
42 auto message = *delayedQueue_.begin();
43 return std::get<time_t>(message) <= clock_t::now();
44 }
45
WaitForMessage(int timeoutMillis)46 void ThreadLooperImpl::WaitForMessage(int timeoutMillis)
47 {
48 std::unique_lock<std::mutex> lock(mutex_);
49 if (queue_.empty() && !HaveDelayedMessageToProcess()) {
50 wakeup_ = false;
51 auto pred = [this]() { return !queue_.empty() || HaveDelayedMessageToProcess() || wakeup_; };
52 if (timeoutMillis < 0) {
53 cv_.wait(lock, pred);
54 } else {
55 auto time = clock_t::now() + std::chrono::milliseconds(timeoutMillis);
56 if (!delayedQueue_.empty()) {
57 auto message = *delayedQueue_.begin();
58 auto msgTime = std::get<time_t>(message);
59 time = std::min(time, msgTime);
60 }
61 cv_.wait_until(lock, time, pred);
62 }
63 }
64 }
65
ProcessOneMessageInternal()66 bool ThreadLooperImpl::ProcessOneMessageInternal()
67 {
68 message_t task = nullptr;
69 int param = 0;
70 {
71 std::unique_lock<std::mutex> lock(mutex_);
72 if (!queue_.empty()) {
73 auto top = queue_.begin();
74 auto message = *top;
75 queue_.erase(top);
76 task = std::get<message_t>(message);
77 param = std::get<int>(message);
78 } else if (HaveDelayedMessageToProcess()) {
79 auto top = delayedQueue_.begin();
80 auto message = *top;
81 delayedQueue_.erase(top);
82 task = std::get<message_t>(message);
83 param = std::get<int>(message);
84 }
85 }
86 if (!task) {
87 return false;
88 }
89 task->Process(param);
90 return true;
91 }
92
ProcessOneMessage(int timeoutMillis)93 void ThreadLooperImpl::ProcessOneMessage(int timeoutMillis)
94 {
95 WaitForMessage(timeoutMillis);
96 ProcessOneMessageInternal();
97 }
98
ProcessAllMessages(int timeoutMillis)99 void ThreadLooperImpl::ProcessAllMessages(int timeoutMillis)
100 {
101 if (timeoutMillis < 0) {
102 WaitForMessage(timeoutMillis);
103 while (ProcessOneMessageInternal()) {
104 // Do nothing, just process messages while we can
105 }
106 } else {
107 auto waitUntil = clock_t::now() + std::chrono::milliseconds(timeoutMillis);
108 std::chrono::milliseconds::rep timeLeft = timeoutMillis;
109 while (timeLeft > 0) {
110 ProcessOneMessage(timeoutMillis);
111 timeLeft = std::chrono::duration_cast<std::chrono::milliseconds>(waitUntil - clock_t::now()).count();
112 }
113 }
114 }
115
WakeUp()116 void ThreadLooperImpl::WakeUp()
117 {
118 std::lock_guard<std::mutex> lock(mutex_);
119 wakeup_ = true;
120 cv_.notify_all();
121 }
122
PostMessage(const std::shared_ptr<ThreadLooperMessage> & message,int param)123 void ThreadLooperImpl::PostMessage(const std::shared_ptr<ThreadLooperMessage>& message, int param)
124 {
125 std::lock_guard<std::mutex> lock(mutex_);
126 queue_.emplace_back(message, param);
127 cv_.notify_all();
128 }
129
PostMessage(int64_t delay,const std::shared_ptr<ThreadLooperMessage> & message,int param)130 void ThreadLooperImpl::PostMessage(int64_t delay, const std::shared_ptr<ThreadLooperMessage>& message, int param)
131 {
132 std::lock_guard<std::mutex> lock(mutex_);
133 auto time = clock_t::now() + std::chrono::nanoseconds(delay);
134 auto castTime = std::chrono::time_point_cast<clock_t::duration>(time);
135 delayedQueue_.emplace(castTime, message, param);
136 cv_.notify_all();
137 }
138
RemoveMessages(const std::shared_ptr<ThreadLooperMessage> & message)139 void ThreadLooperImpl::RemoveMessages(const std::shared_ptr<ThreadLooperMessage>& message)
140 {
141 std::lock_guard<std::mutex> lock(mutex_);
142 for (auto pos = std::begin(queue_); pos != std::end(queue_);) {
143 if (std::get<message_t>(*pos) == message) {
144 pos = queue_.erase(pos);
145 } else {
146 ++pos;
147 }
148 }
149 for (auto pos = std::begin(delayedQueue_); pos != std::end(delayedQueue_);) {
150 if (std::get<message_t>(*pos) == message) {
151 pos = delayedQueue_.erase(pos);
152 } else {
153 ++pos;
154 }
155 }
156 }
157 } // namespace Rosen
158 } // namespace OHOS
159