1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "single_ver_data_message_schedule.h"
16
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "single_ver_data_sync.h"
20 #include "version.h"
21
22 namespace DistributedDB {
~SingleVerDataMessageSchedule()23 SingleVerDataMessageSchedule::~SingleVerDataMessageSchedule()
24 {
25 LOGD("~SingleVerDataMessageSchedule");
26 ClearMsg();
27 }
28
Initialize(const std::string & label,const std::string & deviceId)29 void SingleVerDataMessageSchedule::Initialize(const std::string &label, const std::string &deviceId)
30 {
31 label_ = label;
32 deviceId_ = deviceId;
33 }
34
PutMsg(Message * inMsg)35 void SingleVerDataMessageSchedule::PutMsg(Message *inMsg)
36 {
37 if (inMsg == nullptr) {
38 return;
39 }
40 std::lock_guard<std::mutex> lock(queueLock_);
41 msgQueue_.push(inMsg);
42 isNeedReload_ = true;
43 }
44
IsNeedReloadQueue()45 bool SingleVerDataMessageSchedule::IsNeedReloadQueue()
46 {
47 std::lock_guard<std::mutex> lock(queueLock_);
48 return isNeedReload_;
49 }
50
MoveNextMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)51 Message *SingleVerDataMessageSchedule::MoveNextMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
52 bool &isNeedContinue)
53 {
54 {
55 std::lock_guard<std::mutex> lock(workingLock_);
56 if (isWorking_) {
57 isNeedContinue = false;
58 return nullptr;
59 }
60 isWorking_ = true;
61 }
62 uint32_t remoteVersion = context->GetRemoteSoftwareVersion();
63 if (remoteVersion < SOFTWARE_VERSION_RELEASE_3_0) {
64 // just get last msg when remote version is < 103 or >=103 but just open db now
65 return GetLastMsgFromQueue();
66 }
67 ResetTimer(context);
68 UpdateMsgMap();
69 Message *msg = GetMsgFromMap(isNeedHandle);
70 isNeedContinue = true;
71 if (msg == nullptr) {
72 StopTimer();
73 std::lock_guard<std::mutex> lock(workingLock_);
74 isWorking_ = false;
75 return nullptr;
76 }
77 return msg;
78 }
79
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * inMsg)80 void SingleVerDataMessageSchedule::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap,
81 const Message *inMsg)
82 {
83 if (isNeedHandleStatus) {
84 uint64_t curPacketId = 0;
85 if (GetPacketId(inMsg, curPacketId) != E_OK) {
86 LOGE("[DataMsgSchedule] packet is nullptr");
87 return;
88 }
89 {
90 std::lock_guard<std::mutex> lock(lock_);
91 finishedPacketId_ = curPacketId;
92 if (isNeedClearMap) {
93 ClearMsgMapWithNoLock();
94 expectedSequenceId_ = 1;
95 } else {
96 LOGI("[DataMsgSchedule] DealMsg seqId=%" PRIu32 " finishedPacketId=%" PRIu64 " ok,label=%s,dev=%s",
97 expectedSequenceId_, finishedPacketId_, label_.c_str(), STR_MASK(deviceId_));
98 expectedSequenceId_++;
99 }
100 }
101 }
102 std::lock_guard<std::mutex> lock(workingLock_);
103 isWorking_ = false;
104 }
105
ClearMsg()106 void SingleVerDataMessageSchedule::ClearMsg()
107 {
108 StopTimer();
109 ClearMsgQueue();
110 ClearMsgMap();
111 }
112
UpdateMsgMap()113 void SingleVerDataMessageSchedule::UpdateMsgMap()
114 {
115 std::queue<Message *> msgTmpQueue;
116 {
117 std::lock_guard<std::mutex> lock(queueLock_);
118 while (!msgQueue_.empty()) {
119 msgTmpQueue.push(msgQueue_.front());
120 msgQueue_.pop();
121 }
122 isNeedReload_ = false;
123 }
124 UpdateMsgMapInner(msgTmpQueue);
125 }
126
UpdateMsgMapInner(std::queue<Message * > & msgTmpQueue)127 void SingleVerDataMessageSchedule::UpdateMsgMapInner(std::queue<Message *> &msgTmpQueue)
128 {
129 // update msg map
130 std::lock_guard<std::mutex> lock(lock_);
131 while (!msgTmpQueue.empty()) {
132 Message *msg = msgTmpQueue.front();
133 msgTmpQueue.pop();
134 // insert new msg into map and delete old msg
135 int errCode = UpdateMsgMapIfNeed(msg);
136 if (errCode != E_OK) {
137 delete msg;
138 }
139 }
140 }
141
GetMsgFromMap(bool & isNeedHandle)142 Message *SingleVerDataMessageSchedule::GetMsgFromMap(bool &isNeedHandle)
143 {
144 isNeedHandle = true;
145 std::lock_guard<std::mutex> lock(lock_);
146 while (!messageMap_.empty()) {
147 auto iter = messageMap_.begin();
148 Message *msg = iter->second;
149 messageMap_.erase(iter);
150 uint64_t packetId = 0;
151 if (GetPacketId(msg, packetId) != E_OK) {
152 LOGE("[DataMsgSchedule] expected error");
153 delete msg;
154 continue;
155 }
156 uint32_t sequenceId = msg->GetSequenceId();
157 if (sequenceId < expectedSequenceId_) {
158 uint64_t revisePacketId = finishedPacketId_ - (expectedSequenceId_ - 1 - sequenceId);
159 LOGI("[DataMsgSchedule] drop msg because seqId less than exSeqId");
160 if (packetId < revisePacketId) {
161 delete msg;
162 continue;
163 }
164 // means already handle the msg, and just send E_OK ack in dataSync
165 isNeedHandle = false;
166 return msg;
167 }
168 if (sequenceId == expectedSequenceId_) {
169 if (packetId < finishedPacketId_) {
170 LOGI("[DataMsgSchedule] drop msg because packetId less than finishedPacketId");
171 delete msg;
172 continue;
173 }
174 // if packetId == finishedPacketId_ need handle
175 // it will happened while watermark/need_abilitySync when last ack is missing
176 return msg;
177 }
178 // sequenceId > expectedSequenceId_, not need handle, put into map again
179 messageMap_[sequenceId] = msg;
180 return nullptr;
181 }
182 return nullptr;
183 }
184
GetLastMsgFromQueue()185 Message *SingleVerDataMessageSchedule::GetLastMsgFromQueue()
186 {
187 std::lock_guard<std::mutex> lock(queueLock_);
188 isNeedReload_ = false;
189 while (!msgQueue_.empty()) {
190 Message *msg = msgQueue_.front();
191 msgQueue_.pop();
192 if (msgQueue_.empty()) { // means last msg
193 return msg;
194 }
195 delete msg;
196 }
197 return nullptr;
198 }
199
ClearMsgMap()200 void SingleVerDataMessageSchedule::ClearMsgMap()
201 {
202 std::lock_guard<std::mutex> lock(lock_);
203 ClearMsgMapWithNoLock();
204 }
205
ClearMsgMapWithNoLock()206 void SingleVerDataMessageSchedule::ClearMsgMapWithNoLock()
207 {
208 LOGD("[DataMsgSchedule] begin to ClearMsgMapWithNoLock");
209 for (auto &iter : messageMap_) {
210 delete iter.second;
211 iter.second = nullptr;
212 }
213 messageMap_.clear();
214 }
215
ClearMsgQueue()216 void SingleVerDataMessageSchedule::ClearMsgQueue()
217 {
218 std::lock_guard<std::mutex> lock(queueLock_);
219 while (!msgQueue_.empty()) {
220 Message *msg = msgQueue_.front();
221 msgQueue_.pop();
222 delete msg;
223 }
224 }
225
StartTimer(SingleVerSyncTaskContext * context)226 void SingleVerDataMessageSchedule::StartTimer(SingleVerSyncTaskContext *context)
227 {
228 std::lock_guard<std::mutex> lock(lock_);
229 TimerId timerId = 0;
230 RefObject::IncObjRef(context);
231 TimerAction timeOutCallback = [this](TimerId timerId) { return TimeOut(timerId); };
232 int errCode = RuntimeContext::GetInstance()->SetTimer(IDLE_TIME_OUT, timeOutCallback,
233 [context]() {
234 int errCode = RuntimeContext::GetInstance()->ScheduleTask([context]() {
235 RefObject::DecObjRef(context);
236 });
237 if (errCode != E_OK) {
238 LOGE("[DataMsgSchedule] timer finalizer ScheduleTask,errCode=%d", errCode);
239 }
240 }, timerId);
241 if (errCode != E_OK) {
242 RefObject::DecObjRef(context);
243 LOGE("[DataMsgSchedule] timer ScheduleTask, errCode=%d", errCode);
244 return;
245 }
246 timerId_ = timerId;
247 LOGD("[DataMsgSchedule] StartTimer,TimerId=%" PRIu64, timerId_);
248 }
249
StopTimer()250 void SingleVerDataMessageSchedule::StopTimer()
251 {
252 TimerId timerId;
253 {
254 std::lock_guard<std::mutex> lock(lock_);
255 LOGD("[DataMsgSchedule] StopTimer,remove TimerId=%" PRIu64, timerId_);
256 if (timerId_ == 0) {
257 return;
258 }
259 timerId = timerId_;
260 timerId_ = 0;
261 }
262 RuntimeContext::GetInstance()->RemoveTimer(timerId);
263 }
264
ResetTimer(SingleVerSyncTaskContext * context)265 void SingleVerDataMessageSchedule::ResetTimer(SingleVerSyncTaskContext *context)
266 {
267 StopTimer();
268 StartTimer(context);
269 }
270
TimeOut(TimerId timerId)271 int SingleVerDataMessageSchedule::TimeOut(TimerId timerId)
272 {
273 if (IsNeedReloadQueue()) {
274 LOGI("[DataMsgSchedule] new msg exists, no need to timeout handle");
275 return E_OK;
276 }
277 {
278 std::lock_guard<std::mutex> lock(workingLock_);
279 if (isWorking_) {
280 LOGI("[DataMsgSchedule] other thread is handle msg, no need to timeout handle");
281 return E_OK;
282 }
283 }
284 {
285 std::lock_guard<std::mutex> lock(lock_);
286 LOGI("[DataMsgSchedule] timeout handling, stop timerId_[%" PRIu64 "]", timerId);
287 if (timerId == timerId_) {
288 ClearMsgMapWithNoLock();
289 timerId_ = 0;
290 }
291 }
292 RuntimeContext::GetInstance()->RemoveTimer(timerId);
293 return E_OK;
294 }
295
UpdateMsgMapIfNeed(Message * msg)296 int SingleVerDataMessageSchedule::UpdateMsgMapIfNeed(Message *msg)
297 {
298 if (msg == nullptr) {
299 return -E_INVALID_ARGS;
300 }
301 uint64_t packetId = 0;
302 if (GetPacketId(msg, packetId) != E_OK) {
303 return -E_INVALID_ARGS;
304 }
305 uint32_t sessionId = msg->GetSessionId();
306 uint32_t sequenceId = msg->GetSequenceId();
307 if (prevSessionId_ != 0 && sessionId == prevSessionId_) {
308 LOGD("[DataMsgSchedule] recv prev sessionId msg, drop msg, label=%s, dev=%s", label_.c_str(),
309 STR_MASK(deviceId_));
310 return -E_INVALID_ARGS;
311 }
312 if (sessionId != currentSessionId_) {
313 // make sure all msg sessionId is same in msgMap
314 ClearMsgMapWithNoLock();
315 prevSessionId_ = currentSessionId_;
316 currentSessionId_ = sessionId;
317 finishedPacketId_ = 0;
318 expectedSequenceId_ = 1;
319 }
320 if (messageMap_.count(sequenceId) > 0) {
321 const auto *cachePacket = messageMap_[sequenceId]->GetObject<DataRequestPacket>();
322 if (cachePacket != nullptr) {
323 uint64_t cachePacketId;
324 if ((GetPacketId(messageMap_[sequenceId], cachePacketId) == E_OK) &&
325 (packetId != 0) && (packetId < cachePacketId)) {
326 LOGD("[DataMsgSchedule] drop msg packetId=%" PRIu64 ", cachePacketId=%" PRIu64 ", label=%s, dev=%s",
327 packetId, cachePacketId, label_.c_str(), STR_MASK(deviceId_));
328 return -E_INVALID_ARGS;
329 }
330 }
331 delete messageMap_[sequenceId];
332 messageMap_[sequenceId] = nullptr;
333 }
334 messageMap_[sequenceId] = msg;
335 LOGD("[DataMsgSchedule] put into msgMap seqId=%" PRIu32 ", packetId=%" PRIu64 ", label=%s, dev=%s", sequenceId,
336 packetId, label_.c_str(), STR_MASK(deviceId_));
337 return E_OK;
338 }
339
GetPacketId(const Message * msg,uint64_t & packetId)340 int SingleVerDataMessageSchedule::GetPacketId(const Message *msg, uint64_t &packetId)
341 {
342 const DataRequestPacket *packet = msg->GetObject<DataRequestPacket>();
343 if (packet == nullptr) {
344 return -E_INVALID_ARGS;
345 }
346 packetId = packet->GetPacketId();
347 return E_OK;
348 }
349 }