1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rs_thread_looper_impl.h"
17 
18 #include <algorithm>
19 
20 namespace OHOS {
21 namespace Rosen {
22 static thread_local std::unique_ptr<ThreadLooperImpl> g_looperInstance = nullptr;
23 
CreateThreadInstance()24 ThreadLooperImpl* ThreadLooperImpl::CreateThreadInstance()
25 {
26     if (!g_looperInstance) {
27         g_looperInstance = std::make_unique<ThreadLooperImpl>();
28     }
29     return g_looperInstance.get();
30 }
31 
GetThreadInstance()32 ThreadLooperImpl* ThreadLooperImpl::GetThreadInstance()
33 {
34     return g_looperInstance.get();
35 }
36 
HaveDelayedMessageToProcess()37 bool ThreadLooperImpl::HaveDelayedMessageToProcess()
38 {
39     if (delayedQueue_.empty()) {
40         return false;
41     }
42     auto message = *delayedQueue_.begin();
43     return std::get<time_t>(message) <= clock_t::now();
44 }
45 
WaitForMessage(int timeoutMillis)46 void ThreadLooperImpl::WaitForMessage(int timeoutMillis)
47 {
48     std::unique_lock<std::mutex> lock(mutex_);
49     if (queue_.empty() && !HaveDelayedMessageToProcess()) {
50         wakeup_ = false;
51         auto pred = [this]() { return !queue_.empty() || HaveDelayedMessageToProcess() || wakeup_; };
52         if (timeoutMillis < 0) {
53             cv_.wait(lock, pred);
54         } else {
55             auto time = clock_t::now() + std::chrono::milliseconds(timeoutMillis);
56             if (!delayedQueue_.empty()) {
57                 auto message = *delayedQueue_.begin();
58                 auto msgTime = std::get<time_t>(message);
59                 time = std::min(time, msgTime);
60             }
61             cv_.wait_until(lock, time, pred);
62         }
63     }
64 }
65 
ProcessOneMessageInternal()66 bool ThreadLooperImpl::ProcessOneMessageInternal()
67 {
68     message_t task = nullptr;
69     int param = 0;
70     {
71         std::unique_lock<std::mutex> lock(mutex_);
72         if (!queue_.empty()) {
73             auto top = queue_.begin();
74             auto message = *top;
75             queue_.erase(top);
76             task = std::get<message_t>(message);
77             param = std::get<int>(message);
78         } else if (HaveDelayedMessageToProcess()) {
79             auto top = delayedQueue_.begin();
80             auto message = *top;
81             delayedQueue_.erase(top);
82             task = std::get<message_t>(message);
83             param = std::get<int>(message);
84         }
85     }
86     if (!task) {
87         return false;
88     }
89     task->Process(param);
90     return true;
91 }
92 
ProcessOneMessage(int timeoutMillis)93 void ThreadLooperImpl::ProcessOneMessage(int timeoutMillis)
94 {
95     WaitForMessage(timeoutMillis);
96     ProcessOneMessageInternal();
97 }
98 
ProcessAllMessages(int timeoutMillis)99 void ThreadLooperImpl::ProcessAllMessages(int timeoutMillis)
100 {
101     if (timeoutMillis < 0) {
102         WaitForMessage(timeoutMillis);
103         while (ProcessOneMessageInternal()) {
104             // Do nothing, just process messages while we can
105         }
106     } else {
107         auto waitUntil = clock_t::now() + std::chrono::milliseconds(timeoutMillis);
108         std::chrono::milliseconds::rep timeLeft = timeoutMillis;
109         while (timeLeft > 0) {
110             ProcessOneMessage(timeoutMillis);
111             timeLeft = std::chrono::duration_cast<std::chrono::milliseconds>(waitUntil - clock_t::now()).count();
112         }
113     }
114 }
115 
WakeUp()116 void ThreadLooperImpl::WakeUp()
117 {
118     std::lock_guard<std::mutex> lock(mutex_);
119     wakeup_ = true;
120     cv_.notify_all();
121 }
122 
PostMessage(const std::shared_ptr<ThreadLooperMessage> & message,int param)123 void ThreadLooperImpl::PostMessage(const std::shared_ptr<ThreadLooperMessage>& message, int param)
124 {
125     std::lock_guard<std::mutex> lock(mutex_);
126     queue_.emplace_back(message, param);
127     cv_.notify_all();
128 }
129 
PostMessage(int64_t delay,const std::shared_ptr<ThreadLooperMessage> & message,int param)130 void ThreadLooperImpl::PostMessage(int64_t delay, const std::shared_ptr<ThreadLooperMessage>& message, int param)
131 {
132     std::lock_guard<std::mutex> lock(mutex_);
133     auto time = clock_t::now() + std::chrono::nanoseconds(delay);
134     auto castTime = std::chrono::time_point_cast<clock_t::duration>(time);
135     delayedQueue_.emplace(castTime, message, param);
136     cv_.notify_all();
137 }
138 
RemoveMessages(const std::shared_ptr<ThreadLooperMessage> & message)139 void ThreadLooperImpl::RemoveMessages(const std::shared_ptr<ThreadLooperMessage>& message)
140 {
141     std::lock_guard<std::mutex> lock(mutex_);
142     for (auto pos = std::begin(queue_); pos != std::end(queue_);) {
143         if (std::get<message_t>(*pos) == message) {
144             pos = queue_.erase(pos);
145         } else {
146             ++pos;
147         }
148     }
149     for (auto pos = std::begin(delayedQueue_); pos != std::end(delayedQueue_);) {
150         if (std::get<message_t>(*pos) == message) {
151             pos = delayedQueue_.erase(pos);
152         } else {
153             ++pos;
154         }
155     }
156 }
157 } // namespace Rosen
158 } // namespace OHOS
159