/* * Copyright (C) 2017 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define DEBUG false // STOPSHIP if true #include "Log.h" #include "ValueMetricProducer.h" #include #include #include "../guardrail/StatsdStats.h" #include "../stats_log_util.h" #include "metrics/parsing_utils/metrics_manager_util.h" using android::util::FIELD_COUNT_REPEATED; using android::util::FIELD_TYPE_BOOL; using android::util::FIELD_TYPE_DOUBLE; using android::util::FIELD_TYPE_INT32; using android::util::FIELD_TYPE_INT64; using android::util::FIELD_TYPE_MESSAGE; using android::util::FIELD_TYPE_STRING; using android::util::ProtoOutputStream; using std::map; using std::shared_ptr; using std::unordered_map; namespace android { namespace os { namespace statsd { // for StatsLogReport const int FIELD_ID_ID = 1; const int FIELD_ID_VALUE_METRICS = 7; const int FIELD_ID_TIME_BASE = 9; const int FIELD_ID_BUCKET_SIZE = 10; const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11; const int FIELD_ID_IS_ACTIVE = 14; // for ValueMetricDataWrapper const int FIELD_ID_DATA = 1; const int FIELD_ID_SKIPPED = 2; // for SkippedBuckets const int FIELD_ID_SKIPPED_START_MILLIS = 3; const int FIELD_ID_SKIPPED_END_MILLIS = 4; const int FIELD_ID_SKIPPED_DROP_EVENT = 5; // for DumpEvent Proto const int FIELD_ID_BUCKET_DROP_REASON = 1; const int FIELD_ID_DROP_TIME = 2; // for ValueMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; const int FIELD_ID_BUCKET_INFO = 3; const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; const int FIELD_ID_SLICE_BY_STATE = 6; // for ValueBucketInfo const int FIELD_ID_VALUE_INDEX = 1; const int FIELD_ID_VALUE_LONG = 2; const int FIELD_ID_VALUE_DOUBLE = 3; const int FIELD_ID_VALUES = 9; const int FIELD_ID_BUCKET_NUM = 4; const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; const int FIELD_ID_CONDITION_TRUE_NS = 10; const Value ZERO_LONG((int64_t)0); const Value ZERO_DOUBLE((int64_t)0); // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently ValueMetricProducer::ValueMetricProducer( const ConfigKey& key, const ValueMetric& metric, const int conditionIndex, const vector& initialConditionCache, const sp& conditionWizard, const uint64_t protoHash, const int whatMatcherIndex, const sp& matcherWizard, const int pullTagId, const int64_t timeBaseNs, const int64_t startTimeNs, const sp& pullerManager, const unordered_map>& eventActivationMap, const unordered_map>>& eventDeactivationMap, const vector& slicedStateAtoms, const unordered_map>& stateGroupMap) : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache, conditionWizard, protoHash, eventActivationMap, eventDeactivationMap, slicedStateAtoms, stateGroupMap), mWhatMatcherIndex(whatMatcherIndex), mEventMatcherWizard(matcherWizard), mPullerManager(pullerManager), mPullTagId(pullTagId), mIsPulled(pullTagId != -1), mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first : StatsdStats::kDimensionKeySizeSoftLimit), mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second : StatsdStats::kDimensionKeySizeHardLimit), mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()), mAggregationType(metric.aggregation_type()), mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)), mValueDirection(metric.value_direction()), mSkipZeroDiffOutput(metric.skip_zero_diff_output()), mUseZeroDefaultBase(metric.use_zero_default_base()), mHasGlobalBase(false), mCurrentBucketIsSkipped(false), mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC : StatsdStats::kPullMaxDelayNs), mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()), // Condition timer will be set later within the constructor after pulling events mConditionTimer(false, timeBaseNs) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); } else { bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR); } mBucketSizeNs = bucketSizeMills * 1000000; translateFieldMatcher(metric.value_field(), &mFieldMatchers); if (metric.has_dimensions_in_what()) { translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()); } if (metric.links().size() > 0) { for (const auto& link : metric.links()) { Metric2Condition mc; mc.conditionId = link.condition(); translateFieldMatcher(link.fields_in_what(), &mc.metricFields); translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields); mMetric2ConditionLinks.push_back(mc); } mConditionSliced = true; } for (const auto& stateLink : metric.state_link()) { Metric2State ms; ms.stateAtomId = stateLink.state_atom_id(); translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields); translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields); mMetric2StateLinks.push_back(ms); } if (metric.has_threshold()) { mUploadThreshold = metric.threshold(); } int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs); mCurrentBucketNum += numBucketsForward; flushIfNeededLocked(startTimeNs); if (mIsPulled) { mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(), mBucketSizeNs); } // Only do this for partial buckets like first bucket. All other buckets should use // flushIfNeeded to adjust start and end to bucket boundaries. // Adjust start for partial bucket mCurrentBucketStartTimeNs = startTimeNs; mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs); // Now that activations are processed, start the condition timer if needed. mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue, mCurrentBucketStartTimeNs); VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs); } ValueMetricProducer::~ValueMetricProducer() { VLOG("~ValueMetricProducer() called"); if (mIsPulled) { mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this); } } bool ValueMetricProducer::onConfigUpdatedLocked( const StatsdConfig& config, const int configIndex, const int metricIndex, const vector>& allAtomMatchingTrackers, const unordered_map& oldAtomMatchingTrackerMap, const unordered_map& newAtomMatchingTrackerMap, const sp& matcherWizard, const vector>& allConditionTrackers, const unordered_map& conditionTrackerMap, const sp& wizard, const unordered_map& metricToActivationMap, unordered_map>& trackerToMetricMap, unordered_map>& conditionToMetricMap, unordered_map>& activationAtomTrackerToMetricMap, unordered_map>& deactivationAtomTrackerToMetricMap, vector& metricsWithActivation) { if (!MetricProducer::onConfigUpdatedLocked( config, configIndex, metricIndex, allAtomMatchingTrackers, oldAtomMatchingTrackerMap, newAtomMatchingTrackerMap, matcherWizard, allConditionTrackers, conditionTrackerMap, wizard, metricToActivationMap, trackerToMetricMap, conditionToMetricMap, activationAtomTrackerToMetricMap, deactivationAtomTrackerToMetricMap, metricsWithActivation)) { return false; } const ValueMetric& metric = config.value_metric(configIndex); // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps. if (!handleMetricWithAtomMatchingTrackers(metric.what(), metricIndex, /*enforceOneAtom=*/false, allAtomMatchingTrackers, newAtomMatchingTrackerMap, trackerToMetricMap, mWhatMatcherIndex)) { return false; } if (metric.has_condition() && !handleMetricWithConditions(metric.condition(), metricIndex, conditionTrackerMap, metric.links(), allConditionTrackers, mConditionTrackerIndex, conditionToMetricMap)) { return false; } sp tmpEventWizard = mEventMatcherWizard; mEventMatcherWizard = matcherWizard; return true; } void ValueMetricProducer::onStateChanged(int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey, const FieldValue& oldState, const FieldValue& newState) { VLOG("ValueMetric %lld onStateChanged time %lld, State %d, key %s, %d -> %d", (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(), oldState.mValue.int_value, newState.mValue.int_value); // If old and new states are in the same StateGroup, then we do not need to // pull for this state change. FieldValue oldStateCopy = oldState; FieldValue newStateCopy = newState; mapStateValue(atomId, &oldStateCopy); mapStateValue(atomId, &newStateCopy); if (oldStateCopy == newStateCopy) { return; } // If condition is not true or metric is not active, we do not need to pull // for this state change. if (mCondition != ConditionState::kTrue || !mIsActive) { return; } bool isEventLate = eventTimeNs < mCurrentBucketStartTimeNs; if (isEventLate) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); return; } mStateChangePrimaryKey.first = atomId; mStateChangePrimaryKey.second = primaryKey; if (mIsPulled) { pullAndMatchEventsLocked(eventTimeNs); } mStateChangePrimaryKey.first = 0; mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY; flushIfNeededLocked(eventTimeNs); } void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition, const int64_t eventTime) { VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId); } void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) { StatsdStats::getInstance().noteBucketDropped(mMetricId); // The current partial bucket is not flushed and does not require a pull, // so the data is still valid. flushIfNeededLocked(dropTimeNs); clearPastBucketsLocked(dropTimeNs); } void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { mPastBuckets.clear(); mSkippedBuckets.clear(); } void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, const DumpLatency dumpLatency, std::set *str_set, ProtoOutputStream* protoOutput) { VLOG("metric %lld dump report now...", (long long)mMetricId); if (include_current_partial_bucket) { // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the // current bucket will have incomplete data and the next will have the wrong snapshot to do // a diff against. If the condition is false, we are fine since the base data is reset and // we are not tracking anything. bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue; if (pullNeeded) { switch (dumpLatency) { case FAST: invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED); break; case NO_TIME_CONSTRAINTS: pullAndMatchEventsLocked(dumpTimeNs); break; } } flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs); } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked()); if (mPastBuckets.empty() && mSkippedBuckets.empty()) { return; } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs); // Fills the dimension path if not slicing by ALL. if (!mSliceByPositionALL) { if (!mDimensionsInWhat.empty()) { uint64_t dimenPathToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); writeDimensionPathToProto(mDimensionsInWhat, protoOutput); protoOutput->end(dimenPathToken); } } uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS); for (const auto& skippedBucket : mSkippedBuckets) { uint64_t wrapperToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS, (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs))); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS, (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs))); for (const auto& dropEvent : skippedBucket.dropEvents) { uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED_DROP_EVENT); protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, (long long)(NanoToMillis(dropEvent.dropTimeNs))); protoOutput->end(dropEventToken); } protoOutput->end(wrapperToken); } for (const auto& pair : mPastBuckets) { const MetricDimensionKey& dimensionKey = pair.first; VLOG(" dimension key %s", dimensionKey.toString().c_str()); uint64_t wrapperToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); // First fill dimension. if (mSliceByPositionALL) { uint64_t dimensionToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); protoOutput->end(dimensionToken); } else { writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(), FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); } // Then fill slice_by_state. for (auto state : dimensionKey.getStateValuesKey().getValues()) { uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SLICE_BY_STATE); writeStateToProto(state, protoOutput); protoOutput->end(stateToken); } // Then fill bucket_info (ValueBucketInfo). for (const auto& bucket : pair.second) { uint64_t bucketInfoToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS, (long long)NanoToMillis(bucket.mBucketStartNs)); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS, (long long)NanoToMillis(bucket.mBucketEndNs)); } else { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } // We only write the condition timer value if the metric has a // condition and/or is sliced by state. // If the metric is sliced by state, the condition timer value is // also sliced by state to reflect time spent in that state. if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS, (long long)bucket.mConditionTrueNs); } for (int i = 0; i < (int)bucket.valueIndex.size(); i++) { int index = bucket.valueIndex[i]; const Value& value = bucket.values[i]; uint64_t valueToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES); protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, index); if (value.getType() == LONG) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)value.long_value); VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, index, (long long)value.long_value); } else if (value.getType() == DOUBLE) { protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, value.double_value); VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, index, value.double_value); } else { VLOG("Wrong value type for ValueMetric output: %d", value.getType()); } protoOutput->end(valueToken); } protoOutput->end(bucketInfoToken); } protoOutput->end(wrapperToken); } protoOutput->end(protoToken); VLOG("metric %lld done with dump report...", (long long)mMetricId); if (erase_data) { mPastBuckets.clear(); mSkippedBuckets.clear(); } } void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, const BucketDropReason reason) { if (!mCurrentBucketIsSkipped) { // Only report to StatsdStats once per invalid bucket. StatsdStats::getInstance().noteInvalidatedBucket(mMetricId); } skipCurrentBucket(dropTimeNs, reason); } void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason) { invalidateCurrentBucketWithoutResetBase(dropTimeNs, reason); resetBase(); } void ValueMetricProducer::skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason) { if (!maxDropEventsReached()) { mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason)); } mCurrentBucketIsSkipped = true; } void ValueMetricProducer::resetBase() { for (auto& slice : mCurrentBaseInfo) { for (auto& baseInfo : slice.second.baseInfos) { baseInfo.hasBase = false; } } mHasGlobalBase = false; } // Handle active state change. Active state change is treated like a condition change: // - drop bucket if active state change event arrives too late // - if condition is true, pull data on active state changes // - ConditionTimer tracks changes based on AND of condition and active state. void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) { bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs; if (isEventTooLate) { // Drop bucket because event arrived too late, ie. we are missing data for this bucket. StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); } // Call parent method once we've verified the validity of current bucket. MetricProducer::onActiveStateChangedLocked(eventTimeNs); if (ConditionState::kTrue != mCondition) { return; } // Pull on active state changes. if (!isEventTooLate) { if (mIsPulled) { pullAndMatchEventsLocked(eventTimeNs); } // When active state changes from true to false, clear diff base but don't // reset other counters as we may accumulate more value in the bucket. if (mUseDiff && !mIsActive) { resetBase(); } } flushIfNeededLocked(eventTimeNs); // Let condition timer know of new active state. mConditionTimer.onConditionChanged(mIsActive, eventTimeNs); updateCurrentSlicedBucketConditionTimers(mIsActive, eventTimeNs); } void ValueMetricProducer::onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) { ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse; bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs; // If the config is not active, skip the event. if (!mIsActive) { mCondition = isEventTooLate ? ConditionState::kUnknown : newCondition; return; } // If the event arrived late, mark the bucket as invalid and skip the event. if (isEventTooLate) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId); invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); mCondition = ConditionState::kUnknown; mConditionTimer.onConditionChanged(mCondition, eventTimeNs); updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs); return; } // If the previous condition was unknown, mark the bucket as invalid // because the bucket will contain partial data. For example, the condition // change might happen close to the end of the bucket and we might miss a // lot of data. // // We still want to pull to set the base. if (mCondition == ConditionState::kUnknown) { invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN); } // Pull and match for the following condition change cases: // unknown/false -> true - condition changed // true -> false - condition changed // true -> true - old condition was true so we can flush the bucket at the // end if needed. // // We don’t need to pull for unknown -> false or false -> false. // // onConditionChangedLocked might happen on bucket boundaries if this is // called before #onDataPulled. if (mIsPulled && (newCondition == ConditionState::kTrue || mCondition == ConditionState::kTrue)) { pullAndMatchEventsLocked(eventTimeNs); } // For metrics that use diff, when condition changes from true to false, // clear diff base but don't reset other counts because we may accumulate // more value in the bucket. if (mUseDiff && (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) { resetBase(); } // Update condition state after pulling. mCondition = newCondition; flushIfNeededLocked(eventTimeNs); mConditionTimer.onConditionChanged(mCondition, eventTimeNs); updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs); } void ValueMetricProducer::updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs) { if (mSlicedStateAtoms.empty()) { return; } // Utilize the current state key of each DimensionsInWhat key to determine // which condition timers to update. // // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`. bool inPulledData; for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mCurrentBaseInfo) { // If the new condition is true, turn ON the condition timer only if // the DimensionInWhat key was present in the pulled data. inPulledData = dimensionInWhatInfo.hasCurrentState; mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey, dimensionInWhatInfo.currentState)] .conditionTimer.onConditionChanged(newCondition && inPulledData, eventTimeNs); } } void ValueMetricProducer::prepareFirstBucketLocked() { // Kicks off the puller immediately if condition is true and diff based. if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) { pullAndMatchEventsLocked(mCurrentBucketStartTimeNs); } } void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { vector> allData; if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) { ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs); invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED); return; } accumulateEvents(allData, timestampNs, timestampNs); } int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; } // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely // to be delayed. Other events like condition changes or app upgrade which are not based on // AlarmManager might have arrived earlier and close the bucket. void ValueMetricProducer::onDataPulled(const std::vector>& allData, bool pullSuccess, int64_t originalPullTimeNs) { std::lock_guard lock(mMutex); if (mCondition == ConditionState::kTrue) { // If the pull failed, we won't be able to compute a diff. if (!pullSuccess) { invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED); } else { bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs(); if (isEventLate) { // If the event is late, we are in the middle of a bucket. Just // process the data without trying to snap the data to the nearest bucket. accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs); } else { // For scheduled pulled data, the effective event time is snap to the nearest // bucket end. In the case of waking up from a deep sleep state, we will // attribute to the previous bucket end. If the sleep was long but not very // long, we will be in the immediate next bucket. Previous bucket may get a // larger number as we pull at a later time than real bucket end. // // If the sleep was very long, we skip more than one bucket before sleep. In // this case, if the diff base will be cleared and this new data will serve as // new diff base. int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; StatsdStats::getInstance().noteBucketBoundaryDelayNs( mMetricId, originalPullTimeNs - bucketEndTime); accumulateEvents(allData, originalPullTimeNs, bucketEndTime); } } } // We can probably flush the bucket. Since we used bucketEndTime when calling // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. flushIfNeededLocked(originalPullTimeNs); } void ValueMetricProducer::accumulateEvents(const std::vector>& allData, int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) { bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs; if (isEventLate) { VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs); StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); return; } const int64_t elapsedRealtimeNs = getElapsedRealtimeNs(); const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs; StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); if (pullDelayNs > mMaxPullDelayNs) { ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, (long long)mMaxPullDelayNs); StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); // We are missing one pull from the bucket which means we will not have a complete view of // what's going on. invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED); return; } mMatchedMetricDimensionKeys.clear(); for (const auto& data : allData) { LogEvent localCopy = data->makeCopy(); if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == MatchingState::kMatched) { localCopy.setElapsedTimestampNs(eventElapsedTimeNs); onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); } } // If a key that is: // 1. Tracked in mCurrentSlicedBucket and // 2. A superset of the current mStateChangePrimaryKey // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys) // then we need to reset the base. for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat(); bool presentInPulledData = mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end(); if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) { auto it = mCurrentBaseInfo.find(whatKey); for (auto& baseInfo : it->second.baseInfos) { baseInfo.hasBase = false; } // Set to false when DimensionInWhat key is not present in a pull. // Used in onMatchedLogEventInternalLocked() to ensure the condition // timer is turned on the next pull when data is present. it->second.hasCurrentState = false; // Turn OFF condition timer for keys not present in pulled data. currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs); } } mMatchedMetricDimensionKeys.clear(); mHasGlobalBase = true; // If we reach the guardrail, we might have dropped some data which means the bucket is // incomplete. // // The base also needs to be reset. If we do not have the full data, we might // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key // might be missing from mCurrentSlicedBucket. if (hasReachedGuardRailLimit()) { invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED); mCurrentSlicedBucket.clear(); } } void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { if (mCurrentSlicedBucket.size() == 0) { return; } fprintf(out, "ValueMetric %lld dimension size %lu\n", (long long)mMetricId, (unsigned long)mCurrentSlicedBucket.size()); if (verbose) { for (const auto& it : mCurrentSlicedBucket) { for (const auto& interval : it.second.intervals) { fprintf(out, "\t(what)%s\t(states)%s (value)%s\n", it.first.getDimensionKeyInWhat().toString().c_str(), it.first.getStateValuesKey().toString().c_str(), interval.value.toString().c_str()); } } } } bool ValueMetricProducer::hasReachedGuardRailLimit() const { return mCurrentSlicedBucket.size() >= mDimensionHardLimit; } bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { // ===========GuardRail============== // 1. Report the tuple count if the tuple count > soft limit if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) { return false; } if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) { size_t newTupleCount = mCurrentSlicedBucket.size() + 1; StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. if (hasReachedGuardRailLimit()) { ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId, newKey.toString().c_str()); StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId); return true; } } return false; } bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) { // ===========GuardRail============== // 1. Report the tuple count if the tuple count > soft limit if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) { return false; } if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) { size_t newTupleCount = mCurrentFullBucket.size() + 1; // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. if (newTupleCount > mDimensionHardLimit) { ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s", (long long)mMetricId, newKey.toString().c_str()); return true; } } return false; } bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) { for (const FieldValue& value : event.getValues()) { if (value.mField.matches(matcher)) { switch (value.mValue.type) { case INT: ret.setLong(value.mValue.int_value); break; case LONG: ret.setLong(value.mValue.long_value); break; case FLOAT: ret.setDouble(value.mValue.float_value); break; case DOUBLE: ret.setDouble(value.mValue.double_value); break; default: return false; break; } return true; } } return false; } bool ValueMetricProducer::multipleBucketsSkipped(const int64_t numBucketsForward) { // Skip buckets if this is a pulled metric or a pushed metric that is diffed. return numBucketsForward > 1 && (mIsPulled || mUseDiff); } void ValueMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const MetricDimensionKey& eventKey, const ConditionKey& conditionKey, bool condition, const LogEvent& event, const map& statePrimaryKeys) { auto whatKey = eventKey.getDimensionKeyInWhat(); auto stateKey = eventKey.getStateValuesKey(); // Skip this event if a state changed occurred for a different primary key. auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first); // Check that both the atom id and the primary key are equal. if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) { VLOG("ValueMetric skip event with primary key %s because state change primary key " "is %s", it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str()); return; } int64_t eventTimeNs = event.GetElapsedTimestampNs(); if (eventTimeNs < mCurrentBucketStartTimeNs) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); return; } mMatchedMetricDimensionKeys.insert(whatKey); if (!mIsPulled) { // We cannot flush without doing a pull first. flushIfNeededLocked(eventTimeNs); } // We should not accumulate the data for pushed metrics when the condition is false. bool shouldSkipForPushMetric = !mIsPulled && !condition; // For pulled metrics, there are two cases: // - to compute diffs, we need to process all the state changes // - for non-diffs metrics, we should ignore the data if the condition wasn't true. If we have a // state change from // + True -> True: we should process the data, it might be a bucket boundary // + True -> False: we als need to process the data. bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff && mCondition != ConditionState::kTrue; if (shouldSkipForPushMetric || shouldSkipForPulledMetric) { VLOG("ValueMetric skip event because condition is false and we are not using diff (for " "pulled metric)"); return; } if (hitGuardRailLocked(eventKey)) { return; } const auto& returnVal = mCurrentBaseInfo.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey())); DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second; const HashableDimensionKey oldStateKey = dimensionsInWhatInfo.currentState; vector& baseInfos = dimensionsInWhatInfo.baseInfos; if (baseInfos.size() < mFieldMatchers.size()) { VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); baseInfos.resize(mFieldMatchers.size()); } // Ensure we turn on the condition timer in the case where dimensions // were missing on a previous pull due to a state change. bool stateChange = oldStateKey != stateKey; if (!dimensionsInWhatInfo.hasCurrentState) { stateChange = true; dimensionsInWhatInfo.hasCurrentState = true; } // We need to get the intervals stored with the previous state key so we can // close these value intervals. vector& intervals = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)].intervals; if (intervals.size() < mFieldMatchers.size()) { VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); intervals.resize(mFieldMatchers.size()); } // We only use anomaly detection under certain cases. // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics // containing multiple values. We tried to retain all previous behaviour, but we are unsure the // previous behaviour was correct. At the time of the fix, anomaly detection had no owner. // Whoever next works on it should look into the cases where it is triggered in this function. // Discussion here: http://ag/6124370. bool useAnomalyDetection = true; dimensionsInWhatInfo.hasCurrentState = true; dimensionsInWhatInfo.currentState = stateKey; for (int i = 0; i < (int)mFieldMatchers.size(); i++) { const Matcher& matcher = mFieldMatchers[i]; BaseInfo& baseInfo = baseInfos[i]; Interval& interval = intervals[i]; interval.valueIndex = i; Value value; if (!getDoubleOrLong(event, matcher, value)) { VLOG("Failed to get value %d from event %s", i, event.ToString().c_str()); StatsdStats::getInstance().noteBadValueType(mMetricId); return; } interval.seenNewData = true; if (mUseDiff) { if (!baseInfo.hasBase) { if (mHasGlobalBase && mUseZeroDefaultBase) { // The bucket has global base. This key does not. // Optionally use zero as base. baseInfo.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE); baseInfo.hasBase = true; } else { // no base. just update base and return. baseInfo.base = value; baseInfo.hasBase = true; // If we're missing a base, do not use anomaly detection on incomplete data useAnomalyDetection = false; // Continue (instead of return) here in order to set baseInfo.base and // baseInfo.hasBase for other baseInfos continue; } } Value diff; switch (mValueDirection) { case ValueMetric::INCREASING: if (value >= baseInfo.base) { diff = value - baseInfo.base; } else if (mUseAbsoluteValueOnReset) { diff = value; } else { VLOG("Unexpected decreasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); baseInfo.base = value; // If we've got bad data, do not use anomaly detection useAnomalyDetection = false; continue; } break; case ValueMetric::DECREASING: if (baseInfo.base >= value) { diff = baseInfo.base - value; } else if (mUseAbsoluteValueOnReset) { diff = value; } else { VLOG("Unexpected increasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); baseInfo.base = value; // If we've got bad data, do not use anomaly detection useAnomalyDetection = false; continue; } break; case ValueMetric::ANY: diff = value - baseInfo.base; break; default: break; } baseInfo.base = value; value = diff; } if (interval.hasValue) { switch (mAggregationType) { case ValueMetric::SUM: // for AVG, we add up and take average when flushing the bucket case ValueMetric::AVG: interval.value += value; break; case ValueMetric::MIN: interval.value = std::min(value, interval.value); break; case ValueMetric::MAX: interval.value = std::max(value, interval.value); break; default: break; } } else { interval.value = value; interval.hasValue = true; } interval.sampleSize += 1; } // State change. if (!mSlicedStateAtoms.empty() && stateChange) { // Turn OFF the condition timer for the previous state key. mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)] .conditionTimer.onConditionChanged(false, eventTimeNs); // Turn ON the condition timer for the new state key. mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)] .conditionTimer.onConditionChanged(true, eventTimeNs); } // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due // to MULTIPLE_BUCKETS_SKIPPED. if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) { // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = intervals[0].value.long_value; auto prev = mCurrentFullBucket.find(eventKey); if (prev != mCurrentFullBucket.end()) { wholeBucketVal += prev->second; } for (auto& tracker : mAnomalyTrackers) { tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey, wholeBucketVal); } } } // For pulled metrics, we always need to make sure we do a pull before flushing the bucket // if mCondition is true! void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); if (eventTimeNs < currentBucketEndTimeNs) { VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs, (long long)(currentBucketEndTimeNs)); return; } int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs); } int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const { int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); if (eventTimeNs < currentBucketEndTimeNs) { return 0; } return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; } void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, const int64_t& nextBucketStartTimeNs) { if (mCondition == ConditionState::kUnknown) { StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId); invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN); } VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, (int)mCurrentSlicedBucket.size()); int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); int64_t bucketEndTime = fullBucketEndTimeNs; int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); if (multipleBucketsSkipped(numBucketsForward)) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); // Something went wrong. Maybe the device was sleeping for a long time. It is better // to mark the current bucket as invalid. The last pull might have been successful through. invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::MULTIPLE_BUCKETS_SKIPPED); // End the bucket at the next bucket start time so the entire interval is skipped. bucketEndTime = nextBucketStartTimeNs; } else if (eventTimeNs < fullBucketEndTimeNs) { bucketEndTime = eventTimeNs; } // Close the current bucket. int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime); bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; if (!isBucketLargeEnough) { skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL); } if (!mCurrentBucketIsSkipped) { bool bucketHasData = false; // The current bucket is large enough to keep. for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { PastValueBucket bucket = buildPartialBucket(bucketEndTime, currentValueBucket.intervals); if (!mSlicedStateAtoms.empty()) { bucket.mConditionTrueNs = currentValueBucket.conditionTimer.newBucketStart(bucketEndTime); } else { bucket.mConditionTrueNs = conditionTrueDuration; } // it will auto create new vector of ValuebucketInfo if the key is not found. if (bucket.valueIndex.size() > 0) { auto& bucketList = mPastBuckets[metricDimensionKey]; bucketList.push_back(bucket); bucketHasData = true; } } if (!bucketHasData) { skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); } } if (mCurrentBucketIsSkipped) { mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime; mSkippedBuckets.emplace_back(mCurrentSkippedBucket); } // This means that the current bucket was not flushed before a forced bucket split. // This can happen if an app update or a dump report with include_current_partial_bucket is // requested before we get a chance to flush the bucket due to receiving new data, either from // the statsd socket or the StatsPullerManager. if (bucketEndTime < nextBucketStartTimeNs) { SkippedBucket bucketInGap; bucketInGap.bucketStartTimeNs = bucketEndTime; bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs; bucketInGap.dropEvents.emplace_back( buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA)); mSkippedBuckets.emplace_back(bucketInGap); } appendToFullBucket(eventTimeNs > fullBucketEndTimeNs); initCurrentSlicedBucket(nextBucketStartTimeNs); // Update the condition timer again, in case we skipped buckets. mConditionTimer.newBucketStart(nextBucketStartTimeNs); // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing // by state. Otherwise, the "global" condition timer will be used. if (!mSlicedStateAtoms.empty()) { for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { currentValueBucket.conditionTimer.newBucketStart(nextBucketStartTimeNs); } } mCurrentBucketNum += numBucketsForward; } PastValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime, const std::vector& intervals) { PastValueBucket bucket; bucket.mBucketStartNs = mCurrentBucketStartTimeNs; bucket.mBucketEndNs = bucketEndTime; // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold, // then all interval values are discarded for this bucket. if ((intervals.size() <= 0) || (intervals[0].hasValue && !valuePassesThreshold(intervals[0]))) { return bucket; } for (const Interval& interval : intervals) { // Skip the output if the diff is zero if (!interval.hasValue || (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero())) { continue; } bucket.valueIndex.push_back(interval.valueIndex); bucket.values.push_back(getFinalValue(interval)); } return bucket; } void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) { StatsdStats::getInstance().noteBucketCount(mMetricId); // Cleanup data structure to aggregate values. for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) { bool obsolete = true; for (auto& interval : it->second.intervals) { interval.hasValue = false; interval.sampleSize = 0; if (interval.seenNewData) { obsolete = false; } interval.seenNewData = false; } if (obsolete && !mSlicedStateAtoms.empty()) { // When slicing by state, only delete the MetricDimensionKey when the // state key in the MetricDimensionKey is not the current state key. const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat(); const auto& currentBaseInfoItr = mCurrentBaseInfo.find(dimensionInWhatKey); if ((currentBaseInfoItr != mCurrentBaseInfo.end()) && (it->first.getStateValuesKey() == currentBaseInfoItr->second.currentState)) { obsolete = false; } } if (obsolete) { it = mCurrentSlicedBucket.erase(it); } else { it++; } // TODO(b/157655103): remove mCurrentBaseInfo entries when obsolete } mCurrentBucketIsSkipped = false; mCurrentSkippedBucket.reset(); // If we do not have a global base when the condition is true, // we will have incomplete bucket for the next bucket. if (mUseDiff && !mHasGlobalBase && mCondition) { mCurrentBucketIsSkipped = false; } mCurrentBucketStartTimeNs = nextBucketStartTimeNs; VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, (long long)mCurrentBucketStartTimeNs); } void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { if (mCurrentBucketIsSkipped) { if (isFullBucketReached) { // If the bucket is invalid, we ignore the full bucket since it contains invalid data. mCurrentFullBucket.clear(); } // Current bucket is invalid, we do not add it to the full bucket. return; } if (isFullBucketReached) { // If full bucket, send to anomaly tracker. // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { if (hitFullBucketGuardRailLocked(slice.first) || slice.second.intervals.empty()) { continue; } // TODO: fix this when anomaly can accept double values auto& interval = slice.second.intervals[0]; if (interval.hasValue) { mCurrentFullBucket[slice.first] += interval.value.long_value; } } for (const auto& slice : mCurrentFullBucket) { for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr) { tracker->addPastBucket(slice.first, slice.second, mCurrentBucketNum); } } } mCurrentFullBucket.clear(); } else { // Skip aggregating the partial buckets since there's no previous partial bucket. for (const auto& slice : mCurrentSlicedBucket) { for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr && !slice.second.intervals.empty()) { // TODO: fix this when anomaly can accept double values auto& interval = slice.second.intervals[0]; if (interval.hasValue) { tracker->addPastBucket(slice.first, interval.value.long_value, mCurrentBucketNum); } } } } } } else { // Accumulate partial bucket. for (const auto& slice : mCurrentSlicedBucket) { if (!slice.second.intervals.empty()) { // TODO: fix this when anomaly can accept double values auto& interval = slice.second.intervals[0]; if (interval.hasValue) { mCurrentFullBucket[slice.first] += interval.value.long_value; } } } } } size_t ValueMetricProducer::byteSizeLocked() const { size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; } return totalSize; } bool ValueMetricProducer::valuePassesThreshold(const Interval& interval) { if (mUploadThreshold == nullopt) { return true; } Value finalValue = getFinalValue(interval); double doubleValue = finalValue.type == LONG ? (double)finalValue.long_value : finalValue.double_value; switch (mUploadThreshold->value_comparison_case()) { case UploadThreshold::kLtInt: return doubleValue < (double)mUploadThreshold->lt_int(); case UploadThreshold::kGtInt: return doubleValue > (double)mUploadThreshold->gt_int(); case UploadThreshold::kLteInt: return doubleValue <= (double)mUploadThreshold->lte_int(); case UploadThreshold::kGteInt: return doubleValue >= (double)mUploadThreshold->gte_int(); case UploadThreshold::kLtFloat: return doubleValue <= (double)mUploadThreshold->lt_float(); case UploadThreshold::kGtFloat: return doubleValue >= (double)mUploadThreshold->gt_float(); default: ALOGE("Value metric no upload threshold type used"); return false; } } Value ValueMetricProducer::getFinalValue(const Interval& interval) { if (mAggregationType != ValueMetric::AVG) { return interval.value; } else { double sum = interval.value.type == LONG ? (double)interval.value.long_value : interval.value.double_value; return Value((double)sum / interval.sampleSize); } } } // namespace statsd } // namespace os } // namespace android