1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "generic_virtual_device.h"
16
17 #include "kv_store_errno.h"
18 #include "multi_ver_sync_task_context.h"
19 #include "single_ver_kv_sync_task_context.h"
20 #include "single_ver_relational_sync_task_context.h"
21
22 namespace DistributedDB {
GenericVirtualDevice(std::string deviceId)23 GenericVirtualDevice::GenericVirtualDevice(std::string deviceId)
24 : communicateHandle_(nullptr),
25 communicatorAggregator_(nullptr),
26 storage_(nullptr),
27 metadata_(nullptr),
28 deviceId_(std::move(deviceId)),
29 remoteDeviceId_("real_device"),
30 context_(nullptr),
31 onRemoteDataChanged_(nullptr),
32 subManager_(nullptr),
33 executor_(nullptr)
34 {
35 }
36
~GenericVirtualDevice()37 GenericVirtualDevice::~GenericVirtualDevice()
38 {
39 std::mutex cvMutex;
40 std::condition_variable cv;
41 bool finished = false;
42 Offline();
43
44 if (communicateHandle_ != nullptr) {
45 communicateHandle_->RegOnMessageCallback(nullptr, nullptr);
46 communicatorAggregator_->ReleaseCommunicator(communicateHandle_);
47 communicateHandle_ = nullptr;
48 }
49 communicatorAggregator_ = nullptr;
50
51 if (context_ != nullptr) {
52 ISyncInterface *storage = storage_;
53 context_->OnLastRef([storage, &cv, &cvMutex, &finished]() {
54 delete storage;
55 {
56 std::lock_guard<std::mutex> lock(cvMutex);
57 finished = true;
58 }
59 cv.notify_one();
60 });
61 RefObject::KillAndDecObjRef(context_);
62 std::unique_lock<std::mutex> lock(cvMutex);
63 cv.wait(lock, [&finished] { return finished; });
64 } else {
65 delete storage_;
66 }
67 context_ = nullptr;
68 metadata_ = nullptr;
69 storage_ = nullptr;
70 if (executor_ != nullptr) {
71 RefObject::KillAndDecObjRef(executor_);
72 executor_ = nullptr;
73 }
74 }
75
Initialize(VirtualCommunicatorAggregator * comAggregator,ISyncInterface * syncInterface)76 int GenericVirtualDevice::Initialize(VirtualCommunicatorAggregator *comAggregator, ISyncInterface *syncInterface)
77 {
78 if ((comAggregator == nullptr) || (syncInterface == nullptr)) {
79 return -E_INVALID_ARGS;
80 }
81
82 communicatorAggregator_ = comAggregator;
83 int errCode = E_OK;
84 communicateHandle_ = communicatorAggregator_->AllocCommunicator(deviceId_, errCode);
85 if (communicateHandle_ == nullptr) {
86 return errCode;
87 }
88
89 storage_ = syncInterface;
90 metadata_ = std::make_shared<Metadata>();
91 if (metadata_->Initialize(storage_) != E_OK) {
92 LOGE("metadata_ init failed");
93 return -E_NOT_SUPPORT;
94 }
95 if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_SVD) {
96 context_ = new (std::nothrow) SingleVerKvSyncTaskContext;
97 subManager_ = std::make_shared<SubscribeManager>();
98 static_cast<SingleVerSyncTaskContext *>(context_)->SetSubscribeManager(subManager_);
99 } else if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_RELATION) {
100 context_ = new (std::nothrow) SingleVerRelationalSyncTaskContext;
101 } else {
102 #ifndef OMIT_MULTI_VER
103 context_ = new (std::nothrow) MultiVerSyncTaskContext;
104 #else
105 return -E_NOT_SUPPORT;
106 #endif // OMIT_MULTI_VER
107 }
108 if (context_ == nullptr) {
109 return -E_OUT_OF_MEMORY;
110 }
111 communicateHandle_->RegOnMessageCallback(
112 std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2), []() {});
113 context_->Initialize(remoteDeviceId_, storage_, metadata_, communicateHandle_);
114 context_->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
115 context_->RegOnSyncTask(std::bind(&GenericVirtualDevice::StartResponseTask, this));
116
117 executor_ = new (std::nothrow) RemoteExecutor();
118 if (executor_ == nullptr) {
119 return -E_OUT_OF_MEMORY;
120 }
121 executor_->Initialize(syncInterface, communicateHandle_);
122 return E_OK;
123 }
124
SetDeviceId(const std::string & deviceId)125 void GenericVirtualDevice::SetDeviceId(const std::string &deviceId)
126 {
127 deviceId_ = deviceId;
128 }
129
GetDeviceId() const130 std::string GenericVirtualDevice::GetDeviceId() const
131 {
132 return deviceId_;
133 }
134
MessageCallback(const std::string & deviceId,Message * inMsg)135 int GenericVirtualDevice::MessageCallback(const std::string &deviceId, Message *inMsg)
136 {
137 if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
138 if (onRemoteDataChanged_) {
139 onRemoteDataChanged_(deviceId);
140 delete inMsg;
141 inMsg = nullptr;
142 return E_OK;
143 }
144 delete inMsg;
145 inMsg = nullptr;
146 return -E_INVALID_ARGS;
147 }
148
149 LOGD("[GenericVirtualDevice] onMessage, src %s id %u", deviceId.c_str(), inMsg->GetMessageId());
150 if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor_ != nullptr) {
151 RefObject::IncObjRef(executor_);
152 executor_->ReceiveMessage(deviceId, inMsg);
153 RefObject::DecObjRef(executor_);
154 return E_OK;
155 }
156
157 RefObject::IncObjRef(context_);
158 RefObject::IncObjRef(communicateHandle_);
159 SyncTaskContext *context = context_;
160 ICommunicator *communicateHandle = communicateHandle_;
161 std::thread thread([context, communicateHandle, inMsg]() {
162 int errCode = context->ReceiveMessageCallback(inMsg);
163 if (errCode != -E_NOT_NEED_DELETE_MSG) {
164 delete inMsg;
165 }
166 RefObject::DecObjRef(context);
167 RefObject::DecObjRef(communicateHandle);
168 });
169 thread.detach();
170 return E_OK;
171 }
172
OnRemoteDataChanged(const std::function<void (const std::string &)> & callback)173 void GenericVirtualDevice::OnRemoteDataChanged(const std::function<void(const std::string &)> &callback)
174 {
175 onRemoteDataChanged_ = callback;
176 }
177
Online()178 void GenericVirtualDevice::Online()
179 {
180 static_cast<VirtualCommunicator *>(communicateHandle_)->Enable();
181 communicatorAggregator_->OnlineDevice(deviceId_);
182 }
183
Offline()184 void GenericVirtualDevice::Offline()
185 {
186 static_cast<VirtualCommunicator *>(communicateHandle_)->Disable();
187 communicatorAggregator_->OfflineDevice(deviceId_);
188 }
189
StartResponseTask()190 int GenericVirtualDevice::StartResponseTask()
191 {
192 LOGD("[KvVirtualDevice] StartResponseTask");
193 RefObject::AutoLock lockGuard(context_);
194 int status = context_->GetTaskExecStatus();
195 if ((status == SyncTaskContext::RUNNING) || context_->IsKilled()) {
196 LOGD("[KvVirtualDevice] StartResponseTask status:%d", status);
197 return -E_NOT_SUPPORT;
198 }
199 if (context_->IsTargetQueueEmpty()) {
200 LOGD("[KvVirtualDevice] StartResponseTask IsTargetQueueEmpty is empty");
201 return E_OK;
202 }
203 context_->SetTaskExecStatus(ISyncTaskContext::RUNNING);
204 context_->MoveToNextTarget();
205 LOGI("[KvVirtualDevice] machine StartSync");
206 context_->UnlockObj();
207 int errCode = context_->StartStateMachine();
208 context_->LockObj();
209 if (errCode != E_OK) {
210 LOGE("[KvVirtualDevice] machine StartSync failed");
211 context_->SetOperationStatus(SyncOperation::OP_FAILED);
212 }
213 return errCode;
214 }
215
GetLocalTimeOffset() const216 TimeOffset GenericVirtualDevice::GetLocalTimeOffset() const
217 {
218 return metadata_->GetLocalTimeOffset();
219 }
220
Sync(SyncMode mode,bool wait)221 int GenericVirtualDevice::Sync(SyncMode mode, bool wait)
222 {
223 auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, nullptr, wait);
224 if (operation == nullptr) {
225 return -E_OUT_OF_MEMORY;
226 }
227 operation->Initialize();
228 operation->SetOnSyncFinished([operation](int id) {
229 operation->NotifyIfNeed();
230 });
231 context_->AddSyncOperation(operation);
232 operation->WaitIfNeed();
233 RefObject::KillAndDecObjRef(operation);
234 return E_OK;
235 }
236
Sync(SyncMode mode,const Query & query,bool wait)237 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query, bool wait)
238 {
239 return Sync(mode, query, nullptr, wait);
240 }
241
Sync(SyncMode mode,const Query & query,const SyncOperation::UserCallback & callBack,bool wait)242 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query,
243 const SyncOperation::UserCallback &callBack, bool wait)
244 {
245 auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, callBack, wait);
246 if (operation == nullptr) {
247 return -E_OUT_OF_MEMORY;
248 }
249 operation->Initialize();
250 operation->SetOnSyncFinished([operation](int id) {
251 operation->NotifyIfNeed();
252 });
253 QuerySyncObject querySyncObject(query);
254 int errCode = querySyncObject.Init();
255 if (errCode != E_OK) {
256 return errCode;
257 }
258 operation->SetQuery(querySyncObject);
259 context_->AddSyncOperation(operation);
260 operation->WaitIfNeed();
261 RefObject::KillAndDecObjRef(operation);
262 return errCode;
263 }
264
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,std::shared_ptr<ResultSet> & result)265 int GenericVirtualDevice::RemoteQuery(const std::string &device, const RemoteCondition &condition,
266 uint64_t timeout, std::shared_ptr<ResultSet> &result)
267 {
268 if (executor_ == nullptr) {
269 result = nullptr;
270 return TransferDBErrno(-E_BUSY);
271 }
272 int errCode = executor_->RemoteQuery(device, condition, timeout, 1u, result);
273 if (errCode != E_OK) {
274 result = nullptr;
275 }
276 return TransferDBErrno(errCode);
277 }
278
SetClearRemoteStaleData(bool isStaleData)279 void GenericVirtualDevice::SetClearRemoteStaleData(bool isStaleData)
280 {
281 if (context_ != nullptr) {
282 static_cast<SingleVerSyncTaskContext *>(context_)->EnableClearRemoteStaleData(isStaleData);
283 LOGD("set clear remote stale data mark");
284 }
285 }
286 } // DistributedDB
287