1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <iostream>
16 #include <cstring>
17 #include <string>
18 #include <thread>
19
20 #include "log.h"
21 #include "securec.h"
22 #include "ring_buffer.h"
23
24 namespace Updater {
~RingBuffer()25 RingBuffer::~RingBuffer()
26 {
27 Release();
28 }
29
Init(uint32_t singleSize,uint32_t num)30 bool RingBuffer::Init(uint32_t singleSize, uint32_t num)
31 {
32 if (singleSize == 0 || num == 0 || (num & (num - 1)) != 0) { // power of 2
33 LOG(ERROR) << "singleSize:" << singleSize << " num:" << num << " error";
34 return false;
35 }
36 bufArray_ = new (std::nothrow) uint8_t* [num] {};
37 lenArray_ = new (std::nothrow) uint32_t [num] {};
38 if (bufArray_ == nullptr || lenArray_ == nullptr) {
39 LOG(ERROR) << "new buf or len " << num << " error";
40 return false;
41 }
42 for (uint32_t i = 0; i < num; i++) {
43 bufArray_[i] = new (std::nothrow) uint8_t [singleSize] {};
44 if (bufArray_[i] == nullptr) {
45 LOG(ERROR) << "new buf " << i << " size " << singleSize << " error";
46 return false;
47 }
48 }
49
50 writeIndex_ = 0;
51 readIndex_ = 0;
52 num_ = num;
53 singleSize_ = singleSize;
54 return true;
55 }
56
Reset()57 void RingBuffer::Reset()
58 {
59 isStop_ = false;
60 writeIndex_ = 0;
61 readIndex_ = 0;
62 for (uint32_t i = 0; i < num_; ++i) {
63 lenArray_[i] = 0;
64 }
65 }
66
IsFull()67 bool RingBuffer::IsFull()
68 {
69 // writeIndex readIndex real size: 0 ~ num_ -1, logic size: 0 ~ 2num_ - 1
70 // when writeIndex_ - readIndex_ == n means full
71 return writeIndex_ == (readIndex_ ^ num_);
72 }
73
IsEmpty()74 bool RingBuffer::IsEmpty()
75 {
76 // writeIndex readIndex real size: 0 ~ num_ -1, logic size: 0 ~ 2num_ - 1
77 // when same means empty
78 return writeIndex_ == readIndex_;
79 }
80
Push(uint8_t * buf,uint32_t len)81 bool RingBuffer::Push(uint8_t *buf, uint32_t len)
82 {
83 if (buf == nullptr || len == 0 || len > singleSize_) {
84 LOG(ERROR) << "RingBuffer push error, len:" << len << " singleSize:" << singleSize_;
85 return false;
86 }
87 if (IsFull()) {
88 std::unique_lock<std::mutex> pushLock(notifyMtx_);
89 while (IsFull()) {
90 if (isStop_) {
91 LOG(WARNING) << "RingBuffer push stopped";
92 return false;
93 }
94 LOG(DEBUG) << "RingBuffer full, wait !!!";
95 notFull_.wait(pushLock);
96 }
97 }
98
99 uint32_t index = writeIndex_ & (num_ - 1);
100 if (memcpy_s(bufArray_[index], singleSize_, buf, len) != EOK) {
101 LOG(ERROR) << "memcpy error, len:" << len;
102 return false;
103 }
104 lenArray_[index] = len;
105 writeIndex_ = (writeIndex_ + 1) & (2 * num_ - 1); // 2: logic buffer size
106
107 std::unique_lock<std::mutex> popLock(notifyMtx_);
108 notEmpty_.notify_all();
109 return true;
110 }
111
Pop(uint8_t * buf,uint32_t maxLen,uint32_t & len)112 bool RingBuffer::Pop(uint8_t *buf, uint32_t maxLen, uint32_t &len)
113 {
114 if (buf == nullptr) {
115 LOG(ERROR) << "RingBuffer pop para error";
116 return false;
117 }
118 if (IsEmpty()) {
119 std::unique_lock<std::mutex> popLock(notifyMtx_);
120 while (IsEmpty()) {
121 if (isStop_) {
122 LOG(WARNING) << "RingBuffer pop stopped";
123 return false;
124 }
125 LOG(DEBUG) << "RingBuffer empty, wait !!!";
126 notEmpty_.wait(popLock);
127 }
128 }
129
130 uint32_t index = readIndex_ & (num_ - 1);
131 if (memcpy_s(buf, maxLen, bufArray_[index], lenArray_[index]) != EOK) {
132 LOG(ERROR) << "memcpy error, len:" << lenArray_[index];
133 return false;
134 }
135 len = lenArray_[index];
136 readIndex_ = (readIndex_ + 1) & (2 * num_ - 1); // 2: logic buffer size
137
138 std::unique_lock<std::mutex> popLock(notifyMtx_);
139 notFull_.notify_all();
140 return true;
141 }
142
Stop()143 void RingBuffer::Stop()
144 {
145 isStop_ = true;
146 notFull_.notify_all();
147 notEmpty_.notify_all();
148 }
149
StopPush()150 void RingBuffer::StopPush()
151 {
152 {
153 std::unique_lock<std::mutex> pushLock(notifyMtx_);
154 isStop_ = true;
155 }
156 notFull_.notify_all();
157 }
158
StopPop()159 void RingBuffer::StopPop()
160 {
161 {
162 std::unique_lock<std::mutex> popLock(notifyMtx_);
163 isStop_ = true;
164 }
165 notEmpty_.notify_all();
166 }
167
Release()168 void RingBuffer::Release()
169 {
170 if (lenArray_ != nullptr) {
171 delete[] lenArray_;
172 lenArray_ = nullptr;
173 }
174
175 if (bufArray_ != nullptr) {
176 for (uint32_t i = 0; i < num_ && bufArray_[i] != nullptr; i++) {
177 delete[] bufArray_[i];
178 bufArray_[i] = nullptr;
179 }
180 delete[] bufArray_;
181 bufArray_ = nullptr;
182 }
183 }
184 } // namespace Updater
185