1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "safe_block_queue.h"
16
17 #include <array>
18 #include <future>
19 #include <gtest/gtest.h>
20 #include <thread>
21 #include <iostream>
22 #include <chrono> // std::chrono::seconds
23 using namespace testing::ext;
24 using namespace std;
25
26 namespace OHOS {
27 namespace {
28 class UtilsSafeBlockQueueTracking : public testing::Test {
29 };
30
31 const unsigned int QUEUE_SLOTS = 10;
32 const unsigned int THREAD_NUM = QUEUE_SLOTS + 1;
33
34 class DemoThreadData {
35 public:
DemoThreadData()36 DemoThreadData()
37 {
38 putStatus = false;
39 getStatus = false;
40 joinStatus = false;
41 }
42
43 bool putStatus;
44 bool getStatus;
45 static SafeBlockQueueTracking<int> shareQueue;
46 static bool joinStatus;
47
Get()48 void Get()
49 {
50 shareQueue.Pop();
51 getStatus = true;
52 }
53
Put(int j)54 void Put(int j)
55 {
56 shareQueue.Push(j);
57 putStatus = true;
58 }
59
Join()60 void Join()
61 {
62 shareQueue.Join();
63 joinStatus = true;
64 }
65
GetAndOneTaskDone()66 void GetAndOneTaskDone()
67 {
68 shareQueue.Pop();
69 getStatus = true;
70 shareQueue.OneTaskDone();
71 }
72 };
73
74 SafeBlockQueueTracking<int> DemoThreadData::shareQueue(QUEUE_SLOTS);
75 bool DemoThreadData::joinStatus = false;
76
PutHandleThreadData(DemoThreadData & q,int i)77 void PutHandleThreadData(DemoThreadData& q, int i)
78 {
79 q.Put(i);
80 }
81
GetThreadDatePushedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & pushedIn,unsigned int & unpushedIn)82 void GetThreadDatePushedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
83 unsigned int& pushedIn, unsigned int& unpushedIn)
84 {
85 pushedIn = 0;
86 unpushedIn = 0;
87 for (auto& t : demoDatas) {
88 if (t.putStatus) {
89 pushedIn++;
90 } else {
91 unpushedIn++;
92 }
93 }
94 }
95
GetThreadDateGetedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & getedOut,unsigned int & ungetedOut)96 void GetThreadDateGetedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
97 unsigned int& getedOut, unsigned int& ungetedOut)
98 {
99 getedOut = 0;
100 ungetedOut = 0;
101 for (auto& t : demoDatas) {
102 if (t.getStatus) {
103 getedOut++;
104 } else {
105 ungetedOut++;
106 }
107 }
108 }
109
PutHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)110 void PutHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
111 {
112 std::this_thread::sleep_until(absTime);
113
114 q.Put(i);
115 }
116
GetAndOneTaskDoneHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)117 void GetAndOneTaskDoneHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
118 {
119 std::this_thread::sleep_until(absTime);
120
121 q.GetAndOneTaskDone();
122 }
123
124 /*
125 * @tc.name: testPut001
126 * @tc.desc: Single-threaded call put and get to determine that the normal scenario is working properly
127 */
128 HWTEST_F(UtilsSafeBlockQueueTracking, testPut001, TestSize.Level0)
129 {
130 SafeBlockQueueTracking<int> qi(10);
131 int i = 1;
132 qi.Push(i);
133 EXPECT_EQ(static_cast<unsigned>(1), qi.Size());
134 ASSERT_GT(qi.GetUnfinishTaskNum(), 0);
135 }
136
137 /*
138 * @tc.name: testGet001
139 * @tc.desc: Single-threaded call put and get to determine that the normal scenario is working properly
140 */
141 HWTEST_F(UtilsSafeBlockQueueTracking, testGet001, TestSize.Level0)
142 {
143 SafeBlockQueueTracking<int> qi(10);
144 for (int i = 0; i < 3; i++) {
145 qi.Push(i);
146 }
147 EXPECT_EQ(static_cast<unsigned>(3), qi.Size());
148 int t = qi.Pop();
149 ASSERT_EQ(t, 0);
150 ASSERT_GT(qi.GetUnfinishTaskNum(), 0);
151 }
152
153 /*
154 * @tc.name: testMutilthreadPutAndBlock001
155 * @tc.desc: Multiple threads put until blocking runs, one thread gets, all threads finish running normally
156 */
157 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadPutAndBlock001, TestSize.Level0)
158 {
159 std::thread threads[THREAD_NUM];
160
161 std::array<DemoThreadData, THREAD_NUM> demoDatas;
162 demoDatas.fill(DemoThreadData());
163 demoDatas[0].Put(1);
164 ASSERT_FALSE(demoDatas[0].joinStatus);
165
166 // start thread to join
167 DemoThreadData tmp = DemoThreadData();
168 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
169 ASSERT_FALSE(demoDatas[0].joinStatus);
170
171 for (unsigned int i = 0; i < THREAD_NUM; i++) {
172 threads[i] = std::thread(PutHandleThreadData, std::ref(demoDatas[i]), i);
173 }
174 ASSERT_FALSE(demoDatas[0].joinStatus);
175 // 1. queue is full and some threads is blocked
176 std::this_thread::sleep_for(std::chrono::seconds(2));
177 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
178
179 ASSERT_FALSE(demoDatas[0].joinStatus);
180 // 2. get one out and wait some put in
181 for (unsigned int i = 0; i < THREAD_NUM; i++) {
182 demoDatas[0].GetAndOneTaskDone();
183 ASSERT_FALSE(demoDatas[0].joinStatus);
184 }
185 ASSERT_FALSE(demoDatas[0].joinStatus);
186 demoDatas[0].GetAndOneTaskDone();
187 std::this_thread::sleep_for(std::chrono::seconds(1));
188
189 ASSERT_TRUE(demoDatas[0].joinStatus);
190
191 // recover state
192 for (auto& t : threads) {
193 t.join();
194 }
195 joinThread.join();
196
197 while (!DemoThreadData::shareQueue.IsEmpty()) {
198 demoDatas[0].GetAndOneTaskDone();
199 }
200 demoDatas[0].joinStatus = false;
201 }
202
CheckFullQueueStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & pushedIn,unsigned int & unpushedIn,unsigned int & getedOut,unsigned int & ungetedOut)203 static void CheckFullQueueStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas, unsigned int& pushedIn,
204 unsigned int& unpushedIn, unsigned int& getedOut, unsigned int& ungetedOut)
205 {
206 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
207 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
208 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
209 ASSERT_EQ(pushedIn, THREAD_NUM);
210 ASSERT_EQ(getedOut, THREAD_NUM - QUEUE_SLOTS);
211 }
212
GetTimeAddTwoSeconds()213 static std::time_t GetTimeAddTwoSeconds()
214 {
215 using std::chrono::system_clock;
216 std::time_t timeT = system_clock::to_time_t(system_clock::now());
217 const int twoSec = 2;
218 timeT += twoSec;
219 return timeT;
220 }
221
222 /*
223 * @tc.name: testMutilthreadConcurrentPutAndBlockInblankqueue001
224 * @tc.desc: Multi-threaded put() on the empty queue. When n threads are waiting to reach a certain
225 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
226 */
227 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInblankqueue001, TestSize.Level0)
228 {
229 // 1. prepare
230 std::thread threads[THREAD_NUM];
231 std::array<DemoThreadData, THREAD_NUM> demoDatas;
232 demoDatas.fill(DemoThreadData());
233
234 using std::chrono::system_clock;
235
236 std::time_t timeT = GetTimeAddTwoSeconds();
237
238 // 2. start thread
239 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
240 for (unsigned int i = 0; i < THREAD_NUM; i++) {
241 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
242 }
243
244 // 3. queue is full and some threads is blocked
245 std::this_thread::sleep_for(std::chrono::seconds(4));
246
247 // start thread to join
248 DemoThreadData tmp = DemoThreadData();
249 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
250 ASSERT_FALSE(demoDatas[0].joinStatus);
251
252 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
253 unsigned int pushedIn = 0;
254 unsigned int unpushedIn = 0;
255 unsigned int getedOut = 0;
256 unsigned int ungetedOut = 0;
257 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
258
259 ASSERT_EQ(pushedIn, QUEUE_SLOTS);
260 ASSERT_EQ(unpushedIn, THREAD_NUM - QUEUE_SLOTS);
261
262 // get one out and wait some put in
263 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
264 demoDatas[0].GetAndOneTaskDone();
265 ASSERT_FALSE(demoDatas[0].joinStatus);
266 }
267
268 std::this_thread::sleep_for(std::chrono::seconds(2));
269 // queue is full and some threads is blocked and is not joined
270 CheckFullQueueStatus(demoDatas, pushedIn, unpushedIn, getedOut, ungetedOut);
271
272 for (auto& t : threads) {
273 t.join();
274 }
275
276 while (!DemoThreadData::shareQueue.IsEmpty()) {
277 demoDatas[0].GetAndOneTaskDone();
278 }
279
280 std::this_thread::sleep_for(std::chrono::seconds(1));
281 ASSERT_TRUE(demoDatas[0].joinStatus);
282 demoDatas[0].joinStatus = false;
283 joinThread.join();
284 }
285
QueuePushInfull()286 static void QueuePushInfull()
287 {
288 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
289 int t = i;
290 DemoThreadData::shareQueue.Push(t);
291 }
292 }
293
QueuePushInnotfull(const unsigned int remain)294 static void QueuePushInnotfull(const unsigned int remain)
295 {
296 for (unsigned int i = 0; i < QUEUE_SLOTS - remain; i++) {
297 int t = i;
298 DemoThreadData::shareQueue.Push(t);
299 }
300 }
301
302 /*
303 * @tc.name: testMutilthreadConcurrentPutAndBlockInfullqueue001
304 * @tc.desc: Multi-threaded put() on the full queue. When n threads are waiting to reach a certain
305 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
306 */
307 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInfullqueue001, TestSize.Level0)
308 {
309 // 1. prepare
310 std::thread threads[THREAD_NUM];
311 std::array<DemoThreadData, THREAD_NUM> demoDatas;
312 demoDatas.fill(DemoThreadData());
313
314 using std::chrono::system_clock;
315
316 std::time_t timeT = GetTimeAddTwoSeconds();
317 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
318 QueuePushInfull();
319 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
320
321 // 2. start thread put in full queue
322 for (unsigned int i = 0; i < THREAD_NUM; i++) {
323 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
324 }
325
326 std::this_thread::sleep_for(std::chrono::seconds(3));
327
328 // 3. now thread is running and all is blocked
329 DemoThreadData tmp = DemoThreadData();
330 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
331 ASSERT_FALSE(demoDatas[0].joinStatus);
332
333 unsigned int pushedIn = 0;
334 unsigned int unpushedIn = 0;
335 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
336 ASSERT_EQ(pushedIn, static_cast<unsigned int>(0));
337 ASSERT_EQ(unpushedIn, THREAD_NUM);
338 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
339 for (unsigned int i = 0; i < THREAD_NUM; i++) {
340 DemoThreadData::shareQueue.Pop();
341 DemoThreadData::shareQueue.OneTaskDone();
342
343 std::this_thread::sleep_for(std::chrono::seconds(1));
344 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
345 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
346 ASSERT_EQ(pushedIn, i + 1);
347 ASSERT_EQ(unpushedIn, THREAD_NUM - (i + 1));
348 }
349
350 for (auto& t : threads) {
351 t.join();
352 }
353
354 while (!DemoThreadData::shareQueue.IsEmpty()) {
355 demoDatas[0].GetAndOneTaskDone();
356 }
357 demoDatas[0].joinStatus = false;
358 joinThread.join();
359 }
360
361 /*
362 * @tc.name: testMutilthreadConcurrentGetAndBlockInblankqueue001
363 * @tc.desc: Multi-threaded get() on the empty queue. When n threads are waiting to reach a certain
364 * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
365 */
366 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInblankqueue001, TestSize.Level0)
367 {
368 // 1. prepare
369 std::thread threads[THREAD_NUM];
370 std::array<DemoThreadData, THREAD_NUM> demoDatas;
371 demoDatas.fill(DemoThreadData());
372
373 using std::chrono::system_clock;
374
375 std::time_t timeT = GetTimeAddTwoSeconds();
376 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
377
378 // 2. start thread put in empty queue
379 for (unsigned int i = 0; i < THREAD_NUM; i++) {
380 threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
381 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
382 }
383 std::this_thread::sleep_for(std::chrono::seconds(3));
384
385 // 3. now thread is running and all is blocked
386 unsigned int getedOut = 0;
387 unsigned int ungetedOut = 0;
388 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
389 ASSERT_EQ(getedOut, static_cast<unsigned int>(0));
390 ASSERT_EQ(ungetedOut, THREAD_NUM);
391 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
392
393 // start thread to join
394 DemoThreadData tmp = DemoThreadData();
395 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
396 ASSERT_FALSE(demoDatas[0].joinStatus);
397
398 int value = 1;
399 for (unsigned int i = 0; i < THREAD_NUM; i++) {
400 DemoThreadData::shareQueue.Push(value);
401 std::this_thread::sleep_for(std::chrono::seconds(1));
402 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
403 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
404 ASSERT_EQ(getedOut, i + 1);
405 ASSERT_EQ(ungetedOut, THREAD_NUM - (i + 1));
406 }
407
408 for (auto& t : threads) {
409 t.join();
410 }
411
412 while (!DemoThreadData::shareQueue.IsEmpty()) {
413 demoDatas[0].GetAndOneTaskDone();
414 }
415
416 ASSERT_TRUE(demoDatas[0].joinStatus);
417 demoDatas[0].joinStatus = false;
418 joinThread.join();
419 }
420
QueuePushFullEquivalent(const int equivalent)421 static void QueuePushFullEquivalent(const int equivalent)
422 {
423 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
424 DemoThreadData::shareQueue.Push(equivalent);
425 }
426 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
427 }
428
429 /*
430 * @tc.name: testMutilthreadConcurrentGetAndBlockInfullqueue001
431 * @tc.desc: Multi-threaded get() on the full queue. When n threads are waiting to reach a certain
432 * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
433 */
434 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInfullqueue001, TestSize.Level0)
435 {
436 // 1. prepare
437 std::thread threads[THREAD_NUM];
438 std::array<DemoThreadData, THREAD_NUM> demoDatas;
439 demoDatas.fill(DemoThreadData());
440
441 using std::chrono::system_clock;
442
443 std::time_t timeT = GetTimeAddTwoSeconds();
444 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
445 int t = 1;
446 QueuePushFullEquivalent(t);
447
448 // start thread to join
449 demoDatas[0].joinStatus = false;
450 DemoThreadData tmp = DemoThreadData();
451 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
452 ASSERT_FALSE(demoDatas[0].joinStatus);
453
454 // 2. start thread get in full queue
455 for (unsigned int i = 0; i < THREAD_NUM; i++) {
456 threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
457 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
458 }
459
460 std::this_thread::sleep_for(std::chrono::seconds(4));
461
462 // 3. start judge
463 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
464 ASSERT_TRUE(demoDatas[0].joinStatus);
465
466 unsigned int getedOut = 0, ungetedOut = 0;
467 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
468
469 ASSERT_EQ(getedOut, QUEUE_SLOTS);
470 ASSERT_EQ(ungetedOut, THREAD_NUM - QUEUE_SLOTS);
471
472 // put one in and wait some get out
473 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
474 demoDatas[0].Put(t);
475 }
476
477 demoDatas[0].joinStatus = false;
478 DemoThreadData tmp2 = DemoThreadData();
479 std::thread joinThread2 = std::thread(&DemoThreadData::Join, tmp2);
480
481 std::this_thread::sleep_for(std::chrono::seconds(2));
482
483 // queue is full and some threads is blocked and is not joined
484 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
485 ASSERT_TRUE(demoDatas[0].joinStatus);
486
487 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
488 ASSERT_EQ(getedOut, THREAD_NUM);
489 ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
490
491 for (auto& thd : threads) {
492 thd.join();
493 }
494
495 while (!DemoThreadData::shareQueue.IsEmpty()) {
496 demoDatas[0].GetAndOneTaskDone();
497 }
498
499 demoDatas[0].joinStatus = false;
500 joinThread.join();
501 joinThread2.join();
502 }
503
504 /*
505 * @tc.name: testMutilthreadConcurrentGetAndBlockInnotfullqueue001
506 * @tc.desc: Multi-threaded get() on the notfull queue. When n threads are waiting to reach a certain
507 * time-point, everyone get concurrent to see the status of the queue and the state of the thread.
508 */
509 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInnotfullqueue001, TestSize.Level0)
510 {
511 // 1. prepare
512 std::thread threads[THREAD_NUM];
513 std::array<DemoThreadData, THREAD_NUM> demoDatas;
514 demoDatas.fill(DemoThreadData());
515
516 using std::chrono::system_clock;
517
518 const unsigned int REMAIN_SLOTS = 5;
519 std::time_t timeT = GetTimeAddTwoSeconds();
520 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
521 QueuePushInnotfull(REMAIN_SLOTS);
522
523 // start thread to join
524 demoDatas[0].joinStatus = false;
525 DemoThreadData tmp = DemoThreadData();
526 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
527 ASSERT_FALSE(demoDatas[0].joinStatus);
528
529 // 2. start thread put in not full queue
530 for (unsigned int i = 0; i < THREAD_NUM; i++) {
531 threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
532 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
533 }
534
535 std::this_thread::sleep_for(std::chrono::seconds(3));
536
537 unsigned int getedOut = 0;
538 unsigned int ungetedOut = 0;
539 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
540 ASSERT_EQ(getedOut, QUEUE_SLOTS - REMAIN_SLOTS);
541 ASSERT_EQ(ungetedOut, THREAD_NUM - (QUEUE_SLOTS - REMAIN_SLOTS));
542 ASSERT_TRUE(demoDatas[0].joinStatus);
543 // 3. put ungetedOut
544 for (unsigned int i = 0; i < ungetedOut; i++) {
545 int t = i;
546 DemoThreadData::shareQueue.Push(t);
547 }
548 demoDatas[0].joinStatus = false;
549 DemoThreadData tmp2 = DemoThreadData();
550 std::thread joinThread2 = std::thread(&DemoThreadData::Join, tmp2);
551
552 std::this_thread::sleep_for(std::chrono::seconds(1));
553 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
554 ASSERT_EQ(getedOut, THREAD_NUM);
555 ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
556
557 for (auto& t : threads) {
558 t.join();
559 }
560
561 ASSERT_TRUE(demoDatas[0].joinStatus);
562 demoDatas[0].joinStatus = false;
563 joinThread.join();
564 joinThread2.join();
565 }
566
567 /*
568 * @tc.name: testMutilthreadConcurrentPutAndBlockInnotfullqueue001
569 * @tc.desc: Multi-threaded put() on the not full queue. When n threads are waiting to reach a certain
570 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
571 */
572 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInnotfullqueue001, TestSize.Level0)
573 {
574 // 1. prepare
575 std::array<DemoThreadData, THREAD_NUM> demoDatas;
576 std::thread threads[THREAD_NUM];
577 demoDatas.fill(DemoThreadData());
578
579 using std::chrono::system_clock;
580
581 const unsigned int REMAIN_SLOTS = 5;
582 std::time_t timeT = GetTimeAddTwoSeconds();
583 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
584 QueuePushInnotfull(REMAIN_SLOTS);
585
586 // start thread to join
587 demoDatas[0].joinStatus = false;
588 DemoThreadData tmp = DemoThreadData();
589 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
590 ASSERT_FALSE(demoDatas[0].joinStatus);
591
592 // 2. start thread put in not full queue
593 for (unsigned int i = 0; i < THREAD_NUM; i++) {
594 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
595 }
596 ASSERT_FALSE(demoDatas[0].joinStatus);
597 std::this_thread::sleep_for(std::chrono::seconds(3));
598 ASSERT_FALSE(demoDatas[0].joinStatus);
599 unsigned int putedin = 0;
600 unsigned int unputedin = 0;
601 GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
602 ASSERT_EQ(putedin, REMAIN_SLOTS);
603 ASSERT_EQ(unputedin, THREAD_NUM - REMAIN_SLOTS);
604
605 // 3. put ungetedOut
606 for (unsigned int i = 0; i < unputedin; i++) {
607 DemoThreadData::shareQueue.Pop();
608 DemoThreadData::shareQueue.OneTaskDone();
609 }
610
611 std::this_thread::sleep_for(std::chrono::seconds(1));
612 GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
613 ASSERT_EQ(putedin, THREAD_NUM);
614 ASSERT_EQ(unputedin, static_cast<unsigned int>(0));
615
616 ASSERT_FALSE(demoDatas[0].joinStatus);
617
618 for (auto& t : threads) {
619 t.join();
620 }
621
622 while (!DemoThreadData::shareQueue.IsEmpty()) {
623 demoDatas[0].GetAndOneTaskDone();
624 }
625
626 std::this_thread::sleep_for(std::chrono::seconds(1));
627 ASSERT_TRUE(demoDatas[0].joinStatus);
628 joinThread.join();
629 demoDatas[0].joinStatus = false;
630 }
631
632 /*
633 * @tc.name: testMutilthreadConcurrentGetAndPopInfullqueue001
634 * @tc.desc: Multi-threaded put() and Multi-threaded get() on the full queue. When all threads are waiting to reach a
635 * certain time-point, everyone run concurrently to see the status of the queue and the state of the thread.
636 */
637 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndPopInfullqueue001, TestSize.Level0)
638 {
639 // 1. prepare
640 std::thread threadsout[THREAD_NUM];
641 std::array<DemoThreadData, THREAD_NUM> demoDatas;
642 demoDatas.fill(DemoThreadData());
643
644 std::thread threadsin[THREAD_NUM];
645
646 using std::chrono::system_clock;
647
648 std::time_t timeT = GetTimeAddTwoSeconds();
649 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
650 int t = 1;
651 QueuePushFullEquivalent(t);
652
653 // start thread to join
654 demoDatas[0].joinStatus = false;
655 DemoThreadData tmp = DemoThreadData();
656 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
657 ASSERT_FALSE(demoDatas[0].joinStatus);
658
659 // 2. start thread put in not full queue
660 for (unsigned int i = 0; i < THREAD_NUM; i++) {
661 threadsin[i] = std::thread(PutHandleThreadDataTime,
662 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
663 }
664
665 for (unsigned int i = 0; i < THREAD_NUM; i++) {
666 threadsout[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
667 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
668 }
669
670 std::this_thread::sleep_for(std::chrono::seconds(3));
671
672 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
673 unsigned int getedOut = 0, ungetedOut = 0, pushedIn = 0, unpushedIn = 0;
674 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
675 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
676
677 ASSERT_EQ(pushedIn, THREAD_NUM);
678 ASSERT_EQ(getedOut, THREAD_NUM);
679
680 ASSERT_FALSE(demoDatas[0].joinStatus);
681 demoDatas[0].joinStatus = false;
682
683 for (auto& thdout : threadsout) {
684 thdout.join();
685 }
686
687 for (auto& thdin : threadsin) {
688 thdin.join();
689 }
690
691 while (!DemoThreadData::shareQueue.IsEmpty()) {
692 demoDatas[0].GetAndOneTaskDone();
693 }
694 joinThread.join();
695 ASSERT_TRUE(demoDatas[0].joinStatus);
696 demoDatas[0].joinStatus = false;
697 }
698 } // namespace
699 } // namespace OHOS