1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.car.telemetry.databroker;
18 
19 import android.car.telemetry.MetricsConfigKey;
20 import android.content.ComponentName;
21 import android.content.Context;
22 import android.content.Intent;
23 import android.content.ServiceConnection;
24 import android.os.Handler;
25 import android.os.HandlerThread;
26 import android.os.IBinder;
27 import android.os.Looper;
28 import android.os.Message;
29 import android.os.ParcelFileDescriptor;
30 import android.os.PersistableBundle;
31 import android.os.RemoteException;
32 import android.os.UserHandle;
33 import android.util.ArrayMap;
34 
35 import com.android.car.CarLog;
36 import com.android.car.CarServiceUtils;
37 import com.android.car.telemetry.CarTelemetryService;
38 import com.android.car.telemetry.ResultStore;
39 import com.android.car.telemetry.TelemetryProto;
40 import com.android.car.telemetry.TelemetryProto.MetricsConfig;
41 import com.android.car.telemetry.publisher.AbstractPublisher;
42 import com.android.car.telemetry.publisher.PublisherFactory;
43 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutor;
44 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutorListener;
45 import com.android.internal.annotations.VisibleForTesting;
46 import com.android.server.utils.Slogf;
47 
48 import java.io.Closeable;
49 import java.io.IOException;
50 import java.io.OutputStream;
51 import java.lang.ref.WeakReference;
52 import java.util.ArrayList;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.concurrent.PriorityBlockingQueue;
56 
57 /**
58  * Implementation of the data path component of CarTelemetryService. Forwards the published data
59  * from publishers to consumers subject to the Controller's decision.
60  * All methods should be called from the telemetry thread unless otherwise specified as thread-safe.
61  */
62 public class DataBrokerImpl implements DataBroker {
63 
64     private static final int MSG_HANDLE_TASK = 1;
65     private static final int MSG_BIND_TO_SCRIPT_EXECUTOR = 2;
66 
67     /** Bind to script executor 5 times before entering disabled state. */
68     private static final int MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS = 5;
69 
70     private static final String SCRIPT_EXECUTOR_PACKAGE = "com.android.car.scriptexecutor";
71     private static final String SCRIPT_EXECUTOR_CLASS =
72             "com.android.car.scriptexecutor.ScriptExecutor";
73 
74     private final Context mContext;
75     private final PublisherFactory mPublisherFactory;
76     private final ResultStore mResultStore;
77     private final ScriptExecutorListener mScriptExecutorListener;
78     private final HandlerThread mTelemetryThread = CarServiceUtils.getHandlerThread(
79             CarTelemetryService.class.getSimpleName());
80     private final Handler mTelemetryHandler = new TaskHandler(mTelemetryThread.getLooper());
81 
82     /** Thread-safe priority queue for scheduling tasks. */
83     private final PriorityBlockingQueue<ScriptExecutionTask> mTaskQueue =
84             new PriorityBlockingQueue<>();
85 
86     /**
87      * Maps MetricsConfig's unique identifier to its subscriptions. This map is useful when
88      * removing a MetricsConfig.
89      */
90     private final Map<MetricsConfigKey, List<DataSubscriber>> mSubscriptionMap = new ArrayMap<>();
91 
92     /**
93      * If something irrecoverable happened, DataBroker should enter into a disabled state to prevent
94      * doing futile work.
95      */
96     private boolean mDisabled = false;
97 
98     /** Current number of attempts to bind to ScriptExecutor. */
99     private int mBindScriptExecutorAttempts = 0;
100 
101     /** Priority of current system to determine if a {@link ScriptExecutionTask} can run. */
102     private int mPriority = 1;
103 
104     /** Waiting period between attempts to bind script executor. Can be shortened for tests. */
105     @VisibleForTesting long mBindScriptExecutorDelayMillis = 3_000L;
106 
107     /**
108      * {@link MetricsConfigKey} that uniquely identifies the current running {@link MetricsConfig}.
109      * A non-null value indicates ScriptExecutor is currently running this config, which means
110      * DataBroker should not make another ScriptExecutor binder call.
111      */
112     private MetricsConfigKey mCurrentMetricsConfigKey;
113     private IScriptExecutor mScriptExecutor;
114     private ScriptFinishedCallback mScriptFinishedCallback;
115 
116     private final ServiceConnection mServiceConnection = new ServiceConnection() {
117         @Override
118         public void onServiceConnected(ComponentName name, IBinder service) {
119             mTelemetryHandler.post(() -> {
120                 mScriptExecutor = IScriptExecutor.Stub.asInterface(service);
121                 scheduleNextTask();
122             });
123         }
124 
125         @Override
126         public void onServiceDisconnected(ComponentName name) {
127             // TODO(b/198684473): clean up the state after script executor disconnects
128             mTelemetryHandler.post(() -> {
129                 mScriptExecutor = null;
130                 unbindScriptExecutor();
131             });
132         }
133     };
134 
DataBrokerImpl( Context context, PublisherFactory publisherFactory, ResultStore resultStore)135     public DataBrokerImpl(
136             Context context, PublisherFactory publisherFactory, ResultStore resultStore) {
137         mContext = context;
138         mPublisherFactory = publisherFactory;
139         mResultStore = resultStore;
140         mScriptExecutorListener = new ScriptExecutorListener(this);
141         mPublisherFactory.setFailureListener(this::onPublisherFailure);
142     }
143 
onPublisherFailure( AbstractPublisher publisher, List<TelemetryProto.MetricsConfig> affectedConfigs, Throwable error)144     private void onPublisherFailure(
145             AbstractPublisher publisher, List<TelemetryProto.MetricsConfig> affectedConfigs,
146             Throwable error) {
147         // TODO(b/193680465): disable MetricsConfig and log the error
148         Slogf.w(CarLog.TAG_TELEMETRY, "publisher failed", error);
149     }
150 
bindScriptExecutor()151     private void bindScriptExecutor() {
152         // do not re-bind if broker is in a disabled state or if script executor is nonnull
153         if (mDisabled || mScriptExecutor != null) {
154             return;
155         }
156         Intent intent = new Intent();
157         intent.setComponent(new ComponentName(SCRIPT_EXECUTOR_PACKAGE, SCRIPT_EXECUTOR_CLASS));
158         boolean success = mContext.bindServiceAsUser(
159                 intent,
160                 mServiceConnection,
161                 Context.BIND_AUTO_CREATE,
162                 UserHandle.SYSTEM);
163         if (success) {
164             mBindScriptExecutorAttempts = 0; // reset
165             return;
166         }
167         unbindScriptExecutor();
168         mBindScriptExecutorAttempts++;
169         if (mBindScriptExecutorAttempts < MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS) {
170             Slogf.w(CarLog.TAG_TELEMETRY,
171                     "failed to get valid connection to ScriptExecutor, retrying in "
172                             + mBindScriptExecutorDelayMillis + "ms.");
173             mTelemetryHandler.sendEmptyMessageDelayed(MSG_BIND_TO_SCRIPT_EXECUTOR,
174                     mBindScriptExecutorDelayMillis);
175         } else {
176             Slogf.w(CarLog.TAG_TELEMETRY, "failed to get valid connection to ScriptExecutor, "
177                     + "disabling DataBroker");
178             disableBroker();
179         }
180     }
181 
182     /**
183      * Unbinds {@link ScriptExecutor} to release the connection. This method should be called from
184      * the telemetry thread.
185      */
unbindScriptExecutor()186     private void unbindScriptExecutor() {
187         // TODO(b/198648763): unbind from script executor when there is no work to do
188         mCurrentMetricsConfigKey = null;
189         try {
190             mContext.unbindService(mServiceConnection);
191         } catch (IllegalArgumentException e) {
192             // If ScriptExecutor is gone before unbinding, it will throw this exception
193             Slogf.w(CarLog.TAG_TELEMETRY, "Failed to unbind from ScriptExecutor", e);
194         }
195     }
196 
197     /**
198      * Enters into a disabled state because something irrecoverable happened.
199      * TODO(b/200841260): expose the state to the caller.
200      */
disableBroker()201     private void disableBroker() {
202         mDisabled = true;
203         // remove all MetricConfigs, disable all publishers, stop receiving data
204         for (MetricsConfigKey key : mSubscriptionMap.keySet()) {
205             // get the metrics config from the DataSubscriber and remove the metrics config
206             if (mSubscriptionMap.get(key).size() != 0) {
207                 removeMetricsConfig(key);
208             }
209         }
210         mSubscriptionMap.clear();
211     }
212 
213     @Override
addMetricsConfig(MetricsConfigKey key, MetricsConfig metricsConfig)214     public void addMetricsConfig(MetricsConfigKey key, MetricsConfig metricsConfig) {
215         // TODO(b/187743369): pass status back to caller
216         // if broker is disabled or metricsConfig already exists, do nothing
217         if (mDisabled || mSubscriptionMap.containsKey(key)) {
218             return;
219         }
220         // Create the subscribers for this metrics configuration
221         List<DataSubscriber> dataSubscribers = new ArrayList<>(
222                 metricsConfig.getSubscribersList().size());
223         for (TelemetryProto.Subscriber subscriber : metricsConfig.getSubscribersList()) {
224             // protobuf publisher to a concrete Publisher
225             AbstractPublisher publisher = mPublisherFactory.getPublisher(
226                     subscriber.getPublisher().getPublisherCase());
227             // create DataSubscriber from TelemetryProto.Subscriber
228             DataSubscriber dataSubscriber = new DataSubscriber(
229                     this,
230                     metricsConfig,
231                     subscriber);
232             dataSubscribers.add(dataSubscriber);
233 
234             try {
235                 // The publisher will start sending data to the subscriber.
236                 // TODO(b/191378559): handle bad configs
237                 publisher.addDataSubscriber(dataSubscriber);
238             } catch (IllegalArgumentException e) {
239                 Slogf.w(CarLog.TAG_TELEMETRY, "Invalid config", e);
240                 return;
241             }
242         }
243         mSubscriptionMap.put(key, dataSubscribers);
244     }
245 
246     @Override
removeMetricsConfig(MetricsConfigKey key)247     public void removeMetricsConfig(MetricsConfigKey key) {
248         // TODO(b/187743369): pass status back to caller
249         if (!mSubscriptionMap.containsKey(key)) {
250             return;
251         }
252         // get the subscriptions associated with this MetricsConfig, remove it from the map
253         List<DataSubscriber> dataSubscribers = mSubscriptionMap.remove(key);
254         // for each subscriber, remove it from publishers
255         for (DataSubscriber subscriber : dataSubscribers) {
256             AbstractPublisher publisher = mPublisherFactory.getPublisher(
257                     subscriber.getPublisherParam().getPublisherCase());
258             try {
259                 publisher.removeDataSubscriber(subscriber);
260             } catch (IllegalArgumentException e) {
261                 // It shouldn't happen, but if happens, let's just log it.
262                 Slogf.w(CarLog.TAG_TELEMETRY, "Failed to remove subscriber from publisher", e);
263             }
264         }
265         // Remove all the tasks associated with this metrics config. The underlying impl uses the
266         // weakly consistent iterator, which is thread-safe but does not freeze the collection while
267         // iterating, so it may or may not reflect any updates since the iterator was created.
268         // But since adding & polling from queue should happen in the same thread, the task queue
269         // should not be changed while tasks are being iterated and removed.
270         mTaskQueue.removeIf(task -> task.isAssociatedWithMetricsConfig(key));
271     }
272 
273     @Override
removeAllMetricsConfigs()274     public void removeAllMetricsConfigs() {
275         mPublisherFactory.removeAllDataSubscribers();
276         mSubscriptionMap.clear();
277         mTaskQueue.clear();
278     }
279 
280     @Override
addTaskToQueue(ScriptExecutionTask task)281     public void addTaskToQueue(ScriptExecutionTask task) {
282         if (mDisabled) {
283             return;
284         }
285         mTaskQueue.add(task);
286         scheduleNextTask();
287     }
288 
289     /**
290      * This method can be called from any thread.
291      * It is possible for this method to be invoked from different threads at the same time, but
292      * it is not possible to schedule the same task twice, because the handler handles message
293      * in the order they come in, this means the task will be polled sequentially instead of
294      * concurrently. Every task that is scheduled and run will be distinct.
295      * TODO(b/187743369): If the threading behavior in DataSubscriber changes, ScriptExecutionTask
296      *  will also have different threading behavior. Update javadoc when the behavior is decided.
297      */
298     @Override
scheduleNextTask()299     public void scheduleNextTask() {
300         if (mDisabled || mTelemetryHandler.hasMessages(MSG_HANDLE_TASK)) {
301             return;
302         }
303         mTelemetryHandler.sendEmptyMessage(MSG_HANDLE_TASK);
304     }
305 
306     @Override
setOnScriptFinishedCallback(ScriptFinishedCallback callback)307     public void setOnScriptFinishedCallback(ScriptFinishedCallback callback) {
308         if (mDisabled) {
309             return;
310         }
311         mScriptFinishedCallback = callback;
312     }
313 
314     @Override
setTaskExecutionPriority(int priority)315     public void setTaskExecutionPriority(int priority) {
316         if (mDisabled) {
317             return;
318         }
319         mPriority = priority;
320         scheduleNextTask(); // when priority updates, schedule a task which checks task queue
321     }
322 
323     @VisibleForTesting
getSubscriptionMap()324     Map<MetricsConfigKey, List<DataSubscriber>> getSubscriptionMap() {
325         return new ArrayMap<>((ArrayMap<MetricsConfigKey, List<DataSubscriber>>) mSubscriptionMap);
326     }
327 
328     @VisibleForTesting
getTelemetryHandler()329     Handler getTelemetryHandler() {
330         return mTelemetryHandler;
331     }
332 
333     @VisibleForTesting
getTaskQueue()334     PriorityBlockingQueue<ScriptExecutionTask> getTaskQueue() {
335         return mTaskQueue;
336     }
337 
338     /**
339      * Polls and runs a task from the head of the priority queue if the queue is nonempty and the
340      * head of the queue has priority higher than or equal to the current priority. A higher
341      * priority is denoted by a lower priority number, so head of the queue should have equal or
342      * lower priority number to be polled.
343      */
pollAndExecuteTask()344     private void pollAndExecuteTask() {
345         // check databroker state is ready to run script
346         if (mDisabled || mCurrentMetricsConfigKey != null) {
347             return;
348         }
349         // check task is valid and ready to be run
350         ScriptExecutionTask task = mTaskQueue.peek();
351         if (task == null || task.getPriority() > mPriority) {
352             return;
353         }
354         // if script executor is null, bind service
355         if (mScriptExecutor == null) {
356             Slogf.w(CarLog.TAG_TELEMETRY, "script executor is null, binding to script executor");
357             // upon successful binding, a task will be scheduled to run if there are any
358             mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR);
359             return;
360         }
361         mTaskQueue.poll(); // remove task from queue
362         // update current config key because a script is currently running
363         mCurrentMetricsConfigKey = new MetricsConfigKey(task.getMetricsConfig().getName(),
364                 task.getMetricsConfig().getVersion());
365         try {
366             if (task.isLargeData()) {
367                 Slogf.d(CarLog.TAG_TELEMETRY, "invoking script executor for large input");
368                 invokeScriptForLargeInput(task);
369             } else {
370                 Slogf.d(CarLog.TAG_TELEMETRY, "invoking script executor");
371                 mScriptExecutor.invokeScript(
372                         task.getMetricsConfig().getScript(),
373                         task.getHandlerName(),
374                         task.getData(),
375                         mResultStore.getInterimResult(mCurrentMetricsConfigKey.getName()),
376                         mScriptExecutorListener);
377             }
378         } catch (RemoteException e) {
379             Slogf.w(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e);
380             unbindScriptExecutor();
381             addTaskToQueue(task); // will trigger scheduleNextTask() and re-binding scriptexecutor
382         } catch (IOException e) {
383             Slogf.w(CarLog.TAG_TELEMETRY, "Either unable to create pipe or failed to pipe data"
384                     + " to ScriptExecutor. Skipping the published data", e);
385             mCurrentMetricsConfigKey = null;
386             scheduleNextTask(); // drop this task and schedule the next one
387         }
388     }
389 
390     /**
391      * Sets up pipes, invokes ScriptExecutor#invokeScriptForLargeInput() API, and writes the
392      * script input to the pipe.
393      *
394      * @param task containing all the necessary parameters for ScriptExecutor API.
395      * @throws IOException if cannot create pipe or cannot write the bundle to pipe.
396      * @throws RemoteException if ScriptExecutor failed.
397      */
invokeScriptForLargeInput(ScriptExecutionTask task)398     private void invokeScriptForLargeInput(ScriptExecutionTask task)
399             throws IOException, RemoteException {
400         ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe();
401         ParcelFileDescriptor readFd = fds[0];
402         ParcelFileDescriptor writeFd = fds[1];
403         try {
404             mScriptExecutor.invokeScriptForLargeInput(
405                     task.getMetricsConfig().getScript(),
406                     task.getHandlerName(),
407                     readFd,
408                     mResultStore.getInterimResult(mCurrentMetricsConfigKey.getName()),
409                     mScriptExecutorListener);
410         } catch (RemoteException e) {
411             closeQuietly(readFd);
412             closeQuietly(writeFd);
413             throw e;
414         }
415         closeQuietly(readFd);
416 
417         Slogf.d(CarLog.TAG_TELEMETRY, "writing large script data to pipe");
418         try (OutputStream outputStream = new ParcelFileDescriptor.AutoCloseOutputStream(writeFd)) {
419             task.getData().writeToStream(outputStream);
420         }
421     }
422 
423     /** Quietly closes Java Closeables, ignoring IOException. */
closeQuietly(Closeable closeable)424     private void closeQuietly(Closeable closeable) {
425         try {
426             closeable.close();
427         } catch (IOException e) {
428             // Ignore
429         }
430     }
431 
432     /** Stores final metrics and schedules the next task. */
onScriptFinished(PersistableBundle result)433     private void onScriptFinished(PersistableBundle result) {
434         mTelemetryHandler.post(() -> {
435             mResultStore.putFinalResult(mCurrentMetricsConfigKey.getName(), result);
436             mScriptFinishedCallback.onScriptFinished(mCurrentMetricsConfigKey);
437             mCurrentMetricsConfigKey = null;
438             scheduleNextTask();
439         });
440     }
441 
442     /** Stores interim metrics and schedules the next task. */
onScriptSuccess(PersistableBundle stateToPersist)443     private void onScriptSuccess(PersistableBundle stateToPersist) {
444         mTelemetryHandler.post(() -> {
445             mResultStore.putInterimResult(mCurrentMetricsConfigKey.getName(), stateToPersist);
446             mCurrentMetricsConfigKey = null;
447             scheduleNextTask();
448         });
449     }
450 
451     /** Stores telemetry error and schedules the next task. */
onScriptError(int errorType, String message, String stackTrace)452     private void onScriptError(int errorType, String message, String stackTrace) {
453         mTelemetryHandler.post(() -> {
454             TelemetryProto.TelemetryError.Builder error = TelemetryProto.TelemetryError.newBuilder()
455                     .setErrorType(TelemetryProto.TelemetryError.ErrorType.forNumber(errorType))
456                     .setMessage(message);
457             if (stackTrace != null) {
458                 error.setStackTrace(stackTrace);
459             }
460             mResultStore.putError(mCurrentMetricsConfigKey.getName(), error.build());
461             mCurrentMetricsConfigKey = null;
462             scheduleNextTask();
463         });
464     }
465 
466     /** Listens for script execution status. Methods are called on the binder thread. */
467     private static final class ScriptExecutorListener extends IScriptExecutorListener.Stub {
468         private final WeakReference<DataBrokerImpl> mWeakDataBroker;
469 
ScriptExecutorListener(DataBrokerImpl dataBroker)470         private ScriptExecutorListener(DataBrokerImpl dataBroker) {
471             mWeakDataBroker = new WeakReference<>(dataBroker);
472         }
473 
474         @Override
onScriptFinished(PersistableBundle result)475         public void onScriptFinished(PersistableBundle result) {
476             DataBrokerImpl dataBroker = mWeakDataBroker.get();
477             if (dataBroker == null) {
478                 return;
479             }
480             dataBroker.onScriptFinished(result);
481         }
482 
483         @Override
onSuccess(PersistableBundle stateToPersist)484         public void onSuccess(PersistableBundle stateToPersist) {
485             DataBrokerImpl dataBroker = mWeakDataBroker.get();
486             if (dataBroker == null) {
487                 return;
488             }
489             dataBroker.onScriptSuccess(stateToPersist);
490         }
491 
492         @Override
onError(int errorType, String message, String stackTrace)493         public void onError(int errorType, String message, String stackTrace) {
494             DataBrokerImpl dataBroker = mWeakDataBroker.get();
495             if (dataBroker == null) {
496                 return;
497             }
498             dataBroker.onScriptError(errorType, message, stackTrace);
499         }
500     }
501 
502     /** Callback handler to handle scheduling and rescheduling of {@link ScriptExecutionTask}s. */
503     class TaskHandler extends Handler {
TaskHandler(Looper looper)504         TaskHandler(Looper looper) {
505             super(looper);
506         }
507 
508         /**
509          * Handles a message depending on the message ID.
510          * If the msg ID is MSG_HANDLE_TASK, it polls a task from the priority queue and executing a
511          * {@link ScriptExecutionTask}. There are multiple places where this message is sent: when
512          * priority updates, when a new task is added to the priority queue, and when a task
513          * finishes running.
514          */
515         @Override
handleMessage(Message msg)516         public void handleMessage(Message msg) {
517             switch (msg.what) {
518                 case MSG_HANDLE_TASK:
519                     pollAndExecuteTask(); // run the next task
520                     break;
521                 case MSG_BIND_TO_SCRIPT_EXECUTOR:
522                     bindScriptExecutor();
523                     break;
524                 default:
525                     Slogf.w(CarLog.TAG_TELEMETRY, "TaskHandler received unknown message.");
526             }
527         }
528     }
529 }
530