1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15  * Description: implementation of message queue thread
16  */
17 
18 #include "msg_handle_loop.h"
19 #include <chrono>
20 #include <cinttypes>
21 #include "qos.h"
22 #include "hcodec_log.h"
23 
24 using namespace std;
25 
MsgHandleLoop()26 MsgHandleLoop::MsgHandleLoop()
27 {
28     m_thread = thread(&MsgHandleLoop::MainLoop, this);
29 }
30 
~MsgHandleLoop()31 MsgHandleLoop::~MsgHandleLoop()
32 {
33     Stop();
34 }
35 
Stop()36 void MsgHandleLoop::Stop()
37 {
38     {
39         lock_guard<mutex> lock(m_mtx);
40         m_threadNeedStop = true;
41         m_threadCond.notify_all();
42     }
43 
44     if (m_thread.joinable()) {
45         m_thread.join();
46     }
47 }
48 
SendAsyncMsg(MsgType type,const ParamSP & msg,uint32_t delayUs)49 void MsgHandleLoop::SendAsyncMsg(MsgType type, const ParamSP &msg, uint32_t delayUs)
50 {
51     lock_guard<mutex> lock(m_mtx);
52     TimeUs nowUs = GetNowUs();
53     TimeUs msgProcessTime = (delayUs > INT64_MAX - nowUs) ? INT64_MAX : (nowUs + static_cast<int64_t>(delayUs));
54     if (m_msgQueue.find(msgProcessTime) != m_msgQueue.end()) {
55         LOGW("DUPLICATIVE MSG TIMESTAMP!!!");
56         msgProcessTime++;
57     }
58     m_msgQueue[msgProcessTime] = MsgInfo {type, ASYNC_MSG_ID, msg};
59     m_threadCond.notify_all();
60 }
61 
SendSyncMsg(MsgType type,const ParamSP & msg,ParamSP & reply,uint32_t waitMs)62 bool MsgHandleLoop::SendSyncMsg(MsgType type, const ParamSP &msg, ParamSP &reply, uint32_t waitMs)
63 {
64     MsgId id = GenerateMsgId();
65     {
66         lock_guard<mutex> lock(m_mtx);
67         TimeUs time = GetNowUs();
68         if (m_msgQueue.find(time) != m_msgQueue.end()) {
69             LOGW("DUPLICATIVE MSG TIMESTAMP!!!");
70             time++;
71         }
72         m_msgQueue[time] = MsgInfo {type, id, msg};
73         m_threadCond.notify_all();
74     }
75 
76     unique_lock<mutex> lock(m_replyMtx);
77     const auto pred = [this, id]() {
78         return m_replies.find(id) != m_replies.end();
79     };
80     if (waitMs == 0) {
81         m_replyCond.wait(lock, pred);
82     } else {
83         if (!m_replyCond.wait_for(lock, chrono::milliseconds(waitMs), pred)) {
84             LOGE("type=%u wait reply timeout", type);
85             return false;
86         }
87     }
88     reply = m_replies[id];
89     m_replies.erase(id);
90     return true;
91 }
92 
PostReply(MsgId id,const ParamSP & reply)93 void MsgHandleLoop::PostReply(MsgId id, const ParamSP &reply)
94 {
95     if (id == ASYNC_MSG_ID) {
96         return;
97     }
98     lock_guard<mutex> lock(m_replyMtx);
99     m_replies[id] = reply;
100     m_replyCond.notify_all();
101 }
102 
GenerateMsgId()103 MsgId MsgHandleLoop::GenerateMsgId()
104 {
105     lock_guard<mutex> lock(m_mtx);
106     m_lastMsgId++;
107     if (m_lastMsgId == ASYNC_MSG_ID) {
108         m_lastMsgId++;
109     }
110     return m_lastMsgId;
111 }
112 
MainLoop()113 void MsgHandleLoop::MainLoop()
114 {
115     pthread_setname_np(pthread_self(), "OS_HCodecLoop");
116     OHOS::QOS::SetThreadQos(OHOS::QOS::QosLevel::QOS_USER_INTERACTIVE);
117     while (true) {
118         MsgInfo info;
119         {
120             unique_lock<mutex> lock(m_mtx);
121             m_threadCond.wait(lock, [this] {
122                 return m_threadNeedStop || !m_msgQueue.empty();
123             });
124             if (m_threadNeedStop) {
125                 LOGI("stopped, remain %zu msg unprocessed", m_msgQueue.size());
126                 break;
127             }
128             TimeUs processUs = m_msgQueue.begin()->first;
129             TimeUs nowUs = GetNowUs();
130             if (processUs > nowUs) {
131                 m_threadCond.wait_for(lock, chrono::microseconds(processUs - nowUs));
132                 continue;
133             }
134             info = m_msgQueue.begin()->second;
135             m_msgQueue.erase(m_msgQueue.begin());
136         }
137         OnMsgReceived(info);
138     }
139 }
140 
GetNowUs()141 MsgHandleLoop::TimeUs MsgHandleLoop::GetNowUs()
142 {
143     auto now = chrono::steady_clock::now();
144     return chrono::duration_cast<chrono::microseconds>(now.time_since_epoch()).count();
145 }
146 
147