1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ObjectAssetLoader"
16
17 #include "object_asset_loader.h"
18 #include "block_data.h"
19 #include "cloud_sync_asset_manager.h"
20 #include "log_print.h"
21 #include "object_common.h"
22 #include "utils/anonymous.h"
23 #include "object_radar_reporter.h"
24 #include "distributed_file_daemon_manager.h"
25 namespace OHOS::DistributedObject {
26 using namespace OHOS::FileManagement::CloudSync;
GetInstance()27 ObjectAssetLoader *ObjectAssetLoader::GetInstance()
28 {
29 static ObjectAssetLoader *loader = new ObjectAssetLoader();
30 return loader;
31 }
32
SetThreadPool(std::shared_ptr<ExecutorPool> executors)33 void ObjectAssetLoader::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
34 {
35 executors_ = executors;
36 }
37
Transfer(const int32_t userId,const std::string & bundleName,const std::string & deviceId,const DistributedData::Asset & asset)38 bool ObjectAssetLoader::Transfer(const int32_t userId, const std::string& bundleName, const std::string& deviceId,
39 const DistributedData::Asset& asset)
40 {
41 AssetInfo assetInfo;
42 assetInfo.uri = asset.uri;
43 assetInfo.assetName = asset.name;
44 ZLOGI("Start transfer, bundleName: %{public}s, deviceId: %{public}s, assetName: %{public}s", bundleName.c_str(),
45 DistributedData::Anonymous::Change(deviceId).c_str(), assetInfo.assetName.c_str());
46 auto block = std::make_shared<BlockData<std::tuple<bool, int32_t>>>(WAIT_TIME, std::tuple{ true, OBJECT_SUCCESS });
47 auto res = CloudSyncAssetManager::GetInstance().DownloadFile(userId, bundleName, deviceId, assetInfo,
48 [block](const std::string& uri, int32_t status) {
49 block->SetValue({ false, status });
50 });
51 if (res != OBJECT_SUCCESS) {
52 ZLOGE("fail, res: %{public}d, name: %{public}s, deviceId: %{public}s, bundleName: %{public}s", res,
53 asset.name.c_str(), DistributedData::Anonymous::Change(deviceId).c_str(), bundleName.c_str());
54 return false;
55 }
56 auto [timeout, status] = block->GetValue();
57 if (timeout || status != OBJECT_SUCCESS) {
58 ZLOGE("fail, timeout: %{public}d, status: %{public}d, name: %{public}s, deviceId: %{public}s ", timeout,
59 status, asset.name.c_str(), DistributedData::Anonymous::Change(deviceId).c_str());
60 return false;
61 }
62 ZLOGD("Transfer end, bundleName: %{public}s, deviceId: %{public}s, assetName: %{public}s", bundleName.c_str(),
63 DistributedData::Anonymous::Change(deviceId).c_str(), assetInfo.assetName.c_str());
64 return true;
65 }
66
TransferAssetsAsync(const int32_t userId,const std::string & bundleName,const std::string & deviceId,const std::vector<DistributedData::Asset> & assets,const TransferFunc & callback)67 void ObjectAssetLoader::TransferAssetsAsync(const int32_t userId, const std::string& bundleName,
68 const std::string& deviceId, const std::vector<DistributedData::Asset>& assets, const TransferFunc& callback)
69 {
70 if (executors_ == nullptr) {
71 ZLOGE("executors is null, bundleName: %{public}s, deviceId: %{public}s, userId: %{public}d",
72 bundleName.c_str(), DistributedData::Anonymous::Change(deviceId).c_str(), userId);
73 callback(false);
74 return;
75 }
76 TransferTask task = { .callback = callback };
77 DistributedData::Assets downloadAssets;
78 for (auto& asset : assets) {
79 if (IsDownloaded(asset)) {
80 continue;
81 }
82 task.downloadAssets.insert(asset.uri);
83 downloadAssets.emplace_back(asset);
84 }
85 if (task.downloadAssets.empty()) {
86 callback(true);
87 }
88 tasks_.ComputeIfAbsent(++taskSeq_, [task](const uint32_t key) {
89 return task;
90 });
91 executors_->Execute([this, userId, bundleName, deviceId, downloadAssets]() {
92 bool result = true;
93 for (const auto& asset : downloadAssets) {
94 if (IsDownloaded(asset)) {
95 FinishTask(asset.uri, result);
96 continue;
97 }
98 if (IsDownloading(asset)) {
99 continue;
100 }
101 downloading_.ComputeIfAbsent(asset.uri, [asset](const std::string& key) {
102 return asset.hash;
103 });
104 auto success = Transfer(userId, bundleName, deviceId, asset);
105 if (success) {
106 std::lock_guard<std::mutex> lock(mutex_);
107 downloading_.Erase(asset.uri);
108 UpdateDownloaded(asset);
109 } else {
110 downloading_.Erase(asset.uri);
111 }
112 result &= success;
113 FinishTask(asset.uri, result);
114 }
115 });
116 }
117
FinishTask(const std::string & uri,bool result)118 void ObjectAssetLoader::FinishTask(const std::string& uri, bool result)
119 {
120 std::vector<uint32_t> finishedTasks;
121 tasks_.ForEach([&uri, &finishedTasks, result](auto& seq, auto& task) {
122 task.downloadAssets.erase(uri);
123 if (task.downloadAssets.size() == 0 && task.callback != nullptr) {
124 task.callback(result);
125 finishedTasks.emplace_back(seq);
126 }
127 return false;
128 });
129 for (auto taskId : finishedTasks) {
130 tasks_.Erase(taskId);
131 }
132 }
133
UpdateDownloaded(const DistributedData::Asset & asset)134 void ObjectAssetLoader::UpdateDownloaded(const DistributedData::Asset& asset)
135 {
136 downloaded_.ComputeIfAbsent(asset.uri, [asset](const std::string& key) {
137 return asset.hash;
138 });
139 assetQueue_.push(asset.uri);
140 if (assetQueue_.size() > LAST_DOWNLOAD_ASSET_SIZE) {
141 auto oldAsset = assetQueue_.front();
142 assetQueue_.pop();
143 downloaded_.Erase(oldAsset);
144 }
145 }
146
IsDownloading(const DistributedData::Asset & asset)147 bool ObjectAssetLoader::IsDownloading(const DistributedData::Asset& asset)
148 {
149 auto [success, hash] = downloading_.Find(asset.uri);
150 if (success && hash == asset.hash) {
151 ZLOGD("asset is downloading. assetName:%{public}s", asset.name.c_str());
152 return true;
153 }
154 return false;
155 }
156
IsDownloaded(const DistributedData::Asset & asset)157 bool ObjectAssetLoader::IsDownloaded(const DistributedData::Asset& asset)
158 {
159 auto [success, hash] = downloaded_.Find(asset.uri);
160 if (success && hash == asset.hash) {
161 ZLOGD("asset is downloaded. assetName:%{public}s", asset.name.c_str());
162 return true;
163 }
164 return false;
165 }
166
PushAsset(int32_t userId,const sptr<AssetObj> & assetObj,const sptr<ObjectAssetsSendListener> & sendCallback)167 int32_t ObjectAssetLoader::PushAsset(int32_t userId, const sptr<AssetObj> &assetObj,
168 const sptr<ObjectAssetsSendListener> &sendCallback)
169 {
170 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::SAVE,
171 ObjectStore::PUSH_ASSETS, ObjectStore::IDLE);
172 ZLOGI("PushAsset start, asset size:%{public}zu, bundleName:%{public}s, sessionId:%{public}s",
173 assetObj->uris_.size(), assetObj->dstBundleName_.c_str(), assetObj->sessionId_.c_str());
174 auto status = Storage::DistributedFile::DistributedFileDaemonManager::GetInstance().PushAsset(userId, assetObj,
175 sendCallback);
176 if (status != OBJECT_SUCCESS) {
177 ZLOGE("PushAsset err status: %{public}d, asset size:%{public}zu, bundleName:%{public}s, sessionId:%{public}s",
178 status, assetObj->uris_.size(), assetObj->dstBundleName_.c_str(), assetObj->sessionId_.c_str());
179 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
180 ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, status, ObjectStore::FINISHED);
181 }
182 return status;
183 }
184
OnSendResult(const sptr<AssetObj> & assetObj,int32_t result)185 int32_t ObjectAssetsSendListener::OnSendResult(const sptr<AssetObj> &assetObj, int32_t result)
186 {
187 if (assetObj == nullptr) {
188 ZLOGE("OnSendResult error! status:%{public}d", result);
189 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
190 ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
191 return result;
192 }
193 ZLOGI("OnSendResult, status:%{public}d, asset size:%{public}zu", result, assetObj->uris_.size());
194 if (result == OBJECT_SUCCESS) {
195 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::SAVE,
196 ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_SUCCESS);
197 } else {
198 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
199 ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
200 }
201 return result;
202 }
203 } // namespace OHOS::DistributedObject