1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_DATA_MESSAGE_SCHEDULE_H 17 #define SINGLE_VER_DATA_MESSAGE_SCHEDULE_H 18 #include <map> 19 #include <memory> 20 #include <mutex> 21 #include <queue> 22 23 #include "message.h" 24 #include "runtime_context.h" 25 #include "single_ver_sync_task_context.h" 26 27 namespace DistributedDB { 28 class SingleVerDataMessageSchedule { 29 public: 30 SingleVerDataMessageSchedule() = default; 31 ~SingleVerDataMessageSchedule(); 32 void Initialize(const std::string &label, const std::string &deviceId); 33 void PutMsg(Message *inMsg); 34 bool IsNeedReloadQueue(); 35 Message *MoveNextMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle, bool &isNeedContinue); 36 void ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *inMsg); 37 void ClearMsg(); 38 private: 39 void UpdateMsgMap(); 40 void UpdateMsgMapInner(std::queue<Message *> &msgTmpQueue); 41 int UpdateMsgMapIfNeed(Message *msg); 42 Message *GetMsgFromMap(bool &isNeedHandle); 43 Message *GetLastMsgFromQueue(); 44 void ClearMsgMap(); 45 void ClearMsgMapWithNoLock(); 46 void ClearMsgQueue(); 47 void StartTimer(SingleVerSyncTaskContext *context); 48 void StopTimer(); 49 void ResetTimer(SingleVerSyncTaskContext *context); 50 // when timeout queue size is 0 because thread can move queue msg to map if isNeedReload which is 51 // activated when queue has new msg is true 52 // so only need clear map msg 53 int TimeOut(TimerId timerId); 54 55 int GetPacketId(const Message *msg, uint64_t &packetId); 56 57 static constexpr int IDLE_TIME_OUT = 5 * 60 * 1000; // 5min 58 std::mutex queueLock_; 59 std::queue<Message *> msgQueue_; 60 bool isNeedReload_ = false; 61 // only one thread is deal msg 62 std::mutex workingLock_; 63 bool isWorking_ = false; 64 // first:sequenceId second:Message*, deal msg from low sequenceId to high sequenceId 65 std::mutex lock_; 66 std::map<uint32_t, Message *> messageMap_; 67 uint32_t prevSessionId_ = 0; // drop the msg if msg sessionId is prev sessionId. 68 uint32_t currentSessionId_ = 0; 69 uint64_t finishedPacketId_ = 0; // next msg packetId should larger than it 70 uint32_t expectedSequenceId_ = 0; // handle next msg which sequenceId is equal to it 71 TimerId timerId_ = 0; 72 73 std::string label_; 74 std::string deviceId_; 75 }; 76 } 77 #endif // SINGLE_VER_DATA_MESSAGE_SCHEDULE_H