1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_DSCHED_CONTINUE_MANAGER_H
17 #define OHOS_DSCHED_CONTINUE_MANAGER_H
18 
19 #include <map>
20 #include <string>
21 #include <atomic>
22 
23 #include "dsched_data_buffer.h"
24 #include "dsched_continue.h"
25 #include "idata_listener.h"
26 #include "iremote_object.h"
27 #include "single_instance.h"
28 #include "want.h"
29 
30 namespace OHOS {
31 namespace DistributedSchedule {
32 namespace {
33 constexpr int32_t MAX_CONCURRENT_SINK = 1;
34 constexpr int32_t MAX_CONCURRENT_SOURCE = 1;
35 constexpr int32_t CONTINUE_TIMEOUT = 10000;
36 }
37 class DSchedContinueManager {
38 DECLARE_SINGLE_INSTANCE_BASE(DSchedContinueManager);
39 public:
40     explicit DSchedContinueManager();
41     ~DSchedContinueManager();
42     int32_t ContinueMission(const std::string &srcDeviceId, const std::string &dstDeviceId,
43         int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams &wantParams);
44     int32_t ContinueMission(const DSchedContinueInfo& continueInfo, const sptr<IRemoteObject> &callback,
45         const OHOS::AAFwk::WantParams &wantParams);
46     int32_t StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId, int32_t callerUid, int32_t status,
47         uint32_t accessToken);
48     int32_t NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId, bool isSuccess,
49         const std::string &callerBundleName);
50     int32_t OnContinueEnd(const DSchedContinueInfo& info);
51 
52     void Init();
53     void UnInit();
54     void NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport);
55     void OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer);
56     void OnShutdown(int32_t socket, bool isSelfCalled);
57 
58     int32_t GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId);
59     std::shared_ptr<DSchedContinue> GetDSchedContinueByWant(const OHOS::AAFwk::Want& want, int32_t missionId);
60     std::shared_ptr<DSchedContinue> GetDSchedContinueByDevId(const std::u16string& devId, int32_t missionId);
61     void NotifyTerminateContinuation(const int32_t missionId);
62 
63 private:
64     void StartEvent();
65     void HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId, int32_t missionId,
66         const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams);
67     void HandleContinueMission(const DSchedContinueInfo& continueInfo,
68         const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams);
69     bool GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleNamme, std::string bundleName,
70         std::string deviceId);
71     void HandleContinueMissionWithBundleName(DSchedContinueInfo &info, const sptr<IRemoteObject> &callback,
72         const OHOS::AAFwk::WantParams &wantParams);
73     void HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId, int32_t callerUid,
74         int32_t status, uint32_t accessToken);
75     void HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId, bool isSuccess,
76         const std::string &callerBundleName);
77     void HandleContinueEnd(const DSchedContinueInfo& info);
78     void HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer);
79     void NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
80         std::shared_ptr<DSchedDataBuffer> dataBuffer);
81     int32_t CheckContinuationLimit(const std::string& srcDeviceId, const std::string& dstDeviceId, int32_t &direction);
82     void WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout);
83     void SetTimeOut(const DSchedContinueInfo& info, int32_t timeout);
84     void RemoveTimeout(const DSchedContinueInfo& info);
85 
86     class SoftbusListener : public IDataListener {
87         void OnBind(int32_t socket, PeerSocketInfo info);
88         void OnShutdown(int32_t socket, bool isSelfCalled);
89         void OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer);
90     };
91 
92 private:
93 #ifdef DMSFWK_ALL_CONNECT_MGR
94     static constexpr int32_t CONNECT_DECISION_WAIT_S = 60;
95 #endif
96 
97     std::thread eventThread_;
98     std::condition_variable eventCon_;
99     std::mutex eventMutex_;
100     std::shared_ptr<OHOS::AppExecFwk::EventHandler> eventHandler_;
101     std::shared_ptr<DSchedContinueManager::SoftbusListener> softbusListener_;
102 
103     std::map<DSchedContinueInfo, std::shared_ptr<DSchedContinue>> continues_;
104     std::mutex continueMutex_;
105 
106 #ifdef DMSFWK_ALL_CONNECT_MGR
107     std::mutex connectDecisionMutex_;
108     std::condition_variable connectDecisionCond_;
109     std::map<std::string, std::atomic<bool>> peerConnectDecision_;
110 #endif
111 
112     std::atomic<int32_t> cntSink_ {0};
113     std::atomic<int32_t> cntSource_ {0};
114 };
115 }  // namespace DistributedSchedule
116 }  // namespace OHOS
117 #endif  // OHOS_DSCHED_CONTINUE_MANAGER_H
118