1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef FFRT_WORKER_THREAD_HPP 17 #define FFRT_WORKER_THREAD_HPP 18 19 #include <atomic> 20 #include <unistd.h> 21 #ifdef FFRT_PTHREAD_ENABLE 22 #include <pthread.h> 23 #endif 24 #include <thread> 25 #ifdef OHOS_THREAD_STACK_DUMP 26 #include <sstream> 27 #include "dfx_dump_catcher.h" 28 #endif 29 30 #include "qos.h" 31 #include "tm/cpu_task.h" 32 #include "dfx/log/ffrt_log_api.h" 33 34 namespace ffrt { 35 constexpr int PTHREAD_CREATE_NO_MEM_CODE = 11; 36 constexpr int FFRT_RETRY_MAX_COUNT = 12; 37 const std::vector<uint64_t> FFRT_RETRY_CYCLE_LIST = { 38 10 * 1000, 50 * 1000, 100 * 1000, 200 * 1000, 500 * 1000, 1000 * 1000, 2 * 1000 * 1000, 39 5 * 1000 * 1000, 10 * 1000 * 1000, 50 * 1000 * 1000, 100 * 1000 * 1000, 500 * 1000 * 1000 40 }; 41 42 class WorkerThread { 43 public: 44 CPUEUTask* curTask = nullptr; 45 46 uintptr_t curTaskType_ = ffrt_invalid_task; 47 std::string curTaskLabel_ = ""; // 需要打开宏WORKER_CAHCE_NAMEID才会赋值 48 uint64_t curTaskGid_ = UINT64_MAX; 49 explicit WorkerThread(const QoS& qos); 50 ~WorkerThread()51 virtual ~WorkerThread() 52 { 53 if (!exited) { 54 #ifdef OHOS_THREAD_STACK_DUMP 55 FFRT_LOGW("WorkerThread enter destruction but not exited"); 56 OHOS::HiviewDFX::DfxDumpCatcher dumplog; 57 std::string msg = ""; 58 bool result = dumplog.DumpCatch(getpid(), gettid(), msg); 59 if (result) { 60 std::vector<std::string> out; 61 std::stringstream ss(msg); 62 std::string s; 63 while (std::getline(ss, s, '\n')) { 64 out.push_back(s); 65 } 66 for (auto const& line: out) { 67 FFRT_LOGE("ffrt callstack %s", line.c_str()); 68 } 69 } 70 #endif 71 } 72 FFRT_LOGW("to exit, qos[%d]", qos()); 73 Detach(); 74 } 75 Idle()76 bool Idle() const 77 { 78 return idle; 79 } 80 SetIdle(bool var)81 void SetIdle(bool var) 82 { 83 this->idle = var; 84 } 85 Exited()86 bool Exited() const 87 { 88 return exited; 89 } 90 SetExited(bool var)91 void SetExited(bool var) 92 { 93 this->exited = var; 94 } 95 Id()96 pid_t Id() const 97 { 98 while (!exited && tid < 0) { 99 } 100 return tid; 101 } 102 GetQos()103 const QoS& GetQos() const 104 { 105 return qos; 106 } 107 #ifdef FFRT_WORKERS_DYNAMIC_SCALING GetDomainId()108 unsigned int GetDomainId() const 109 { 110 return domain_id; 111 } 112 #endif 113 114 #ifdef FFRT_PTHREAD_ENABLE Start(void * (* ThreadFunc)(void *),void * args)115 void Start(void*(*ThreadFunc)(void*), void* args) 116 { 117 int ret = pthread_create(&thread_, &attr_, ThreadFunc, args); 118 if (ret == PTHREAD_CREATE_NO_MEM_CODE) { 119 int count = 0; 120 while (ret == PTHREAD_CREATE_NO_MEM_CODE && count < FFRT_RETRY_MAX_COUNT) { 121 usleep(FFRT_RETRY_CYCLE_LIST[count]); 122 count++; 123 FFRT_LOGW("pthread_create failed due to shortage of system memory, FFRT retry %d times...", count); 124 ret = pthread_create(&thread_, &attr_, ThreadFunc, args); 125 } 126 } 127 if (ret != 0) { 128 FFRT_LOGE("pthread_create failed, ret = %d", ret); 129 exited = true; 130 } 131 pthread_attr_destroy(&attr_); 132 } 133 Join()134 void Join() 135 { 136 if (tid > 0) { 137 pthread_join(thread_, nullptr); 138 } 139 tid = -1; 140 } 141 Detach()142 void Detach() 143 { 144 if (tid > 0) { 145 pthread_detach(thread_); 146 } else { 147 FFRT_LOGD("qos %d thread not joinable.", qos()); 148 } 149 tid = -1; 150 } 151 GetThread()152 pthread_t& GetThread() 153 { 154 return this->thread_; 155 } 156 #else 157 template <typename F, typename... Args> Start(F && f,Args &&...args)158 void Start(F&& f, Args&&... args) 159 { 160 auto wrap = [&](Args&&... args) { 161 NativeConfig(); 162 return f(args...); 163 }; 164 thread = std::thread(wrap, args...); 165 } 166 Join()167 void Join() 168 { 169 if (thread.joinable()) { 170 thread.join(); 171 } 172 tid = -1; 173 } 174 Detach()175 void Detach() 176 { 177 if (thread.joinable()) { 178 thread.detach(); 179 } else { 180 FFRT_LOGD("qos %d thread not joinable\n", qos()); 181 } 182 tid = -1; 183 } 184 GetThread()185 pthread_t GetThread() 186 { 187 return this->thread.native_handle(); 188 } 189 #endif 190 191 void WorkerSetup(WorkerThread* wthread); 192 void NativeConfig(); 193 void* worker_mgr; 194 195 private: 196 std::atomic_bool exited; 197 std::atomic_bool idle; 198 199 std::atomic<pid_t> tid; 200 201 QoS qos; 202 #ifdef FFRT_PTHREAD_ENABLE 203 pthread_t thread_{0}; 204 pthread_attr_t attr_; 205 #else 206 std::thread thread; 207 #endif 208 #ifdef FFRT_WORKERS_DYNAMIC_SCALING 209 unsigned int domain_id; 210 #endif 211 }; 212 void SetThreadAttr(WorkerThread* thread, const QoS& qos); 213 } // namespace ffrt 214 #endif 215