1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "result_entries_window.h"
17
18 #include <limits>
19 #include "db_constant.h"
20 #include "db_errno.h"
21
22 using std::vector;
23 using std::move;
24
25 namespace DistributedDB {
ResultEntriesWindow()26 ResultEntriesWindow::ResultEntriesWindow()
27 : rawCursor_(nullptr),
28 windowSize_(0),
29 totalCount_(0),
30 begin_(0),
31 currentPosition_(0) {}
32
~ResultEntriesWindow()33 ResultEntriesWindow::~ResultEntriesWindow()
34 {
35 if (rawCursor_ != nullptr) {
36 (void)(rawCursor_->Close());
37 rawCursor_ = nullptr;
38 }
39 }
40
Init(IKvDBRawCursor * rawCursor,int64_t windowSize,double scale)41 int ResultEntriesWindow::Init(IKvDBRawCursor *rawCursor, int64_t windowSize, double scale)
42 {
43 if (rawCursor == nullptr ||
44 (windowSize <= 0 || windowSize > MAX_WINDOW_SIZE) ||
45 (scale <= std::numeric_limits<double>::epsilon() || scale > 1)) {
46 return -E_INVALID_ARGS;
47 }
48 int errCode = rawCursor->Open();
49 if (errCode != E_OK) {
50 return errCode;
51 }
52
53 rawCursor_ = rawCursor;
54 windowSize_ = static_cast<int64_t>(static_cast<double>(windowSize) * scale);
55 totalCount_ = rawCursor_->GetCount();
56 return E_OK;
57 }
58
GetTotalCount() const59 int ResultEntriesWindow::GetTotalCount() const
60 {
61 return totalCount_;
62 }
63
GetCurrentPosition() const64 int ResultEntriesWindow::GetCurrentPosition() const
65 {
66 return currentPosition_;
67 }
68
MoveToPosition(int position)69 bool ResultEntriesWindow::MoveToPosition(int position)
70 {
71 if ((rawCursor_ == nullptr && buffer_.empty()) || (position < 0 || position >= totalCount_)) {
72 return false;
73 }
74 if (buffer_.empty()) {
75 if (SetCursor(0, position) != E_OK) {
76 return false;
77 }
78 return true;
79 }
80 int last = begin_ + buffer_.size() - 1;
81 if (position > last) {
82 buffer_.clear();
83 buffer_.shrink_to_fit();
84 if (SetCursor(last + 1, position) != E_OK) {
85 return false;
86 }
87 return true;
88 } else if (position < begin_) {
89 if (rawCursor_ == nullptr) {
90 return false;
91 }
92 buffer_.clear();
93 buffer_.shrink_to_fit();
94 if (rawCursor_->Reload() != E_OK) {
95 ResetWindow();
96 return false;
97 }
98 if (SetCursor(0, position) != E_OK) {
99 return false;
100 }
101 return true;
102 } else {
103 currentPosition_ = position;
104 }
105 return true;
106 }
107
GetEntry(Entry & entry) const108 int ResultEntriesWindow::GetEntry(Entry &entry) const
109 {
110 if (rawCursor_ == nullptr && buffer_.empty()) {
111 return -E_NOT_INIT;
112 }
113 if (totalCount_ == 0) {
114 return -E_NOT_FOUND;
115 }
116 if (buffer_.empty()) {
117 int errCode = LoadData(0, currentPosition_);
118 if (errCode != E_OK) {
119 return errCode;
120 }
121 }
122 entry = buffer_[currentPosition_ - begin_];
123 return E_OK;
124 }
125
ResetWindow()126 void ResultEntriesWindow::ResetWindow()
127 {
128 buffer_.clear();
129 buffer_.shrink_to_fit();
130 if (rawCursor_ != nullptr) {
131 (void)(rawCursor_->Reload());
132 }
133 begin_ = 0;
134 currentPosition_ = 0;
135 }
136
SetCursor(int begin,int target)137 int ResultEntriesWindow::SetCursor(int begin, int target)
138 {
139 int errCode = LoadData(begin, target);
140 if (errCode == E_OK) {
141 begin_ = target;
142 currentPosition_ = target;
143 } else {
144 ResetWindow();
145 }
146 return errCode;
147 }
148
LoadData(int begin,int target) const149 int ResultEntriesWindow::LoadData(int begin, int target) const
150 {
151 if (rawCursor_ == nullptr) {
152 return -E_NOT_INIT;
153 }
154 if (target < begin || target >= totalCount_) {
155 return -E_INVALID_ARGS;
156 }
157 int cursor = begin;
158 int errCode = E_OK;
159 for (; cursor < target; cursor++) {
160 Entry next;
161 errCode = rawCursor_->GetNext(next, false);
162 if (errCode != E_OK) {
163 return errCode;
164 }
165 }
166 int64_t bufferSize = 0;
167 for (; cursor < totalCount_ && bufferSize < windowSize_; cursor++) {
168 Entry next;
169 errCode = rawCursor_->GetNext(next, true);
170 if (errCode != E_OK) {
171 return errCode;
172 }
173 // filter the abnormal data.
174 if (next.key.size() > DBConstant::MAX_KEY_SIZE || next.value.size() > DBConstant::MAX_VALUE_SIZE) {
175 continue;
176 }
177 bufferSize += next.key.size() + next.value.size();
178 buffer_.push_back(std::move(next));
179 }
180 if (buffer_.size() == static_cast<size_t>(totalCount_)) {
181 (void)(rawCursor_->Close());
182 rawCursor_ = nullptr;
183 }
184 return E_OK;
185 }
186 }
187