1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ObjectAssetLoader"
16 
17 #include "object_asset_loader.h"
18 #include "block_data.h"
19 #include "cloud_sync_asset_manager.h"
20 #include "log_print.h"
21 #include "object_common.h"
22 #include "utils/anonymous.h"
23 #include "object_radar_reporter.h"
24 #include "distributed_file_daemon_manager.h"
25 namespace OHOS::DistributedObject {
26 using namespace OHOS::FileManagement::CloudSync;
GetInstance()27 ObjectAssetLoader *ObjectAssetLoader::GetInstance()
28 {
29     static ObjectAssetLoader *loader = new ObjectAssetLoader();
30     return loader;
31 }
32 
SetThreadPool(std::shared_ptr<ExecutorPool> executors)33 void ObjectAssetLoader::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
34 {
35     executors_ = executors;
36 }
37 
Transfer(const int32_t userId,const std::string & bundleName,const std::string & deviceId,const DistributedData::Asset & asset)38 bool ObjectAssetLoader::Transfer(const int32_t userId, const std::string& bundleName, const std::string& deviceId,
39     const DistributedData::Asset& asset)
40 {
41     AssetInfo assetInfo;
42     assetInfo.uri = asset.uri;
43     assetInfo.assetName = asset.name;
44     ZLOGI("Start transfer, bundleName: %{public}s, deviceId: %{public}s, assetName: %{public}s", bundleName.c_str(),
45         DistributedData::Anonymous::Change(deviceId).c_str(), assetInfo.assetName.c_str());
46     auto block = std::make_shared<BlockData<std::tuple<bool, int32_t>>>(WAIT_TIME, std::tuple{ true, OBJECT_SUCCESS });
47     auto res = CloudSyncAssetManager::GetInstance().DownloadFile(userId, bundleName, deviceId, assetInfo,
48         [block](const std::string& uri, int32_t status) {
49             block->SetValue({ false, status });
50         });
51     if (res != OBJECT_SUCCESS) {
52         ZLOGE("fail, res: %{public}d, name: %{public}s, deviceId: %{public}s, bundleName: %{public}s", res,
53             asset.name.c_str(), DistributedData::Anonymous::Change(deviceId).c_str(), bundleName.c_str());
54         return false;
55     }
56     auto [timeout, status] = block->GetValue();
57     if (timeout || status != OBJECT_SUCCESS) {
58         ZLOGE("fail, timeout: %{public}d, status: %{public}d, name: %{public}s, deviceId: %{public}s ", timeout,
59             status, asset.name.c_str(), DistributedData::Anonymous::Change(deviceId).c_str());
60         return false;
61     }
62     ZLOGD("Transfer end, bundleName: %{public}s, deviceId: %{public}s, assetName: %{public}s", bundleName.c_str(),
63         DistributedData::Anonymous::Change(deviceId).c_str(), assetInfo.assetName.c_str());
64     return true;
65 }
66 
TransferAssetsAsync(const int32_t userId,const std::string & bundleName,const std::string & deviceId,const std::vector<DistributedData::Asset> & assets,const TransferFunc & callback)67 void ObjectAssetLoader::TransferAssetsAsync(const int32_t userId, const std::string& bundleName,
68     const std::string& deviceId, const std::vector<DistributedData::Asset>& assets, const TransferFunc& callback)
69 {
70     if (executors_ == nullptr) {
71         ZLOGE("executors is null, bundleName: %{public}s, deviceId: %{public}s, userId: %{public}d",
72             bundleName.c_str(), DistributedData::Anonymous::Change(deviceId).c_str(), userId);
73         callback(false);
74         return;
75     }
76     TransferTask task = { .callback = callback };
77     DistributedData::Assets downloadAssets;
78     for (auto& asset : assets) {
79         if (IsDownloaded(asset)) {
80             continue;
81         }
82         task.downloadAssets.insert(asset.uri);
83         downloadAssets.emplace_back(asset);
84     }
85     if (task.downloadAssets.empty()) {
86         callback(true);
87     }
88     tasks_.ComputeIfAbsent(++taskSeq_, [task](const uint32_t key) {
89         return task;
90     });
91     executors_->Execute([this, userId, bundleName, deviceId, downloadAssets]() {
92         bool result = true;
93         for (const auto& asset : downloadAssets) {
94             if (IsDownloaded(asset)) {
95                 FinishTask(asset.uri, result);
96                 continue;
97             }
98             if (IsDownloading(asset)) {
99                 continue;
100             }
101             downloading_.ComputeIfAbsent(asset.uri, [asset](const std::string& key) {
102                 return asset.hash;
103             });
104             auto success = Transfer(userId, bundleName, deviceId, asset);
105             if (success) {
106                 std::lock_guard<std::mutex> lock(mutex_);
107                 downloading_.Erase(asset.uri);
108                 UpdateDownloaded(asset);
109             } else {
110                 downloading_.Erase(asset.uri);
111             }
112             result &= success;
113             FinishTask(asset.uri, result);
114         }
115     });
116 }
117 
FinishTask(const std::string & uri,bool result)118 void ObjectAssetLoader::FinishTask(const std::string& uri, bool result)
119 {
120     std::vector<uint32_t> finishedTasks;
121     tasks_.ForEach([&uri, &finishedTasks, result](auto& seq, auto& task) {
122         task.downloadAssets.erase(uri);
123         if (task.downloadAssets.size() == 0 && task.callback != nullptr) {
124             task.callback(result);
125             finishedTasks.emplace_back(seq);
126         }
127         return false;
128     });
129     for (auto taskId : finishedTasks) {
130         tasks_.Erase(taskId);
131     }
132 }
133 
UpdateDownloaded(const DistributedData::Asset & asset)134 void ObjectAssetLoader::UpdateDownloaded(const DistributedData::Asset& asset)
135 {
136     downloaded_.ComputeIfAbsent(asset.uri, [asset](const std::string& key) {
137         return asset.hash;
138     });
139     assetQueue_.push(asset.uri);
140     if (assetQueue_.size() > LAST_DOWNLOAD_ASSET_SIZE) {
141         auto oldAsset = assetQueue_.front();
142         assetQueue_.pop();
143         downloaded_.Erase(oldAsset);
144     }
145 }
146 
IsDownloading(const DistributedData::Asset & asset)147 bool ObjectAssetLoader::IsDownloading(const DistributedData::Asset& asset)
148 {
149     auto [success, hash] = downloading_.Find(asset.uri);
150     if (success && hash == asset.hash) {
151         ZLOGD("asset is downloading. assetName:%{public}s", asset.name.c_str());
152         return true;
153     }
154     return false;
155 }
156 
IsDownloaded(const DistributedData::Asset & asset)157 bool ObjectAssetLoader::IsDownloaded(const DistributedData::Asset& asset)
158 {
159     auto [success, hash] = downloaded_.Find(asset.uri);
160     if (success && hash == asset.hash) {
161         ZLOGD("asset is downloaded. assetName:%{public}s", asset.name.c_str());
162         return true;
163     }
164     return false;
165 }
166 
PushAsset(int32_t userId,const sptr<AssetObj> & assetObj,const sptr<ObjectAssetsSendListener> & sendCallback)167 int32_t ObjectAssetLoader::PushAsset(int32_t userId, const sptr<AssetObj> &assetObj,
168     const sptr<ObjectAssetsSendListener> &sendCallback)
169 {
170     ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::SAVE,
171         ObjectStore::PUSH_ASSETS, ObjectStore::IDLE);
172     ZLOGI("PushAsset start, asset size:%{public}zu, bundleName:%{public}s, sessionId:%{public}s",
173         assetObj->uris_.size(), assetObj->dstBundleName_.c_str(), assetObj->sessionId_.c_str());
174     auto status = Storage::DistributedFile::DistributedFileDaemonManager::GetInstance().PushAsset(userId, assetObj,
175         sendCallback);
176     if (status != OBJECT_SUCCESS) {
177         ZLOGE("PushAsset err status: %{public}d, asset size:%{public}zu, bundleName:%{public}s, sessionId:%{public}s",
178             status, assetObj->uris_.size(), assetObj->dstBundleName_.c_str(), assetObj->sessionId_.c_str());
179         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
180             ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, status, ObjectStore::FINISHED);
181     }
182     return status;
183 }
184 
OnSendResult(const sptr<AssetObj> & assetObj,int32_t result)185 int32_t ObjectAssetsSendListener::OnSendResult(const sptr<AssetObj> &assetObj, int32_t result)
186 {
187     if (assetObj == nullptr) {
188         ZLOGE("OnSendResult error! status:%{public}d", result);
189         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
190             ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
191         return result;
192     }
193     ZLOGI("OnSendResult, status:%{public}d, asset size:%{public}zu", result, assetObj->uris_.size());
194     if (result == OBJECT_SUCCESS) {
195         ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::SAVE,
196             ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_SUCCESS);
197     } else {
198         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
199             ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
200     }
201     return result;
202 }
203 } // namespace OHOS::DistributedObject