1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "capture_message.h"
16
17 namespace OHOS::Camera {
ICaptureMessage(int32_t streamId,int32_t captureId,uint64_t time,uint32_t count)18 ICaptureMessage::ICaptureMessage(int32_t streamId, int32_t captureId, uint64_t time, uint32_t count)
19 {
20 peerMessageCount_ = count;
21 timestamp_ = time;
22 streamId_ = streamId;
23 captureId_ = captureId;
24 }
25
GetPeerMessageCount() const26 uint32_t ICaptureMessage::GetPeerMessageCount() const
27 {
28 return peerMessageCount_;
29 }
30
GetTimestamp() const31 uint64_t ICaptureMessage::GetTimestamp() const
32 {
33 return timestamp_;
34 }
35
GetStreamId() const36 int32_t ICaptureMessage::GetStreamId() const
37 {
38 return streamId_;
39 }
40
GetCaptureId() const41 int32_t ICaptureMessage::GetCaptureId() const
42 {
43 return captureId_;
44 }
45
GetMessageType() const46 CaptureMessageType ICaptureMessage::GetMessageType() const
47 {
48 return CAPTURE_MESSAGE_TYPE_INVALID;
49 }
50
CaptureMessageOperator(MessageOperatorFunc f)51 CaptureMessageOperator::CaptureMessageOperator(MessageOperatorFunc f)
52 {
53 messageOperator_ = f;
54 }
55
~CaptureMessageOperator()56 CaptureMessageOperator::~CaptureMessageOperator()
57 {
58 {
59 std::unique_lock<std::mutex> l(lock_);
60 running_ = false;
61 cv_.notify_one();
62 }
63
64 if (messageHandler_ != nullptr) {
65 messageHandler_->join();
66 }
67 messageBox_.clear();
68 }
69
SendMessage(std::shared_ptr<ICaptureMessage> & message)70 void CaptureMessageOperator::SendMessage(std::shared_ptr<ICaptureMessage>& message)
71 {
72 if (message == nullptr) {
73 CAMERA_LOGE("message is nullptr");
74 return;
75 }
76
77 CAMERA_LOGV("%{public}s, %{public}d, enter", __FUNCTION__, __LINE__);
78 std::unique_lock<std::mutex> l(lock_);
79 CAMERA_LOGV("%{public}s, %{public}d, enter", __FUNCTION__, __LINE__);
80 auto it = messageBox_.find(message->GetMessageType());
81 if (it == messageBox_.end()) {
82 messageBox_[message->GetMessageType()] = {{message}};
83 if (message->GetPeerMessageCount() == 1) {
84 wakeup_ = true;
85 cv_.notify_one();
86 }
87 } else {
88 bool isPeerMessage = false;
89 for (auto& mit : it->second) {
90 if (mit.empty()) {
91 continue;
92 }
93 if (mit[0] == nullptr) {
94 CAMERA_LOGE("unknown error, message is null");
95 continue;
96 }
97 if (message->GetTimestamp() == mit[0]->GetTimestamp() &&
98 message->GetMessageType() == mit[0]->GetMessageType()) {
99 mit.emplace_back(message);
100 if (mit.size() == mit[0]->GetPeerMessageCount()) {
101 wakeup_ = true;
102 cv_.notify_one();
103 }
104 isPeerMessage = true;
105 break;
106 }
107 }
108 if (!isPeerMessage) {
109 MessageGroup mg = {message};
110 it->second.emplace_back(mg);
111 if (message->GetPeerMessageCount() == 1) {
112 wakeup_ = true;
113 cv_.notify_one();
114 }
115 }
116 }
117 CAMERA_LOGV("%{public}s, %{public}d, enter", __FUNCTION__, __LINE__);
118
119 return;
120 }
121
StartProcess()122 RetCode CaptureMessageOperator::StartProcess()
123 {
124 {
125 std::unique_lock<std::mutex> l(lock_);
126 running_ = true;
127 }
128
129 messageHandler_ = std::make_unique<std::thread>([this]() {
130 prctl(PR_SET_NAME, "MessageOperator");
131 while (true) {
132 {
133 std::unique_lock<std::mutex> l(lock_);
134 if (!running_) {
135 break;
136 }
137 }
138 HandleMessage();
139 }
140 });
141 if (messageHandler_ == nullptr) {
142 return RC_ERROR;
143 }
144 return RC_OK;
145 }
146
HandleMessage()147 void CaptureMessageOperator::HandleMessage()
148 {
149 {
150 std::unique_lock<std::mutex> l(lock_);
151 cv_.wait(l, [this] { return !running_ || wakeup_; });
152 wakeup_ = false;
153 if (!running_) {
154 return;
155 }
156 }
157
158 std::list<MessageGroup> messages = {};
159 {
160 CAMERA_LOGV("%{public}s, %{public}d, enter", __FUNCTION__, __LINE__);
161 std::unique_lock<std::mutex> l(lock_);
162 for (auto& lit : messageBox_) {
163 for (auto& vit : lit.second) {
164 if (vit.empty()) {
165 continue;
166 }
167
168 if (vit.size() == vit[0]->GetPeerMessageCount()) {
169 messages.emplace_back(vit);
170 vit.clear();
171 }
172 }
173 for (auto it = lit.second.begin(); it != lit.second.end();) {
174 if (it->empty()) {
175 it = lit.second.erase(it);
176 continue;
177 }
178 it++;
179 }
180 }
181 CAMERA_LOGV("%{public}s, %{public}d, enter", __FUNCTION__, __LINE__);
182 }
183 for (auto it = messages.begin(); it != messages.end();) {
184 messageOperator_(*it);
185 it = messages.erase(it);
186 }
187 return;
188 }
189 }
190 // namespace OHOS::Camera
191