1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "safe_block_queue.h"
16 
17 #include <array>
18 #include <future>
19 #include <gtest/gtest.h>
20 #include <thread>
21 #include <iostream>
22 #include <chrono> // std::chrono::seconds
23 using namespace testing::ext;
24 using namespace std;
25 
26 namespace OHOS {
27 namespace {
28 class UtilsSafeBlockQueueTracking : public testing::Test {
29 };
30 
31 const unsigned int QUEUE_SLOTS = 10;
32 const unsigned int THREAD_NUM = QUEUE_SLOTS + 1;
33 
34 class DemoThreadData {
35 public:
DemoThreadData()36     DemoThreadData()
37     {
38         putStatus = false;
39         getStatus = false;
40         joinStatus = false;
41     }
42 
43     bool putStatus;
44     bool getStatus;
45     static SafeBlockQueueTracking<int> shareQueue;
46     static bool joinStatus;
47 
Get()48     void Get()
49     {
50         shareQueue.Pop();
51         getStatus = true;
52     }
53 
Put(int j)54     void Put(int j)
55     {
56         shareQueue.Push(j);
57         putStatus = true;
58     }
59 
Join()60     void Join()
61     {
62         shareQueue.Join();
63         joinStatus = true;
64     }
65 
GetAndOneTaskDone()66     void GetAndOneTaskDone()
67     {
68         shareQueue.Pop();
69         getStatus = true;
70         shareQueue.OneTaskDone();
71     }
72 };
73 
74 SafeBlockQueueTracking<int> DemoThreadData::shareQueue(QUEUE_SLOTS);
75 bool DemoThreadData::joinStatus = false;
76 
PutHandleThreadData(DemoThreadData & q,int i)77 void PutHandleThreadData(DemoThreadData& q, int i)
78 {
79     q.Put(i);
80 }
81 
GetThreadDatePushedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & pushedIn,unsigned int & unpushedIn)82 void GetThreadDatePushedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
83                                              unsigned int& pushedIn, unsigned int& unpushedIn)
84 {
85     pushedIn = 0;
86     unpushedIn = 0;
87     for (auto& t : demoDatas) {
88         if (t.putStatus) {
89             pushedIn++;
90         } else {
91             unpushedIn++;
92         }
93     }
94 }
95 
GetThreadDateGetedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & getedOut,unsigned int & ungetedOut)96 void GetThreadDateGetedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
97                                            unsigned int& getedOut, unsigned int& ungetedOut)
98 {
99     getedOut = 0;
100     ungetedOut = 0;
101     for (auto& t : demoDatas) {
102         if (t.getStatus) {
103             getedOut++;
104         } else {
105             ungetedOut++;
106         }
107     }
108 }
109 
PutHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)110 void PutHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
111 {
112     std::this_thread::sleep_until(absTime);
113 
114     q.Put(i);
115 }
116 
GetAndOneTaskDoneHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)117 void GetAndOneTaskDoneHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
118 {
119     std::this_thread::sleep_until(absTime);
120 
121     q.GetAndOneTaskDone();
122 }
123 
124 /*
125  * @tc.name: testPut001
126  * @tc.desc: Single-threaded call put and get to determine that the normal scenario is working properly
127  */
128 HWTEST_F(UtilsSafeBlockQueueTracking, testPut001, TestSize.Level0)
129 {
130     SafeBlockQueueTracking<int> qi(10);
131     int i = 1;
132     qi.Push(i);
133     EXPECT_EQ(static_cast<unsigned>(1), qi.Size());
134     ASSERT_GT(qi.GetUnfinishTaskNum(), 0);
135 }
136 
137 /*
138  * @tc.name: testGet001
139  * @tc.desc: Single-threaded call put and get to determine that the normal scenario is working properly
140  */
141 HWTEST_F(UtilsSafeBlockQueueTracking, testGet001, TestSize.Level0)
142 {
143     SafeBlockQueueTracking<int> qi(10);
144     for (int i = 0; i < 3; i++) {
145         qi.Push(i);
146     }
147     EXPECT_EQ(static_cast<unsigned>(3), qi.Size());
148     int t = qi.Pop();
149     ASSERT_EQ(t, 0);
150     ASSERT_GT(qi.GetUnfinishTaskNum(), 0);
151 }
152 
153 /*
154  * @tc.name: testMutilthreadPutAndBlock001
155  * @tc.desc: Multiple threads put until blocking runs, one thread gets, all threads finish running normally
156  */
157 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadPutAndBlock001, TestSize.Level0)
158 {
159     std::thread threads[THREAD_NUM];
160 
161     std::array<DemoThreadData, THREAD_NUM> demoDatas;
162     demoDatas.fill(DemoThreadData());
163     demoDatas[0].Put(1);
164     ASSERT_FALSE(demoDatas[0].joinStatus);
165 
166     // start thread to join
167     DemoThreadData tmp = DemoThreadData();
168     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
169     ASSERT_FALSE(demoDatas[0].joinStatus);
170 
171     for (unsigned int i = 0; i < THREAD_NUM; i++) {
172         threads[i] = std::thread(PutHandleThreadData, std::ref(demoDatas[i]), i);
173     }
174     ASSERT_FALSE(demoDatas[0].joinStatus);
175     // 1. queue is full and some threads is blocked
176     std::this_thread::sleep_for(std::chrono::seconds(2));
177     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
178 
179     ASSERT_FALSE(demoDatas[0].joinStatus);
180     // 2.  get one out  and wait some put in
181     for (unsigned int i = 0; i < THREAD_NUM; i++) {
182         demoDatas[0].GetAndOneTaskDone();
183         ASSERT_FALSE(demoDatas[0].joinStatus);
184     }
185     ASSERT_FALSE(demoDatas[0].joinStatus);
186     demoDatas[0].GetAndOneTaskDone();
187     std::this_thread::sleep_for(std::chrono::seconds(1));
188 
189     ASSERT_TRUE(demoDatas[0].joinStatus);
190 
191     // recover state
192     for (auto& t : threads) {
193         t.join();
194     }
195     joinThread.join();
196 
197     while (!DemoThreadData::shareQueue.IsEmpty()) {
198         demoDatas[0].GetAndOneTaskDone();
199     }
200     demoDatas[0].joinStatus = false;
201 }
202 
CheckFullQueueStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & pushedIn,unsigned int & unpushedIn,unsigned int & getedOut,unsigned int & ungetedOut)203 static void CheckFullQueueStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas, unsigned int& pushedIn,
204                                  unsigned int& unpushedIn, unsigned int& getedOut, unsigned int& ungetedOut)
205 {
206     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
207     GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
208     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
209     ASSERT_EQ(pushedIn, THREAD_NUM);
210     ASSERT_EQ(getedOut, THREAD_NUM - QUEUE_SLOTS);
211 }
212 
GetTimeAddTwoSeconds()213 static std::time_t GetTimeAddTwoSeconds()
214 {
215     using std::chrono::system_clock;
216     std::time_t timeT = system_clock::to_time_t(system_clock::now());
217     const int twoSec = 2;
218     timeT += twoSec;
219     return timeT;
220 }
221 
222 /*
223  * @tc.name: testMutilthreadConcurrentPutAndBlockInblankqueue001
224  * @tc.desc: Multi-threaded put() on the empty queue. When n threads are waiting to reach a certain
225  * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
226  */
227 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInblankqueue001, TestSize.Level0)
228 {
229     // 1. prepare
230     std::thread threads[THREAD_NUM];
231     std::array<DemoThreadData, THREAD_NUM> demoDatas;
232     demoDatas.fill(DemoThreadData());
233 
234     using std::chrono::system_clock;
235 
236     std::time_t timeT = GetTimeAddTwoSeconds();
237 
238     // 2. start thread
239     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
240     for (unsigned int i = 0; i < THREAD_NUM; i++) {
241         threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
242     }
243 
244     // 3. queue is full and some threads is blocked
245     std::this_thread::sleep_for(std::chrono::seconds(4));
246 
247     // start thread to join
248     DemoThreadData tmp = DemoThreadData();
249     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
250     ASSERT_FALSE(demoDatas[0].joinStatus);
251 
252     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
253     unsigned int pushedIn = 0;
254     unsigned int unpushedIn = 0;
255     unsigned int getedOut = 0;
256     unsigned int ungetedOut = 0;
257     GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
258 
259     ASSERT_EQ(pushedIn, QUEUE_SLOTS);
260     ASSERT_EQ(unpushedIn, THREAD_NUM - QUEUE_SLOTS);
261 
262     // get one out and wait some put in
263     for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
264         demoDatas[0].GetAndOneTaskDone();
265         ASSERT_FALSE(demoDatas[0].joinStatus);
266     }
267 
268     std::this_thread::sleep_for(std::chrono::seconds(2));
269     // queue is full and some threads is blocked and is not joined
270     CheckFullQueueStatus(demoDatas, pushedIn, unpushedIn, getedOut, ungetedOut);
271 
272     for (auto& t : threads) {
273         t.join();
274     }
275 
276     while (!DemoThreadData::shareQueue.IsEmpty()) {
277         demoDatas[0].GetAndOneTaskDone();
278     }
279 
280     std::this_thread::sleep_for(std::chrono::seconds(1));
281     ASSERT_TRUE(demoDatas[0].joinStatus);
282     demoDatas[0].joinStatus = false;
283     joinThread.join();
284 }
285 
QueuePushInfull()286 static void QueuePushInfull()
287 {
288     for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
289         int t = i;
290         DemoThreadData::shareQueue.Push(t);
291     }
292 }
293 
QueuePushInnotfull(const unsigned int remain)294 static void QueuePushInnotfull(const unsigned int remain)
295 {
296     for (unsigned int i = 0; i < QUEUE_SLOTS - remain; i++) {
297         int t = i;
298         DemoThreadData::shareQueue.Push(t);
299     }
300 }
301 
302 /*
303  * @tc.name: testMutilthreadConcurrentPutAndBlockInfullqueue001
304  * @tc.desc: Multi-threaded put() on the full queue. When n threads are waiting to reach a certain
305  * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
306  */
307 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInfullqueue001, TestSize.Level0)
308 {
309     // 1. prepare
310     std::thread threads[THREAD_NUM];
311     std::array<DemoThreadData, THREAD_NUM> demoDatas;
312     demoDatas.fill(DemoThreadData());
313 
314     using std::chrono::system_clock;
315 
316     std::time_t timeT = GetTimeAddTwoSeconds();
317     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
318     QueuePushInfull();
319     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
320 
321     // 2. start thread put in full queue
322     for (unsigned int i = 0; i < THREAD_NUM; i++) {
323         threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
324     }
325 
326     std::this_thread::sleep_for(std::chrono::seconds(3));
327 
328     // 3. now thread is running and all is blocked
329     DemoThreadData tmp = DemoThreadData();
330     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
331     ASSERT_FALSE(demoDatas[0].joinStatus);
332 
333     unsigned int pushedIn = 0;
334     unsigned int unpushedIn = 0;
335     GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
336     ASSERT_EQ(pushedIn, static_cast<unsigned int>(0));
337     ASSERT_EQ(unpushedIn, THREAD_NUM);
338     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
339     for (unsigned int i = 0; i < THREAD_NUM; i++) {
340         DemoThreadData::shareQueue.Pop();
341         DemoThreadData::shareQueue.OneTaskDone();
342 
343         std::this_thread::sleep_for(std::chrono::seconds(1));
344         ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
345         GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
346         ASSERT_EQ(pushedIn, i + 1);
347         ASSERT_EQ(unpushedIn, THREAD_NUM - (i + 1));
348     }
349 
350     for (auto& t : threads) {
351         t.join();
352     }
353 
354     while (!DemoThreadData::shareQueue.IsEmpty()) {
355         demoDatas[0].GetAndOneTaskDone();
356     }
357     demoDatas[0].joinStatus = false;
358     joinThread.join();
359 }
360 
361 /*
362  * @tc.name: testMutilthreadConcurrentGetAndBlockInblankqueue001
363  * @tc.desc: Multi-threaded get() on the empty queue. When n threads are waiting to reach a certain
364  * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
365  */
366 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInblankqueue001, TestSize.Level0)
367 {
368     // 1. prepare
369     std::thread threads[THREAD_NUM];
370     std::array<DemoThreadData, THREAD_NUM> demoDatas;
371     demoDatas.fill(DemoThreadData());
372 
373     using std::chrono::system_clock;
374 
375     std::time_t timeT = GetTimeAddTwoSeconds();
376     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
377 
378     // 2. start thread put in empty queue
379     for (unsigned int i = 0; i < THREAD_NUM; i++) {
380         threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
381             std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
382     }
383     std::this_thread::sleep_for(std::chrono::seconds(3));
384 
385     // 3. now thread is running and all is blocked
386     unsigned int getedOut = 0;
387     unsigned int ungetedOut = 0;
388     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
389     ASSERT_EQ(getedOut, static_cast<unsigned int>(0));
390     ASSERT_EQ(ungetedOut, THREAD_NUM);
391     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
392 
393     // start thread to join
394     DemoThreadData tmp = DemoThreadData();
395     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
396     ASSERT_FALSE(demoDatas[0].joinStatus);
397 
398     int value = 1;
399     for (unsigned int i = 0; i < THREAD_NUM; i++) {
400         DemoThreadData::shareQueue.Push(value);
401         std::this_thread::sleep_for(std::chrono::seconds(1));
402         ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
403         GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
404         ASSERT_EQ(getedOut, i + 1);
405         ASSERT_EQ(ungetedOut, THREAD_NUM - (i + 1));
406     }
407 
408     for (auto& t : threads) {
409         t.join();
410     }
411 
412     while (!DemoThreadData::shareQueue.IsEmpty()) {
413         demoDatas[0].GetAndOneTaskDone();
414     }
415 
416     ASSERT_TRUE(demoDatas[0].joinStatus);
417     demoDatas[0].joinStatus = false;
418     joinThread.join();
419 }
420 
QueuePushFullEquivalent(const int equivalent)421 static void QueuePushFullEquivalent(const int equivalent)
422 {
423     for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
424         DemoThreadData::shareQueue.Push(equivalent);
425     }
426     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
427 }
428 
429 /*
430  * @tc.name: testMutilthreadConcurrentGetAndBlockInfullqueue001
431  * @tc.desc: Multi-threaded get() on the full queue. When n threads are waiting to reach a certain
432  * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
433  */
434 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInfullqueue001, TestSize.Level0)
435 {
436     // 1. prepare
437     std::thread threads[THREAD_NUM];
438     std::array<DemoThreadData, THREAD_NUM> demoDatas;
439     demoDatas.fill(DemoThreadData());
440 
441     using std::chrono::system_clock;
442 
443     std::time_t timeT = GetTimeAddTwoSeconds();
444     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
445     int t = 1;
446     QueuePushFullEquivalent(t);
447 
448     // start thread to join
449     demoDatas[0].joinStatus = false;
450     DemoThreadData tmp = DemoThreadData();
451     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
452     ASSERT_FALSE(demoDatas[0].joinStatus);
453 
454     // 2. start thread get in full queue
455     for (unsigned int i = 0; i < THREAD_NUM; i++) {
456         threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
457             std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
458     }
459 
460     std::this_thread::sleep_for(std::chrono::seconds(4));
461 
462     // 3. start judge
463     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
464     ASSERT_TRUE(demoDatas[0].joinStatus);
465 
466     unsigned int getedOut = 0, ungetedOut = 0;
467     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
468 
469     ASSERT_EQ(getedOut, QUEUE_SLOTS);
470     ASSERT_EQ(ungetedOut, THREAD_NUM - QUEUE_SLOTS);
471 
472     // put one in and wait some get out
473     for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
474         demoDatas[0].Put(t);
475     }
476 
477     demoDatas[0].joinStatus = false;
478     DemoThreadData tmp2 = DemoThreadData();
479     std::thread joinThread2 = std::thread(&DemoThreadData::Join, tmp2);
480 
481     std::this_thread::sleep_for(std::chrono::seconds(2));
482 
483     // queue is full and some threads is blocked and is not joined
484     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
485     ASSERT_TRUE(demoDatas[0].joinStatus);
486 
487     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
488     ASSERT_EQ(getedOut, THREAD_NUM);
489     ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
490 
491     for (auto& thd : threads) {
492         thd.join();
493     }
494 
495     while (!DemoThreadData::shareQueue.IsEmpty()) {
496         demoDatas[0].GetAndOneTaskDone();
497     }
498 
499     demoDatas[0].joinStatus = false;
500     joinThread.join();
501     joinThread2.join();
502 }
503 
504 /*
505  * @tc.name: testMutilthreadConcurrentGetAndBlockInnotfullqueue001
506  * @tc.desc: Multi-threaded get() on the notfull queue. When n threads are waiting to reach a certain
507  * time-point, everyone get concurrent to see the status of the queue and the state of the thread.
508  */
509 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInnotfullqueue001, TestSize.Level0)
510 {
511     // 1. prepare
512     std::thread threads[THREAD_NUM];
513     std::array<DemoThreadData, THREAD_NUM> demoDatas;
514     demoDatas.fill(DemoThreadData());
515 
516     using std::chrono::system_clock;
517 
518     const unsigned int REMAIN_SLOTS = 5;
519     std::time_t timeT = GetTimeAddTwoSeconds();
520     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
521     QueuePushInnotfull(REMAIN_SLOTS);
522 
523     // start thread to join
524     demoDatas[0].joinStatus = false;
525     DemoThreadData tmp = DemoThreadData();
526     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
527     ASSERT_FALSE(demoDatas[0].joinStatus);
528 
529     // 2. start thread put in not full queue
530     for (unsigned int i = 0; i < THREAD_NUM; i++) {
531         threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
532             std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
533     }
534 
535     std::this_thread::sleep_for(std::chrono::seconds(3));
536 
537     unsigned int getedOut = 0;
538     unsigned int ungetedOut = 0;
539     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
540     ASSERT_EQ(getedOut, QUEUE_SLOTS - REMAIN_SLOTS);
541     ASSERT_EQ(ungetedOut, THREAD_NUM - (QUEUE_SLOTS - REMAIN_SLOTS));
542     ASSERT_TRUE(demoDatas[0].joinStatus);
543     // 3. put ungetedOut
544     for (unsigned int i = 0; i < ungetedOut; i++) {
545         int t = i;
546         DemoThreadData::shareQueue.Push(t);
547     }
548     demoDatas[0].joinStatus = false;
549     DemoThreadData tmp2 = DemoThreadData();
550     std::thread joinThread2 = std::thread(&DemoThreadData::Join, tmp2);
551 
552     std::this_thread::sleep_for(std::chrono::seconds(1));
553     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
554     ASSERT_EQ(getedOut, THREAD_NUM);
555     ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
556 
557     for (auto& t : threads) {
558         t.join();
559     }
560 
561     ASSERT_TRUE(demoDatas[0].joinStatus);
562     demoDatas[0].joinStatus = false;
563     joinThread.join();
564     joinThread2.join();
565 }
566 
567 /*
568  * @tc.name: testMutilthreadConcurrentPutAndBlockInnotfullqueue001
569  * @tc.desc: Multi-threaded put() on the not full queue. When n threads are waiting to reach a certain
570  * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
571  */
572 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInnotfullqueue001, TestSize.Level0)
573 {
574     // 1. prepare
575     std::array<DemoThreadData, THREAD_NUM> demoDatas;
576     std::thread threads[THREAD_NUM];
577     demoDatas.fill(DemoThreadData());
578 
579     using std::chrono::system_clock;
580 
581     const unsigned int REMAIN_SLOTS = 5;
582     std::time_t timeT = GetTimeAddTwoSeconds();
583     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
584     QueuePushInnotfull(REMAIN_SLOTS);
585 
586     // start thread to join
587     demoDatas[0].joinStatus = false;
588     DemoThreadData tmp = DemoThreadData();
589     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
590     ASSERT_FALSE(demoDatas[0].joinStatus);
591 
592     // 2. start thread put in not full queue
593     for (unsigned int i = 0; i < THREAD_NUM; i++) {
594         threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
595     }
596     ASSERT_FALSE(demoDatas[0].joinStatus);
597     std::this_thread::sleep_for(std::chrono::seconds(3));
598     ASSERT_FALSE(demoDatas[0].joinStatus);
599     unsigned int putedin = 0;
600     unsigned int unputedin = 0;
601     GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
602     ASSERT_EQ(putedin, REMAIN_SLOTS);
603     ASSERT_EQ(unputedin, THREAD_NUM - REMAIN_SLOTS);
604 
605     // 3. put ungetedOut
606     for (unsigned int i = 0; i < unputedin; i++) {
607         DemoThreadData::shareQueue.Pop();
608         DemoThreadData::shareQueue.OneTaskDone();
609     }
610 
611     std::this_thread::sleep_for(std::chrono::seconds(1));
612     GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
613     ASSERT_EQ(putedin, THREAD_NUM);
614     ASSERT_EQ(unputedin, static_cast<unsigned int>(0));
615 
616     ASSERT_FALSE(demoDatas[0].joinStatus);
617 
618     for (auto& t : threads) {
619         t.join();
620     }
621 
622     while (!DemoThreadData::shareQueue.IsEmpty()) {
623         demoDatas[0].GetAndOneTaskDone();
624     }
625 
626     std::this_thread::sleep_for(std::chrono::seconds(1));
627     ASSERT_TRUE(demoDatas[0].joinStatus);
628     joinThread.join();
629     demoDatas[0].joinStatus = false;
630 }
631 
632 /*
633  * @tc.name: testMutilthreadConcurrentGetAndPopInfullqueue001
634  * @tc.desc: Multi-threaded put() and Multi-threaded get() on the full queue. When all threads are waiting to reach a
635  * certain time-point, everyone run concurrently to see the status of the queue and the state of the thread.
636  */
637 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndPopInfullqueue001, TestSize.Level0)
638 {
639     // 1. prepare
640     std::thread threadsout[THREAD_NUM];
641     std::array<DemoThreadData, THREAD_NUM> demoDatas;
642     demoDatas.fill(DemoThreadData());
643 
644     std::thread threadsin[THREAD_NUM];
645 
646     using std::chrono::system_clock;
647 
648     std::time_t timeT = GetTimeAddTwoSeconds();
649     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
650     int t = 1;
651     QueuePushFullEquivalent(t);
652 
653     // start thread to join
654     demoDatas[0].joinStatus = false;
655     DemoThreadData tmp = DemoThreadData();
656     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
657     ASSERT_FALSE(demoDatas[0].joinStatus);
658 
659     // 2. start thread put in not full queue
660     for (unsigned int i = 0; i < THREAD_NUM; i++) {
661         threadsin[i] = std::thread(PutHandleThreadDataTime,
662             std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
663     }
664 
665     for (unsigned int i = 0; i < THREAD_NUM; i++) {
666         threadsout[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
667             std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
668     }
669 
670     std::this_thread::sleep_for(std::chrono::seconds(3));
671 
672     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
673     unsigned int getedOut = 0, ungetedOut = 0, pushedIn = 0, unpushedIn = 0;
674     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
675     GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
676 
677     ASSERT_EQ(pushedIn, THREAD_NUM);
678     ASSERT_EQ(getedOut, THREAD_NUM);
679 
680     ASSERT_FALSE(demoDatas[0].joinStatus);
681     demoDatas[0].joinStatus = false;
682 
683     for (auto& thdout : threadsout) {
684         thdout.join();
685     }
686 
687     for (auto& thdin : threadsin) {
688         thdin.join();
689     }
690 
691     while (!DemoThreadData::shareQueue.IsEmpty()) {
692         demoDatas[0].GetAndOneTaskDone();
693     }
694     joinThread.join();
695     ASSERT_TRUE(demoDatas[0].joinStatus);
696     demoDatas[0].joinStatus = false;
697 }
698 }  // namespace
699 }  // namespace OHOS