1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "output_controller.h"
17
18 #include <cstdlib>
19 #include <sys/prctl.h>
20
21 #include "av_trans_constants.h"
22 #include "av_trans_errno.h"
23
24 namespace OHOS {
25 namespace DistributedHardware {
~OutputController()26 OutputController::~OutputController()
27 {
28 ReleaseControl();
29 }
30
PushData(std::shared_ptr<Plugin::Buffer> & data)31 void OutputController::PushData(std::shared_ptr<Plugin::Buffer>& data)
32 {
33 int64_t pushTime = GetCurrentTime();
34 TRUE_RETURN((GetControlStatus() != ControlStatus::START),
35 "Control status wrong, push data failed.");
36 {
37 std::lock_guard<std::mutex> lock(queueMutex_);
38 if (dataQueue_.size() > QUEUE_MAX_SIZE) {
39 AVTRANS_LOGE("DataQueue is greater than QUEUE_MAX_SIZE %{public}zu.", QUEUE_MAX_SIZE);
40 dataQueue_.pop();
41 }
42 CheckSyncInfo(data);
43 if (data == nullptr || data->GetBufferMeta() == nullptr) {
44 AVTRANS_LOGE("data or data getbuffermeta is nullptr");
45 return;
46 }
47 data->GetBufferMeta()->SetMeta(Tag::USER_PUSH_DATA_TIME, pushTime);
48 dataQueue_.push(data);
49 AVTRANS_LOGD("Push Data, dataQueue size is %{public}zu.", dataQueue_.size());
50 }
51 if (GetControlMode() == ControlMode::SYNC) {
52 clockCon_.notify_one();
53 }
54 controlCon_.notify_one();
55 }
56
StartControl()57 OutputController::ControlStatus OutputController::StartControl()
58 {
59 TRUE_RETURN_V_MSG_D((GetControlStatus() == ControlStatus::START), ControlStatus::STARTED,
60 "Control status is started.");
61 SetControlStatus(ControlStatus::START);
62 if (!handlerThread_) {
63 AVTRANS_LOGD("Init handler thread.");
64 handlerThread_ = std::make_unique<std::thread>(&OutputController::LooperHandle, this);
65 std::unique_lock<std::mutex> lock(handlerMutex_);
66 handlerCon_.wait(lock, [this] {
67 return handler_;
68 });
69 }
70 if (!controlThread_) {
71 AVTRANS_LOGD("Init control thread.");
72 controlThread_ = std::make_unique<std::thread>(&OutputController::LooperControl, this);
73 }
74 AVTRANS_LOGI("Start control success.");
75 return ControlStatus::START;
76 }
77
PrepareControl()78 void OutputController::PrepareControl()
79 {
80 if (GetControlMode() == ControlMode::SMOOTH) {
81 PrepareSmooth();
82 } else {
83 PrepareSync();
84 }
85 InitTimeStatistician();
86 AVTRANS_LOGI("Prepare control success.");
87 }
88
StopControl()89 OutputController::ControlStatus OutputController::StopControl()
90 {
91 TRUE_RETURN_V_MSG_D((GetControlStatus() == ControlStatus::STOP), ControlStatus::STOPPED,
92 "Control status is stopped.");
93 SetControlStatus(ControlStatus::STOP);
94 ClearQueue(dataQueue_);
95 AVTRANS_LOGI("Stop control success.");
96 return ControlStatus::STOP;
97 }
98
ReleaseControl()99 OutputController::ControlStatus OutputController::ReleaseControl()
100 {
101 TRUE_RETURN_V_MSG_D((GetControlStatus() == ControlStatus::RELEASE), ControlStatus::RELEASED,
102 "Control status is released.");
103 SetControlStatus(ControlStatus::RELEASE);
104 statistician_ = nullptr;
105 UnregisterListener();
106 controlCon_.notify_one();
107 sleepCon_.notify_one();
108 clockCon_.notify_one();
109 if (handler_ && handler_->GetEventRunner()) {
110 handler_->GetEventRunner()->Stop();
111 }
112 if (handlerThread_) {
113 handlerThread_->join();
114 handlerThread_ = nullptr;
115 }
116 if (controlThread_) {
117 controlThread_->join();
118 controlThread_ = nullptr;
119 }
120 ClearQueue(dataQueue_);
121 paramsMap_.clear();
122 AVTRANS_LOGI("Release control success.");
123 return ControlStatus::RELEASE;
124 }
125
GetParameter(Tag tag,ValueType & value)126 Status OutputController::GetParameter(Tag tag, ValueType& value)
127 {
128 {
129 std::lock_guard<std::mutex> lock(paramMapMutex_);
130 auto iter = paramsMap_.find(tag);
131 if (iter != paramsMap_.end()) {
132 value = iter->second;
133 return Status::OK;
134 }
135 return Status::ERROR_NOT_EXISTED;
136 }
137 }
138
SetParameter(Tag tag,const ValueType & value)139 Status OutputController::SetParameter(Tag tag, const ValueType& value)
140 {
141 {
142 std::lock_guard<std::mutex> lock(paramMapMutex_);
143 switch (tag) {
144 case Tag::USER_AV_SYNC_GROUP_INFO: {
145 std::string jsonStr = Plugin::AnyCast<std::string>(value);
146 AVTRANS_LOGD("Set parameter USER_AV_SYNC_GROUP_INFO: %{public}s", jsonStr.c_str());
147 break;
148 }
149 case Tag::USER_SHARED_MEMORY_FD: {
150 std::string jsonStr = Plugin::AnyCast<std::string>(value);
151 sharedMem_ = UnmarshalSharedMemory(jsonStr);
152 AVTRANS_LOGD("Set parameter USER_SHARED_MEMORY_FD: %{public}s, unmarshal sharedMem fd: %{public}d, "
153 "size: %{public}d, name: %{public}s", jsonStr.c_str(), sharedMem_.fd, sharedMem_.size,
154 sharedMem_.name.c_str());
155 break;
156 }
157 case Tag::USER_TIME_SYNC_RESULT: {
158 std::string jsonStr = Plugin::AnyCast<std::string>(value);
159 int32_t devClockDiff = atoi(jsonStr.c_str());
160 SetDevClockDiff(devClockDiff);
161 AVTRANS_LOGD("Set parameter USER_TIME_SYNC_RESULT: %{public}s, devClockDiff is %{public}d.",
162 jsonStr.c_str(), devClockDiff);
163 break;
164 }
165 default:
166 AVTRANS_LOGE("Invalid tag.");
167 }
168 paramsMap_.insert(std::make_pair(tag, value));
169 return Status::OK;
170 }
171 }
172
PrepareSync()173 void OutputController::PrepareSync()
174 {
175 AVTRANS_LOGI("Prepare sync.");
176 SetProcessDynamicBalanceState(true);
177 SetTimeInitState(false);
178 SetBaselineInitState(false);
179 SetAllowControlState(true);
180 SetBufferTime(0);
181 SetAdjustSleepFactor(0);
182 SetWaitClockThre(0);
183 SetTrackClockThre(0);
184 SetSleepThre(0);
185 SetAudioFrontTime(0);
186 SetAudioBackTime(0);
187 SetVideoFrontTime(0);
188 SetVideoBackTime(0);
189 }
190
PrepareSmooth()191 void OutputController::PrepareSmooth()
192 {
193 AVTRANS_LOGI("Prepare smooth.");
194 SetProcessDynamicBalanceState(false);
195 SetTimeInitState(false);
196 SetBaselineInitState(false);
197 SetAllowControlState(true);
198 SetBufferTime(0);
199 SetAdjustSleepFactor(0);
200 SetWaitClockFactor(0);
201 SetTrackClockFactor(0);
202 SetSleepThre(0);
203 SetDynamicBalanceThre(0);
204 SetAverIntervalDiffThre(0);
205 SetPushOnceDiffThre(0);
206 SetTimeStampOnceDiffThre(0);
207 }
208
LooperHandle()209 void OutputController::LooperHandle()
210 {
211 prctl(PR_SET_NAME, OUTPUT_HANDLE_THREAD_NAME.c_str());
212 auto runner = AppExecFwk::EventRunner::Create(false);
213 {
214 std::lock_guard<std::mutex> lock(handlerMutex_);
215 handler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
216 }
217 handlerCon_.notify_one();
218 runner->Run();
219 }
220
LooperControl()221 void OutputController::LooperControl()
222 {
223 prctl(PR_SET_NAME, LOOPER_CONTROL_THREAD_NAME.c_str());
224 while (GetControlStatus() != ControlStatus::RELEASE) {
225 std::shared_ptr<Plugin::Buffer> data = nullptr;
226 {
227 std::unique_lock<std::mutex> lock(queueMutex_);
228 controlCon_.wait(lock, [this] {
229 return ((!dataQueue_.empty() && GetControlStatus() == ControlStatus::START)
230 || (GetControlStatus() == ControlStatus::RELEASE));
231 });
232 if (GetControlStatus() == ControlStatus::RELEASE) {
233 continue;
234 }
235 data = dataQueue_.front();
236 }
237 HandleControlResult(data, ControlOutput(data));
238 }
239 }
240
ControlOutput(const std::shared_ptr<Plugin::Buffer> & data)241 int32_t OutputController::ControlOutput(const std::shared_ptr<Plugin::Buffer>& data)
242 {
243 if (data == nullptr) {
244 AVTRANS_LOGE("data is nullptr");
245 return OUTPUT_FRAME;
246 }
247 enterTime_ = GetCurrentTime();
248 int64_t timeStamp = data->pts;
249 TRUE_RETURN_V_MSG_D((timeStamp == INVALID_TIMESTAMP || !CheckIsAllowControl()), OUTPUT_FRAME,
250 "Direct output.");
251 if (CheckIsClockInvalid(data) || !CheckIsProcessInDynamicBalance(data)) {
252 CalProcessTime(data);
253 RecordTime(enterTime_, timeStamp);
254 return OUTPUT_FRAME;
255 }
256 CalProcessTime(data);
257 if (!CheckIsTimeInit()) {
258 InitTime(enterTime_, timeStamp);
259 }
260 if (!CheckIsBaselineInit()) {
261 InitBaseline(timeStamp, GetClockTime());
262 }
263 CalSleepTime(timeStamp);
264 SyncClock(data);
265 RecordTime(enterTime_, timeStamp);
266 return OUTPUT_FRAME;
267 }
268
CheckSyncInfo(const std::shared_ptr<Plugin::Buffer> & data)269 void OutputController::CheckSyncInfo(const std::shared_ptr<Plugin::Buffer>& data)
270 {
271 if (data == nullptr || data->GetBufferMeta() == nullptr) {
272 AVTRANS_LOGE("data or getbuffermeta is nullptr");
273 return;
274 }
275 auto bufferMeta = data->GetBufferMeta();
276 bool isAFrameNumberExist = bufferMeta->IsExist(Tag::AUDIO_SAMPLE_PER_FRAME);
277 bool isAPtsExist = bufferMeta->IsExist(Tag::MEDIA_START_TIME);
278 if (GetControlMode() == ControlMode::SYNC && (!isAFrameNumberExist || !isAPtsExist)) {
279 ClearQueue(dataQueue_);
280 sleepCon_.notify_one();
281 clockCon_.notify_one();
282 SetControlMode(ControlMode::SMOOTH);
283 AVTRANS_LOGI("Stop sync and start smooth, aFrameNumberExist: %{public}d, aPtsExist: %{public}d.",
284 isAFrameNumberExist, isAPtsExist);
285 return;
286 }
287 if (GetControlMode() == ControlMode::SMOOTH && isAFrameNumberExist && isAPtsExist) {
288 ClearQueue(dataQueue_);
289 sleepCon_.notify_one();
290 clockCon_.notify_one();
291 SetControlMode(ControlMode::SYNC);
292 AVTRANS_LOGI("Stop smooth and start sync, aFrameNumberExist: %{public}d, aPtsExist: %{public}d.",
293 isAFrameNumberExist, isAPtsExist);
294 }
295 }
296
CalProcessTime(const std::shared_ptr<Plugin::Buffer> & data)297 void OutputController::CalProcessTime(const std::shared_ptr<Plugin::Buffer>& data)
298 {
299 if (statistician_) {
300 statistician_->CalProcessTime(data);
301 }
302 }
303
InitTimeStatistician()304 void OutputController::InitTimeStatistician()
305 {
306 if (!statistician_) {
307 AVTRANS_LOGD("Init time statistician.");
308 statistician_ = std::make_shared<TimeStatistician>();
309 }
310 statistician_->ClearStatistics();
311 AVTRANS_LOGI("Start time statistics.");
312 }
313
RecordTime(const int64_t enterTime,const int64_t timeStamp)314 void OutputController::RecordTime(const int64_t enterTime, const int64_t timeStamp)
315 {
316 lastEnterTime_ = enterTime;
317 lastTimeStamp_ = timeStamp;
318 leaveTime_ = GetCurrentTime();
319 SetTimeInitState(true);
320 }
321
InitTime(const int64_t enterTime,const int64_t timeStamp)322 void OutputController::InitTime(const int64_t enterTime, const int64_t timeStamp)
323 {
324 delta_ = 0;
325 sleep_ = 0;
326 lastEnterTime_ = enterTime;
327 lastTimeStamp_ = timeStamp;
328 leaveTime_ = enterTime;
329 SetTimeInitState(true);
330 }
331
CheckIsClockInvalid(const std::shared_ptr<Plugin::Buffer> & data)332 bool OutputController::CheckIsClockInvalid(const std::shared_ptr<Plugin::Buffer>& data)
333 {
334 if (GetControlMode() == ControlMode::SMOOTH) {
335 SetClockTime(enterTime_);
336 AVTRANS_LOGD("Control mode is smooth, clock is valid.");
337 return false;
338 }
339 int32_t ret = AcquireSyncClockTime(data);
340 if (ret == ERR_DH_AVT_MASTER_NOT_READY) {
341 AVTRANS_LOGD("Master clock not ready, wait reread clock.");
342 return WaitRereadClockFailed(data);
343 }
344 TRUE_RETURN_V_MSG_E((ret != DH_AVT_SUCCESS), true, "Read unit clock is invalid.");
345 return false;
346 }
347
AcquireSyncClockTime(const std::shared_ptr<Plugin::Buffer> & data)348 int32_t OutputController::AcquireSyncClockTime(const std::shared_ptr<Plugin::Buffer>& data)
349 {
350 if (data == nullptr || data->GetBufferMeta() == nullptr) {
351 AVTRANS_LOGE("data or getbuffermeta is nullptr");
352 return ERR_DH_AVT_INVALID_PARAM;
353 }
354 TRUE_RETURN_V_MSG_E(((sharedMem_.fd <= 0) || (sharedMem_.size <= 0) || sharedMem_.name.empty()),
355 ERR_DH_AVT_SHARED_MEMORY_FAILED, "Parameter USER_SHARED_MEMORY_FD info error.");
356 AVTransSharedMemory sharedMem;
357 sharedMem.fd = sharedMem_.fd;
358 sharedMem.size = sharedMem_.size;
359 sharedMem.name = sharedMem_.name;
360 AVSyncClockUnit clockUnit;
361 clockUnit.pts = INVALID_TIMESTAMP;
362 auto bufferMeta = data->GetBufferMeta();
363 clockUnit.frameNum = Plugin::AnyCast<uint32_t>(bufferMeta->GetMeta(Tag::AUDIO_SAMPLE_PER_FRAME));
364 int32_t ret = ReadClockUnitFromMemory(sharedMem, clockUnit);
365 if (ret == DH_AVT_SUCCESS) {
366 TRUE_RETURN_V_MSG_D((clockUnit.pts == INVALID_TIMESTAMP), ERR_DH_AVT_SHARED_MEMORY_FAILED,
367 "Read invalid clock.");
368 clockUnit_.pts = clockUnit.pts;
369 clockUnit_.frameNum = clockUnit.frameNum;
370 SetClockTime(clockUnit_.pts);
371 AVTRANS_LOGD("Acquire sync clock success, pts: %{public}lld.", clockUnit_.pts);
372 }
373 return ret;
374 }
375
WaitRereadClockFailed(const std::shared_ptr<Plugin::Buffer> & data)376 bool OutputController::WaitRereadClockFailed(const std::shared_ptr<Plugin::Buffer>& data)
377 {
378 const uint32_t halfQueueSize = QUEUE_MAX_SIZE / 2;
379 while (GetQueueSize() < QUEUE_MAX_SIZE) {
380 if (GetQueueSize() < halfQueueSize) {
381 {
382 AVTRANS_LOGD("Dataqueue size is less than half size, wait notify.");
383 std::unique_lock<std::mutex> lock(clockMutex_);
384 int64_t rereadTime = LESS_HALF_REREAD_TIME;
385 if (statistician_) {
386 rereadTime = statistician_->GetAverPushInterval() * FACTOR_DOUBLE;
387 }
388 clockCon_.wait_for(lock, std::chrono::nanoseconds(rereadTime),
389 [this] { return (GetControlStatus() != ControlStatus::START); });
390 }
391 } else if (GetQueueSize() >= halfQueueSize) {
392 {
393 AVTRANS_LOGD("Dataqueue size is greater than half size, scheduled %{public}lld query.",
394 GREATER_HALF_REREAD_TIME);
395 std::unique_lock<std::mutex> lock(clockMutex_);
396 clockCon_.wait_for(lock, std::chrono::nanoseconds(GREATER_HALF_REREAD_TIME),
397 [this] { return (GetControlStatus() != ControlStatus::START); });
398 }
399 }
400 int32_t ret = AcquireSyncClockTime(data);
401 TRUE_RETURN_V_MSG_D((ret == DH_AVT_SUCCESS || GetControlMode() == ControlMode::SMOOTH), false,
402 "Wait reread clock success.");
403 }
404 return true;
405 }
406
CheckIsProcessInDynamicBalance(const std::shared_ptr<Plugin::Buffer> & data)407 bool OutputController::CheckIsProcessInDynamicBalance(const std::shared_ptr<Plugin::Buffer>& data)
408 {
409 TRUE_RETURN_V_MSG_D((GetControlMode() == ControlMode::SYNC || GetProcessDynamicBalanceState()), true,
410 "Process in dynamic balance.");
411 if (CheckIsProcessInDynamicBalanceOnce(data)) {
412 dynamicBalanceCount_++;
413 } else {
414 dynamicBalanceCount_ = 0;
415 }
416 if (dynamicBalanceCount_ >= dynamicBalanceThre_) {
417 AVTRANS_LOGD("Process meet dynamic balance condition.");
418 {
419 std::lock_guard<std::mutex> lock(queueMutex_);
420 ClearQueue(dataQueue_);
421 }
422 SetProcessDynamicBalanceState(true);
423 return true;
424 }
425 return false;
426 }
427
CheckIsProcessInDynamicBalanceOnce(const std::shared_ptr<Plugin::Buffer> & data)428 bool OutputController::CheckIsProcessInDynamicBalanceOnce(const std::shared_ptr<Plugin::Buffer>& data)
429 {
430 TRUE_RETURN_V_MSG_E((!statistician_), false, "Statistician is nullptr.");
431 int64_t pushInterval = statistician_->GetPushInterval();
432 int64_t timeStampInterval = statistician_->GetTimeStampInterval();
433 int64_t averPushInterval = statistician_->GetAverPushInterval();
434 int64_t averTimeStamapInterval = statistician_->GetAverTimeStampInterval();
435 int64_t averIntervalDiff = averPushInterval - averTimeStamapInterval;
436 int64_t pushOnceDiff = pushInterval - averPushInterval;
437 int64_t timeStampOnceDiff = timeStampInterval - averTimeStamapInterval;
438 return (averPushInterval != 0) && (averTimeStamapInterval != 0) &&
439 (llabs(averIntervalDiff) < averIntervalDiffThre_) && (llabs(pushOnceDiff) < pushOnceDiffThre_)
440 && (llabs(timeStampOnceDiff) < timeStampOnceDiffThre_);
441 }
442
InitBaseline(const int64_t timeStampBaseline,const int64_t clockBaseline)443 void OutputController::InitBaseline(const int64_t timeStampBaseline, const int64_t clockBaseline)
444 {
445 SetTimeStampBaseline(timeStampBaseline);
446 SetClockBaseline(clockBaseline + bufferTime_);
447 SetBaselineInitState(true);
448 }
449
CalSleepTime(const int64_t timeStamp)450 void OutputController::CalSleepTime(const int64_t timeStamp)
451 {
452 int64_t interval = timeStamp - lastTimeStamp_;
453 int64_t elapse = enterTime_ - leaveTime_;
454 int64_t render = enterTime_ - lastEnterTime_;
455 int64_t delta = render - sleep_ - elapse;
456 delta_ += delta;
457 sleep_ = interval - elapse;
458 AVTRANS_LOGD("Control frame pts: %{public}lld, interval: %{public}lld, elapse: %{public}lld, render: %{public}lld,"
459 " delta: %{public}lld, delat count: %{public}lld, sleep: %{public}lld.",
460 timeStamp, interval, elapse, render, delta, delta_, sleep_);
461 TRUE_RETURN((interval == INVALID_INTERVAL), "Interverl is Invalid.");
462 const int64_t adjustThre = interval * adjustSleepFactor_;
463 if (delta_ > adjustThre && sleep_ > 0) {
464 int64_t sleep = sleep_ - adjustThre;
465 delta_ -= (sleep < 0) ? sleep_ : adjustThre;
466 sleep_ = sleep;
467 AVTRANS_LOGD("Delta greater than thre, adjust sleep to %{public}lld.", sleep_);
468 } else if (delta_ < -adjustThre) {
469 sleep_ += delta_;
470 delta_ = 0;
471 AVTRANS_LOGD("Delta less than negative thre, adjust sleep to %{public}lld.", sleep_);
472 }
473 }
474
SyncClock(const std::shared_ptr<Plugin::Buffer> & data)475 void OutputController::SyncClock(const std::shared_ptr<Plugin::Buffer>& data)
476 {
477 if (GetControlMode() == ControlMode::SMOOTH) {
478 HandleSmoothTime(data);
479 } else {
480 HandleSyncTime(data);
481 }
482 if (sleep_ > sleepThre_) {
483 sleep_ = sleepThre_;
484 AVTRANS_LOGD("Sleep is more than sleepThre %{public}lld, adjust sleep to %{public}lld", sleepThre_, sleep_);
485 }
486 if (sleep_ < 0) {
487 sleep_ = 0;
488 AVTRANS_LOGD("Sleep less than zero, adjust sleep to zero.");
489 }
490 AVTRANS_LOGD("After sync clock, sleep is %{public}lld.", sleep_);
491 {
492 std::unique_lock<std::mutex> lock(sleepMutex_);
493 sleepCon_.wait_for(lock, std::chrono::nanoseconds(sleep_),
494 [this] { return (GetControlStatus() != ControlStatus::START); });
495 }
496 }
497
HandleSmoothTime(const std::shared_ptr<Plugin::Buffer> & data)498 void OutputController::HandleSmoothTime(const std::shared_ptr<Plugin::Buffer>& data)
499 {
500 TRUE_RETURN(!statistician_, "Statistician is nullptr.");
501 int64_t interval = statistician_->GetTimeStampInterval();
502 TRUE_RETURN((interval == INVALID_INTERVAL), "Interval is Invalid.");
503 int64_t averTimeStampInterval = statistician_->GetAverTimeStampInterval();
504 TRUE_RETURN((averTimeStampInterval == INVALID_INTERVAL), "AverTimeStampInterval is Invalid.");
505 int64_t waitClockThre = averTimeStampInterval * waitClockFactor_;
506 int64_t trackClockThre = averTimeStampInterval * trackClockFactor_;
507 int64_t vTimeStamp = data->pts;
508 int64_t vcts = (sleep_ > 0) ? (vTimeStamp - sleep_) : vTimeStamp;
509 int64_t offset = vcts - timeStampBaseline_ - (GetClockTime() - clockBaseline_);
510 AVTRANS_LOGD("Smooth vTimeStamp: %{public}lld, offset: %{public}lld, averTimeStampInterval: %{public}lld,"
511 " waitClockThre: %{public}lld, trackClockThre: %{public}lld.",
512 vTimeStamp, offset, averTimeStampInterval, waitClockThre, trackClockThre);
513 if (offset > waitClockThre || offset < -trackClockThre) {
514 sleep_ += offset;
515 AVTRANS_LOGD("Smooth offset %{public}lld is over than thre, adjust sleep to %{public}lld.",
516 offset, sleep_);
517 }
518 }
519
HandleSyncTime(const std::shared_ptr<Plugin::Buffer> & data)520 void OutputController::HandleSyncTime(const std::shared_ptr<Plugin::Buffer>& data)
521 {
522 if (data == nullptr || data->GetBufferMeta() == nullptr) {
523 AVTRANS_LOGE("data or getbuffermeta is nullptr");
524 return;
525 }
526 auto bufferMeta = data->GetBufferMeta();
527 int64_t vTimeStamp = data->pts;
528 uint32_t vFrameNumber = Plugin::AnyCast<uint32_t>(bufferMeta->GetMeta(Tag::USER_FRAME_NUMBER));
529 int64_t vcts = (sleep_ > 0) ? (vTimeStamp - sleep_) : vTimeStamp;
530 int64_t aTimeStamp = clockUnit_.pts;
531 uint32_t aFrameNumber = clockUnit_.frameNum;
532 int64_t acts = GetClockTime();
533 int64_t ctsDiff = vcts - acts;
534 int64_t offset = (vcts - vFront_ - vBack_) - (acts - aFront_ - aBack_) - GetDevClockDiff();
535 AVTRANS_LOGD("Sync vTimeStamp: %{public}lld, vFrameNumber: %{public}" PRIu32 " vcts: %{public}lld,"
536 " aTimeStamp: %{public}lld, aFrameNumber: %{public}" PRIu32 " acts: %{public}lld, ctsDiff: %{public}lld,"
537 " offset: %{public}lld", vTimeStamp, vFrameNumber, vcts, aTimeStamp, aFrameNumber, acts, ctsDiff, offset);
538 const int64_t append = (trackClockThre_ + waitClockThre_) / 2;
539 if (offset > waitClockThre_) {
540 sleep_ += offset - waitClockThre_ + append;
541 AVTRANS_LOGD("Sync offset %{public}lld is over than wait thre %{public}lld, adjust sleep to %{public}lld.",
542 offset, waitClockThre_, sleep_);
543 } else if (offset < -trackClockThre_) {
544 sleep_ += offset - trackClockThre_ - append;
545 AVTRANS_LOGD("Sync offset %{public}lld is over than track thre %{public}lld, adjust sleep to %{public}lld.",
546 offset, -trackClockThre_, sleep_);
547 }
548 }
549
HandleControlResult(const std::shared_ptr<Plugin::Buffer> & data,int32_t result)550 void OutputController::HandleControlResult(const std::shared_ptr<Plugin::Buffer>& data, int32_t result)
551 {
552 switch (result) {
553 case OUTPUT_FRAME: {
554 int32_t ret = PostOutputEvent(data);
555 TRUE_RETURN((ret == HANDLE_FAILED), "Handle result OUTPUT_FRAME failed.");
556 {
557 std::lock_guard<std::mutex> lock(queueMutex_);
558 if (!dataQueue_.empty() && data == dataQueue_.front()) {
559 dataQueue_.pop();
560 }
561 }
562 break;
563 }
564 case DROP_FRAME: {
565 {
566 std::lock_guard<std::mutex> lock(queueMutex_);
567 if (!dataQueue_.empty() && data == dataQueue_.front()) {
568 dataQueue_.pop();
569 }
570 }
571 break;
572 }
573 case REPEAT_FREAM: {
574 int32_t ret = PostOutputEvent(data);
575 TRUE_RETURN((ret == HANDLE_FAILED), "Handle result REPEAT_FREAM failed.");
576 break;
577 }
578 default:
579 AVTRANS_LOGE("Invalid result.");
580 }
581 }
582
PostOutputEvent(const std::shared_ptr<Plugin::Buffer> & data)583 int32_t OutputController::PostOutputEvent(const std::shared_ptr<Plugin::Buffer>& data)
584 {
585 TRUE_RETURN_V_MSG_E((!handler_), HANDLE_FAILED, "Handler is nullptr.");
586 auto outputFunc = [this, data]() {
587 int32_t ret = NotifyOutput(data);
588 TRUE_RETURN_V_MSG_E((ret == NOTIFY_FAILED), HANDLE_FAILED, "Notify failed.");
589 return HANDLE_SUCCESS;
590 };
591 TRUE_RETURN_V_MSG_E((!handler_->PostTask(outputFunc)), HANDLE_FAILED, "Handler post task failed.");
592 return HANDLE_SUCCESS;
593 }
594
NotifyOutput(const std::shared_ptr<Plugin::Buffer> & data)595 int32_t OutputController::NotifyOutput(const std::shared_ptr<Plugin::Buffer>& data)
596 {
597 TRUE_RETURN_V_MSG_E((!listener_), NOTIFY_FAILED, "Listener is nullptr.");
598 return listener_->OnOutput(data);
599 }
600
CheckIsBaselineInit()601 bool OutputController::CheckIsBaselineInit()
602 {
603 return GetBaselineInitState();
604 }
605
CheckIsTimeInit()606 bool OutputController::CheckIsTimeInit()
607 {
608 return GetTimeInitState();
609 }
610
CheckIsAllowControl()611 bool OutputController::CheckIsAllowControl()
612 {
613 return GetAllowControlState();
614 }
615
RegisterListener(const std::shared_ptr<OutputControllerListener> & listener)616 void OutputController::RegisterListener(const std::shared_ptr<OutputControllerListener>& listener)
617 {
618 listener_ = listener;
619 }
620
UnregisterListener()621 void OutputController::UnregisterListener()
622 {
623 listener_ = nullptr;
624 }
625
SetBufferTime(const uint32_t time)626 void OutputController::SetBufferTime(const uint32_t time)
627 {
628 bufferTime_ = time;
629 }
630
GetBufferTime()631 uint32_t OutputController::GetBufferTime()
632 {
633 return bufferTime_;
634 }
635
SetTimeInitState(const bool state)636 void OutputController::SetTimeInitState(const bool state)
637 {
638 isTimeInit_.store(state);
639 }
640
GetTimeInitState()641 bool OutputController::GetTimeInitState()
642 {
643 return isTimeInit_.load();
644 }
645
SetBaselineInitState(const bool state)646 void OutputController::SetBaselineInitState(const bool state)
647 {
648 isBaselineInit_.store(state);
649 }
650
GetBaselineInitState()651 bool OutputController::GetBaselineInitState()
652 {
653 return isBaselineInit_.load();
654 }
655
SetProcessDynamicBalanceState(const bool state)656 void OutputController::SetProcessDynamicBalanceState(const bool state)
657 {
658 isInDynamicBalance_.store(state);
659 }
660
GetProcessDynamicBalanceState()661 bool OutputController::GetProcessDynamicBalanceState()
662 {
663 return isInDynamicBalance_.load();
664 }
665
SetAllowControlState(const bool state)666 void OutputController::SetAllowControlState(const bool state)
667 {
668 isAllowControl_.store(state);
669 }
670
GetAllowControlState()671 bool OutputController::GetAllowControlState()
672 {
673 return isAllowControl_.load();
674 }
675
SetClockBaseline(const int64_t clockBaseline)676 void OutputController::SetClockBaseline(const int64_t clockBaseline)
677 {
678 clockBaseline_ = clockBaseline;
679 }
680
SetTimeStampBaseline(const int64_t timeStmapBaseline)681 void OutputController::SetTimeStampBaseline(const int64_t timeStmapBaseline)
682 {
683 timeStampBaseline_ = timeStmapBaseline;
684 }
685
SetAverIntervalDiffThre(const uint32_t thre)686 void OutputController::SetAverIntervalDiffThre(const uint32_t thre)
687 {
688 averIntervalDiffThre_ = thre;
689 }
690
SetDynamicBalanceThre(const uint8_t thre)691 void OutputController::SetDynamicBalanceThre(const uint8_t thre)
692 {
693 dynamicBalanceThre_ = thre;
694 }
695
SetPushOnceDiffThre(const uint32_t thre)696 void OutputController::SetPushOnceDiffThre(const uint32_t thre)
697 {
698 pushOnceDiffThre_ = thre;
699 }
700
SetTimeStampOnceDiffThre(const uint32_t thre)701 void OutputController::SetTimeStampOnceDiffThre(const uint32_t thre)
702 {
703 timeStampOnceDiffThre_ = thre;
704 }
705
SetAdjustSleepFactor(const float factor)706 void OutputController::SetAdjustSleepFactor(const float factor)
707 {
708 adjustSleepFactor_ = factor;
709 }
710
SetWaitClockFactor(const float factor)711 void OutputController::SetWaitClockFactor(const float factor)
712 {
713 waitClockFactor_ = factor;
714 }
715
SetTrackClockFactor(const float factor)716 void OutputController::SetTrackClockFactor(const float factor)
717 {
718 trackClockFactor_ = factor;
719 }
720
SetWaitClockThre(const int64_t thre)721 void OutputController::SetWaitClockThre(const int64_t thre)
722 {
723 waitClockThre_ = thre;
724 }
725
SetTrackClockThre(const int64_t thre)726 void OutputController::SetTrackClockThre(const int64_t thre)
727 {
728 trackClockThre_ = thre;
729 }
730
SetSleepThre(const int64_t thre)731 void OutputController::SetSleepThre(const int64_t thre)
732 {
733 sleepThre_ = thre;
734 }
735
SetVideoFrontTime(const int64_t time)736 void OutputController::SetVideoFrontTime(const int64_t time)
737 {
738 vFront_ = time;
739 }
740
SetVideoBackTime(const int64_t time)741 void OutputController::SetVideoBackTime(const int64_t time)
742 {
743 vBack_ = time;
744 }
745
SetAudioFrontTime(const int64_t time)746 void OutputController::SetAudioFrontTime(const int64_t time)
747 {
748 aFront_ = time;
749 }
750
SetAudioBackTime(const int64_t time)751 void OutputController::SetAudioBackTime(const int64_t time)
752 {
753 aBack_ = time;
754 }
755
SetClockTime(const int64_t clockTime)756 void OutputController::SetClockTime(const int64_t clockTime)
757 {
758 clockTime_.store(clockTime);
759 }
760
GetClockTime()761 int64_t OutputController::GetClockTime()
762 {
763 return clockTime_.load();
764 }
765
SetDevClockDiff(int32_t diff)766 void OutputController::SetDevClockDiff(int32_t diff)
767 {
768 devClockDiff_.store(diff);
769 }
770
GetDevClockDiff()771 int32_t OutputController::GetDevClockDiff()
772 {
773 return devClockDiff_.load();
774 }
775
GetControlMode()776 OutputController::ControlMode OutputController::GetControlMode()
777 {
778 return mode_.load();
779 }
780
GetControlStatus()781 OutputController::ControlStatus OutputController::GetControlStatus()
782 {
783 return status_.load();
784 }
785
SetControlMode(ControlMode mode)786 void OutputController::SetControlMode(ControlMode mode)
787 {
788 mode_.store(mode);
789 PrepareControl();
790 }
791
SetControlStatus(ControlStatus status)792 void OutputController::SetControlStatus(ControlStatus status)
793 {
794 status_.store(status);
795 }
796
GetQueueSize()797 size_t OutputController::GetQueueSize()
798 {
799 {
800 std::lock_guard<std::mutex> lock(queueMutex_);
801 return dataQueue_.size();
802 }
803 }
804
ClearQueue(std::queue<std::shared_ptr<Plugin::Buffer>> & queue)805 void OutputController::ClearQueue(std::queue<std::shared_ptr<Plugin::Buffer>>& queue)
806 {
807 std::queue<std::shared_ptr<Plugin::Buffer>> empty;
808 swap(empty, queue);
809 }
810 } // namespace DistributedHardware
811 } // namespace OHOS