1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 use crate::service_impl::error::SyncError;
17 use crate::service_impl::types::{
18     Database, RawValueBucket, Value, CREATE_FIELD, GID_FIELD, MODIFY_FIELD,
19 };
20 use crate::{ipc_conn, SyncResult};
21 use std::collections::HashMap;
22 
23 const LOCK_EXPIRE: i32 = 60 * 60;
24 
25 /// Struct of CloudDatabase
26 pub struct CloudDatabase {
27     database_ipc: ipc_conn::DatabaseStub,
28     database: Database,
29     lock_session_id: Option<i32>,
30 }
31 
32 impl CloudDatabase {
33     /// Initialize a CloudDatabase with user id and database. This function will send IPC request
34     /// with user id to get relating cloud database information.
new(user_id: i32, bundle_name: &str, database: Database) -> SyncResult<CloudDatabase>35     pub fn new(user_id: i32, bundle_name: &str, database: Database) -> SyncResult<CloudDatabase> {
36         let db = ipc_conn::Database::try_from(&database)?;
37         let database_ipc = ipc_conn::DatabaseStub::new(user_id, bundle_name, &db)?;
38         Ok(CloudDatabase {
39             database_ipc,
40             database,
41             lock_session_id: None,
42         })
43     }
44 
45     /// Execute a sql on the cloud database. Unsupported currently.
execute( &mut self, _table: &str, _sql: &str, _extend: &RawValueBucket, ) -> SyncResult<()>46     pub fn execute(
47         &mut self,
48         _table: &str,
49         _sql: &str,
50         _extend: &RawValueBucket,
51     ) -> SyncResult<()> {
52         Err(SyncError::Unsupported)
53     }
54 
55     /// Insert a batch of value buckets into the cloud database, with specific target table name.
56     ///
57     /// Values and extends are all value buckets from the users, but for the convenience, split
58     /// them in parameters. Extends will also be changed to store information and results from
59     /// the insertion action.
batch_insert( &mut self, table: &str, values: &[RawValueBucket], extends: &mut Vec<RawValueBucket>, ) -> SyncResult<Vec<RawValueBucket>>60     pub fn batch_insert(
61         &mut self,
62         table: &str,
63         values: &[RawValueBucket],
64         extends: &mut Vec<RawValueBucket>,
65     ) -> SyncResult<Vec<RawValueBucket>> {
66         let ids = self.database_ipc.generate_ids(values.len() as u32)?;
67 
68         for (i, id) in ids.iter().enumerate() {
69             if i < extends.len() {
70                 extends[i].insert(GID_FIELD.to_string(), Value::String(id.to_string()));
71             } else {
72                 let mut vb = HashMap::new();
73                 vb.insert(GID_FIELD.to_string(), Value::String(id.to_string()));
74                 extends.push(vb);
75             }
76         }
77 
78         self.upload(
79             table,
80             values,
81             extends,
82             true,
83             true,
84             ipc_conn::DatabaseStub::insert,
85         )
86     }
87 
88     /// Update a batch of value buckets in the cloud database, with specific target table name.
89     ///
90     /// Values and extends are all value buckets from the users, but for the convenience, split
91     /// them in parameters. Extends will also be changed to store information and results from
92     /// the insertion action.
batch_update( &mut self, table: &str, values: &[RawValueBucket], extends: &mut [RawValueBucket], ) -> SyncResult<Vec<RawValueBucket>>93     pub fn batch_update(
94         &mut self,
95         table: &str,
96         values: &[RawValueBucket],
97         extends: &mut [RawValueBucket],
98     ) -> SyncResult<Vec<RawValueBucket>> {
99         self.upload(
100             table,
101             values,
102             extends,
103             false,
104             false,
105             ipc_conn::DatabaseStub::update,
106         )
107     }
108 
109     /// Delete a batch of value buckets from the cloud database, with specific target table name.
110     ///
111     /// Values and extends are all value buckets from the users, but for the convenience, split
112     /// them in parameters. Extends will also be changed to store information and results from
113     /// the insertion action.
batch_delete( &mut self, table: &str, extends: &[RawValueBucket], ) -> SyncResult<Vec<RawValueBucket>>114     pub fn batch_delete(
115         &mut self,
116         table: &str,
117         extends: &[RawValueBucket],
118     ) -> SyncResult<Vec<RawValueBucket>> {
119         match self.database.tables.get(table) {
120             None => Err(SyncError::NoSuchTableInDb),
121             Some(t) => {
122                 let name = &t.name;
123                 let mut records = vec![];
124                 for ext in extends.iter() {
125                     match ext.get(GID_FIELD) {
126                         Some(Value::String(gid)) => {
127                             let mut extend_data = HashMap::new();
128                             let modify_field = match ext.get(MODIFY_FIELD) {
129                                 Some(Value::Int(i)) => *i,
130                                 _ => 0,
131                             };
132 
133                             extend_data
134                                 .insert("id".to_string(), ipc_conn::FieldRaw::Text(gid.clone()));
135                             extend_data
136                                 .insert("operation".to_string(), ipc_conn::FieldRaw::Number(2_i64));
137                             extend_data.insert(
138                                 "modifyTime".to_string(),
139                                 ipc_conn::FieldRaw::Number(modify_field),
140                             );
141 
142                             records.push(ipc_conn::ValueBucket(extend_data));
143                         }
144                         _ => continue,
145                     }
146                 }
147                 let extends = ipc_conn::ValueBuckets(records);
148                 let ret = self.database_ipc.delete(name, &extends)?;
149 
150                 let mut results = vec![];
151                 for value_bucket in ret.into_iter().flatten() {
152                     results.push(value_bucket.into());
153                 }
154                 Ok(results)
155             }
156         }
157     }
158 
159     /// Query value buckets from the cloud database, with specific target table name. Return with
160     /// the corresponding Cursor and ValueBuckets.
161     ///
162     /// Values and extends are all value buckets from the users, but for the convenience, split
163     /// them in parameters. Extends will also be changed to store information and results from
164     /// the insertion action.
batch_query(&mut self, table: &str, cursor: &str) -> SyncResult<CloudDbData>165     pub fn batch_query(&mut self, table: &str, cursor: &str) -> SyncResult<CloudDbData> {
166         const QUERY_LIMIT: usize = 10;
167 
168         match self.database.tables.get(table) {
169             None => Err(SyncError::NoSuchTableInDb),
170             Some(t) => {
171                 let name = &t.name;
172                 let mut fields = vec![];
173                 for field in &t.fields {
174                     fields.push(field.col_name.clone());
175                 }
176                 let ret =
177                     self.database_ipc
178                         .query_values(name, &fields, QUERY_LIMIT as i32, cursor)?;
179                 Ok(ret.into())
180             }
181         }
182     }
183 
184     /// Send lock request to the other end through IPC message, so that users can get exclusive
185     /// access to the database.
186     ///
187     /// Return err if that fails.
lock(&mut self) -> SyncResult<i32>188     pub fn lock(&mut self) -> SyncResult<i32> {
189         if self.lock_session_id.is_some() {
190             return Ok(0);
191         }
192         let int = self.database_ipc.lock(LOCK_EXPIRE)?;
193         self.lock_session_id = Some(int.session_id);
194         Ok(int.interval)
195     }
196 
197     /// Send unlock request to the other end through IPC message, so that users can release exclusive
198     /// access to the database, and others can use it.
199     ///
200     /// Return err if that fails.
unlock(&mut self) -> SyncResult<()>201     pub fn unlock(&mut self) -> SyncResult<()> {
202         match self.lock_session_id {
203             Some(i) => {
204                 self.database_ipc.unlock(i)?;
205                 self.lock_session_id = None;
206                 Ok(())
207             }
208             None => Err(SyncError::SessionUnlocked),
209         }
210     }
211 
212     /// Send heartbeat to the other end through IPC message, so that the users can prolong exclusive
213     /// access to the database. If no action is taken, this access will expire after some time (by
214     /// default, 60 min).
heartbeat(&mut self) -> SyncResult<()>215     pub fn heartbeat(&mut self) -> SyncResult<()> {
216         match self.lock_session_id {
217             Some(i) => {
218                 self.database_ipc.heartbeat(i)?;
219                 Ok(())
220             }
221             None => Err(SyncError::SessionUnlocked),
222         }
223     }
224 
upload<F>( &mut self, table: &str, values: &[RawValueBucket], extends: &mut [RawValueBucket], is_new: bool, is_backfill: bool, mut f: F, ) -> SyncResult<Vec<RawValueBucket>> where F: FnMut( &mut ipc_conn::DatabaseStub, &str, &ipc_conn::ValueBuckets, &ipc_conn::ValueBuckets, ) -> Result<Vec<Option<ipc_conn::ValueBucket>>, ipc_conn::Error>,225     fn upload<F>(
226         &mut self,
227         table: &str,
228         values: &[RawValueBucket],
229         extends: &mut [RawValueBucket],
230         is_new: bool,
231         is_backfill: bool,
232         mut f: F,
233     ) -> SyncResult<Vec<RawValueBucket>>
234     where
235         F: FnMut(
236             &mut ipc_conn::DatabaseStub,
237             &str,
238             &ipc_conn::ValueBuckets,
239             &ipc_conn::ValueBuckets,
240         ) -> Result<Vec<Option<ipc_conn::ValueBucket>>, ipc_conn::Error>,
241     {
242         let mut asset_keys = vec![];
243         let mut extend_ids = vec![];
244 
245         match self.database.tables.get(table) {
246             None => Err(SyncError::NoSuchTableInDb),
247             Some(t) => {
248                 let name = &t.name;
249                 let mut values_records = vec![];
250                 let mut extends_records = vec![];
251                 for (bucket, ext) in values.iter().zip(extends.iter_mut()) {
252                     match ext.get(GID_FIELD) {
253                         Some(Value::String(gid)) => {
254                             let mut extend_data = HashMap::new();
255                             let create_field = match ext.get(CREATE_FIELD) {
256                                 Some(Value::Int(i)) => *i,
257                                 _ => 0,
258                             };
259 
260                             let modify_field = match ext.get(MODIFY_FIELD) {
261                                 Some(Value::Int(i)) => *i,
262                                 _ => 0,
263                             };
264 
265                             extend_ids.push(gid.clone());
266                             extend_data
267                                 .insert("id".to_string(), ipc_conn::FieldRaw::Text(gid.clone()));
268                             if is_new {
269                                 extend_data.insert(
270                                     "operation".to_string(),
271                                     ipc_conn::FieldRaw::Number(0_i64),
272                                 );
273                             } else {
274                                 extend_data.insert(
275                                     "operation".to_string(),
276                                     ipc_conn::FieldRaw::Number(1_i64),
277                                 );
278                             }
279                             extend_data.insert(
280                                 "createTime".to_string(),
281                                 ipc_conn::FieldRaw::Number(create_field),
282                             );
283                             extend_data.insert(
284                                 "modifyTime".to_string(),
285                                 ipc_conn::FieldRaw::Number(modify_field),
286                             );
287 
288                             let mut record_data = HashMap::new();
289                             for (key, value) in bucket {
290                                 if value.has_asset() {
291                                     asset_keys.push(key.to_string());
292                                     ext.insert(key.to_string(), value.clone());
293                                 }
294                                 let field = ipc_conn::FieldRaw::from(value);
295                                 record_data.insert(key.clone(), field);
296                             }
297 
298                             values_records.push(ipc_conn::ValueBucket(record_data));
299                             extends_records.push(ipc_conn::ValueBucket(extend_data));
300                         }
301                         _ => continue,
302                     }
303                 }
304                 let value_raw_vb = ipc_conn::ValueBuckets(values_records);
305                 let extend_raw_vb = ipc_conn::ValueBuckets(extends_records);
306 
307                 let mut results: Vec<RawValueBucket> = vec![];
308                 let ret = f(&mut self.database_ipc, name, &value_raw_vb, &extend_raw_vb)?;
309                 for value_bucket in ret.into_iter().flatten() {
310                     results.push(value_bucket.into());
311                 }
312 
313                 if is_backfill {
314                     let mut result_ids = vec![];
315                     for result in &results {
316                         if let Some(Value::String(id)) = result.get("id") {
317                             result_ids.push(id.to_string());
318                         }
319                     }
320 
321                     for (result_idx, result_id) in result_ids.iter().enumerate() {
322                         for (ext_idx, ext_id) in extend_ids.iter().enumerate() {
323                             if result_id == ext_id {
324                                 for asset_key in &asset_keys {
325                                     *extends[ext_idx].get_mut(asset_key).unwrap() =
326                                         results[result_idx].get(asset_key).unwrap().clone();
327                                 }
328                             }
329                         }
330                     }
331                 }
332 
333                 Ok(results)
334             }
335         }
336     }
337 }
338 
339 /// Struct of CloudDbData.
340 #[derive(Clone, Debug)]
341 pub struct CloudDbData {
342     pub(crate) next_cursor: String,
343     pub(crate) has_more: bool,
344     pub(crate) values: Vec<ipc_conn::ValueBucket>,
345 }
346 
347 impl From<ipc_conn::CloudData> for CloudDbData {
from(value: ipc_conn::CloudData) -> Self348     fn from(value: ipc_conn::CloudData) -> Self {
349         let mut vec = vec![];
350         for v in value.values.0 {
351             vec.push(v);
352         }
353         CloudDbData {
354             next_cursor: value.next_cursor,
355             has_more: value.has_more,
356             values: vec,
357         }
358     }
359 }
360 
361 impl CloudDbData {
362     /// Get next cursor from CloudDbData instance.
next_cursor(&self) -> &str363     pub fn next_cursor(&self) -> &str {
364         &self.next_cursor
365     }
366 
367     /// Check whether CloudDbData instance has more value buckets.
has_more(&self) -> bool368     pub fn has_more(&self) -> bool {
369         self.has_more
370     }
371 
372     /// Get values from CloudDbData instance.
values(&self) -> &[ipc_conn::ValueBucket]373     pub fn values(&self) -> &[ipc_conn::ValueBucket] {
374         &self.values
375     }
376 }
377