1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_DATA_MESSAGE_SCHEDULE_H
17 #define SINGLE_VER_DATA_MESSAGE_SCHEDULE_H
18 #include <map>
19 #include <memory>
20 #include <mutex>
21 #include <queue>
22 
23 #include "message.h"
24 #include "runtime_context.h"
25 #include "single_ver_sync_task_context.h"
26 
27 namespace DistributedDB {
28 class SingleVerDataMessageSchedule {
29 public:
30     SingleVerDataMessageSchedule() = default;
31     ~SingleVerDataMessageSchedule();
32     void Initialize(const std::string &label, const std::string &deviceId);
33     void PutMsg(Message *inMsg);
34     bool IsNeedReloadQueue();
35     Message *MoveNextMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle, bool &isNeedContinue);
36     void ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *inMsg);
37     void ClearMsg();
38 private:
39     void UpdateMsgMap();
40     void UpdateMsgMapInner(std::queue<Message *> &msgTmpQueue);
41     int UpdateMsgMapIfNeed(Message *msg);
42     Message *GetMsgFromMap(bool &isNeedHandle);
43     Message *GetLastMsgFromQueue();
44     void ClearMsgMap();
45     void ClearMsgMapWithNoLock();
46     void ClearMsgQueue();
47     void StartTimer(SingleVerSyncTaskContext *context);
48     void StopTimer();
49     void ResetTimer(SingleVerSyncTaskContext *context);
50     // when timeout queue size is 0 because thread can move queue msg to map if isNeedReload which is
51     // activated when queue has new msg is true
52     // so only need clear map msg
53     int TimeOut(TimerId timerId);
54 
55     int GetPacketId(const Message *msg, uint64_t &packetId);
56 
57     static constexpr int IDLE_TIME_OUT = 5 * 60 * 1000; // 5min
58     std::mutex queueLock_;
59     std::queue<Message *> msgQueue_;
60     bool isNeedReload_ = false;
61     // only one thread is deal msg
62     std::mutex workingLock_;
63     bool isWorking_ = false;
64     // first:sequenceId second:Message*, deal msg from low sequenceId to high sequenceId
65     std::mutex lock_;
66     std::map<uint32_t, Message *> messageMap_;
67     uint32_t prevSessionId_ = 0; // drop the msg if msg sessionId is prev sessionId.
68     uint32_t currentSessionId_ = 0;
69     uint64_t finishedPacketId_ = 0; // next msg packetId should larger than it
70     uint32_t expectedSequenceId_ = 0; // handle next msg which sequenceId is equal to it
71     TimerId timerId_ = 0;
72 
73     std::string label_;
74     std::string deviceId_;
75 };
76 }
77 #endif // SINGLE_VER_DATA_MESSAGE_SCHEDULE_H