1 /*
2  * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "pipeline.h"
16 #include "audit.h"
17 #include "file_util.h"
18 #include "hiview_logger.h"
19 #include "thread_util.h"
20 #include "time_util.h"
21 namespace OHOS {
22 namespace HiviewDFX {
23 DEFINE_LOG_TAG("HiView-Pipeline");
OnRepack()24 void PipelineEvent::OnRepack()
25 {
26     startDeliver_ = false;
27     ResetTimestamp();
28 }
29 
OnPending()30 void PipelineEvent::OnPending()
31 {
32     hasPending_ = true;
33 }
34 
OnContinue()35 bool PipelineEvent::OnContinue()
36 {
37     if ((!hasFinish_) && processors_.empty()) {
38         return OnFinish();
39     }
40 
41     // once the event start delivering
42     // the call OnContinue means one has done the processing of the event
43     // this may be called by upstream event processor or the framework
44     if (Audit::IsEnabled() && startDeliver_) {
45         Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_HANDLE_OUT,
46             createTime_, std::to_string(Thread::GetTid()));
47     }
48 
49     // the framework will call OnContinue when the event is assigned to a pipeline
50     if (!startDeliver_) {
51         startDeliver_ = true;
52     }
53 
54     std::weak_ptr<Plugin> plugin = processors_.front();
55     processors_.pop_front();
56     if (auto pluginPtr = plugin.lock()) {
57         if (!pluginPtr->CanProcessMoreEvents()) {
58             if (handler_ != nullptr) {
59                 handler_->PauseDispatch(plugin);
60             }
61         }
62 
63         if (Audit::IsEnabled()) {
64             Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_HANDLE_IN, createTime_,
65                                    pluginPtr->GetHandlerInfo());
66         }
67 
68         if (!pluginPtr->IsInterestedPipelineEvent(shared_from_this())) {
69             if ((!HasFinish() && !HasPending())) {
70                 return OnContinue();
71             }
72             return true;
73         }
74 
75         if (auto workLoop = pluginPtr->GetWorkLoop()) {
76             workLoop->AddEvent(pluginPtr, shared_from_this());
77         } else {
78             pluginPtr->OnEventProxy(shared_from_this());
79         }
80     } else {
81         return OnContinue();
82     }
83     return true;
84 }
85 
OnFinish()86 bool PipelineEvent::OnFinish()
87 {
88     {
89         uint64_t nowTime = TimeUtil::GenerateTimestamp();
90         processTime_ = nowTime > createTime_ ? (nowTime - createTime_) : 0;
91     }
92     if (handler_ != nullptr) {
93         handler_->Recycle(this);
94     }
95 
96     hasFinish_ = true;
97     if (Audit::IsEnabled()) {
98         Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_HANDLE_OUT,
99             createTime_, std::to_string(Thread::GetTid()));
100         Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_DONE, createTime_, pipelineName_);
101     }
102     return true;
103 }
104 
GetPendingProcessorSize()105 uint32_t PipelineEvent::GetPendingProcessorSize()
106 {
107     return processors_.size();
108 }
109 
SetPipelineInfo(const std::string & pipelineName,std::list<std::weak_ptr<Plugin>> & processors)110 void PipelineEvent::SetPipelineInfo(const std::string& pipelineName, std::list<std::weak_ptr<Plugin>>& processors)
111 {
112     pipelineName_ = pipelineName;
113     processors_ = processors;
114 }
115 
GetPipelineInfo()116 std::string PipelineEvent::GetPipelineInfo()
117 {
118     return pipelineName_;
119 }
120 
FillPipelineInfo(std::shared_ptr<Plugin> caller,const std::string & pipelineName,std::shared_ptr<PipelineEvent> event,bool deliverFromCurrent)121 void PipelineEvent::FillPipelineInfo(std::shared_ptr<Plugin> caller, const std::string& pipelineName,
122                                      std::shared_ptr<PipelineEvent> event, bool deliverFromCurrent)
123 {
124     if (caller == nullptr || event == nullptr || caller->GetHiviewContext() == nullptr) {
125         return;
126     }
127 
128     auto seq = caller->GetHiviewContext()->GetPipelineSequenceByName(pipelineName);
129     if (deliverFromCurrent) {
130         while (!seq.empty()) {
131             auto& plugin = seq.front();
132             if (auto pluginPtr = plugin.lock()) {
133                 if (pluginPtr->GetName() == caller->GetName()) {
134                     break;
135                 }
136             }
137             seq.pop_front();
138         }
139     }
140     event->SetPipelineInfo(pipelineName, seq);
141 }
142 
CanProcessEvent(std::shared_ptr<PipelineEvent> event)143 bool Pipeline::CanProcessEvent(std::shared_ptr<PipelineEvent> event)
144 {
145     if (processors_.empty()) {
146         HIVIEW_LOGI("no processor in this pipeline.");
147         return false;
148     }
149 
150     std::weak_ptr<Plugin> plugin = processors_.front();
151     if (auto pluginPtr = plugin.lock()) {
152         return pluginPtr->CanProcessEvent(std::dynamic_pointer_cast<Event>(event));
153     }
154     return false;
155 }
156 
ProcessEvent(std::shared_ptr<PipelineEvent> event)157 void Pipeline::ProcessEvent(std::shared_ptr<PipelineEvent> event)
158 {
159     event->SetPipelineInfo(name_, processors_);
160     event->OnContinue();
161 }
162 
AppendProcessor(std::weak_ptr<Plugin> plugin)163 void Pipeline::AppendProcessor(std::weak_ptr<Plugin> plugin)
164 {
165     processors_.push_back(plugin);
166 }
167 
RemoveProcessor(std::weak_ptr<Plugin> plugin)168 void Pipeline::RemoveProcessor(std::weak_ptr<Plugin> plugin)
169 {
170     processors_.remove_if([plugin](std::weak_ptr<Plugin> wp) {
171         std::shared_ptr<Plugin> cur = plugin.lock();
172         std::shared_ptr<Plugin> sp = wp.lock();
173         if (cur != nullptr && sp != nullptr) {
174             return cur == sp;
175         }
176         return false;
177     });
178 }
179 } // namespace HiviewDFX
180 } // namespace OHOS
181