1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef DFSU_ACTOR_H 17 #define DFSU_ACTOR_H 18 19 #include <algorithm> 20 #include <future> 21 #include <list> 22 #include <stdexcept> 23 24 #include "dfsu_cmd.h" 25 #include "dfsu_thread_safe_queue.h" 26 27 namespace OHOS { 28 namespace Storage { 29 namespace DistributedFile { 30 constexpr int RETRY_SLEEP_TIME = 1500; 31 /** 32 * @brief The Actor Model 33 * 34 * @tparam Ctx Context for Commands 35 */ 36 template<typename Ctx> 37 class DfsuActor { 38 public: 39 explicit DfsuActor(Ctx *ctx, uint32_t startCmdTryTimes = 1) : ctx_(ctx), retryTimes_(startCmdTryTimes) {} ~DfsuActor()40 virtual ~DfsuActor() {} 41 StartActor()42 void StartActor() 43 { 44 // Always insert cmds b4 starting an actor 45 StartCtx(); 46 loop_ = std::thread([this]() { this->Main(); }); 47 } 48 StopActor()49 void StopActor() 50 { 51 // Always insert cmds b4 starting an actor 52 StopCtx(); 53 pendingCmds_.Halt(); 54 55 loop_.join(); 56 } 57 Recv(std::unique_ptr<VirtualCmd<Ctx>> pcmd)58 void Recv(std::unique_ptr<VirtualCmd<Ctx>> pcmd) 59 { 60 pendingCmds_.Push(std::move(pcmd)); 61 } 62 63 protected: 64 DfsuThreadSafeQueue<VirtualCmd<Ctx>> pendingCmds_; 65 66 Ctx *ctx_ {nullptr}; 67 uint32_t retryTimes_ {1}; 68 std::thread loop_; 69 70 std::list<std::future<void>> retryTasks; 71 72 private: DelayRetry(std::unique_ptr<VirtualCmd<Ctx>> cmd)73 void DelayRetry(std::unique_ptr<VirtualCmd<Ctx>> cmd) 74 { 75 using namespace std::literals::chrono_literals; 76 std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_SLEEP_TIME)); 77 pendingCmds_.Push(std::move(cmd)); 78 } 79 Retry(std::unique_ptr<VirtualCmd<Ctx>> cmd)80 void Retry(std::unique_ptr<VirtualCmd<Ctx>> cmd) 81 { 82 pendingCmds_.PushFront(std::move(cmd)); 83 using namespace std::literals::chrono_literals; 84 std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_SLEEP_TIME)); 85 } 86 StartCtx()87 void StartCtx() 88 { 89 auto startCmd = std::make_unique<DfsuCmd<Ctx>>(&Ctx::Start); 90 startCmd->UpdateOption({.importance_ = CmdImportance::SUBVITAL, .tryTimes_ = retryTimes_}); 91 pendingCmds_.Push(std::move(startCmd)); 92 } 93 StopCtx()94 void StopCtx() 95 { 96 auto cmd = std::make_unique<DfsuCmd<Ctx>>(&Ctx::Stop); 97 cmd->UpdateOption({.importance_ = CmdImportance::VITAL, .tryTimes_ = 1}); 98 pendingCmds_.Push(std::move(cmd)); 99 } 100 IsExistStopTask()101 bool IsExistStopTask() 102 { 103 bool result = false; 104 pendingCmds_.ForEach([&result](const std::unique_ptr<VirtualCmd<Ctx>> &item) { 105 if (item->option_.importance_ == CmdImportance::VITAL) { 106 result = true; 107 } 108 }); 109 return result; 110 } 111 ExceptionHandler(const DfsuException & e,std::unique_ptr<VirtualCmd<Ctx>> & currentCmd)112 bool ExceptionHandler(const DfsuException &e, std::unique_ptr<VirtualCmd<Ctx>> ¤tCmd) 113 { 114 if (e.code() == ERR_UTILS_ACTOR_QUEUE_STOP) { 115 return false; 116 } 117 118 const CmdOptions &op = currentCmd->option_; 119 120 if (IsExistStopTask() && (op.tryTimes_ > 0)) { 121 return true; // exist stop Task, stop retry 122 } 123 124 if (op.importance_ == CmdImportance::TRIVIAL) { 125 if (op.tryTimes_) { 126 retryTasks.emplace_back( 127 std::async(std::launch::async, &DfsuActor<Ctx>::DelayRetry, this, std::move(currentCmd))); 128 return true; 129 } 130 } else { 131 if (op.tryTimes_) { 132 Retry(std::move(currentCmd)); 133 return true; 134 } 135 if (op.importance_ == CmdImportance::VITAL) { 136 return false; 137 } 138 if (op.importance_ == CmdImportance::NORMAL) { 139 StopCtx(); 140 StartCtx(); 141 return true; 142 } 143 } 144 return true; 145 } 146 Main()147 void Main() 148 { 149 while (true) { 150 std::unique_ptr<VirtualCmd<Ctx>> currentCmd; 151 try { 152 while (true) { 153 currentCmd = pendingCmds_.WaitAndPop(); 154 (*currentCmd)(ctx_); 155 currentCmd.release(); 156 } 157 } catch (const DfsuException &e) { 158 if (ExceptionHandler(e, currentCmd)) { 159 continue; 160 } else { 161 break; 162 } 163 } catch (const std::exception &e) { 164 LOGE("Unexpected Low Level exception"); 165 } 166 } 167 } 168 }; 169 } // namespace DistributedFile 170 } // namespace Storage 171 } // namespace OHOS 172 #endif // DFSU_ACTOR_H 173