1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "spmc_queue.h"
17 #include <cstdlib>
18 #include "dfx/log/ffrt_log_api.h"
19
20 namespace ffrt {
~SpmcQueue()21 SpmcQueue::~SpmcQueue()
22 {
23 if (buf_ != nullptr) {
24 free(buf_);
25 buf_ = nullptr;
26 }
27 }
28
Init(unsigned int capacity)29 int SpmcQueue::Init(unsigned int capacity)
30 {
31 if (capacity == 0) {
32 return -1;
33 }
34
35 buf_ = reinterpret_cast<void**>(malloc(capacity * sizeof(void*)));
36 if (buf_ == nullptr) {
37 FFRT_LOGE("Queue malloc failed, size: %u", capacity * sizeof(void*));
38 return -1;
39 }
40
41 capacity_ = capacity;
42 return 0;
43 }
44
GetLength() const45 unsigned int SpmcQueue::GetLength() const
46 {
47 return tail_.load() - head_.load();
48 }
49
GetCapacity() const50 unsigned int SpmcQueue::GetCapacity() const
51 {
52 return capacity_;
53 }
54
PopHead()55 void* SpmcQueue::PopHead()
56 {
57 if (buf_ == nullptr) {
58 return nullptr;
59 }
60
61 while (true) {
62 unsigned int head = head_.load();
63 unsigned int tail = tail_.load();
64 if (tail == head) {
65 return nullptr;
66 }
67
68 void* res = buf_[head % capacity_];
69 if (atomic_compare_exchange_weak(&head_, &head, head + 1)) {
70 return res;
71 }
72 }
73 }
74
PushTail(void * object)75 int SpmcQueue::PushTail(void* object)
76 {
77 if (buf_ == nullptr) {
78 return -1;
79 }
80
81 unsigned int head = head_.load();
82 unsigned int tail = tail_.load();
83 if ((tail - head) < capacity_) {
84 buf_[tail % capacity_] = object;
85 tail_.store(tail + 1);
86 return 0;
87 }
88
89 return -1;
90 }
91
PopHeadToAnotherQueue(SpmcQueue & dstQueue,unsigned int elementNum,int qos,PushFunc func)92 unsigned int SpmcQueue::PopHeadToAnotherQueue(SpmcQueue& dstQueue, unsigned int elementNum, int qos, PushFunc func)
93 {
94 if (elementNum == 0) {
95 return 0;
96 }
97
98 unsigned int pushCount = 0;
99 while ((dstQueue.GetLength() < dstQueue.GetCapacity()) && (head_.load() != tail_.load())) {
100 void* element = PopHead();
101 if (element == nullptr) {
102 break;
103 }
104
105 int ret = dstQueue.PushTail(element);
106 if (ret != 0) {
107 func(element, qos);
108 return pushCount;
109 }
110
111 if (++pushCount == elementNum) {
112 break;
113 }
114 }
115
116 return pushCount;
117 }
118 }