1 /*
2 * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "message_queue.h"
17 #include <cinttypes>
18 #include <sys/time.h>
19 #include <thread>
20 #include "wifi_log.h"
21
22 #undef LOG_TAG
23 #define LOG_TAG "OHWIFI_MESSAGE_QUEUE"
24
25 namespace OHOS {
26 namespace Wifi {
MessageQueue()27 MessageQueue::MessageQueue() : pMessageQueue(nullptr), mIsBlocked(false), mNeedQuit(false)
28 {
29 LOGI("MessageQueue");
30 }
31
~MessageQueue()32 MessageQueue::~MessageQueue()
33 {
34 LOGI("~MessageQueue");
35 /* Releasing Messages in a Queue */
36 std::unique_lock<std::mutex> lock(mMtxQueue);
37 InternalMessagePtr current = pMessageQueue;
38 InternalMessagePtr next = nullptr;
39 while (current != nullptr) {
40 next = current->GetNextMsg();
41 current = nullptr;
42 current = next;
43 }
44 return;
45 }
46
AddMessageToQueue(InternalMessagePtr message,int64_t handleTime)47 bool MessageQueue::AddMessageToQueue(InternalMessagePtr message, int64_t handleTime)
48 {
49 if (message == nullptr) {
50 LOGE("message is null.");
51 return false;
52 }
53
54 LOGD("AddMessageToQueue, msg: %{public}d, timestamp:%" PRId64 "\n",
55 message->GetMessageName(), handleTime);
56
57 if (mNeedQuit) {
58 MessageManage::GetInstance().ReclaimMsg(message);
59 LOGE("Already quit the message queue.");
60 return false;
61 }
62
63 message->SetHandleTime(handleTime);
64 bool mNeedWakeup = false;
65 /*
66 * If the queue is empty, the current message needs to be executed
67 * immediately, or the execution time is earlier than the queue header, the
68 * message is placed in the queue header and is woken up when the queue is
69 * blocked.
70 */
71 {
72 std::unique_lock<std::mutex> lck(mMtxQueue);
73 InternalMessagePtr pTop = pMessageQueue;
74 if (pTop == nullptr || handleTime == 0 || handleTime <= pTop->GetHandleTime()) {
75 LOGD("Add the message in the head of queue.");
76 message->SetNextMsg(pTop);
77 pMessageQueue = message;
78 mNeedWakeup = mIsBlocked;
79 } else {
80 LOGD("Insert the message in the middle of the queue.");
81 InternalMessagePtr pPrev = nullptr;
82 InternalMessagePtr pCurrent = pTop;
83 /* Inserts messages in the middle of the queue based on the execution time. */
84 while (pCurrent != nullptr) {
85 pPrev = pCurrent;
86 pCurrent = pCurrent->GetNextMsg();
87 if (pCurrent == nullptr || handleTime < pCurrent->GetHandleTime()) {
88 message->SetNextMsg(pCurrent);
89 pPrev->SetNextMsg(message);
90 break;
91 }
92 }
93 }
94 }
95
96 LOGD("Add message needWakeup: %{public}d", static_cast<int>(mNeedWakeup));
97 if (mNeedWakeup) {
98 mIsBlocked = false;
99 }
100 /* Wake up the process. */
101 mCvQueue.notify_one();
102 return true;
103 }
104
DeleteMessageFromQueue(int messageName)105 bool MessageQueue::DeleteMessageFromQueue(int messageName)
106 {
107 std::unique_lock<std::mutex> lck(mMtxQueue);
108 InternalMessagePtr pTop = pMessageQueue;
109 if (pTop == nullptr) {
110 return true;
111 }
112
113 InternalMessagePtr pCurrent = pTop;
114 while (pCurrent != nullptr) {
115 InternalMessagePtr pPrev = pCurrent;
116 pCurrent = pCurrent->GetNextMsg();
117 if ((pCurrent != nullptr) && (pCurrent->GetMessageName() == messageName)) {
118 InternalMessagePtr pNextMsg = pCurrent->GetNextMsg();
119 pPrev->SetNextMsg(pNextMsg);
120 MessageManage::GetInstance().ReclaimMsg(pCurrent);
121 pCurrent = pNextMsg;
122 }
123 }
124
125 if (pTop->GetMessageName() == messageName) {
126 pMessageQueue = pTop->GetNextMsg();
127 MessageManage::GetInstance().ReclaimMsg(pTop);
128 }
129 return true;
130 }
131
GetNextMessage()132 InternalMessagePtr MessageQueue::GetNextMessage()
133 {
134 LOGD("GetNextMessage");
135 int nextBlockTime = 0;
136
137 while (!mNeedQuit) {
138 /* Obtains the current time, accurate to milliseconds. */
139 struct timespec curTime = {0, 0};
140 if (clock_gettime(CLOCK_MONOTONIC, &curTime) != 0) {
141 LOGE("clock_gettime failed.");
142 return nullptr;
143 }
144
145 int64_t nowTime = static_cast<int64_t>(curTime.tv_sec) * TIME_USEC_1000 +
146 curTime.tv_nsec / (TIME_USEC_1000 * TIME_USEC_1000);
147 {
148 std::unique_lock<std::mutex> lck(mMtxQueue); // Data queue lock
149 InternalMessagePtr curMsg = pMessageQueue;
150 mIsBlocked = true;
151 if (curMsg != nullptr) {
152 LOGD("Message queue is not empty.");
153 if (nowTime < curMsg->GetHandleTime()) {
154 /* The execution time of the first message is not reached.
155 The remaining time is blocked here. */
156 nextBlockTime = curMsg->GetHandleTime() - nowTime;
157 } else {
158 /* Get the message of queue header. */
159 mIsBlocked = false;
160 pMessageQueue = curMsg->GetNextMsg();
161 curMsg->SetNextMsg(nullptr);
162 LOGD("Get queue message: %{public}d", curMsg->GetMessageName());
163 return curMsg;
164 }
165 } else {
166 /* If there's no message, check it every 30 seconds. */
167 nextBlockTime = WIFI_TIME_INTERVAL;
168 }
169 }
170
171 if (mIsBlocked && (!mNeedQuit)) {
172 std::mutex mtxBlock;
173 std::unique_lock<std::mutex> lck(mtxBlock); // mCvQueue lock
174 LOGD("mCvQueue wait_for: %{public}d", nextBlockTime);
175 if (mCvQueue.wait_for(lck, std::chrono::milliseconds(nextBlockTime)) == std::cv_status::timeout) {
176 LOGD("mCvQueue wake up, reason: cv_status::timeout: %{public}d", nextBlockTime);
177 } else {
178 LOGD("mCvQueue is wake up.");
179 }
180 }
181 mIsBlocked = false;
182 }
183 LOGE("Already quit the message queue.");
184 return nullptr;
185 }
186
StopQueueLoop()187 void MessageQueue::StopQueueLoop()
188 {
189 LOGI("Start stop queue loop.");
190 mNeedQuit = true;
191 if (mIsBlocked) {
192 mIsBlocked = false;
193 }
194 mCvQueue.notify_one();
195 LOGI("Queue loop has stopped.");
196 }
197 } // namespace Wifi
198 } // namespace OHOS
199