1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef FFRT_CPUWORKER_MANAGER_HPP 17 #define FFRT_CPUWORKER_MANAGER_HPP 18 19 #include "eu/worker_manager.h" 20 #include "eu/cpu_worker.h" 21 #include "eu/cpu_monitor.h" 22 #include "eu/cpu_manager_strategy.h" 23 #include "sync/poller.h" 24 #include "util/spmc_queue.h" 25 #include "tm/cpu_task.h" 26 27 namespace ffrt { 28 struct WorkerSleepCtl { 29 std::mutex mutex; 30 std::condition_variable cv; 31 }; 32 33 class CPUWorkerManager : public WorkerManager { 34 public: 35 CPUWorkerManager(); 36 ~CPUWorkerManager()37 ~CPUWorkerManager() override 38 { 39 } 40 41 void NotifyTaskAdded(const QoS& qos) override; 42 void NotifyLocalTaskAdded(const QoS& qos) override; 43 void NotifyWorkers(const QoS& qos, int number) override; 44 GetSleepCtl(int qos)45 std::mutex* GetSleepCtl(int qos) override 46 { 47 return &sleepCtl[qos].mutex; 48 } 49 AddStealingWorker(const QoS & qos)50 void AddStealingWorker(const QoS& qos) 51 { 52 stealWorkers[qos].fetch_add(1); 53 } 54 SubStealingWorker(const QoS & qos)55 void SubStealingWorker(const QoS& qos) 56 { 57 while (1) { 58 uint64_t stealWorkersNum = stealWorkers[qos].load(); 59 if (stealWorkersNum == 0) { 60 return; 61 } 62 if (atomic_compare_exchange_weak(&stealWorkers[qos], &stealWorkersNum, stealWorkersNum - 1)) return; 63 } 64 } 65 GetStealingWorkers(const QoS & qos)66 uint64_t GetStealingWorkers(const QoS& qos) 67 { 68 return stealWorkers[qos].load(std::memory_order_relaxed); 69 } GetCPUMonitor()70 CPUMonitor* GetCPUMonitor() override 71 { 72 return monitor; 73 } 74 75 virtual void WorkerPrepare(WorkerThread* thread) = 0; 76 virtual void WakeupWorkers(const QoS& qos) = 0; 77 bool IncWorker(const QoS& qos) override; 78 int GetTaskCount(const QoS& qos); 79 int GetWorkerCount(const QoS& qos); 80 void WorkerJoinTg(const QoS& qos, pid_t pid); 81 82 CPUMonitor* monitor = nullptr; 83 bool tearDown = false; 84 WorkerSleepCtl sleepCtl[QoS::MaxNum()]; 85 void WorkerLeaveTg(const QoS& qos, pid_t pid); 86 uint8_t polling_[QoS::MaxNum()] = {0}; 87 fast_mutex pollersMtx[QoS::MaxNum()]; 88 void WorkerRetired(WorkerThread* thread); 89 #ifdef FFRT_WORKERS_DYNAMIC_SCALING 90 bool IsExceedRunningThreshold(const WorkerThread* thread); 91 bool IsBlockAwareInit(void); 92 #endif 93 94 bool WorkerTearDown(); DecWorker()95 bool DecWorker() override 96 {return false;} 97 virtual void WorkerRetiredSimplified(WorkerThread* thread) = 0; 98 void NotifyTaskPicked(const WorkerThread* thread); 99 /* strategy options for task pick up */ 100 CPUEUTask* PickUpTaskFromGlobalQueue(WorkerThread* thread); 101 CPUEUTask* PickUpTaskFromLocalQueue(WorkerThread* thread); 102 103 /* strategy options for worker wait action */ 104 virtual WorkerAction WorkerIdleAction(const WorkerThread* thread) = 0; 105 106 void WorkerSetup(WorkerThread* thread); 107 PollerRet TryPoll(const WorkerThread* thread, int timeout = -1); 108 unsigned int StealTaskBatch(WorkerThread* thread); 109 CPUEUTask* PickUpTaskBatch(WorkerThread* thread); 110 std::atomic_uint64_t stealWorkers[QoS::MaxNum()] = {0}; 111 friend class CPUManagerStrategy; 112 }; 113 } // namespace ffrt 114 #endif 115