1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "single_ver_data_message_schedule.h"
16 
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "single_ver_data_sync.h"
20 #include "version.h"
21 
22 namespace DistributedDB {
~SingleVerDataMessageSchedule()23 SingleVerDataMessageSchedule::~SingleVerDataMessageSchedule()
24 {
25     LOGD("~SingleVerDataMessageSchedule");
26     ClearMsg();
27 }
28 
Initialize(const std::string & label,const std::string & deviceId)29 void SingleVerDataMessageSchedule::Initialize(const std::string &label, const std::string &deviceId)
30 {
31     label_ = label;
32     deviceId_ = deviceId;
33 }
34 
PutMsg(Message * inMsg)35 void SingleVerDataMessageSchedule::PutMsg(Message *inMsg)
36 {
37     if (inMsg == nullptr) {
38         return;
39     }
40     std::lock_guard<std::mutex> lock(queueLock_);
41     msgQueue_.push(inMsg);
42     isNeedReload_ = true;
43 }
44 
IsNeedReloadQueue()45 bool SingleVerDataMessageSchedule::IsNeedReloadQueue()
46 {
47     std::lock_guard<std::mutex> lock(queueLock_);
48     return isNeedReload_;
49 }
50 
MoveNextMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)51 Message *SingleVerDataMessageSchedule::MoveNextMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
52     bool &isNeedContinue)
53 {
54     {
55         std::lock_guard<std::mutex> lock(workingLock_);
56         if (isWorking_) {
57             isNeedContinue = false;
58             return nullptr;
59         }
60         isWorking_ = true;
61     }
62     uint32_t remoteVersion = context->GetRemoteSoftwareVersion();
63     if (remoteVersion < SOFTWARE_VERSION_RELEASE_3_0) {
64         // just get last msg when remote version is < 103 or >=103 but just open db now
65         return GetLastMsgFromQueue();
66     }
67     ResetTimer(context);
68     UpdateMsgMap();
69     Message *msg = GetMsgFromMap(isNeedHandle);
70     isNeedContinue = true;
71     if (msg == nullptr) {
72         StopTimer();
73         std::lock_guard<std::mutex> lock(workingLock_);
74         isWorking_ = false;
75         return nullptr;
76     }
77     return msg;
78 }
79 
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * inMsg)80 void SingleVerDataMessageSchedule::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap,
81     const Message *inMsg)
82 {
83     if (isNeedHandleStatus) {
84         uint64_t curPacketId = 0;
85         if (GetPacketId(inMsg, curPacketId) != E_OK) {
86             LOGE("[DataMsgSchedule] packet is nullptr");
87             return;
88         }
89         {
90             std::lock_guard<std::mutex> lock(lock_);
91             finishedPacketId_ = curPacketId;
92             if (isNeedClearMap) {
93                 ClearMsgMapWithNoLock();
94                 expectedSequenceId_ = 1;
95             } else {
96                 LOGI("[DataMsgSchedule] DealMsg seqId=%" PRIu32 " finishedPacketId=%" PRIu64 " ok,label=%s,dev=%s",
97                     expectedSequenceId_, finishedPacketId_, label_.c_str(), STR_MASK(deviceId_));
98                 expectedSequenceId_++;
99             }
100         }
101     }
102     std::lock_guard<std::mutex> lock(workingLock_);
103     isWorking_ = false;
104 }
105 
ClearMsg()106 void SingleVerDataMessageSchedule::ClearMsg()
107 {
108     StopTimer();
109     ClearMsgQueue();
110     ClearMsgMap();
111 }
112 
UpdateMsgMap()113 void SingleVerDataMessageSchedule::UpdateMsgMap()
114 {
115     std::queue<Message *> msgTmpQueue;
116     {
117         std::lock_guard<std::mutex> lock(queueLock_);
118         while (!msgQueue_.empty()) {
119             msgTmpQueue.push(msgQueue_.front());
120             msgQueue_.pop();
121         }
122         isNeedReload_ = false;
123     }
124     UpdateMsgMapInner(msgTmpQueue);
125 }
126 
UpdateMsgMapInner(std::queue<Message * > & msgTmpQueue)127 void SingleVerDataMessageSchedule::UpdateMsgMapInner(std::queue<Message *> &msgTmpQueue)
128 {
129     // update msg map
130     std::lock_guard<std::mutex> lock(lock_);
131     while (!msgTmpQueue.empty()) {
132         Message *msg = msgTmpQueue.front();
133         msgTmpQueue.pop();
134         // insert new msg into map and delete old msg
135         int errCode = UpdateMsgMapIfNeed(msg);
136         if (errCode != E_OK) {
137             delete msg;
138         }
139     }
140 }
141 
GetMsgFromMap(bool & isNeedHandle)142 Message *SingleVerDataMessageSchedule::GetMsgFromMap(bool &isNeedHandle)
143 {
144     isNeedHandle = true;
145     std::lock_guard<std::mutex> lock(lock_);
146     while (!messageMap_.empty()) {
147         auto iter = messageMap_.begin();
148         Message *msg = iter->second;
149         messageMap_.erase(iter);
150         uint64_t packetId = 0;
151         if (GetPacketId(msg, packetId) != E_OK) {
152             LOGE("[DataMsgSchedule] expected error");
153             delete msg;
154             continue;
155         }
156         uint32_t sequenceId = msg->GetSequenceId();
157         if (sequenceId < expectedSequenceId_) {
158             uint64_t revisePacketId = finishedPacketId_ - (expectedSequenceId_ - 1 - sequenceId);
159             LOGI("[DataMsgSchedule] drop msg because seqId less than exSeqId");
160             if (packetId < revisePacketId) {
161                 delete msg;
162                 continue;
163             }
164             // means already handle the msg, and just send E_OK ack in dataSync
165             isNeedHandle = false;
166             return msg;
167         }
168         if (sequenceId == expectedSequenceId_) {
169             if (packetId < finishedPacketId_) {
170                 LOGI("[DataMsgSchedule] drop msg because packetId less than finishedPacketId");
171                 delete msg;
172                 continue;
173             }
174             // if packetId == finishedPacketId_ need handle
175             // it will happened while watermark/need_abilitySync when last ack is missing
176             return msg;
177         }
178         // sequenceId > expectedSequenceId_, not need handle, put into map again
179         messageMap_[sequenceId] = msg;
180         return nullptr;
181     }
182     return nullptr;
183 }
184 
GetLastMsgFromQueue()185 Message *SingleVerDataMessageSchedule::GetLastMsgFromQueue()
186 {
187     std::lock_guard<std::mutex> lock(queueLock_);
188     isNeedReload_ = false;
189     while (!msgQueue_.empty()) {
190         Message *msg = msgQueue_.front();
191         msgQueue_.pop();
192         if (msgQueue_.empty()) { // means last msg
193             return msg;
194         }
195         delete msg;
196     }
197     return nullptr;
198 }
199 
ClearMsgMap()200 void SingleVerDataMessageSchedule::ClearMsgMap()
201 {
202     std::lock_guard<std::mutex> lock(lock_);
203     ClearMsgMapWithNoLock();
204 }
205 
ClearMsgMapWithNoLock()206 void SingleVerDataMessageSchedule::ClearMsgMapWithNoLock()
207 {
208     LOGD("[DataMsgSchedule] begin to ClearMsgMapWithNoLock");
209     for (auto &iter : messageMap_) {
210         delete iter.second;
211         iter.second = nullptr;
212     }
213     messageMap_.clear();
214 }
215 
ClearMsgQueue()216 void SingleVerDataMessageSchedule::ClearMsgQueue()
217 {
218     std::lock_guard<std::mutex> lock(queueLock_);
219     while (!msgQueue_.empty()) {
220         Message *msg = msgQueue_.front();
221         msgQueue_.pop();
222         delete msg;
223     }
224 }
225 
StartTimer(SingleVerSyncTaskContext * context)226 void SingleVerDataMessageSchedule::StartTimer(SingleVerSyncTaskContext *context)
227 {
228     std::lock_guard<std::mutex> lock(lock_);
229     TimerId timerId = 0;
230     RefObject::IncObjRef(context);
231     TimerAction timeOutCallback = [this](TimerId timerId) { return TimeOut(timerId); };
232     int errCode = RuntimeContext::GetInstance()->SetTimer(IDLE_TIME_OUT, timeOutCallback,
233         [context]() {
234             int errCode = RuntimeContext::GetInstance()->ScheduleTask([context]() {
235                 RefObject::DecObjRef(context);
236             });
237             if (errCode != E_OK) {
238                 LOGE("[DataMsgSchedule] timer finalizer ScheduleTask,errCode=%d", errCode);
239             }
240         }, timerId);
241     if (errCode != E_OK) {
242         RefObject::DecObjRef(context);
243         LOGE("[DataMsgSchedule] timer ScheduleTask, errCode=%d", errCode);
244         return;
245     }
246     timerId_ = timerId;
247     LOGD("[DataMsgSchedule] StartTimer,TimerId=%" PRIu64, timerId_);
248 }
249 
StopTimer()250 void SingleVerDataMessageSchedule::StopTimer()
251 {
252     TimerId timerId;
253     {
254         std::lock_guard<std::mutex> lock(lock_);
255         LOGD("[DataMsgSchedule] StopTimer,remove TimerId=%" PRIu64, timerId_);
256         if (timerId_ == 0) {
257             return;
258         }
259         timerId = timerId_;
260         timerId_ = 0;
261     }
262     RuntimeContext::GetInstance()->RemoveTimer(timerId);
263 }
264 
ResetTimer(SingleVerSyncTaskContext * context)265 void SingleVerDataMessageSchedule::ResetTimer(SingleVerSyncTaskContext *context)
266 {
267     StopTimer();
268     StartTimer(context);
269 }
270 
TimeOut(TimerId timerId)271 int SingleVerDataMessageSchedule::TimeOut(TimerId timerId)
272 {
273     if (IsNeedReloadQueue()) {
274         LOGI("[DataMsgSchedule] new msg exists, no need to timeout handle");
275         return E_OK;
276     }
277     {
278         std::lock_guard<std::mutex> lock(workingLock_);
279         if (isWorking_) {
280             LOGI("[DataMsgSchedule] other thread is handle msg, no need to timeout handle");
281             return E_OK;
282         }
283     }
284     {
285         std::lock_guard<std::mutex> lock(lock_);
286         LOGI("[DataMsgSchedule] timeout handling, stop timerId_[%" PRIu64 "]", timerId);
287         if (timerId == timerId_) {
288             ClearMsgMapWithNoLock();
289             timerId_ = 0;
290         }
291     }
292     RuntimeContext::GetInstance()->RemoveTimer(timerId);
293     return E_OK;
294 }
295 
UpdateMsgMapIfNeed(Message * msg)296 int SingleVerDataMessageSchedule::UpdateMsgMapIfNeed(Message *msg)
297 {
298     if (msg == nullptr) {
299         return -E_INVALID_ARGS;
300     }
301     uint64_t packetId = 0;
302     if (GetPacketId(msg, packetId) != E_OK) {
303         return -E_INVALID_ARGS;
304     }
305     uint32_t sessionId = msg->GetSessionId();
306     uint32_t sequenceId = msg->GetSequenceId();
307     if (prevSessionId_ != 0 && sessionId == prevSessionId_) {
308         LOGD("[DataMsgSchedule] recv prev sessionId msg, drop msg, label=%s, dev=%s", label_.c_str(),
309             STR_MASK(deviceId_));
310         return -E_INVALID_ARGS;
311     }
312     if (sessionId != currentSessionId_) {
313         // make sure all msg sessionId is same in msgMap
314         ClearMsgMapWithNoLock();
315         prevSessionId_ = currentSessionId_;
316         currentSessionId_ = sessionId;
317         finishedPacketId_ = 0;
318         expectedSequenceId_ = 1;
319     }
320     if (messageMap_.count(sequenceId) > 0) {
321         const auto *cachePacket = messageMap_[sequenceId]->GetObject<DataRequestPacket>();
322         if (cachePacket != nullptr) {
323             uint64_t cachePacketId;
324             if ((GetPacketId(messageMap_[sequenceId], cachePacketId) == E_OK) &&
325                 (packetId != 0) && (packetId < cachePacketId)) {
326                 LOGD("[DataMsgSchedule] drop msg packetId=%" PRIu64 ", cachePacketId=%" PRIu64 ", label=%s, dev=%s",
327                     packetId, cachePacketId, label_.c_str(), STR_MASK(deviceId_));
328                 return -E_INVALID_ARGS;
329             }
330         }
331         delete messageMap_[sequenceId];
332         messageMap_[sequenceId] = nullptr;
333     }
334     messageMap_[sequenceId] = msg;
335     LOGD("[DataMsgSchedule] put into msgMap seqId=%" PRIu32 ", packetId=%" PRIu64 ", label=%s, dev=%s", sequenceId,
336         packetId, label_.c_str(), STR_MASK(deviceId_));
337     return E_OK;
338 }
339 
GetPacketId(const Message * msg,uint64_t & packetId)340 int SingleVerDataMessageSchedule::GetPacketId(const Message *msg, uint64_t &packetId)
341 {
342     const DataRequestPacket *packet = msg->GetObject<DataRequestPacket>();
343     if (packet == nullptr) {
344         return -E_INVALID_ARGS;
345     }
346     packetId = packet->GetPacketId();
347     return E_OK;
348 }
349 }