1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_continue_token.h"
17 #include "sqlite_utils.h"
18 
19 namespace DistributedDB {
SQLiteSingleVerRelationalContinueToken(const SyncTimeRange & timeRange,const QueryObject & queryObject)20 SQLiteSingleVerRelationalContinueToken::SQLiteSingleVerRelationalContinueToken(
21     const SyncTimeRange &timeRange, const QueryObject &queryObject)
22     : isGettingDeletedData_(false), queryObj_(queryObject), tableName_(queryObj_.GetTableName()), timeRange_(timeRange)
23 {}
24 
CheckValid() const25 bool SQLiteSingleVerRelationalContinueToken::CheckValid() const
26 {
27     bool isValid = (magicBegin_ == MAGIC_BEGIN && magicEnd_ == MAGIC_END);
28     if (!isValid) {
29         LOGE("Invalid continue token.");
30     }
31     return isValid;
32 }
33 
GetStatement(sqlite3 * db,sqlite3_stmt * & queryStmt,sqlite3_stmt * & fullStmt,bool & isGettingDeletedData)34 int SQLiteSingleVerRelationalContinueToken::GetStatement(sqlite3 *db, sqlite3_stmt *&queryStmt, sqlite3_stmt *&fullStmt,
35     bool &isGettingDeletedData)
36 {
37     isGettingDeletedData = isGettingDeletedData_;
38     if (isGettingDeletedData) {
39         return GetDeletedDataStmt(db, queryStmt);
40     }
41 
42     int errCode = GetQuerySyncStatement(db, queryStmt);
43     if (errCode != E_OK) {
44         return errCode;
45     }
46 
47     // if lastQueryTime equals 0, that means never sync before, need not to send miss query data.
48     // if queryObj is empty, that means to send all data now, need not to send miss query data.
49     if (timeRange_.lastQueryTime != 0 && !queryObj_.Empty()) {
50         errCode = GetMissQueryStatement(db, fullStmt);
51     }
52     if (errCode != E_OK) {
53         SQLiteUtils::ResetStatement(queryStmt, true, errCode);
54     }
55     return errCode;
56 }
57 
SetNextBeginTime(const DataItem & theLastItem)58 void SQLiteSingleVerRelationalContinueToken::SetNextBeginTime(const DataItem &theLastItem)
59 {
60     Timestamp nextBeginTime = theLastItem.timestamp + 1;
61     if (nextBeginTime > INT64_MAX) {
62         nextBeginTime = INT64_MAX;
63     }
64     if (!isGettingDeletedData_) {
65         timeRange_.beginTime = nextBeginTime;
66         timeRange_.lastQueryTime = std::max(nextBeginTime, timeRange_.lastQueryTime);
67         return;
68     }
69     if ((theLastItem.flag & DataItem::DELETE_FLAG) != 0) {  // The last one could be non-deleted.
70         timeRange_.deleteBeginTime = nextBeginTime;
71     }
72 }
73 
FinishGetData()74 void SQLiteSingleVerRelationalContinueToken::FinishGetData()
75 {
76     if (isGettingDeletedData_) {
77         timeRange_.deleteEndTime = 0;
78         return;
79     }
80     isGettingDeletedData_ = true;
81     timeRange_.endTime = 0;
82     return;
83 }
84 
IsGetAllDataFinished() const85 bool SQLiteSingleVerRelationalContinueToken::IsGetAllDataFinished() const
86 {
87     return timeRange_.beginTime >= timeRange_.endTime && timeRange_.deleteBeginTime >= timeRange_.deleteEndTime;
88 }
89 
GetQuerySyncStatement(sqlite3 * db,sqlite3_stmt * & stmt)90 int SQLiteSingleVerRelationalContinueToken::GetQuerySyncStatement(sqlite3 *db, sqlite3_stmt *&stmt)
91 {
92     int errCode = E_OK;
93     SqliteQueryHelper helper = queryObj_.GetQueryHelper(errCode);
94     if (errCode != E_OK) {
95         return errCode;
96     }
97     if (fieldNames_.empty()) {
98         LOGE("field names cannot be empty.");
99         return -E_INTERNAL_ERROR;
100     }
101     return helper.GetRelationalQueryStatement(db, timeRange_.beginTime, timeRange_.endTime, fieldNames_, stmt);
102 }
103 
GetMissQueryStatement(sqlite3 * db,sqlite3_stmt * & stmt)104 int SQLiteSingleVerRelationalContinueToken::GetMissQueryStatement(sqlite3 *db, sqlite3_stmt *&stmt)
105 {
106     int errCode = E_OK;
107     SqliteQueryHelper helper = queryObj_.GetQueryHelper(errCode);
108     if (errCode != E_OK) {
109         return errCode;
110     }
111     return helper.GetRelationalMissQueryStatement(db, timeRange_.lastQueryTime + 1, INT64_MAX, fieldNames_, stmt);
112 }
113 
GetDeletedDataStmt(sqlite3 * db,sqlite3_stmt * & stmt) const114 int SQLiteSingleVerRelationalContinueToken::GetDeletedDataStmt(sqlite3 *db, sqlite3_stmt *&stmt) const
115 {
116     // get stmt
117     const std::string sql = GetDeletedDataSQL();
118     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
119     if (errCode != E_OK) {
120         return errCode;
121     }
122 
123     // bind stmt
124     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, timeRange_.deleteBeginTime); // 1 means begin time
125     if (errCode != E_OK) {
126         goto ERROR;
127     }
128     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, timeRange_.deleteEndTime); // 2 means end time
129     if (errCode != E_OK) {
130         goto ERROR;
131     }
132     return errCode;
133 
134 ERROR:
135     SQLiteUtils::ResetStatement(stmt, true, errCode);
136     return errCode;
137 }
138 
GetQuery() const139 const QueryObject &SQLiteSingleVerRelationalContinueToken::GetQuery() const
140 {
141     return queryObj_;
142 }
143 
GetDeletedDataSQL() const144 std::string SQLiteSingleVerRelationalContinueToken::GetDeletedDataSQL() const
145 {
146     std::string tableName = DBConstant::RELATIONAL_PREFIX + tableName_ + "_log";
147     return "SELECT * FROM " + tableName +
148         " WHERE timestamp >= ? AND timestamp < ? AND (flag&0x03 = 0x03) ORDER BY timestamp ASC;";
149 }
150 
SetFieldNames(const std::vector<std::string> & fieldNames)151 void SQLiteSingleVerRelationalContinueToken::SetFieldNames(const std::vector<std::string> &fieldNames)
152 {
153     fieldNames_ = fieldNames;
154 }
155 
UpdateNextSyncOffset(int addOffset)156 void SQLiteSingleVerRelationalContinueToken::UpdateNextSyncOffset(int addOffset)
157 {
158     if (!queryObj_.HasLimit() || queryObj_.HasOrderBy()) {
159         return;
160     }
161     int limit;
162     int offset;
163     queryObj_.GetLimitVal(limit, offset);
164     if (limit < addOffset) {
165         LOGW("Sync data is over limit.");
166         return;
167     }
168     queryObj_.SetLimit(limit - addOffset, offset + addOffset);
169 }
170 
SetCloudTableSchema(const TableSchema & schema)171 void SQLiteSingleVerRelationalContinueToken::SetCloudTableSchema(const TableSchema &schema)
172 {
173     tableSchema_ = schema;
174 }
175 
GetCloudStatement(sqlite3 * db,CloudSyncData & cloudDataResult,sqlite3_stmt * & queryStmt,bool & isFirstTime)176 int SQLiteSingleVerRelationalContinueToken::GetCloudStatement(sqlite3 *db, CloudSyncData &cloudDataResult,
177     sqlite3_stmt *&queryStmt, bool &isFirstTime)
178 {
179     if (queryStmt_ != nullptr) {
180         queryStmt = queryStmt_;
181         isFirstTime = false;
182         return E_OK;
183     }
184     int errCode;
185     SqliteQueryHelper helper = queryObj_.GetQueryHelper(errCode);
186     if (errCode != E_OK) {
187         return errCode;
188     }
189     std::string sql = helper.GetRelationalCloudQuerySql(tableSchema_.fields, cloudDataResult.isCloudForcePushStrategy,
190         cloudDataResult.isCompensatedTask, cloudDataResult.mode);
191     errCode = helper.GetCloudQueryStatement(true, db, timeRange_.beginTime, sql, queryStmt_);
192     if (errCode == E_OK) {
193         queryStmt = queryStmt_;
194     }
195     isFirstTime = true;
196     return errCode;
197 }
198 
GetCloudTableSchema(TableSchema & tableSchema) const199 void SQLiteSingleVerRelationalContinueToken::GetCloudTableSchema(TableSchema &tableSchema) const
200 {
201     tableSchema = tableSchema_;
202 }
203 
ReleaseCloudStatement()204 int SQLiteSingleVerRelationalContinueToken::ReleaseCloudStatement()
205 {
206     if (queryStmt_ == nullptr) {
207         return E_OK;
208     }
209     int errCode = E_OK;
210     SQLiteUtils::ResetStatement(queryStmt_, true, errCode);
211     queryStmt_ = nullptr;
212     return errCode;
213 }
214 }  // namespace DistributedDB
215 #endif