1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "capture_message.h"
16 
17 namespace OHOS::Camera {
ICaptureMessage(int32_t streamId,int32_t captureId,uint64_t time,uint32_t count)18 ICaptureMessage::ICaptureMessage(int32_t streamId, int32_t captureId, uint64_t time, uint32_t count)
19 {
20     peerMessageCount_ = count;
21     timestamp_ = time;
22     streamId_ = streamId;
23     captureId_ = captureId;
24 }
25 
GetPeerMessageCount() const26 uint32_t ICaptureMessage::GetPeerMessageCount() const
27 {
28     return peerMessageCount_;
29 }
30 
GetTimestamp() const31 uint64_t ICaptureMessage::GetTimestamp() const
32 {
33     return timestamp_;
34 }
35 
GetStreamId() const36 int32_t ICaptureMessage::GetStreamId() const
37 {
38     return streamId_;
39 }
40 
GetCaptureId() const41 int32_t ICaptureMessage::GetCaptureId() const
42 {
43     return captureId_;
44 }
45 
GetMessageType() const46 CaptureMessageType ICaptureMessage::GetMessageType() const
47 {
48     return CAPTURE_MESSAGE_TYPE_INVALID;
49 }
50 
CaptureMessageOperator(MessageOperatorFunc f)51 CaptureMessageOperator::CaptureMessageOperator(MessageOperatorFunc f)
52 {
53     messageOperator_ = f;
54 }
55 
~CaptureMessageOperator()56 CaptureMessageOperator::~CaptureMessageOperator()
57 {
58     {
59         std::unique_lock<std::mutex> l(lock_);
60         running_ = false;
61         cv_.notify_one();
62     }
63 
64     if (messageHandler_ != nullptr) {
65         messageHandler_->join();
66     }
67     messageBox_.clear();
68 }
69 
SendMessage(std::shared_ptr<ICaptureMessage> & message)70 void CaptureMessageOperator::SendMessage(std::shared_ptr<ICaptureMessage>& message)
71 {
72     if (message == nullptr) {
73         CAMERA_LOGE("message is nullptr");
74         return;
75     }
76 
77     CAMERA_LOGV("%{public}s, %{public}d, enter", __FUNCTION__, __LINE__);
78     std::unique_lock<std::mutex> l(lock_);
79     CAMERA_LOGV("%{public}s, %{public}d, enter", __FUNCTION__, __LINE__);
80     auto it = messageBox_.find(message->GetMessageType());
81     if (it == messageBox_.end()) {
82         messageBox_[message->GetMessageType()] = {{message}};
83         if (message->GetPeerMessageCount() == 1) {
84             wakeup_ = true;
85             cv_.notify_one();
86         }
87     } else {
88         bool isPeerMessage = false;
89         for (auto& mit : it->second) {
90             if (mit.empty()) {
91                 continue;
92             }
93             if (mit[0] == nullptr) {
94                 CAMERA_LOGE("unknown error, message is null");
95                 continue;
96             }
97             if (message->GetTimestamp() == mit[0]->GetTimestamp() &&
98                 message->GetMessageType() == mit[0]->GetMessageType()) {
99                 mit.emplace_back(message);
100                 if (mit.size() == mit[0]->GetPeerMessageCount()) {
101                     wakeup_ = true;
102                     cv_.notify_one();
103                 }
104                 isPeerMessage = true;
105                 break;
106             }
107         }
108         if (!isPeerMessage) {
109             MessageGroup mg = {message};
110             it->second.emplace_back(mg);
111             if (message->GetPeerMessageCount() == 1) {
112                 wakeup_ = true;
113                 cv_.notify_one();
114             }
115         }
116     }
117     CAMERA_LOGV("%{public}s, %{public}d, enter", __FUNCTION__, __LINE__);
118 
119     return;
120 }
121 
StartProcess()122 RetCode CaptureMessageOperator::StartProcess()
123 {
124     {
125         std::unique_lock<std::mutex> l(lock_);
126         running_ = true;
127     }
128 
129     messageHandler_ = std::make_unique<std::thread>([this]() {
130         prctl(PR_SET_NAME, "MessageOperator");
131         while (true) {
132             {
133                 std::unique_lock<std::mutex> l(lock_);
134                 if (!running_) {
135                     break;
136                 }
137             }
138             HandleMessage();
139         }
140     });
141     if (messageHandler_ == nullptr) {
142         return RC_ERROR;
143     }
144     return RC_OK;
145 }
146 
HandleMessage()147 void CaptureMessageOperator::HandleMessage()
148 {
149     {
150         std::unique_lock<std::mutex> l(lock_);
151         cv_.wait(l, [this] { return !running_ || wakeup_; });
152         wakeup_ = false;
153         if (!running_) {
154             return;
155         }
156     }
157 
158     std::list<MessageGroup> messages = {};
159     {
160         CAMERA_LOGV("%{public}s, %{public}d, enter", __FUNCTION__, __LINE__);
161         std::unique_lock<std::mutex> l(lock_);
162         for (auto& lit : messageBox_) {
163             for (auto& vit : lit.second) {
164                 if (vit.empty()) {
165                     continue;
166                 }
167 
168                 if (vit.size() == vit[0]->GetPeerMessageCount()) {
169                     messages.emplace_back(vit);
170                     vit.clear();
171                 }
172             }
173             for (auto it = lit.second.begin(); it != lit.second.end();) {
174                 if (it->empty()) {
175                     it = lit.second.erase(it);
176                     continue;
177                 }
178                 it++;
179             }
180         }
181         CAMERA_LOGV("%{public}s, %{public}d, enter", __FUNCTION__, __LINE__);
182     }
183     for (auto it = messages.begin(); it != messages.end();) {
184         messageOperator_(*it);
185         it = messages.erase(it);
186     }
187     return;
188 }
189 }
190 // namespace OHOS::Camera
191