1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "safe_block_queue.h"
17
18 #include <array>
19 #include <future>
20 #include <gtest/gtest.h>
21 #include <iostream>
22 #include <thread>
23
24 #include <iostream>
25
26 #include <chrono> // std::chrono::seconds
27 #include <iostream> // std::cout
28 #include <thread> // std::thread, std::this_thread::sleep_for
29 using namespace testing::ext;
30 using namespace std;
31
32 namespace OHOS {
33 namespace {
34 class UtilsSafeBlockQueue : public testing::Test {
35 };
36
37 const unsigned int QUEUE_SLOTS = 10;
38 const unsigned int THREAD_NUM = QUEUE_SLOTS + 1;
39
40 class DemoThreadData {
41 public:
DemoThreadData()42 DemoThreadData()
43 {
44 putStatus = false;
45 getStatus = false;
46 }
47 bool putStatus;
48 bool getStatus;
49 static SafeBlockQueue<int> shareQueue;
50
Put(int j)51 void Put(int j)
52 {
53 shareQueue.Push(j);
54 putStatus = true;
55 }
56
Get()57 void Get()
58 {
59 shareQueue.Pop();
60 getStatus = true;
61 }
62 };
63
64 SafeBlockQueue<int> DemoThreadData::shareQueue(QUEUE_SLOTS);
65
PutHandleThreadData(DemoThreadData & q,int i)66 void PutHandleThreadData(DemoThreadData& q, int i)
67 {
68 q.Put(i);
69 }
70
GetThreadDatePushedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & pushedIn,unsigned int & unpushedIn)71 void GetThreadDatePushedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas, unsigned int& pushedIn,
72 unsigned int& unpushedIn)
73 {
74 pushedIn = 0;
75 unpushedIn = 0;
76 for (auto& t : demoDatas) {
77 if (t.putStatus) {
78 pushedIn++;
79 }
80 else {
81 unpushedIn++;
82 }
83 }
84 }
85
GetThreadDateGetedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & getedOut,unsigned int & ungetedOut)86 void GetThreadDateGetedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
87 unsigned int& getedOut, unsigned int& ungetedOut)
88 {
89 getedOut = 0;
90 ungetedOut = 0;
91 for (auto& t : demoDatas) {
92 if (t.getStatus) {
93 getedOut++;
94 }
95 else {
96 ungetedOut++;
97 }
98 }
99 }
100
PutHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)101 void PutHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
102 {
103 cout << "thread-" << std::this_thread::get_id() << " run time: "
104 << std::chrono::system_clock::to_time_t(absTime) << endl;
105 std::this_thread::sleep_until(absTime);
106
107 q.Put(i);
108 }
109
GetHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)110 void GetHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
111 {
112 cout << "thread-" << std::this_thread::get_id() << " run time: "
113 << std::chrono::system_clock::to_time_t(absTime) << endl;
114 std::this_thread::sleep_until(absTime);
115
116 q.Get();
117 }
118
119 /*
120 * @tc.name: testPut001
121 * @tc.desc: Single-threaded call put and get to determine that the normal scenario
122 */
123 HWTEST_F(UtilsSafeBlockQueue, testPut001, TestSize.Level0)
124 {
125 SafeBlockQueue<int> qi(10);
126 int i = 1;
127 qi.Push(i);
128 EXPECT_EQ(static_cast<unsigned>(1), qi.Size());
129 }
130
131 /*
132 * @tc.name: testGet001
133 * @tc.desc: Single-threaded call put and get to determine that the normal scenario
134 */
135 HWTEST_F(UtilsSafeBlockQueue, testGet001, TestSize.Level0)
136 {
137 SafeBlockQueue<int> qi(10);
138 for (int i = 0; i < 3; i++) {
139 qi.Push(i);
140 }
141 EXPECT_EQ(static_cast<unsigned>(3), qi.Size());
142 int t = qi.Pop();
143 ASSERT_EQ(t, 0);
144 }
145
ThreadsJoin(std::thread (& threads)[THREAD_NUM])146 static void ThreadsJoin(std::thread (&threads)[THREAD_NUM])
147 {
148 for (auto& t : threads) {
149 t.join();
150 }
151 }
152
CheckFullQueueStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & pushedIn,unsigned int & unpushedIn,unsigned int & getedOut,unsigned int & ungetedOut)153 static void CheckFullQueueStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas, unsigned int& pushedIn,
154 unsigned int& unpushedIn, unsigned int& getedOut, unsigned int& ungetedOut)
155 {
156 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
157 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
158 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
159 ASSERT_EQ(pushedIn, THREAD_NUM);
160 ASSERT_EQ(getedOut, THREAD_NUM - QUEUE_SLOTS);
161 }
162
163 /*
164 * @tc.name: testMutilthreadPutAndBlock001
165 * @tc.desc: Multiple threads put until blocking runs, one thread gets, all threads finish running normally
166 */
167 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadPutAndBlock001, TestSize.Level0)
168 {
169 std::thread threads[THREAD_NUM];
170
171 std::array<DemoThreadData, THREAD_NUM> demoDatas;
172 demoDatas.fill(DemoThreadData());
173
174 for (unsigned int i = 0; i < THREAD_NUM; i++) {
175 threads[i] = std::thread(PutHandleThreadData, std::ref(demoDatas[i]), i);
176 }
177
178 // 1. queue is full and some threads is blocked
179 std::this_thread::sleep_for(std::chrono::seconds(2));
180 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
181
182 unsigned int pushedIn = 0, unpushedIn = 0, getedOut = 0, ungetedOut = 0;
183 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
184
185 ASSERT_EQ(pushedIn, QUEUE_SLOTS);
186 ASSERT_EQ(unpushedIn, THREAD_NUM - QUEUE_SLOTS);
187
188 // 2. get one out and wait some put in
189 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
190 demoDatas[0].Get();
191 }
192
193 std::this_thread::sleep_for(std::chrono::seconds(2));
194 // queue is full and some threads is blocked and is not joined
195 CheckFullQueueStatus(demoDatas, pushedIn, unpushedIn, getedOut, ungetedOut);
196 ThreadsJoin(threads);
197
198 while (!DemoThreadData::shareQueue.IsEmpty()) {
199 demoDatas[0].Get();
200 }
201
202 // here means all thread end ok or if some operation blocked and the testcase blocked
203 }
204
GetTimeAddTwoSeconds()205 static std::time_t GetTimeAddTwoSeconds()
206 {
207 using std::chrono::system_clock;
208 std::time_t timeT = system_clock::to_time_t(system_clock::now());
209 cout << "start time: " << timeT << endl;
210 const int twoSec = 2;
211 timeT += twoSec;
212 return timeT;
213 }
214
215 /*
216 * @tc.name: testMutilthreadConcurrentPutAndBlockInblankqueue001
217 * @tc.desc: Multi-threaded put() on the empty queue. When n threads are waiting to reach a certain
218 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
219 */
220 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentPutAndBlockInblankqueue001, TestSize.Level0)
221 {
222 // 1. prepare
223 std::thread threads[THREAD_NUM];
224 std::array<DemoThreadData, THREAD_NUM> demoDatas;
225 demoDatas.fill(DemoThreadData());
226
227 using std::chrono::system_clock;
228
229 std::time_t timeT = GetTimeAddTwoSeconds();
230 // 2. start thread
231 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
232 for (unsigned int i = 0; i < THREAD_NUM; i++) {
233 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
234 }
235
236 // 1. queue is full and some threads is blocked
237 std::this_thread::sleep_for(std::chrono::seconds(4));
238 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
239
240 unsigned int pushedIn = 0;
241 unsigned int unpushedIn = 0;
242 unsigned int getedOut = 0;
243 unsigned int ungetedOut = 0;
244 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
245
246 ASSERT_EQ(pushedIn, QUEUE_SLOTS);
247 ASSERT_EQ(unpushedIn, THREAD_NUM - QUEUE_SLOTS);
248
249 // 2. get one out and wait some put in
250 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
251 demoDatas[0].Get();
252 }
253
254 std::this_thread::sleep_for(std::chrono::seconds(2));
255 // queue is full and some threads is blocked and is not joined
256 CheckFullQueueStatus(demoDatas, pushedIn, unpushedIn, getedOut, ungetedOut);
257 ThreadsJoin(threads);
258
259 while (!DemoThreadData::shareQueue.IsEmpty()) {
260 demoDatas[0].Get();
261 }
262 // here means all thread end ok or if some operation blocked and the testcase blocked
263 }
264
QueuePushInfull()265 static void QueuePushInfull()
266 {
267 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
268 int t = i;
269 DemoThreadData::shareQueue.Push(t);
270 }
271 }
272
QueuePushInnotfull(const unsigned int remain)273 static void QueuePushInnotfull(const unsigned int remain)
274 {
275 for (unsigned int i = 0; i < QUEUE_SLOTS - remain; i++) {
276 int t = i;
277 DemoThreadData::shareQueue.Push(t);
278 }
279 }
280
281 /*
282 * @tc.name: testMutilthreadConcurrentPutAndBlockInfullqueue001
283 * @tc.desc: Multi-threaded put() on the full queue. When n threads are waiting to reach a certain
284 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
285 */
286 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentPutAndBlockInfullqueue001, TestSize.Level0)
287 {
288 // 1. prepare
289 std::thread threads[THREAD_NUM];
290 std::array<DemoThreadData, THREAD_NUM> demoDatas;
291 demoDatas.fill(DemoThreadData());
292
293 using std::chrono::system_clock;
294
295 std::time_t timeT = GetTimeAddTwoSeconds();
296 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
297 QueuePushInfull();
298 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
299
300 // 2. start thread put in full queue
301 for (unsigned int i = 0; i < THREAD_NUM; i++) {
302 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
303 }
304
305 std::this_thread::sleep_for(std::chrono::seconds(3));
306 // 3. now thread is running and all is blocked
307 unsigned int pushedIn = 0;
308 unsigned int unpushedIn = 0;
309 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
310 ASSERT_EQ(pushedIn, static_cast<unsigned int>(0));
311 ASSERT_EQ(unpushedIn, THREAD_NUM);
312 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
313 for (unsigned int i = 0; i < THREAD_NUM; i++) {
314 cout << "get out one and then the queue is full" << endl;
315 DemoThreadData::shareQueue.Pop();
316 std::this_thread::sleep_for(std::chrono::seconds(1));
317 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
318 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
319 ASSERT_EQ(pushedIn, i + 1);
320 ASSERT_EQ(unpushedIn, THREAD_NUM - (i + 1));
321 }
322
323 ThreadsJoin(threads);
324 while (!DemoThreadData::shareQueue.IsEmpty()) {
325 demoDatas[0].Get();
326 }
327 }
328
329 /*
330 * @tc.name: testMutilthreadConcurrentGetAndBlockInblankqueue001
331 * @tc.desc: Multi-threaded get() on the empty queue. When n threads are waiting to reach a certain
332 * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
333 */
334 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndBlockInblankqueue001, TestSize.Level0)
335 {
336 // 1. prepare
337 std::thread threads[THREAD_NUM];
338 std::array<DemoThreadData, THREAD_NUM> demoDatas;
339 demoDatas.fill(DemoThreadData());
340
341 using std::chrono::system_clock;
342
343 std::time_t timeT = GetTimeAddTwoSeconds();
344 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
345
346 // 2. start thread put in empty queue
347 for (unsigned int i = 0; i < THREAD_NUM; i++) {
348 threads[i] = std::thread(GetHandleThreadDataTime,
349 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
350 }
351 std::this_thread::sleep_for(std::chrono::seconds(3));
352
353 // 3. now thread is running and all is blocked
354 unsigned int getedOut = 0;
355 unsigned int ungetedOut = 0;
356 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
357 ASSERT_EQ(getedOut, static_cast<unsigned int>(0));
358 ASSERT_EQ(ungetedOut, THREAD_NUM);
359 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
360
361 int value = 1;
362
363 for (unsigned int i = 0; i < THREAD_NUM; i++) {
364 cout << "put in one and then the queue is empty" << endl;
365 DemoThreadData::shareQueue.Push(value);
366 std::this_thread::sleep_for(std::chrono::seconds(1));
367 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
368 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
369 ASSERT_EQ(getedOut, i + 1);
370 ASSERT_EQ(ungetedOut, THREAD_NUM - (i + 1));
371 }
372
373 ThreadsJoin(threads);
374 while (!DemoThreadData::shareQueue.IsEmpty()) {
375 demoDatas[0].Get();
376 }
377 }
378 /*
379 * @tc.name: testMutilthreadConcurrentGetAndBlockInfullqueue001
380 * @tc.desc: Multi-threaded get() on the full queue. When n threads are waiting to reach a certain
381 * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
382 */
383 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndBlockInfullqueue001, TestSize.Level0)
384 {
385 // 1. prepare
386 std::thread threads[THREAD_NUM];
387 std::array<DemoThreadData, THREAD_NUM> demoDatas;
388 demoDatas.fill(DemoThreadData());
389
390 using std::chrono::system_clock;
391
392 std::time_t timeT = GetTimeAddTwoSeconds();
393 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
394 int t = 1;
395 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
396 DemoThreadData::shareQueue.Push(t);
397 }
398 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
399
400 // 2. start thread put in full queue
401 for (unsigned int i = 0; i < THREAD_NUM; i++) {
402 threads[i] = std::thread(GetHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
403 }
404
405 std::this_thread::sleep_for(std::chrono::seconds(4));
406 // 3. start judge
407 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
408
409 unsigned int getedOut = 0;
410 unsigned int ungetedOut = 0;
411 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
412
413 ASSERT_EQ(getedOut, QUEUE_SLOTS);
414 ASSERT_EQ(ungetedOut, THREAD_NUM - QUEUE_SLOTS);
415
416 // 2. put one in and wait some get out
417 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
418 demoDatas[0].Put(t);
419 }
420
421 std::this_thread::sleep_for(std::chrono::seconds(2));
422 // queue is full and some threads is blocked and is not joined
423 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
424 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
425 ASSERT_EQ(getedOut, THREAD_NUM);
426 ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
427
428 ThreadsJoin(threads);
429 while (!DemoThreadData::shareQueue.IsEmpty()) {
430 demoDatas[0].Get();
431 }
432 }
433
434 /*
435 * @tc.name: testMutilthreadConcurrentGetAndBlockInnotfullqueue001
436 * @tc.desc: Multi-threaded get() on the notfull queue. When n threads are waiting to reach a certain
437 * time-point, everyone get concurrent to see the status of the queue and the state of the thread.
438 */
439 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndBlockInnotfullqueue001, TestSize.Level0)
440 {
441 // 1. prepare
442 std::thread threads[THREAD_NUM];
443 std::array<DemoThreadData, THREAD_NUM> demoDatas;
444 demoDatas.fill(DemoThreadData());
445
446 using std::chrono::system_clock;
447
448 const unsigned int REMAIN_SLOTS = 5;
449 std::time_t timeT = GetTimeAddTwoSeconds();
450 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
451 QueuePushInnotfull(REMAIN_SLOTS);
452
453 // 2. start thread put in not full queue
454 for (unsigned int i = 0; i < THREAD_NUM; i++) {
455 threads[i] = std::thread(GetHandleThreadDataTime,
456 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
457 }
458
459 std::this_thread::sleep_for(std::chrono::seconds(3));
460
461 unsigned int getedOut = 0;
462 unsigned int ungetedOut = 0;
463 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
464 ASSERT_EQ(getedOut, QUEUE_SLOTS - REMAIN_SLOTS);
465 ASSERT_EQ(ungetedOut, THREAD_NUM - (QUEUE_SLOTS - REMAIN_SLOTS));
466
467 // 3. put ungetedOut
468 for (unsigned int i = 0; i < ungetedOut; i++) {
469 int t = i;
470 DemoThreadData::shareQueue.Push(t);
471 }
472
473 std::this_thread::sleep_for(std::chrono::seconds(1));
474 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
475 ASSERT_EQ(getedOut, THREAD_NUM);
476 ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
477
478 ThreadsJoin(threads);
479 while (!DemoThreadData::shareQueue.IsEmpty()) {
480 demoDatas[0].Get();
481 }
482 }
483
484 /*
485 * @tc.name: testMutilthreadConcurrentPutAndBlockInnotfullqueue001
486 * @tc.desc: Multi-threaded put() on the not full queue. When n threads are waiting to reach a certain
487 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
488 */
489 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentPutAndBlockInnotfullqueue001, TestSize.Level0)
490 {
491 // 1. prepare
492 std::thread threads[THREAD_NUM];
493 std::array<DemoThreadData, THREAD_NUM> demoDatas;
494 demoDatas.fill(DemoThreadData());
495
496 using std::chrono::system_clock;
497
498 const unsigned int REMAIN_SLOTS = 5;
499 std::time_t timeT = GetTimeAddTwoSeconds();
500 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
501 QueuePushInnotfull(REMAIN_SLOTS);
502
503 // 2. start thread put in not full queue
504 for (unsigned int i = 0; i < THREAD_NUM; i++) {
505 threads[i] = std::thread(PutHandleThreadDataTime,
506 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
507 }
508
509 std::this_thread::sleep_for(std::chrono::seconds(3));
510
511 unsigned int putedin = 0;
512 unsigned int unputedin = 0;
513 GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
514 ASSERT_EQ(putedin, REMAIN_SLOTS);
515 ASSERT_EQ(unputedin, THREAD_NUM - REMAIN_SLOTS);
516
517 // 3. put ungetedOut
518 for (unsigned int i = 0; i < unputedin; i++) {
519 DemoThreadData::shareQueue.Pop();
520 }
521
522 std::this_thread::sleep_for(std::chrono::seconds(1));
523 GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
524 ASSERT_EQ(putedin, THREAD_NUM);
525 ASSERT_EQ(unputedin, static_cast<unsigned int>(0));
526
527 ThreadsJoin(threads);
528 while (!DemoThreadData::shareQueue.IsEmpty()) {
529 demoDatas[0].Get();
530 }
531 }
532
CheckQueueStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas)533 static void CheckQueueStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas)
534 {
535 unsigned int getedOut = 0;
536 unsigned int ungetedOut = 0;
537 unsigned int pushedIn = 0;
538 unsigned int unpushedIn = 0;
539 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
540 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
541
542 ASSERT_EQ(pushedIn, THREAD_NUM);
543 ASSERT_EQ(getedOut, THREAD_NUM);
544 }
545
546 /*
547 * @tc.name: testMutilthreadConcurrentGetAndPopInblankqueue001
548 * @tc.desc: Multi-threaded put() and Multi-threaded get() on the empty queue. When all threads are waiting to reach
549 * a certain time-point, everyone run concurrently to see the status of the queue and the state of the thread.
550 */
551 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndPopInblankqueue001, TestSize.Level0)
552 {
553 // 1. prepare
554 std::thread threadsout[THREAD_NUM];
555 std::array<DemoThreadData, THREAD_NUM> demoDatas;
556 demoDatas.fill(DemoThreadData());
557
558 std::thread threadsin[THREAD_NUM];
559
560 using std::chrono::system_clock;
561
562 std::time_t timeT = GetTimeAddTwoSeconds();
563 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
564
565 // 2. start thread put and get in empty queue
566
567 for (unsigned int i = 0; i < THREAD_NUM; i++) {
568 threadsout[i] = std::thread(GetHandleThreadDataTime,
569 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
570 }
571
572 for (unsigned int i = 0; i < THREAD_NUM; i++) {
573 threadsin[i] = std::thread(PutHandleThreadDataTime,
574 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
575 }
576
577 std::this_thread::sleep_for(std::chrono::seconds(3));
578
579 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
580 CheckQueueStatus(demoDatas);
581
582 ThreadsJoin(threadsout);
583 ThreadsJoin(threadsin);
584
585 while (!DemoThreadData::shareQueue.IsEmpty()) {
586 demoDatas[0].Get();
587 }
588 }
589
590 /*
591 * @tc.name: testMutilthreadConcurrentGetAndPopInfullqueue001
592 * @tc.desc: Multi-threaded put() and Multi-threaded get() on the full queue.
593 * When all threads are waiting to reach a certain
594 * time-point, everyone run concurrently to see the status of the queue and the state of the thread.
595 */
596 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndPopInfullqueue001, TestSize.Level0)
597 {
598 // 1. prepare
599 std::thread threadsout[THREAD_NUM];
600 std::array<DemoThreadData, THREAD_NUM> demoDatas;
601 demoDatas.fill(DemoThreadData());
602
603 std::thread threadsin[THREAD_NUM];
604
605 using std::chrono::system_clock;
606
607 std::time_t timeT = GetTimeAddTwoSeconds();
608 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
609 int t = 1;
610 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
611 DemoThreadData::shareQueue.Push(t);
612 }
613 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
614 // 2. start thread put in not full queue
615 for (unsigned int i = 0; i < THREAD_NUM; i++) {
616 threadsin[i] = std::thread(PutHandleThreadDataTime,
617 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
618 }
619
620 for (unsigned int i = 0; i < THREAD_NUM; i++) {
621 threadsout[i] = std::thread(GetHandleThreadDataTime,
622 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
623 }
624
625 std::this_thread::sleep_for(std::chrono::seconds(3));
626
627 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
628 CheckQueueStatus(demoDatas);
629
630 ThreadsJoin(threadsout);
631 ThreadsJoin(threadsin);
632
633 while (!DemoThreadData::shareQueue.IsEmpty()) {
634 demoDatas[0].Get();
635 }
636 }
637 } // namespace
638 } // namespace OHOS