1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef DFSU_ACTOR_H
17 #define DFSU_ACTOR_H
18 
19 #include <algorithm>
20 #include <future>
21 #include <list>
22 #include <stdexcept>
23 
24 #include "dfsu_cmd.h"
25 #include "dfsu_thread_safe_queue.h"
26 
27 namespace OHOS {
28 namespace Storage {
29 namespace DistributedFile {
30 constexpr int RETRY_SLEEP_TIME = 1500;
31 /**
32  * @brief  The Actor Model
33  *
34  * @tparam Ctx Context for Commands
35  */
36 template<typename Ctx>
37 class DfsuActor {
38 public:
39     explicit DfsuActor(Ctx *ctx, uint32_t startCmdTryTimes = 1) : ctx_(ctx), retryTimes_(startCmdTryTimes) {}
~DfsuActor()40     virtual ~DfsuActor() {}
41 
StartActor()42     void StartActor()
43     {
44         // Always insert cmds b4 starting an actor
45         StartCtx();
46         loop_ = std::thread([this]() { this->Main(); });
47     }
48 
StopActor()49     void StopActor()
50     {
51         // Always insert cmds b4 starting an actor
52         StopCtx();
53         pendingCmds_.Halt();
54 
55         loop_.join();
56     }
57 
Recv(std::unique_ptr<VirtualCmd<Ctx>> pcmd)58     void Recv(std::unique_ptr<VirtualCmd<Ctx>> pcmd)
59     {
60         pendingCmds_.Push(std::move(pcmd));
61     }
62 
63 protected:
64     DfsuThreadSafeQueue<VirtualCmd<Ctx>> pendingCmds_;
65 
66     Ctx *ctx_ {nullptr};
67     uint32_t retryTimes_ {1};
68     std::thread loop_;
69 
70     std::list<std::future<void>> retryTasks;
71 
72 private:
DelayRetry(std::unique_ptr<VirtualCmd<Ctx>> cmd)73     void DelayRetry(std::unique_ptr<VirtualCmd<Ctx>> cmd)
74     {
75         using namespace std::literals::chrono_literals;
76         std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_SLEEP_TIME));
77         pendingCmds_.Push(std::move(cmd));
78     }
79 
Retry(std::unique_ptr<VirtualCmd<Ctx>> cmd)80     void Retry(std::unique_ptr<VirtualCmd<Ctx>> cmd)
81     {
82         pendingCmds_.PushFront(std::move(cmd));
83         using namespace std::literals::chrono_literals;
84         std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_SLEEP_TIME));
85     }
86 
StartCtx()87     void StartCtx()
88     {
89         auto startCmd = std::make_unique<DfsuCmd<Ctx>>(&Ctx::Start);
90         startCmd->UpdateOption({.importance_ = CmdImportance::SUBVITAL, .tryTimes_ = retryTimes_});
91         pendingCmds_.Push(std::move(startCmd));
92     }
93 
StopCtx()94     void StopCtx()
95     {
96         auto cmd = std::make_unique<DfsuCmd<Ctx>>(&Ctx::Stop);
97         cmd->UpdateOption({.importance_ = CmdImportance::VITAL, .tryTimes_ = 1});
98         pendingCmds_.Push(std::move(cmd));
99     }
100 
IsExistStopTask()101     bool IsExistStopTask()
102     {
103         bool result = false;
104         pendingCmds_.ForEach([&result](const std::unique_ptr<VirtualCmd<Ctx>> &item) {
105             if (item->option_.importance_ == CmdImportance::VITAL) {
106                 result = true;
107             }
108         });
109         return result;
110     }
111 
ExceptionHandler(const DfsuException & e,std::unique_ptr<VirtualCmd<Ctx>> & currentCmd)112     bool ExceptionHandler(const DfsuException &e, std::unique_ptr<VirtualCmd<Ctx>> &currentCmd)
113     {
114         if (e.code() == ERR_UTILS_ACTOR_QUEUE_STOP) {
115             return false;
116         }
117 
118         const CmdOptions &op = currentCmd->option_;
119 
120         if (IsExistStopTask() && (op.tryTimes_ > 0)) {
121             return true; // exist stop Task, stop retry
122         }
123 
124         if (op.importance_ == CmdImportance::TRIVIAL) {
125             if (op.tryTimes_) {
126                 retryTasks.emplace_back(
127                     std::async(std::launch::async, &DfsuActor<Ctx>::DelayRetry, this, std::move(currentCmd)));
128                 return true;
129             }
130         } else {
131             if (op.tryTimes_) {
132                 Retry(std::move(currentCmd));
133                 return true;
134             }
135             if (op.importance_ == CmdImportance::VITAL) {
136                 return false;
137             }
138             if (op.importance_ == CmdImportance::NORMAL) {
139                 StopCtx();
140                 StartCtx();
141                 return true;
142             }
143         }
144         return true;
145     }
146 
Main()147     void Main()
148     {
149         while (true) {
150             std::unique_ptr<VirtualCmd<Ctx>> currentCmd;
151             try {
152                 while (true) {
153                     currentCmd = pendingCmds_.WaitAndPop();
154                     (*currentCmd)(ctx_);
155                     currentCmd.release();
156                 }
157             } catch (const DfsuException &e) {
158                 if (ExceptionHandler(e, currentCmd)) {
159                     continue;
160                 } else {
161                     break;
162                 }
163             } catch (const std::exception &e) {
164                 LOGE("Unexpected Low Level exception");
165             }
166         }
167     }
168 };
169 } // namespace DistributedFile
170 } // namespace Storage
171 } // namespace OHOS
172 #endif // DFSU_ACTOR_H
173