1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "spmc_queue.h"
17 #include <cstdlib>
18 #include "dfx/log/ffrt_log_api.h"
19 
20 namespace ffrt {
~SpmcQueue()21 SpmcQueue::~SpmcQueue()
22 {
23     if (buf_ != nullptr) {
24         free(buf_);
25         buf_ = nullptr;
26     }
27 }
28 
Init(unsigned int capacity)29 int SpmcQueue::Init(unsigned int capacity)
30 {
31     if (capacity == 0) {
32         return -1;
33     }
34 
35     buf_ = reinterpret_cast<void**>(malloc(capacity * sizeof(void*)));
36     if (buf_ == nullptr) {
37         FFRT_LOGE("Queue malloc failed, size: %u", capacity * sizeof(void*));
38         return -1;
39     }
40 
41     capacity_ = capacity;
42     return 0;
43 }
44 
GetLength() const45 unsigned int SpmcQueue::GetLength() const
46 {
47     return tail_.load() - head_.load();
48 }
49 
GetCapacity() const50 unsigned int SpmcQueue::GetCapacity() const
51 {
52     return capacity_;
53 }
54 
PopHead()55 void* SpmcQueue::PopHead()
56 {
57     if (buf_ == nullptr) {
58         return nullptr;
59     }
60 
61     while (true) {
62         unsigned int head = head_.load();
63         unsigned int tail = tail_.load();
64         if (tail == head) {
65             return nullptr;
66         }
67 
68         void* res = buf_[head % capacity_];
69         if (atomic_compare_exchange_weak(&head_, &head, head + 1)) {
70             return res;
71         }
72     }
73 }
74 
PushTail(void * object)75 int SpmcQueue::PushTail(void* object)
76 {
77     if (buf_ == nullptr) {
78         return -1;
79     }
80 
81     unsigned int head = head_.load();
82     unsigned int tail = tail_.load();
83     if ((tail - head) < capacity_) {
84         buf_[tail % capacity_] = object;
85         tail_.store(tail + 1);
86         return 0;
87     }
88 
89     return -1;
90 }
91 
PopHeadToAnotherQueue(SpmcQueue & dstQueue,unsigned int elementNum,int qos,PushFunc func)92 unsigned int SpmcQueue::PopHeadToAnotherQueue(SpmcQueue& dstQueue, unsigned int elementNum, int qos, PushFunc func)
93 {
94     if (elementNum == 0) {
95         return 0;
96     }
97 
98     unsigned int pushCount = 0;
99     while ((dstQueue.GetLength() < dstQueue.GetCapacity()) && (head_.load() != tail_.load())) {
100         void* element = PopHead();
101         if (element == nullptr) {
102             break;
103         }
104 
105         int ret = dstQueue.PushTail(element);
106         if (ret != 0) {
107             func(element, qos);
108             return pushCount;
109         }
110 
111         if (++pushCount == elementNum) {
112             break;
113         }
114     }
115 
116     return pushCount;
117 }
118 }