1 /*
2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "video_job_repository.h"
17
18 namespace OHOS {
19 namespace CameraStandard {
20 namespace DeferredProcessing {
VideoJobRepository(const int32_t userId)21 VideoJobRepository::VideoJobRepository(const int32_t userId) : userId_(userId)
22 {
23 DP_DEBUG_LOG("entered, userid: %{public}d", userId_);
24 jobQueue_ = std::make_shared<VideoJobQueue>([] (DeferredVideoJobPtr a, DeferredVideoJobPtr b) {return *a > *b;});
25 }
26
~VideoJobRepository()27 VideoJobRepository::~VideoJobRepository()
28 {
29 DP_DEBUG_LOG("entered, userid: %{public}d", userId_);
30 ClearCatch();
31 }
32
AddVideoJob(const std::string & videoId,const sptr<IPCFileDescriptor> & srcFd,const sptr<IPCFileDescriptor> & dstFd)33 void VideoJobRepository::AddVideoJob(const std::string& videoId,
34 const sptr<IPCFileDescriptor>& srcFd, const sptr<IPCFileDescriptor>& dstFd)
35 {
36 std::lock_guard<std::recursive_mutex> lock(mutex_);
37 DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
38 DP_CHECK_RETURN_LOG(jobPtrFind != nullptr, "already existed, videoId: %{public}s", videoId.c_str());
39
40 DeferredVideoJobPtr jobPtr = std::make_shared<DeferredVideoJob>(videoId, srcFd, dstFd);
41 jobPtr->SetJobStatus(VideoJobStatus::PENDING);
42 jobMap_.emplace(videoId, jobPtr);
43 jobQueue_->Push(jobPtr);
44 DP_INFO_LOG("add video job size: %{public}d, videoId: %{public}s, srcFd: %{public}d",
45 static_cast<int>(jobQueue_->GetSize()), videoId.c_str(), srcFd->GetFd());
46 }
47
RemoveVideoJob(const std::string & videoId,bool restorable)48 bool VideoJobRepository::RemoveVideoJob(const std::string& videoId, bool restorable)
49 {
50 DP_INFO_LOG("entered, videoId: %{public}s, restorable: %{public}d", videoId.c_str(), restorable);
51 std::lock_guard<std::recursive_mutex> lock(mutex_);
52 DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
53 bool isNeedStop = false;
54 DP_CHECK_RETURN_RET_LOG(jobPtrFind == nullptr, isNeedStop,
55 "does not existed, videoId: %{public}s", videoId.c_str());
56
57 isNeedStop = jobPtrFind->GetCurStatus() == VideoJobStatus::RUNNING;
58 if (!restorable) {
59 DP_INFO_LOG("remove video job size: %{public}d, videoId: %{public}s",
60 static_cast<int>(jobQueue_->GetSize()), videoId.c_str());
61 jobMap_.erase(videoId);
62 jobQueue_->Remove(jobPtrFind);
63 }
64 bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::DELETED);
65 UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
66 return isNeedStop;
67 }
68
RestoreVideoJob(const std::string & videoId)69 void VideoJobRepository::RestoreVideoJob(const std::string& videoId)
70 {
71 DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
72 std::lock_guard<std::recursive_mutex> lock(mutex_);
73 DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
74 DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
75
76 bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::PENDING);
77 DP_CHECK_EXECUTE(statusChanged, jobQueue_->Update(jobPtrFind));
78 }
79
SetJobPending(const std::string & videoId)80 void VideoJobRepository::SetJobPending(const std::string& videoId)
81 {
82 DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
83 std::lock_guard<std::recursive_mutex> lock(mutex_);
84 DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
85 DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
86
87 bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::PENDING);
88 UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
89 NotifyJobChangedUnLocked(statusChanged, jobPtrFind);
90 }
91
SetJobRunning(const std::string & videoId)92 void VideoJobRepository::SetJobRunning(const std::string& videoId)
93 {
94 DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
95 std::lock_guard<std::recursive_mutex> lock(mutex_);
96 DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
97 DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
98
99 bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::RUNNING);
100 UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
101 }
102
SetJobCompleted(const std::string & videoId)103 void VideoJobRepository::SetJobCompleted(const std::string& videoId)
104 {
105 DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
106 std::lock_guard<std::recursive_mutex> lock(mutex_);
107 DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
108 DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
109
110 bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::COMPLETED);
111 UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
112 NotifyJobChangedUnLocked(statusChanged, jobPtrFind);
113 }
114
SetJobFailed(const std::string & videoId)115 void VideoJobRepository::SetJobFailed(const std::string& videoId)
116 {
117 DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
118 std::lock_guard<std::recursive_mutex> lock(mutex_);
119 DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
120 DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
121
122 bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::FAILED);
123 UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
124 NotifyJobChangedUnLocked(statusChanged, jobPtrFind);
125 }
126
SetJobPause(const std::string & videoId)127 void VideoJobRepository::SetJobPause(const std::string& videoId)
128 {
129 DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
130 std::lock_guard<std::recursive_mutex> lock(mutex_);
131 DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
132 DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
133
134 bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::PAUSE);
135 UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
136 }
137
SetJobError(const std::string & videoId)138 void VideoJobRepository::SetJobError(const std::string& videoId)
139 {
140 DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
141 std::lock_guard<std::recursive_mutex> lock(mutex_);
142 DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
143 DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
144
145 bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::ERROR);
146 UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
147 }
148
GetJob()149 DeferredVideoJobPtr VideoJobRepository::GetJob()
150 {
151 DP_INFO_LOG("entered, video job size: %{public}d, running num: %{public}d",
152 jobQueue_->GetSize(), static_cast<int32_t>(runningSet_.size()));
153 std::lock_guard<std::recursive_mutex> lock(mutex_);
154 auto jobPtr = jobQueue_->Peek();
155 DP_CHECK_RETURN_RET(jobPtr == nullptr || jobPtr->GetCurStatus() == VideoJobStatus::COMPLETED ||
156 jobPtr->GetCurStatus() == VideoJobStatus::ERROR, nullptr);
157
158 if (jobPtr->GetCurStatus() == VideoJobStatus::FAILED) {
159 jobPtr->SetJobStatus(VideoJobStatus::PENDING);
160 jobQueue_->Update(jobPtr);
161 }
162 return jobPtr;
163 }
164
165
GetRunningJobCounts()166 int32_t VideoJobRepository::GetRunningJobCounts()
167 {
168 std::lock_guard<std::recursive_mutex> lock(mutex_);
169 DP_DEBUG_LOG("video running jobs num: %{public}d", static_cast<int32_t>(runningSet_.size()));
170 return static_cast<int32_t>(runningSet_.size());
171 }
172
GetRunningJobList(std::vector<std::string> & list)173 void VideoJobRepository::GetRunningJobList(std::vector<std::string>& list)
174 {
175 std::lock_guard<std::recursive_mutex> lock(mutex_);
176 DP_DEBUG_LOG("video running jobs num: %{public}d", static_cast<int32_t>(runningSet_.size()));
177 list.clear();
178 list.reserve(runningSet_.size());
179 for (auto& item : runningSet_) {
180 list.emplace_back(item);
181 }
182 }
183
RegisterJobListener(const std::weak_ptr<IVideoJobRepositoryListener> & listener)184 void VideoJobRepository::RegisterJobListener(const std::weak_ptr<IVideoJobRepositoryListener>& listener)
185 {
186 DP_INFO_LOG("entered");
187 jobListener_ = listener;
188 }
189
GetJobUnLocked(const std::string & videoId)190 DeferredVideoJobPtr VideoJobRepository::GetJobUnLocked(const std::string& videoId)
191 {
192 DeferredVideoJobPtr jobPtr = nullptr;
193 if (jobMap_.count(videoId) == 1) {
194 DP_DEBUG_LOG("video job, videoId: %{public}s", videoId.c_str());
195 jobPtr = jobMap_.find(videoId)->second;
196 }
197 return jobPtr;
198 }
199
NotifyJobChangedUnLocked(bool statusChanged,DeferredVideoJobPtr jobPtr)200 void VideoJobRepository::NotifyJobChangedUnLocked(bool statusChanged, DeferredVideoJobPtr jobPtr)
201 {
202 DP_DEBUG_LOG("entered, statusChanged: %{public}d, videoId: %{public}s",
203 statusChanged, jobPtr->GetVideoId().c_str());
204 if (auto listenerSptr = jobListener_.lock()) {
205 listenerSptr->OnVideoJobChanged(jobPtr);
206 }
207 }
208
UpdateRunningCountUnLocked(bool statusChanged,const DeferredVideoJobPtr & jobPtr)209 void VideoJobRepository::UpdateRunningCountUnLocked(bool statusChanged, const DeferredVideoJobPtr& jobPtr)
210 {
211 DP_CHECK_EXECUTE(statusChanged, jobQueue_->Update(jobPtr));
212
213 if (statusChanged && (jobPtr->GetPreStatus() == VideoJobStatus::RUNNING)) {
214 runningSet_.erase(jobPtr->GetVideoId());
215 }
216 if (statusChanged && (jobPtr->GetCurStatus() == VideoJobStatus::RUNNING)) {
217 runningSet_.emplace(jobPtr->GetVideoId());
218 }
219 DP_INFO_LOG("video running jobs num: %{public}d, videoId: %{public}s",
220 static_cast<int32_t>(runningSet_.size()), jobPtr->GetVideoId().c_str());
221 }
222
ClearCatch()223 void VideoJobRepository::ClearCatch()
224 {
225 jobQueue_->Clear();
226 jobMap_.clear();
227 runningSet_.clear();
228 }
229 } // namespace DeferredProcessing
230 } // namespace CameraStandard
231 } // namespace OHOS