1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "hardware/imagecodec/msg_handle_loop.h"
17 #include <chrono>
18 #include <cinttypes>
19 #include "qos.h"
20 #include "hardware/imagecodec/image_codec_log.h"
21 
22 namespace OHOS::ImagePlugin {
23 using namespace std;
24 
MsgHandleLoop()25 MsgHandleLoop::MsgHandleLoop()
26 {
27     m_thread = thread([this] {
28         this->MainLoop();
29     });
30 }
31 
~MsgHandleLoop()32 MsgHandleLoop::~MsgHandleLoop()
33 {
34     Stop();
35 }
36 
Stop()37 void MsgHandleLoop::Stop()
38 {
39     {
40         lock_guard<mutex> lock(m_mtx);
41         m_threadNeedStop = true;
42         m_threadCond.notify_all();
43     }
44 
45     if (m_thread.joinable()) {
46         m_thread.join();
47     }
48 }
49 
SendAsyncMsg(MsgType type,const ParamSP & msg,uint32_t delayUs)50 void MsgHandleLoop::SendAsyncMsg(MsgType type, const ParamSP &msg, uint32_t delayUs)
51 {
52     lock_guard<mutex> lock(m_mtx);
53     TimeUs nowUs = GetNowUs();
54     TimeUs msgProcessTime = (delayUs > INT64_MAX - nowUs) ? INT64_MAX : (nowUs + delayUs);
55     if (m_msgQueue.find(msgProcessTime) != m_msgQueue.end()) {
56         LOGW("DUPLICATIVE MSG TIMESTAMP!!!");
57         msgProcessTime++;
58     }
59     m_msgQueue[msgProcessTime] = MsgInfo {type, ASYNC_MSG_ID, msg};
60     m_threadCond.notify_all();
61 }
62 
SendSyncMsg(MsgType type,const ParamSP & msg,ParamSP & reply,uint32_t waitMs)63 bool MsgHandleLoop::SendSyncMsg(MsgType type, const ParamSP &msg, ParamSP &reply, uint32_t waitMs)
64 {
65     MsgId id = GenerateMsgId();
66     {
67         lock_guard<mutex> lock(m_mtx);
68         TimeUs time = GetNowUs();
69         if (m_msgQueue.find(time) != m_msgQueue.end()) {
70             LOGW("DUPLICATIVE MSG TIMESTAMP!!!");
71             time++;
72         }
73         m_msgQueue[time] = MsgInfo {type, id, msg};
74         m_threadCond.notify_all();
75     }
76 
77     unique_lock<mutex> lock(m_replyMtx);
78     const auto pred = [this, id]() {
79         return m_replies.find(id) != m_replies.end();
80     };
81     if (waitMs == 0) {
82         m_replyCond.wait(lock, pred);
83     } else {
84         if (!m_replyCond.wait_for(lock, chrono::milliseconds(waitMs), pred)) {
85             LOGE("type=%{public}u wait reply timeout", type);
86             return false;
87         }
88     }
89     reply = m_replies[id];
90     m_replies.erase(id);
91     return true;
92 }
93 
PostReply(MsgId id,const ParamSP & reply)94 void MsgHandleLoop::PostReply(MsgId id, const ParamSP &reply)
95 {
96     if (id == ASYNC_MSG_ID) {
97         return;
98     }
99     lock_guard<mutex> lock(m_replyMtx);
100     m_replies[id] = reply;
101     m_replyCond.notify_all();
102 }
103 
GenerateMsgId()104 MsgId MsgHandleLoop::GenerateMsgId()
105 {
106     lock_guard<mutex> lock(m_mtx);
107     m_lastMsgId++;
108     if (m_lastMsgId == ASYNC_MSG_ID) {
109         m_lastMsgId++;
110     }
111     return m_lastMsgId;
112 }
113 
MainLoop()114 void MsgHandleLoop::MainLoop()
115 {
116     LOGD("increase thread priority");
117     pthread_setname_np(pthread_self(), "OS_ImageCodecLoop");
118     OHOS::QOS::SetThreadQos(OHOS::QOS::QosLevel::QOS_USER_INTERACTIVE);
119     while (true) {
120         MsgInfo info;
121         {
122             unique_lock<mutex> lock(m_mtx);
123             m_threadCond.wait(lock, [this] {
124                 return m_threadNeedStop || !m_msgQueue.empty();
125             });
126             if (m_threadNeedStop) {
127                 LOGD("stopped, remain %{public}zu msg unprocessed", m_msgQueue.size());
128                 break;
129             }
130             TimeUs processUs = m_msgQueue.begin()->first;
131             TimeUs nowUs = GetNowUs();
132             if (processUs > nowUs) {
133                 m_threadCond.wait_for(lock, chrono::microseconds(processUs - nowUs));
134                 continue;
135             }
136             info = m_msgQueue.begin()->second;
137             m_msgQueue.erase(m_msgQueue.begin());
138         }
139         OnMsgReceived(info);
140     }
141 }
142 
GetNowUs()143 MsgHandleLoop::TimeUs MsgHandleLoop::GetNowUs()
144 {
145     auto now = chrono::steady_clock::now();
146     return chrono::duration_cast<chrono::microseconds>(now.time_since_epoch()).count();
147 }
148 } // namespace OHOS::ImagePlugin
149