1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef FFRT_CPUWORKER_MANAGER_HPP
17 #define FFRT_CPUWORKER_MANAGER_HPP
18 
19 #include "eu/worker_manager.h"
20 #include "eu/cpu_worker.h"
21 #include "eu/cpu_monitor.h"
22 #include "eu/cpu_manager_strategy.h"
23 #include "sync/poller.h"
24 #include "util/spmc_queue.h"
25 #include "tm/cpu_task.h"
26 
27 namespace ffrt {
28 struct WorkerSleepCtl {
29     std::mutex mutex;
30     std::condition_variable cv;
31 };
32 
33 class CPUWorkerManager : public WorkerManager {
34 public:
35     CPUWorkerManager();
36 
~CPUWorkerManager()37     ~CPUWorkerManager() override
38     {
39     }
40 
41     void NotifyTaskAdded(const QoS& qos) override;
42     void NotifyLocalTaskAdded(const QoS& qos) override;
43     void NotifyWorkers(const QoS& qos, int number) override;
44 
GetSleepCtl(int qos)45     std::mutex* GetSleepCtl(int qos) override
46     {
47         return &sleepCtl[qos].mutex;
48     }
49 
AddStealingWorker(const QoS & qos)50     void AddStealingWorker(const QoS& qos)
51     {
52         stealWorkers[qos].fetch_add(1);
53     }
54 
SubStealingWorker(const QoS & qos)55     void SubStealingWorker(const QoS& qos)
56     {
57         while (1) {
58             uint64_t stealWorkersNum = stealWorkers[qos].load();
59             if (stealWorkersNum == 0) {
60                 return;
61             }
62             if (atomic_compare_exchange_weak(&stealWorkers[qos], &stealWorkersNum, stealWorkersNum - 1)) return;
63         }
64     }
65 
GetStealingWorkers(const QoS & qos)66     uint64_t GetStealingWorkers(const QoS& qos)
67     {
68         return stealWorkers[qos].load(std::memory_order_relaxed);
69     }
GetCPUMonitor()70     CPUMonitor* GetCPUMonitor() override
71     {
72         return monitor;
73     }
74 
75     virtual void WorkerPrepare(WorkerThread* thread) = 0;
76     virtual void WakeupWorkers(const QoS& qos) = 0;
77     bool IncWorker(const QoS& qos) override;
78     int GetTaskCount(const QoS& qos);
79     int GetWorkerCount(const QoS& qos);
80     void WorkerJoinTg(const QoS& qos, pid_t pid);
81 
82     CPUMonitor* monitor = nullptr;
83     bool tearDown = false;
84     WorkerSleepCtl sleepCtl[QoS::MaxNum()];
85     void WorkerLeaveTg(const QoS& qos, pid_t pid);
86     uint8_t polling_[QoS::MaxNum()] = {0};
87     fast_mutex pollersMtx[QoS::MaxNum()];
88     void WorkerRetired(WorkerThread* thread);
89 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
90     bool IsExceedRunningThreshold(const WorkerThread* thread);
91     bool IsBlockAwareInit(void);
92 #endif
93 
94     bool WorkerTearDown();
DecWorker()95     bool DecWorker() override
96     {return false;}
97     virtual void WorkerRetiredSimplified(WorkerThread* thread) = 0;
98     void NotifyTaskPicked(const WorkerThread* thread);
99     /* strategy options for task pick up */
100     CPUEUTask* PickUpTaskFromGlobalQueue(WorkerThread* thread);
101     CPUEUTask* PickUpTaskFromLocalQueue(WorkerThread* thread);
102 
103     /* strategy options for worker wait action */
104     virtual WorkerAction WorkerIdleAction(const WorkerThread* thread) = 0;
105 
106     void WorkerSetup(WorkerThread* thread);
107     PollerRet TryPoll(const WorkerThread* thread, int timeout = -1);
108     unsigned int StealTaskBatch(WorkerThread* thread);
109     CPUEUTask* PickUpTaskBatch(WorkerThread* thread);
110     std::atomic_uint64_t stealWorkers[QoS::MaxNum()] = {0};
111     friend class CPUManagerStrategy;
112 };
113 } // namespace ffrt
114 #endif
115