1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef TASK_SCHEDULER_H
17 #define TASK_SCHEDULER_H
18 
19 #include <cinttypes>
20 #include <functional>
21 #include <future>
22 #include <memory>
23 #include <mutex>
24 #include <queue>
25 
26 #include "i_task_scheduler.h"
27 #include "id_factory.h"
28 #include "include/util.h"
29 
30 namespace OHOS {
31 namespace Msdp {
32 namespace DeviceStatus {
33 class TaskScheduler final : public ITaskScheduler,
34                             public IdFactory<int32_t> {
35 public:
36     struct TaskData {
37         uint64_t tid { 0 };
38         int32_t taskId { 0 };
39     };
40     class Task : public std::enable_shared_from_this<Task> {
41     public:
42         using Promise = std::promise<int32_t>;
43         using Future = std::future<int32_t>;
44         using TaskPtr = std::shared_ptr<TaskScheduler::Task>;
45         Task(int32_t id, DTaskCallback fun, Promise *promise = nullptr)
46             : id_(id), fun_(fun), promise_(promise) {}
47         ~Task() = default;
48 
GetSharedPtr()49         TaskPtr GetSharedPtr()
50         {
51             return shared_from_this();
52         }
GetId()53         int32_t GetId() const
54         {
55             return id_;
56         }
SetWaited()57         void SetWaited()
58         {
59             hasWaited_ = true;
60         }
61         void ProcessTask();
62 
63     private:
64         int32_t id_ { 0 };
65         std::atomic_bool hasWaited_ { false };
66         DTaskCallback fun_ { nullptr };
67         Promise* promise_ { nullptr };
68     };
69     using TaskPtr = Task::TaskPtr;
70     using Promise = Task::Promise;
71     using Future = Task::Future;
72 
73 public:
74     TaskScheduler() = default;
75     ~TaskScheduler();
76 
77     bool Init();
78     void ProcessTasks();
79     int32_t PostSyncTask(DTaskCallback cb) override;
80     int32_t PostAsyncTask(DTaskCallback callback) override;
81 
GetReadFd()82     int32_t GetReadFd() const
83     {
84         return fds_[0];
85     }
SetWorkerThreadId(uint64_t tid)86     void SetWorkerThreadId(uint64_t tid)
87     {
88         workerThreadId_ = tid;
89     }
IsCallFromWorkerThread()90     bool IsCallFromWorkerThread() const
91     {
92         return (GetThisThreadId() == workerThreadId_);
93     }
94 
95 private:
96     void PopPendingTaskList(std::vector<TaskPtr> &tasks);
97     TaskPtr PostTask(DTaskCallback callback, Promise *promise = nullptr);
98 
99 private:
100     std::atomic<uint64_t> workerThreadId_ { 0 };
101     int32_t fds_[2] {};
102     std::mutex mux_;
103     std::queue<TaskPtr> tasks_;
104 };
105 } // namespace DeviceStatus
106 } // namespace Msdp
107 } // namespace OHOS
108 #endif // TASK_SCHEDULER_H
109