1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 use crate::service_impl::error::SyncError; 17 use crate::service_impl::types::{ 18 Database, RawValueBucket, Value, CREATE_FIELD, GID_FIELD, MODIFY_FIELD, 19 }; 20 use crate::{ipc_conn, SyncResult}; 21 use std::collections::HashMap; 22 23 const LOCK_EXPIRE: i32 = 60 * 60; 24 25 /// Struct of CloudDatabase 26 pub struct CloudDatabase { 27 database_ipc: ipc_conn::DatabaseStub, 28 database: Database, 29 lock_session_id: Option<i32>, 30 } 31 32 impl CloudDatabase { 33 /// Initialize a CloudDatabase with user id and database. This function will send IPC request 34 /// with user id to get relating cloud database information. new(user_id: i32, bundle_name: &str, database: Database) -> SyncResult<CloudDatabase>35 pub fn new(user_id: i32, bundle_name: &str, database: Database) -> SyncResult<CloudDatabase> { 36 let db = ipc_conn::Database::try_from(&database)?; 37 let database_ipc = ipc_conn::DatabaseStub::new(user_id, bundle_name, &db)?; 38 Ok(CloudDatabase { 39 database_ipc, 40 database, 41 lock_session_id: None, 42 }) 43 } 44 45 /// Execute a sql on the cloud database. Unsupported currently. execute( &mut self, _table: &str, _sql: &str, _extend: &RawValueBucket, ) -> SyncResult<()>46 pub fn execute( 47 &mut self, 48 _table: &str, 49 _sql: &str, 50 _extend: &RawValueBucket, 51 ) -> SyncResult<()> { 52 Err(SyncError::Unsupported) 53 } 54 55 /// Insert a batch of value buckets into the cloud database, with specific target table name. 56 /// 57 /// Values and extends are all value buckets from the users, but for the convenience, split 58 /// them in parameters. Extends will also be changed to store information and results from 59 /// the insertion action. batch_insert( &mut self, table: &str, values: &[RawValueBucket], extends: &mut Vec<RawValueBucket>, ) -> SyncResult<Vec<RawValueBucket>>60 pub fn batch_insert( 61 &mut self, 62 table: &str, 63 values: &[RawValueBucket], 64 extends: &mut Vec<RawValueBucket>, 65 ) -> SyncResult<Vec<RawValueBucket>> { 66 let ids = self.database_ipc.generate_ids(values.len() as u32)?; 67 68 for (i, id) in ids.iter().enumerate() { 69 if i < extends.len() { 70 extends[i].insert(GID_FIELD.to_string(), Value::String(id.to_string())); 71 } else { 72 let mut vb = HashMap::new(); 73 vb.insert(GID_FIELD.to_string(), Value::String(id.to_string())); 74 extends.push(vb); 75 } 76 } 77 78 self.upload( 79 table, 80 values, 81 extends, 82 true, 83 true, 84 ipc_conn::DatabaseStub::insert, 85 ) 86 } 87 88 /// Update a batch of value buckets in the cloud database, with specific target table name. 89 /// 90 /// Values and extends are all value buckets from the users, but for the convenience, split 91 /// them in parameters. Extends will also be changed to store information and results from 92 /// the insertion action. batch_update( &mut self, table: &str, values: &[RawValueBucket], extends: &mut [RawValueBucket], ) -> SyncResult<Vec<RawValueBucket>>93 pub fn batch_update( 94 &mut self, 95 table: &str, 96 values: &[RawValueBucket], 97 extends: &mut [RawValueBucket], 98 ) -> SyncResult<Vec<RawValueBucket>> { 99 self.upload( 100 table, 101 values, 102 extends, 103 false, 104 false, 105 ipc_conn::DatabaseStub::update, 106 ) 107 } 108 109 /// Delete a batch of value buckets from the cloud database, with specific target table name. 110 /// 111 /// Values and extends are all value buckets from the users, but for the convenience, split 112 /// them in parameters. Extends will also be changed to store information and results from 113 /// the insertion action. batch_delete( &mut self, table: &str, extends: &[RawValueBucket], ) -> SyncResult<Vec<RawValueBucket>>114 pub fn batch_delete( 115 &mut self, 116 table: &str, 117 extends: &[RawValueBucket], 118 ) -> SyncResult<Vec<RawValueBucket>> { 119 match self.database.tables.get(table) { 120 None => Err(SyncError::NoSuchTableInDb), 121 Some(t) => { 122 let name = &t.name; 123 let mut records = vec![]; 124 for ext in extends.iter() { 125 match ext.get(GID_FIELD) { 126 Some(Value::String(gid)) => { 127 let mut extend_data = HashMap::new(); 128 let modify_field = match ext.get(MODIFY_FIELD) { 129 Some(Value::Int(i)) => *i, 130 _ => 0, 131 }; 132 133 extend_data 134 .insert("id".to_string(), ipc_conn::FieldRaw::Text(gid.clone())); 135 extend_data 136 .insert("operation".to_string(), ipc_conn::FieldRaw::Number(2_i64)); 137 extend_data.insert( 138 "modifyTime".to_string(), 139 ipc_conn::FieldRaw::Number(modify_field), 140 ); 141 142 records.push(ipc_conn::ValueBucket(extend_data)); 143 } 144 _ => continue, 145 } 146 } 147 let extends = ipc_conn::ValueBuckets(records); 148 let ret = self.database_ipc.delete(name, &extends)?; 149 150 let mut results = vec![]; 151 for value_bucket in ret.into_iter().flatten() { 152 results.push(value_bucket.into()); 153 } 154 Ok(results) 155 } 156 } 157 } 158 159 /// Query value buckets from the cloud database, with specific target table name. Return with 160 /// the corresponding Cursor and ValueBuckets. 161 /// 162 /// Values and extends are all value buckets from the users, but for the convenience, split 163 /// them in parameters. Extends will also be changed to store information and results from 164 /// the insertion action. batch_query(&mut self, table: &str, cursor: &str) -> SyncResult<CloudDbData>165 pub fn batch_query(&mut self, table: &str, cursor: &str) -> SyncResult<CloudDbData> { 166 const QUERY_LIMIT: usize = 10; 167 168 match self.database.tables.get(table) { 169 None => Err(SyncError::NoSuchTableInDb), 170 Some(t) => { 171 let name = &t.name; 172 let mut fields = vec![]; 173 for field in &t.fields { 174 fields.push(field.col_name.clone()); 175 } 176 let ret = 177 self.database_ipc 178 .query_values(name, &fields, QUERY_LIMIT as i32, cursor)?; 179 Ok(ret.into()) 180 } 181 } 182 } 183 184 /// Send lock request to the other end through IPC message, so that users can get exclusive 185 /// access to the database. 186 /// 187 /// Return err if that fails. lock(&mut self) -> SyncResult<i32>188 pub fn lock(&mut self) -> SyncResult<i32> { 189 if self.lock_session_id.is_some() { 190 return Ok(0); 191 } 192 let int = self.database_ipc.lock(LOCK_EXPIRE)?; 193 self.lock_session_id = Some(int.session_id); 194 Ok(int.interval) 195 } 196 197 /// Send unlock request to the other end through IPC message, so that users can release exclusive 198 /// access to the database, and others can use it. 199 /// 200 /// Return err if that fails. unlock(&mut self) -> SyncResult<()>201 pub fn unlock(&mut self) -> SyncResult<()> { 202 match self.lock_session_id { 203 Some(i) => { 204 self.database_ipc.unlock(i)?; 205 self.lock_session_id = None; 206 Ok(()) 207 } 208 None => Err(SyncError::SessionUnlocked), 209 } 210 } 211 212 /// Send heartbeat to the other end through IPC message, so that the users can prolong exclusive 213 /// access to the database. If no action is taken, this access will expire after some time (by 214 /// default, 60 min). heartbeat(&mut self) -> SyncResult<()>215 pub fn heartbeat(&mut self) -> SyncResult<()> { 216 match self.lock_session_id { 217 Some(i) => { 218 self.database_ipc.heartbeat(i)?; 219 Ok(()) 220 } 221 None => Err(SyncError::SessionUnlocked), 222 } 223 } 224 upload<F>( &mut self, table: &str, values: &[RawValueBucket], extends: &mut [RawValueBucket], is_new: bool, is_backfill: bool, mut f: F, ) -> SyncResult<Vec<RawValueBucket>> where F: FnMut( &mut ipc_conn::DatabaseStub, &str, &ipc_conn::ValueBuckets, &ipc_conn::ValueBuckets, ) -> Result<Vec<Option<ipc_conn::ValueBucket>>, ipc_conn::Error>,225 fn upload<F>( 226 &mut self, 227 table: &str, 228 values: &[RawValueBucket], 229 extends: &mut [RawValueBucket], 230 is_new: bool, 231 is_backfill: bool, 232 mut f: F, 233 ) -> SyncResult<Vec<RawValueBucket>> 234 where 235 F: FnMut( 236 &mut ipc_conn::DatabaseStub, 237 &str, 238 &ipc_conn::ValueBuckets, 239 &ipc_conn::ValueBuckets, 240 ) -> Result<Vec<Option<ipc_conn::ValueBucket>>, ipc_conn::Error>, 241 { 242 let mut asset_keys = vec![]; 243 let mut extend_ids = vec![]; 244 245 match self.database.tables.get(table) { 246 None => Err(SyncError::NoSuchTableInDb), 247 Some(t) => { 248 let name = &t.name; 249 let mut values_records = vec![]; 250 let mut extends_records = vec![]; 251 for (bucket, ext) in values.iter().zip(extends.iter_mut()) { 252 match ext.get(GID_FIELD) { 253 Some(Value::String(gid)) => { 254 let mut extend_data = HashMap::new(); 255 let create_field = match ext.get(CREATE_FIELD) { 256 Some(Value::Int(i)) => *i, 257 _ => 0, 258 }; 259 260 let modify_field = match ext.get(MODIFY_FIELD) { 261 Some(Value::Int(i)) => *i, 262 _ => 0, 263 }; 264 265 extend_ids.push(gid.clone()); 266 extend_data 267 .insert("id".to_string(), ipc_conn::FieldRaw::Text(gid.clone())); 268 if is_new { 269 extend_data.insert( 270 "operation".to_string(), 271 ipc_conn::FieldRaw::Number(0_i64), 272 ); 273 } else { 274 extend_data.insert( 275 "operation".to_string(), 276 ipc_conn::FieldRaw::Number(1_i64), 277 ); 278 } 279 extend_data.insert( 280 "createTime".to_string(), 281 ipc_conn::FieldRaw::Number(create_field), 282 ); 283 extend_data.insert( 284 "modifyTime".to_string(), 285 ipc_conn::FieldRaw::Number(modify_field), 286 ); 287 288 let mut record_data = HashMap::new(); 289 for (key, value) in bucket { 290 if value.has_asset() { 291 asset_keys.push(key.to_string()); 292 ext.insert(key.to_string(), value.clone()); 293 } 294 let field = ipc_conn::FieldRaw::from(value); 295 record_data.insert(key.clone(), field); 296 } 297 298 values_records.push(ipc_conn::ValueBucket(record_data)); 299 extends_records.push(ipc_conn::ValueBucket(extend_data)); 300 } 301 _ => continue, 302 } 303 } 304 let value_raw_vb = ipc_conn::ValueBuckets(values_records); 305 let extend_raw_vb = ipc_conn::ValueBuckets(extends_records); 306 307 let mut results: Vec<RawValueBucket> = vec![]; 308 let ret = f(&mut self.database_ipc, name, &value_raw_vb, &extend_raw_vb)?; 309 for value_bucket in ret.into_iter().flatten() { 310 results.push(value_bucket.into()); 311 } 312 313 if is_backfill { 314 let mut result_ids = vec![]; 315 for result in &results { 316 if let Some(Value::String(id)) = result.get("id") { 317 result_ids.push(id.to_string()); 318 } 319 } 320 321 for (result_idx, result_id) in result_ids.iter().enumerate() { 322 for (ext_idx, ext_id) in extend_ids.iter().enumerate() { 323 if result_id == ext_id { 324 for asset_key in &asset_keys { 325 *extends[ext_idx].get_mut(asset_key).unwrap() = 326 results[result_idx].get(asset_key).unwrap().clone(); 327 } 328 } 329 } 330 } 331 } 332 333 Ok(results) 334 } 335 } 336 } 337 } 338 339 /// Struct of CloudDbData. 340 #[derive(Clone, Debug)] 341 pub struct CloudDbData { 342 pub(crate) next_cursor: String, 343 pub(crate) has_more: bool, 344 pub(crate) values: Vec<ipc_conn::ValueBucket>, 345 } 346 347 impl From<ipc_conn::CloudData> for CloudDbData { from(value: ipc_conn::CloudData) -> Self348 fn from(value: ipc_conn::CloudData) -> Self { 349 let mut vec = vec![]; 350 for v in value.values.0 { 351 vec.push(v); 352 } 353 CloudDbData { 354 next_cursor: value.next_cursor, 355 has_more: value.has_more, 356 values: vec, 357 } 358 } 359 } 360 361 impl CloudDbData { 362 /// Get next cursor from CloudDbData instance. next_cursor(&self) -> &str363 pub fn next_cursor(&self) -> &str { 364 &self.next_cursor 365 } 366 367 /// Check whether CloudDbData instance has more value buckets. has_more(&self) -> bool368 pub fn has_more(&self) -> bool { 369 self.has_more 370 } 371 372 /// Get values from CloudDbData instance. values(&self) -> &[ipc_conn::ValueBucket]373 pub fn values(&self) -> &[ipc_conn::ValueBucket] { 374 &self.values 375 } 376 } 377