1 /* 2 * Copyright (C) 2021 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.car.telemetry.databroker; 18 19 import android.car.telemetry.MetricsConfigKey; 20 import android.content.ComponentName; 21 import android.content.Context; 22 import android.content.Intent; 23 import android.content.ServiceConnection; 24 import android.os.Handler; 25 import android.os.HandlerThread; 26 import android.os.IBinder; 27 import android.os.Looper; 28 import android.os.Message; 29 import android.os.ParcelFileDescriptor; 30 import android.os.PersistableBundle; 31 import android.os.RemoteException; 32 import android.os.UserHandle; 33 import android.util.ArrayMap; 34 35 import com.android.car.CarLog; 36 import com.android.car.CarServiceUtils; 37 import com.android.car.telemetry.CarTelemetryService; 38 import com.android.car.telemetry.ResultStore; 39 import com.android.car.telemetry.TelemetryProto; 40 import com.android.car.telemetry.TelemetryProto.MetricsConfig; 41 import com.android.car.telemetry.publisher.AbstractPublisher; 42 import com.android.car.telemetry.publisher.PublisherFactory; 43 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutor; 44 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutorListener; 45 import com.android.internal.annotations.VisibleForTesting; 46 import com.android.server.utils.Slogf; 47 48 import java.io.Closeable; 49 import java.io.IOException; 50 import java.io.OutputStream; 51 import java.lang.ref.WeakReference; 52 import java.util.ArrayList; 53 import java.util.List; 54 import java.util.Map; 55 import java.util.concurrent.PriorityBlockingQueue; 56 57 /** 58 * Implementation of the data path component of CarTelemetryService. Forwards the published data 59 * from publishers to consumers subject to the Controller's decision. 60 * All methods should be called from the telemetry thread unless otherwise specified as thread-safe. 61 */ 62 public class DataBrokerImpl implements DataBroker { 63 64 private static final int MSG_HANDLE_TASK = 1; 65 private static final int MSG_BIND_TO_SCRIPT_EXECUTOR = 2; 66 67 /** Bind to script executor 5 times before entering disabled state. */ 68 private static final int MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS = 5; 69 70 private static final String SCRIPT_EXECUTOR_PACKAGE = "com.android.car.scriptexecutor"; 71 private static final String SCRIPT_EXECUTOR_CLASS = 72 "com.android.car.scriptexecutor.ScriptExecutor"; 73 74 private final Context mContext; 75 private final PublisherFactory mPublisherFactory; 76 private final ResultStore mResultStore; 77 private final ScriptExecutorListener mScriptExecutorListener; 78 private final HandlerThread mTelemetryThread = CarServiceUtils.getHandlerThread( 79 CarTelemetryService.class.getSimpleName()); 80 private final Handler mTelemetryHandler = new TaskHandler(mTelemetryThread.getLooper()); 81 82 /** Thread-safe priority queue for scheduling tasks. */ 83 private final PriorityBlockingQueue<ScriptExecutionTask> mTaskQueue = 84 new PriorityBlockingQueue<>(); 85 86 /** 87 * Maps MetricsConfig's unique identifier to its subscriptions. This map is useful when 88 * removing a MetricsConfig. 89 */ 90 private final Map<MetricsConfigKey, List<DataSubscriber>> mSubscriptionMap = new ArrayMap<>(); 91 92 /** 93 * If something irrecoverable happened, DataBroker should enter into a disabled state to prevent 94 * doing futile work. 95 */ 96 private boolean mDisabled = false; 97 98 /** Current number of attempts to bind to ScriptExecutor. */ 99 private int mBindScriptExecutorAttempts = 0; 100 101 /** Priority of current system to determine if a {@link ScriptExecutionTask} can run. */ 102 private int mPriority = 1; 103 104 /** Waiting period between attempts to bind script executor. Can be shortened for tests. */ 105 @VisibleForTesting long mBindScriptExecutorDelayMillis = 3_000L; 106 107 /** 108 * {@link MetricsConfigKey} that uniquely identifies the current running {@link MetricsConfig}. 109 * A non-null value indicates ScriptExecutor is currently running this config, which means 110 * DataBroker should not make another ScriptExecutor binder call. 111 */ 112 private MetricsConfigKey mCurrentMetricsConfigKey; 113 private IScriptExecutor mScriptExecutor; 114 private ScriptFinishedCallback mScriptFinishedCallback; 115 116 private final ServiceConnection mServiceConnection = new ServiceConnection() { 117 @Override 118 public void onServiceConnected(ComponentName name, IBinder service) { 119 mTelemetryHandler.post(() -> { 120 mScriptExecutor = IScriptExecutor.Stub.asInterface(service); 121 scheduleNextTask(); 122 }); 123 } 124 125 @Override 126 public void onServiceDisconnected(ComponentName name) { 127 // TODO(b/198684473): clean up the state after script executor disconnects 128 mTelemetryHandler.post(() -> { 129 mScriptExecutor = null; 130 unbindScriptExecutor(); 131 }); 132 } 133 }; 134 DataBrokerImpl( Context context, PublisherFactory publisherFactory, ResultStore resultStore)135 public DataBrokerImpl( 136 Context context, PublisherFactory publisherFactory, ResultStore resultStore) { 137 mContext = context; 138 mPublisherFactory = publisherFactory; 139 mResultStore = resultStore; 140 mScriptExecutorListener = new ScriptExecutorListener(this); 141 mPublisherFactory.setFailureListener(this::onPublisherFailure); 142 } 143 onPublisherFailure( AbstractPublisher publisher, List<TelemetryProto.MetricsConfig> affectedConfigs, Throwable error)144 private void onPublisherFailure( 145 AbstractPublisher publisher, List<TelemetryProto.MetricsConfig> affectedConfigs, 146 Throwable error) { 147 // TODO(b/193680465): disable MetricsConfig and log the error 148 Slogf.w(CarLog.TAG_TELEMETRY, "publisher failed", error); 149 } 150 bindScriptExecutor()151 private void bindScriptExecutor() { 152 // do not re-bind if broker is in a disabled state or if script executor is nonnull 153 if (mDisabled || mScriptExecutor != null) { 154 return; 155 } 156 Intent intent = new Intent(); 157 intent.setComponent(new ComponentName(SCRIPT_EXECUTOR_PACKAGE, SCRIPT_EXECUTOR_CLASS)); 158 boolean success = mContext.bindServiceAsUser( 159 intent, 160 mServiceConnection, 161 Context.BIND_AUTO_CREATE, 162 UserHandle.SYSTEM); 163 if (success) { 164 mBindScriptExecutorAttempts = 0; // reset 165 return; 166 } 167 unbindScriptExecutor(); 168 mBindScriptExecutorAttempts++; 169 if (mBindScriptExecutorAttempts < MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS) { 170 Slogf.w(CarLog.TAG_TELEMETRY, 171 "failed to get valid connection to ScriptExecutor, retrying in " 172 + mBindScriptExecutorDelayMillis + "ms."); 173 mTelemetryHandler.sendEmptyMessageDelayed(MSG_BIND_TO_SCRIPT_EXECUTOR, 174 mBindScriptExecutorDelayMillis); 175 } else { 176 Slogf.w(CarLog.TAG_TELEMETRY, "failed to get valid connection to ScriptExecutor, " 177 + "disabling DataBroker"); 178 disableBroker(); 179 } 180 } 181 182 /** 183 * Unbinds {@link ScriptExecutor} to release the connection. This method should be called from 184 * the telemetry thread. 185 */ unbindScriptExecutor()186 private void unbindScriptExecutor() { 187 // TODO(b/198648763): unbind from script executor when there is no work to do 188 mCurrentMetricsConfigKey = null; 189 try { 190 mContext.unbindService(mServiceConnection); 191 } catch (IllegalArgumentException e) { 192 // If ScriptExecutor is gone before unbinding, it will throw this exception 193 Slogf.w(CarLog.TAG_TELEMETRY, "Failed to unbind from ScriptExecutor", e); 194 } 195 } 196 197 /** 198 * Enters into a disabled state because something irrecoverable happened. 199 * TODO(b/200841260): expose the state to the caller. 200 */ disableBroker()201 private void disableBroker() { 202 mDisabled = true; 203 // remove all MetricConfigs, disable all publishers, stop receiving data 204 for (MetricsConfigKey key : mSubscriptionMap.keySet()) { 205 // get the metrics config from the DataSubscriber and remove the metrics config 206 if (mSubscriptionMap.get(key).size() != 0) { 207 removeMetricsConfig(key); 208 } 209 } 210 mSubscriptionMap.clear(); 211 } 212 213 @Override addMetricsConfig(MetricsConfigKey key, MetricsConfig metricsConfig)214 public void addMetricsConfig(MetricsConfigKey key, MetricsConfig metricsConfig) { 215 // TODO(b/187743369): pass status back to caller 216 // if broker is disabled or metricsConfig already exists, do nothing 217 if (mDisabled || mSubscriptionMap.containsKey(key)) { 218 return; 219 } 220 // Create the subscribers for this metrics configuration 221 List<DataSubscriber> dataSubscribers = new ArrayList<>( 222 metricsConfig.getSubscribersList().size()); 223 for (TelemetryProto.Subscriber subscriber : metricsConfig.getSubscribersList()) { 224 // protobuf publisher to a concrete Publisher 225 AbstractPublisher publisher = mPublisherFactory.getPublisher( 226 subscriber.getPublisher().getPublisherCase()); 227 // create DataSubscriber from TelemetryProto.Subscriber 228 DataSubscriber dataSubscriber = new DataSubscriber( 229 this, 230 metricsConfig, 231 subscriber); 232 dataSubscribers.add(dataSubscriber); 233 234 try { 235 // The publisher will start sending data to the subscriber. 236 // TODO(b/191378559): handle bad configs 237 publisher.addDataSubscriber(dataSubscriber); 238 } catch (IllegalArgumentException e) { 239 Slogf.w(CarLog.TAG_TELEMETRY, "Invalid config", e); 240 return; 241 } 242 } 243 mSubscriptionMap.put(key, dataSubscribers); 244 } 245 246 @Override removeMetricsConfig(MetricsConfigKey key)247 public void removeMetricsConfig(MetricsConfigKey key) { 248 // TODO(b/187743369): pass status back to caller 249 if (!mSubscriptionMap.containsKey(key)) { 250 return; 251 } 252 // get the subscriptions associated with this MetricsConfig, remove it from the map 253 List<DataSubscriber> dataSubscribers = mSubscriptionMap.remove(key); 254 // for each subscriber, remove it from publishers 255 for (DataSubscriber subscriber : dataSubscribers) { 256 AbstractPublisher publisher = mPublisherFactory.getPublisher( 257 subscriber.getPublisherParam().getPublisherCase()); 258 try { 259 publisher.removeDataSubscriber(subscriber); 260 } catch (IllegalArgumentException e) { 261 // It shouldn't happen, but if happens, let's just log it. 262 Slogf.w(CarLog.TAG_TELEMETRY, "Failed to remove subscriber from publisher", e); 263 } 264 } 265 // Remove all the tasks associated with this metrics config. The underlying impl uses the 266 // weakly consistent iterator, which is thread-safe but does not freeze the collection while 267 // iterating, so it may or may not reflect any updates since the iterator was created. 268 // But since adding & polling from queue should happen in the same thread, the task queue 269 // should not be changed while tasks are being iterated and removed. 270 mTaskQueue.removeIf(task -> task.isAssociatedWithMetricsConfig(key)); 271 } 272 273 @Override removeAllMetricsConfigs()274 public void removeAllMetricsConfigs() { 275 mPublisherFactory.removeAllDataSubscribers(); 276 mSubscriptionMap.clear(); 277 mTaskQueue.clear(); 278 } 279 280 @Override addTaskToQueue(ScriptExecutionTask task)281 public void addTaskToQueue(ScriptExecutionTask task) { 282 if (mDisabled) { 283 return; 284 } 285 mTaskQueue.add(task); 286 scheduleNextTask(); 287 } 288 289 /** 290 * This method can be called from any thread. 291 * It is possible for this method to be invoked from different threads at the same time, but 292 * it is not possible to schedule the same task twice, because the handler handles message 293 * in the order they come in, this means the task will be polled sequentially instead of 294 * concurrently. Every task that is scheduled and run will be distinct. 295 * TODO(b/187743369): If the threading behavior in DataSubscriber changes, ScriptExecutionTask 296 * will also have different threading behavior. Update javadoc when the behavior is decided. 297 */ 298 @Override scheduleNextTask()299 public void scheduleNextTask() { 300 if (mDisabled || mTelemetryHandler.hasMessages(MSG_HANDLE_TASK)) { 301 return; 302 } 303 mTelemetryHandler.sendEmptyMessage(MSG_HANDLE_TASK); 304 } 305 306 @Override setOnScriptFinishedCallback(ScriptFinishedCallback callback)307 public void setOnScriptFinishedCallback(ScriptFinishedCallback callback) { 308 if (mDisabled) { 309 return; 310 } 311 mScriptFinishedCallback = callback; 312 } 313 314 @Override setTaskExecutionPriority(int priority)315 public void setTaskExecutionPriority(int priority) { 316 if (mDisabled) { 317 return; 318 } 319 mPriority = priority; 320 scheduleNextTask(); // when priority updates, schedule a task which checks task queue 321 } 322 323 @VisibleForTesting getSubscriptionMap()324 Map<MetricsConfigKey, List<DataSubscriber>> getSubscriptionMap() { 325 return new ArrayMap<>((ArrayMap<MetricsConfigKey, List<DataSubscriber>>) mSubscriptionMap); 326 } 327 328 @VisibleForTesting getTelemetryHandler()329 Handler getTelemetryHandler() { 330 return mTelemetryHandler; 331 } 332 333 @VisibleForTesting getTaskQueue()334 PriorityBlockingQueue<ScriptExecutionTask> getTaskQueue() { 335 return mTaskQueue; 336 } 337 338 /** 339 * Polls and runs a task from the head of the priority queue if the queue is nonempty and the 340 * head of the queue has priority higher than or equal to the current priority. A higher 341 * priority is denoted by a lower priority number, so head of the queue should have equal or 342 * lower priority number to be polled. 343 */ pollAndExecuteTask()344 private void pollAndExecuteTask() { 345 // check databroker state is ready to run script 346 if (mDisabled || mCurrentMetricsConfigKey != null) { 347 return; 348 } 349 // check task is valid and ready to be run 350 ScriptExecutionTask task = mTaskQueue.peek(); 351 if (task == null || task.getPriority() > mPriority) { 352 return; 353 } 354 // if script executor is null, bind service 355 if (mScriptExecutor == null) { 356 Slogf.w(CarLog.TAG_TELEMETRY, "script executor is null, binding to script executor"); 357 // upon successful binding, a task will be scheduled to run if there are any 358 mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR); 359 return; 360 } 361 mTaskQueue.poll(); // remove task from queue 362 // update current config key because a script is currently running 363 mCurrentMetricsConfigKey = new MetricsConfigKey(task.getMetricsConfig().getName(), 364 task.getMetricsConfig().getVersion()); 365 try { 366 if (task.isLargeData()) { 367 Slogf.d(CarLog.TAG_TELEMETRY, "invoking script executor for large input"); 368 invokeScriptForLargeInput(task); 369 } else { 370 Slogf.d(CarLog.TAG_TELEMETRY, "invoking script executor"); 371 mScriptExecutor.invokeScript( 372 task.getMetricsConfig().getScript(), 373 task.getHandlerName(), 374 task.getData(), 375 mResultStore.getInterimResult(mCurrentMetricsConfigKey.getName()), 376 mScriptExecutorListener); 377 } 378 } catch (RemoteException e) { 379 Slogf.w(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e); 380 unbindScriptExecutor(); 381 addTaskToQueue(task); // will trigger scheduleNextTask() and re-binding scriptexecutor 382 } catch (IOException e) { 383 Slogf.w(CarLog.TAG_TELEMETRY, "Either unable to create pipe or failed to pipe data" 384 + " to ScriptExecutor. Skipping the published data", e); 385 mCurrentMetricsConfigKey = null; 386 scheduleNextTask(); // drop this task and schedule the next one 387 } 388 } 389 390 /** 391 * Sets up pipes, invokes ScriptExecutor#invokeScriptForLargeInput() API, and writes the 392 * script input to the pipe. 393 * 394 * @param task containing all the necessary parameters for ScriptExecutor API. 395 * @throws IOException if cannot create pipe or cannot write the bundle to pipe. 396 * @throws RemoteException if ScriptExecutor failed. 397 */ invokeScriptForLargeInput(ScriptExecutionTask task)398 private void invokeScriptForLargeInput(ScriptExecutionTask task) 399 throws IOException, RemoteException { 400 ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe(); 401 ParcelFileDescriptor readFd = fds[0]; 402 ParcelFileDescriptor writeFd = fds[1]; 403 try { 404 mScriptExecutor.invokeScriptForLargeInput( 405 task.getMetricsConfig().getScript(), 406 task.getHandlerName(), 407 readFd, 408 mResultStore.getInterimResult(mCurrentMetricsConfigKey.getName()), 409 mScriptExecutorListener); 410 } catch (RemoteException e) { 411 closeQuietly(readFd); 412 closeQuietly(writeFd); 413 throw e; 414 } 415 closeQuietly(readFd); 416 417 Slogf.d(CarLog.TAG_TELEMETRY, "writing large script data to pipe"); 418 try (OutputStream outputStream = new ParcelFileDescriptor.AutoCloseOutputStream(writeFd)) { 419 task.getData().writeToStream(outputStream); 420 } 421 } 422 423 /** Quietly closes Java Closeables, ignoring IOException. */ closeQuietly(Closeable closeable)424 private void closeQuietly(Closeable closeable) { 425 try { 426 closeable.close(); 427 } catch (IOException e) { 428 // Ignore 429 } 430 } 431 432 /** Stores final metrics and schedules the next task. */ onScriptFinished(PersistableBundle result)433 private void onScriptFinished(PersistableBundle result) { 434 mTelemetryHandler.post(() -> { 435 mResultStore.putFinalResult(mCurrentMetricsConfigKey.getName(), result); 436 mScriptFinishedCallback.onScriptFinished(mCurrentMetricsConfigKey); 437 mCurrentMetricsConfigKey = null; 438 scheduleNextTask(); 439 }); 440 } 441 442 /** Stores interim metrics and schedules the next task. */ onScriptSuccess(PersistableBundle stateToPersist)443 private void onScriptSuccess(PersistableBundle stateToPersist) { 444 mTelemetryHandler.post(() -> { 445 mResultStore.putInterimResult(mCurrentMetricsConfigKey.getName(), stateToPersist); 446 mCurrentMetricsConfigKey = null; 447 scheduleNextTask(); 448 }); 449 } 450 451 /** Stores telemetry error and schedules the next task. */ onScriptError(int errorType, String message, String stackTrace)452 private void onScriptError(int errorType, String message, String stackTrace) { 453 mTelemetryHandler.post(() -> { 454 TelemetryProto.TelemetryError.Builder error = TelemetryProto.TelemetryError.newBuilder() 455 .setErrorType(TelemetryProto.TelemetryError.ErrorType.forNumber(errorType)) 456 .setMessage(message); 457 if (stackTrace != null) { 458 error.setStackTrace(stackTrace); 459 } 460 mResultStore.putError(mCurrentMetricsConfigKey.getName(), error.build()); 461 mCurrentMetricsConfigKey = null; 462 scheduleNextTask(); 463 }); 464 } 465 466 /** Listens for script execution status. Methods are called on the binder thread. */ 467 private static final class ScriptExecutorListener extends IScriptExecutorListener.Stub { 468 private final WeakReference<DataBrokerImpl> mWeakDataBroker; 469 ScriptExecutorListener(DataBrokerImpl dataBroker)470 private ScriptExecutorListener(DataBrokerImpl dataBroker) { 471 mWeakDataBroker = new WeakReference<>(dataBroker); 472 } 473 474 @Override onScriptFinished(PersistableBundle result)475 public void onScriptFinished(PersistableBundle result) { 476 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 477 if (dataBroker == null) { 478 return; 479 } 480 dataBroker.onScriptFinished(result); 481 } 482 483 @Override onSuccess(PersistableBundle stateToPersist)484 public void onSuccess(PersistableBundle stateToPersist) { 485 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 486 if (dataBroker == null) { 487 return; 488 } 489 dataBroker.onScriptSuccess(stateToPersist); 490 } 491 492 @Override onError(int errorType, String message, String stackTrace)493 public void onError(int errorType, String message, String stackTrace) { 494 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 495 if (dataBroker == null) { 496 return; 497 } 498 dataBroker.onScriptError(errorType, message, stackTrace); 499 } 500 } 501 502 /** Callback handler to handle scheduling and rescheduling of {@link ScriptExecutionTask}s. */ 503 class TaskHandler extends Handler { TaskHandler(Looper looper)504 TaskHandler(Looper looper) { 505 super(looper); 506 } 507 508 /** 509 * Handles a message depending on the message ID. 510 * If the msg ID is MSG_HANDLE_TASK, it polls a task from the priority queue and executing a 511 * {@link ScriptExecutionTask}. There are multiple places where this message is sent: when 512 * priority updates, when a new task is added to the priority queue, and when a task 513 * finishes running. 514 */ 515 @Override handleMessage(Message msg)516 public void handleMessage(Message msg) { 517 switch (msg.what) { 518 case MSG_HANDLE_TASK: 519 pollAndExecuteTask(); // run the next task 520 break; 521 case MSG_BIND_TO_SCRIPT_EXECUTOR: 522 bindScriptExecutor(); 523 break; 524 default: 525 Slogf.w(CarLog.TAG_TELEMETRY, "TaskHandler received unknown message."); 526 } 527 } 528 } 529 } 530