1 /*
2  * Copyright (C) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef IM_RENDER_THREAD_H
17 #define IM_RENDER_THREAD_H
18 
19 #include <thread>
20 #include <type_traits>
21 
22 #include "render_work_itf.h"
23 #include "render_queue_itf.h"
24 #include "render_fifo_queue.h"
25 #include "render_task_itf.h"
26 
27 constexpr const static int TIME_FOR_STOP = 1000;
28 constexpr const static int TIME_FOR_WAITING_TASK = 2500;
29 
30 template <typename QUEUE = RenderFifoQueue<RenderTaskPtr<void>>>
31 class RenderThread : public RenderWorkerItf<typename QUEUE::DataType> {
32 public:
33     typedef typename QUEUE::DataType LocalTaskType;
34     static_assert(std::is_base_of<RenderQueueItf<LocalTaskType>, QUEUE>::value,
35         "QUEUE should be derived from RenderQueueItf");
36 
37     explicit RenderThread(
38         size_t, std::function<void()> idleTask = []() {});
39     virtual ~RenderThread();
40     virtual void AddTask(const LocalTaskType &, bool overwrite = false) override;
41     virtual void ClearTask() override;
42     virtual void Start() override;
43     virtual void Stop() override;
44 
45 protected:
46     virtual void Run() override;
47 
48     QUEUE *m_localMsgQueue = nullptr;
49     volatile bool m_isWorking = false;
50     volatile bool m_isStopped = true;
51 
52     std::mutex cvMutex;
53     std::condition_variable cvFull;
54     std::condition_variable cvEmpty;
55     std::function<void()> idleTask;
56 
57     std::thread *t{ nullptr };
58     size_t qSize;
59 };
60 
61 template <typename QUEUE>
RenderThread(size_t queueSize,std::function<void ()> idleTask)62 RenderThread<QUEUE>::RenderThread(size_t queueSize, std::function<void()> idleTask) : idleTask(idleTask),
63     qSize(queueSize)
64 {
65     m_localMsgQueue = new QUEUE();
66 }
67 
~RenderThread()68 template <typename QUEUE> RenderThread<QUEUE>::~RenderThread()
69 {
70     Stop();
71     t->join();
72     delete t;
73     delete m_localMsgQueue;
74 }
75 
AddTask(const LocalTaskType & task,bool overwrite)76 template <typename QUEUE> void RenderThread<QUEUE>::AddTask(const LocalTaskType &task, bool overwrite)
77 {
78     std::unique_lock<std::mutex> lk(cvMutex);
79     cvFull.wait(lk, [this]() { return (m_localMsgQueue->GetSize() < this->qSize) || (!m_isWorking); });
80     if (m_isWorking) {
81         if (overwrite) {
82             m_localMsgQueue->Remove([&task](LocalTaskType &t) { return GetTag(task) == GetTag(t); });
83         }
84         m_localMsgQueue->Push(task);
85         lk.unlock();
86         cvEmpty.notify_one();
87     }
88 }
89 
ClearTask()90 template <typename QUEUE> void RenderThread<QUEUE>::ClearTask()
91 {
92     std::unique_lock<std::mutex> lk(cvMutex);
93     while (m_localMsgQueue->GetSize() > 0) {
94         m_localMsgQueue->RemoveAll();
95     }
96     lk.unlock();
97 }
98 
Start()99 template <typename QUEUE> void RenderThread<QUEUE>::Start()
100 {
101     if (m_isStopped) {
102         m_isWorking = true;
103         t = new std::thread([this]() {
104             this->m_isStopped = false;
105             this->Run();
106             this->m_isStopped = true;
107         });
108         while (m_isStopped) {
109             std::this_thread::sleep_for(std::chrono::microseconds(TIME_FOR_STOP));
110         }
111     }
112 }
113 
Stop()114 template <typename QUEUE> void RenderThread<QUEUE>::Stop()
115 {
116     m_isWorking = false;
117     cvEmpty.notify_all();
118     while (!m_isStopped) {
119         std::this_thread::sleep_for(std::chrono::microseconds(TIME_FOR_STOP));
120     }
121 }
122 
Run()123 template <typename QUEUE> void RenderThread<QUEUE>::Run()
124 {
125     while (m_isWorking) {
126         std::unique_lock<std::mutex> lk(cvMutex);
127         bool cvRet = cvEmpty.wait_for(lk, std::chrono::milliseconds(TIME_FOR_WAITING_TASK),
128             [this]() { return (m_localMsgQueue->GetSize() > 0) || (!m_isWorking); });
129         if (cvRet) {
130             LocalTaskType task;
131             bool ret = m_localMsgQueue->Pop(task);
132             lk.unlock();
133             cvFull.notify_one();
134             if (ret) {
135                 task->Run();
136             }
137         } else {
138             lk.unlock();
139             idleTask();
140         }
141     }
142 };
143 #endif // IM_RENDER_THREAD_H