1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "db_common.h"
18 #include "db_dfx_adapter.h"
19 #include "log_print.h"
20 #include "multi_ver_sync_state_machine.h"
21 #include "multi_ver_sync_target.h"
22 #include "multi_ver_sync_task_context.h"
23 
24 namespace DistributedDB {
DEFINE_OBJECT_TAG_FACILITIES(MultiVerSyncTaskContext)25 DEFINE_OBJECT_TAG_FACILITIES(MultiVerSyncTaskContext)
26 
27 MultiVerSyncTaskContext::~MultiVerSyncTaskContext()
28 {
29 }
30 
Initialize(const std::string & deviceId,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)31 int MultiVerSyncTaskContext::Initialize(const std::string &deviceId, ISyncInterface *syncInterface,
32     const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
33 {
34     if (deviceId.empty() || (syncInterface == nullptr) || (communicator == nullptr)) {
35         return -E_INVALID_ARGS;
36     }
37     syncInterface_ = syncInterface;
38     communicator_ = communicator;
39     deviceId_ = deviceId;
40     taskExecStatus_ = INIT;
41     isAutoSync_ = true;
42     timeHelper_ = std::make_unique<TimeHelper>();
43     int errCode = timeHelper_->Initialize(syncInterface, metadata);
44     if (errCode != E_OK) {
45         LOGE("[MultiVerSyncTaskContext] timeHelper Initialize failed, err %d.", errCode);
46         return errCode;
47     }
48 
49     stateMachine_ = new (std::nothrow) MultiVerSyncStateMachine;
50     if (stateMachine_ == nullptr) {
51         return -E_OUT_OF_MEMORY;
52     }
53 
54     errCode = stateMachine_->Initialize(this, syncInterface, metadata, communicator);
55     TimerAction timeOutCallback = [stateMachine = static_cast<MultiVerSyncStateMachine *>(stateMachine_)](
56         TimerId timerId) -> int { return stateMachine->TimeoutCallback(timerId); };
57     SetTimeoutCallback(timeOutCallback);
58     OnKill([this]() { this->KillWait(); });
59     {
60         std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
61         synTaskContextSet_.insert(this);
62     }
63     std::vector<uint8_t> label = syncInterface_->GetIdentifier();
64     label.resize(3); // only show 3 bytes
65     syncActionName_ = DBDfxAdapter::SYNC_ACTION + "_" +
66         DBCommon::VectorToHexString(label) + "_" + deviceId_.c_str();
67     return errCode;
68 }
69 
AddSyncOperation(SyncOperation * operation)70 int MultiVerSyncTaskContext::AddSyncOperation(SyncOperation *operation)
71 {
72     if (operation == nullptr) {
73         return -E_INVALID_ARGS;
74     }
75 
76     if (operation->IsAutoSync() && !IsTargetQueueEmpty()) {
77         LOGI("[MultiVerSyncTaskContext] Exist operation in queue, skip it!");
78         operation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
79         return E_OK;
80     }
81 
82     MultiVerSyncTarget *target = new (std::nothrow) MultiVerSyncTarget;
83     if (target == nullptr) {
84         return -E_OUT_OF_MEMORY;
85     }
86     target->SetSyncOperation(operation);
87     target->SetTaskType(ISyncTarget::REQUEST);
88     AddSyncTarget(target);
89     return E_OK;
90 }
91 
GetCommitIndex() const92 int MultiVerSyncTaskContext::GetCommitIndex() const
93 {
94     return commitsIndex_;
95 }
96 
SetCommitIndex(int index)97 void MultiVerSyncTaskContext::SetCommitIndex(int index)
98 {
99     commitsIndex_ = index;
100 }
101 
GetEntriesIndex() const102 int MultiVerSyncTaskContext::GetEntriesIndex() const
103 {
104     return entriesIndex_;
105 }
106 
SetEntriesIndex(int index)107 void MultiVerSyncTaskContext::SetEntriesIndex(int index)
108 {
109     entriesIndex_ = index;
110 }
111 
GetValueSlicesIndex() const112 int  MultiVerSyncTaskContext::GetValueSlicesIndex() const
113 {
114     return valueSlicesIndex_;
115 }
116 
SetValueSlicesIndex(int index)117 void MultiVerSyncTaskContext::SetValueSlicesIndex(int index)
118 {
119     valueSlicesIndex_ = index;
120 }
121 
GetCommits(std::vector<MultiVerCommitNode> & commits)122 void MultiVerSyncTaskContext::GetCommits(std::vector<MultiVerCommitNode> &commits)
123 {
124     commits = commits_;
125 }
126 
SetCommits(const std::vector<MultiVerCommitNode> & commits)127 void MultiVerSyncTaskContext::SetCommits(const std::vector<MultiVerCommitNode> &commits)
128 {
129     commits_ = commits;
130 }
131 
GetCommit(int index,MultiVerCommitNode & commit) const132 void MultiVerSyncTaskContext::GetCommit(int index, MultiVerCommitNode &commit) const
133 {
134     commit = commits_[index];
135 }
136 
SetCommit(int index,const MultiVerCommitNode & commit)137 void MultiVerSyncTaskContext::SetCommit(int index, const MultiVerCommitNode &commit)
138 {
139     commits_[index] = commit;
140 }
141 
SetEntries(const std::vector<MultiVerKvEntry * > & entries)142 void MultiVerSyncTaskContext::SetEntries(const std::vector<MultiVerKvEntry *> &entries)
143 {
144     entries_ = entries;
145 }
146 
ReleaseEntries(void)147 void MultiVerSyncTaskContext::ReleaseEntries(void)
148 {
149     for (auto &item : entries_) {
150         if (syncInterface_ != nullptr) {
151             static_cast<MultiVerKvDBSyncInterface *>(syncInterface_)->ReleaseKvEntry(item);
152         }
153         item = nullptr;
154     }
155     entries_.clear();
156     entries_.shrink_to_fit();
157 }
158 
GetEntries(std::vector<MultiVerKvEntry * > & entries) const159 void MultiVerSyncTaskContext::GetEntries(std::vector<MultiVerKvEntry *> &entries) const
160 {
161     entries = entries_;
162 }
163 
GetEntry(int index,MultiVerKvEntry * & entry)164 void MultiVerSyncTaskContext::GetEntry(int index, MultiVerKvEntry *&entry)
165 {
166     entry = entries_[index];
167 }
168 
SetCommitsSize(int commitsSize)169 void MultiVerSyncTaskContext::SetCommitsSize(int commitsSize)
170 {
171     commitsSize_ = commitsSize;
172 }
173 
GetCommitsSize() const174 int MultiVerSyncTaskContext::GetCommitsSize() const
175 {
176     return commitsSize_;
177 }
178 
SetEntriesSize(int entriesSize)179 void MultiVerSyncTaskContext::SetEntriesSize(int entriesSize)
180 {
181     entriesSize_ = entriesSize;
182 }
183 
GetEntriesSize() const184 int MultiVerSyncTaskContext::GetEntriesSize() const
185 {
186     return entriesSize_;
187 }
188 
SetValueSlicesSize(int valueSlicesSize)189 void MultiVerSyncTaskContext::SetValueSlicesSize(int valueSlicesSize)
190 {
191     valueSlicesSize_ = valueSlicesSize;
192 }
193 
GetValueSlicesSize() const194 int MultiVerSyncTaskContext::GetValueSlicesSize() const
195 {
196     return valueSlicesSize_;
197 }
198 
GetValueSliceHashNode(int index,ValueSliceHash & hashNode) const199 void MultiVerSyncTaskContext::GetValueSliceHashNode(int index, ValueSliceHash &hashNode) const
200 {
201     hashNode = valueSliceHashNodes_[index];
202 }
203 
SetValueSliceHashNodes(const std::vector<ValueSliceHash> & valueSliceHashNodes)204 void MultiVerSyncTaskContext::SetValueSliceHashNodes(const std::vector<ValueSliceHash> &valueSliceHashNodes)
205 {
206     valueSliceHashNodes_ = valueSliceHashNodes;
207 }
208 
GetValueSliceHashNodes(std::vector<ValueSliceHash> & valueSliceHashNodes) const209 void MultiVerSyncTaskContext::GetValueSliceHashNodes(std::vector<ValueSliceHash> &valueSliceHashNodes) const
210 {
211     valueSliceHashNodes = valueSliceHashNodes_;
212 }
213 
Clear()214 void MultiVerSyncTaskContext::Clear()
215 {
216     commits_.clear();
217     commits_.shrink_to_fit();
218     ReleaseEntries();
219     valueSliceHashNodes_.clear();
220     valueSliceHashNodes_.shrink_to_fit();
221     commitsIndex_ = 0;
222     commitsSize_ = 0;
223     entriesIndex_ = 0;
224     entriesSize_ = 0;
225     valueSlicesIndex_ = 0;
226     valueSlicesSize_ = 0;
227     retryTime_ = 0;
228     isNeedRetry_ = NO_NEED_RETRY;
229     StopTimer();
230     sequenceId_ = 1; // minimum valid ID : 1
231     syncId_ = 0;
232 }
233 
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)234 void MultiVerSyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
235 {
236     SyncTaskContext::CopyTargetData(target, taskParam);
237 }
238 }
239 #endif