1 /*
2  * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "message_queue.h"
17 #include <cinttypes>
18 #include <sys/time.h>
19 #include <thread>
20 #include "wifi_log.h"
21 
22 #undef LOG_TAG
23 #define LOG_TAG "OHWIFI_MESSAGE_QUEUE"
24 
25 namespace OHOS {
26 namespace Wifi {
MessageQueue()27 MessageQueue::MessageQueue() : pMessageQueue(nullptr), mIsBlocked(false), mNeedQuit(false)
28 {
29     LOGI("MessageQueue");
30 }
31 
~MessageQueue()32 MessageQueue::~MessageQueue()
33 {
34     LOGI("~MessageQueue");
35     /* Releasing Messages in a Queue */
36     std::unique_lock<std::mutex> lock(mMtxQueue);
37     InternalMessagePtr current = pMessageQueue;
38     InternalMessagePtr next = nullptr;
39     while (current != nullptr) {
40         next = current->GetNextMsg();
41         current = nullptr;
42         current = next;
43     }
44     return;
45 }
46 
AddMessageToQueue(InternalMessagePtr message,int64_t handleTime)47 bool MessageQueue::AddMessageToQueue(InternalMessagePtr message, int64_t handleTime)
48 {
49     if (message == nullptr) {
50         LOGE("message is null.");
51         return false;
52     }
53 
54     LOGD("AddMessageToQueue, msg: %{public}d, timestamp:%" PRId64 "\n",
55         message->GetMessageName(), handleTime);
56 
57     if (mNeedQuit) {
58         MessageManage::GetInstance().ReclaimMsg(message);
59         LOGE("Already quit the message queue.");
60         return false;
61     }
62 
63     message->SetHandleTime(handleTime);
64     bool mNeedWakeup = false;
65     /*
66      * If the queue is empty, the current message needs to be executed
67      * immediately, or the execution time is earlier than the queue header, the
68      * message is placed in the queue header and is woken up when the queue is
69      * blocked.
70      */
71     {
72         std::unique_lock<std::mutex> lck(mMtxQueue);
73         InternalMessagePtr pTop = pMessageQueue;
74         if (pTop == nullptr || handleTime == 0 || handleTime <= pTop->GetHandleTime()) {
75             LOGD("Add the message in the head of queue.");
76             message->SetNextMsg(pTop);
77             pMessageQueue = message;
78             mNeedWakeup = mIsBlocked;
79         } else {
80             LOGD("Insert the message in the middle of the queue.");
81             InternalMessagePtr pPrev = nullptr;
82             InternalMessagePtr pCurrent = pTop;
83             /* Inserts messages in the middle of the queue based on the execution time. */
84             while (pCurrent != nullptr) {
85                 pPrev = pCurrent;
86                 pCurrent = pCurrent->GetNextMsg();
87                 if (pCurrent == nullptr || handleTime < pCurrent->GetHandleTime()) {
88                     message->SetNextMsg(pCurrent);
89                     pPrev->SetNextMsg(message);
90                     break;
91                 }
92             }
93         }
94     }
95 
96     LOGD("Add message needWakeup: %{public}d", static_cast<int>(mNeedWakeup));
97     if (mNeedWakeup) {
98         mIsBlocked = false;
99     }
100     /* Wake up the process. */
101     mCvQueue.notify_one();
102     return true;
103 }
104 
DeleteMessageFromQueue(int messageName)105 bool MessageQueue::DeleteMessageFromQueue(int messageName)
106 {
107     std::unique_lock<std::mutex> lck(mMtxQueue);
108     InternalMessagePtr pTop = pMessageQueue;
109     if (pTop == nullptr) {
110         return true;
111     }
112 
113     InternalMessagePtr pCurrent = pTop;
114     while (pCurrent != nullptr) {
115         InternalMessagePtr pPrev = pCurrent;
116         pCurrent = pCurrent->GetNextMsg();
117         if ((pCurrent != nullptr) && (pCurrent->GetMessageName() == messageName)) {
118             InternalMessagePtr pNextMsg = pCurrent->GetNextMsg();
119             pPrev->SetNextMsg(pNextMsg);
120             MessageManage::GetInstance().ReclaimMsg(pCurrent);
121             pCurrent = pNextMsg;
122         }
123     }
124 
125     if (pTop->GetMessageName() == messageName) {
126         pMessageQueue = pTop->GetNextMsg();
127         MessageManage::GetInstance().ReclaimMsg(pTop);
128     }
129     return true;
130 }
131 
GetNextMessage()132 InternalMessagePtr MessageQueue::GetNextMessage()
133 {
134     LOGD("GetNextMessage");
135     int nextBlockTime = 0;
136 
137     while (!mNeedQuit) {
138         /* Obtains the current time, accurate to milliseconds. */
139         struct timespec curTime = {0, 0};
140         if (clock_gettime(CLOCK_MONOTONIC, &curTime) != 0) {
141             LOGE("clock_gettime failed.");
142             return nullptr;
143         }
144 
145         int64_t nowTime = static_cast<int64_t>(curTime.tv_sec) * TIME_USEC_1000 +
146             curTime.tv_nsec / (TIME_USEC_1000 * TIME_USEC_1000);
147         {
148             std::unique_lock<std::mutex> lck(mMtxQueue); // Data queue lock
149             InternalMessagePtr curMsg = pMessageQueue;
150             mIsBlocked = true;
151             if (curMsg != nullptr) {
152                 LOGD("Message queue is not empty.");
153                 if (nowTime < curMsg->GetHandleTime()) {
154                     /* The execution time of the first message is not reached.
155                         The remaining time is blocked here. */
156                     nextBlockTime = curMsg->GetHandleTime() - nowTime;
157                 } else {
158                     /* Get the message of queue header. */
159                     mIsBlocked = false;
160                     pMessageQueue = curMsg->GetNextMsg();
161                     curMsg->SetNextMsg(nullptr);
162                     LOGD("Get queue message: %{public}d", curMsg->GetMessageName());
163                     return curMsg;
164                 }
165             } else {
166                 /* If there's no message, check it every 30 seconds. */
167                 nextBlockTime = WIFI_TIME_INTERVAL;
168             }
169         }
170 
171         if (mIsBlocked && (!mNeedQuit)) {
172             std::mutex mtxBlock;
173             std::unique_lock<std::mutex> lck(mtxBlock); // mCvQueue lock
174             LOGD("mCvQueue wait_for: %{public}d", nextBlockTime);
175             if (mCvQueue.wait_for(lck, std::chrono::milliseconds(nextBlockTime)) == std::cv_status::timeout) {
176                 LOGD("mCvQueue wake up, reason: cv_status::timeout: %{public}d", nextBlockTime);
177             } else {
178                 LOGD("mCvQueue is wake up.");
179             }
180         }
181         mIsBlocked = false;
182     }
183     LOGE("Already quit the message queue.");
184     return nullptr;
185 }
186 
StopQueueLoop()187 void MessageQueue::StopQueueLoop()
188 {
189     LOGI("Start stop queue loop.");
190     mNeedQuit = true;
191     if (mIsBlocked) {
192         mIsBlocked = false;
193     }
194     mCvQueue.notify_one();
195     LOGI("Queue loop has stopped.");
196 }
197 }  // namespace Wifi
198 }  // namespace OHOS
199