1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifdef HAVE_CONFIG_H
17 #include <config.h>
18 #endif
19 
20 #ifndef LOG_TAG
21 #define LOG_TAG "ModuleInnerCapturerSink"
22 #endif
23 
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <errno.h>
27 #include <unistd.h>
28 
29 #include <pulse/rtclock.h>
30 #include <pulse/timeval.h>
31 #include <pulse/util.h>
32 #include <pulse/xmalloc.h>
33 
34 #include <pulsecore/i18n.h>
35 #include <pulsecore/macro.h>
36 #include <pulsecore/sink.h>
37 #include <pulsecore/module.h>
38 #include <pulsecore/core-util.h>
39 #include <pulsecore/modargs.h>
40 #include <pulsecore/log.h>
41 #include <pulsecore/thread.h>
42 #include <pulsecore/thread-mq.h>
43 #include <pulsecore/rtpoll.h>
44 
45 #include "securec.h"
46 #include "audio_common_log.h"
47 #include "audio_utils_c.h"
48 #include "audio_volume_c.h"
49 
50 PA_MODULE_AUTHOR("OpenHarmony");
51 PA_MODULE_DESCRIPTION(_("Inner Capturer Sink"));
52 PA_MODULE_VERSION(PACKAGE_VERSION);
53 PA_MODULE_LOAD_ONCE(false);
54 PA_MODULE_USAGE(
55         "sink_name=<name of sink> "
56         "sink_properties=<properties for the sink> "
57         "format=<sample format> "
58         "rate=<sample rate> "
59         "channels=<number of channels> "
60         "channel_map=<channel map>"
61         "buffer_size=<custom buffer size>"
62         "formats=<semi-colon separated sink formats>");
63 
64 #define DEFAULT_SINK_NAME "InnerCapturer"
65 #define DEFAULT_BUFFER_SIZE 8192  // same as HDI Sink
66 #define PA_ERR (-1)
67 const char *SINK_NAME_INNER_CAPTURER = "InnerCapturerSink";
68 
69 struct userdata {
70     pa_core *core;
71     pa_module *module;
72     pa_sink *sink;
73 
74     pa_thread *thread;
75     pa_thread_mq thread_mq;
76     pa_rtpoll *rtpoll;
77 
78     uint32_t buffer_size;
79     pa_usec_t block_usec;
80     pa_usec_t timestamp;
81 
82     pa_idxset *formats;
83 };
84 
85 static const char * const VALID_MODARGS[] = {
86     "sink_name",
87     "sink_properties",
88     "format",
89     "rate",
90     "channels",
91     "channel_map",
92     "buffer_size",
93     "formats",
94     NULL
95 };
96 
SinkProcessMsg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)97 static int SinkProcessMsg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk)
98 {
99     switch (code) {
100         case PA_SINK_MESSAGE_GET_LATENCY:
101             *((int64_t*) data) = 0;
102             return 0;
103         default:
104             break;
105     }
106 
107     return pa_sink_process_msg(o, code, data, offset, chunk);
108 }
109 
110 /* Called from the IO thread. */
SinkSetStateInIoThreadCb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)111 static int SinkSetStateInIoThreadCb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause)
112 {
113     struct userdata *u;
114 
115     pa_assert(s);
116     pa_assert_se(u = s->userdata);
117 
118     if (s->thread_info.state == PA_SINK_SUSPENDED || s->thread_info.state == PA_SINK_INIT) {
119         if (PA_SINK_IS_OPENED(new_state)) {
120             u->timestamp = pa_rtclock_now();
121         }
122     }
123 
124     return 0;
125 }
126 
SinkUpdateRequestedLatencyCb(pa_sink * s)127 static void SinkUpdateRequestedLatencyCb(pa_sink *s)
128 {
129     struct userdata *u;
130     size_t nbytes;
131 
132     pa_sink_assert_ref(s);
133     pa_assert_se(u = s->userdata);
134 
135     u->block_usec = pa_sink_get_requested_latency_within_thread(s);
136 
137     if (u->block_usec == (pa_usec_t) -1) {
138         u->block_usec = s->thread_info.max_latency;
139     }
140 
141     nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
142     pa_sink_set_max_rewind_within_thread(s, nbytes);
143     pa_sink_set_max_request_within_thread(s, nbytes);
144 }
145 
SinkReconfigureCb(pa_sink * s,pa_sample_spec * spec,bool passthrough)146 static void SinkReconfigureCb(pa_sink *s, pa_sample_spec *spec, bool passthrough)
147 {
148     s->sample_spec = *spec;
149 }
150 
SinkSetFormatsCb(pa_sink * s,pa_idxset * formats)151 static bool SinkSetFormatsCb(pa_sink *s, pa_idxset *formats)
152 {
153     struct userdata *u = s->userdata;
154 
155     pa_assert(u);
156 
157     pa_idxset_free(u->formats, (pa_free_cb_t) pa_format_info_free);
158     u->formats = pa_idxset_copy(formats, (pa_copy_func_t) pa_format_info_copy);
159 
160     return true;
161 }
162 
SinkGetFormatsCb(pa_sink * s)163 static pa_idxset* SinkGetFormatsCb(pa_sink *s)
164 {
165     struct userdata *u = s->userdata;
166 
167     pa_assert(u);
168 
169     return pa_idxset_copy(u->formats, (pa_copy_func_t) pa_format_info_copy);
170 }
171 
ProcessRewind(struct userdata * u,pa_usec_t now)172 static void ProcessRewind(struct userdata *u, pa_usec_t now)
173 {
174     size_t rewindNbytes;
175     size_t inBuffer;
176     pa_usec_t delay;
177 
178     pa_assert(u);
179 
180     rewindNbytes = u->sink->thread_info.rewind_nbytes;
181     if (!PA_SINK_IS_OPENED(u->sink->thread_info.state) || rewindNbytes <= 0) {
182         goto do_nothing;
183     }
184     AUDIO_DEBUG_LOG("Requested to rewind %lu bytes.", (unsigned long) rewindNbytes);
185 
186     if (u->timestamp <= now) {
187         goto do_nothing;
188     }
189 
190     delay = u->timestamp - now;
191     inBuffer = pa_usec_to_bytes(delay, &u->sink->sample_spec);
192     if (inBuffer <= 0) {
193         goto do_nothing;
194     }
195 
196     if (rewindNbytes > inBuffer) {
197         rewindNbytes = inBuffer;
198     }
199 
200     pa_sink_process_rewind(u->sink, rewindNbytes);
201     u->timestamp -= pa_bytes_to_usec(rewindNbytes, &u->sink->sample_spec);
202 
203     AUDIO_DEBUG_LOG("Rewound %lu bytes.", (unsigned long) rewindNbytes);
204     return;
205 
206 do_nothing:
207     pa_sink_process_rewind(u->sink, 0);
208 }
209 
SafeProplistGets(const pa_proplist * p,const char * key,const char * defstr)210 static const char *SafeProplistGets(const pa_proplist *p, const char *key, const char *defstr)
211 {
212     const char *res = pa_proplist_gets(p, key);
213     if (res == NULL) {
214         return defstr;
215     }
216     return res;
217 }
218 
SetSinkVolumeBySinkName(pa_sink * s)219 static void SetSinkVolumeBySinkName(pa_sink *s)
220 {
221     pa_assert(s);
222     void *state = NULL;
223     pa_sink_input *input;
224     while ((input = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))) {
225         pa_sink_input_assert_ref(input);
226         if (input->thread_info.state != PA_SINK_INPUT_RUNNING) {
227             continue;
228         }
229         const char *streamType = SafeProplistGets(input->proplist, "stream.type", "NULL");
230         const char *sessionIDStr = SafeProplistGets(input->proplist, "stream.sessionID", "NULL");
231         uint32_t sessionID = sessionIDStr != NULL ? atoi(sessionIDStr) : 0;
232         float volumeFloat = 1.0f;
233         if (!strcmp(s->name, SINK_NAME_INNER_CAPTURER)) { // inner capturer only stream volume
234             volumeFloat = GetStreamVolume(sessionID);
235         } else {
236             volumeFloat = GetCurVolume(sessionID, streamType, s->name);
237         }
238         uint32_t volume = pa_sw_volume_from_linear(volumeFloat);
239         pa_cvolume_set(&input->thread_info.soft_volume, input->thread_info.soft_volume.channels, volume);
240     }
241 }
242 
UnsetSinkVolume(pa_sink * s)243 static void UnsetSinkVolume(pa_sink *s)
244 {
245     pa_assert(s);
246     void *state = NULL;
247     pa_sink_input *input;
248     while ((input = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))) {
249         pa_sink_input_assert_ref(input);
250         if (input->thread_info.state != PA_SINK_INPUT_RUNNING) {
251             continue;
252         }
253         uint32_t volume = pa_sw_volume_from_linear(1.0f);
254         pa_cvolume_set(&input->thread_info.soft_volume, input->thread_info.soft_volume.channels, volume);
255     }
256 }
257 
ProcessRender(struct userdata * u,pa_usec_t now)258 static void ProcessRender(struct userdata *u, pa_usec_t now)
259 {
260     size_t ate = 0;
261 
262     pa_assert(u);
263 
264     // update use volume
265     SetSinkVolumeBySinkName(u->sink);
266 
267     /* This is the configured latency. Sink inputs connected to us
268     might not have a single frame more than the maxrequest value
269     queued. Hence: at maximum read this many bytes from the sink
270     inputs. */
271 
272     /* Fill the buffer up the latency size */
273     while (u->timestamp < now + u->block_usec) {
274         pa_memchunk chunk;
275 
276         pa_sink_render(u->sink, u->sink->thread_info.max_request, &chunk);
277         AUTO_CTRACE("inner_capturer_sink: ProcessRender len %zu, max_request %zu", chunk.length,
278             u->sink->thread_info.max_request);
279         pa_memblock_unref(chunk.memblock);
280 
281         u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
282 
283         ate += chunk.length;
284         if (ate >= u->sink->thread_info.max_request) {
285             break;
286         }
287     }
288 
289     UnsetSinkVolume(u->sink);
290 }
291 
ThreadFunc(void * userdata)292 static void ThreadFunc(void *userdata)
293 {
294     struct userdata *u = userdata;
295 
296     pa_assert(u);
297 
298     AUDIO_DEBUG_LOG("Thread starting up");
299     if (u->core->realtime_scheduling) {
300         pa_thread_make_realtime(u->core->realtime_priority);
301     }
302 
303     pa_thread_mq_install(&u->thread_mq);
304 
305     u->timestamp = pa_rtclock_now();
306 
307     for (;;) {
308         pa_usec_t now = 0;
309         int ret;
310 
311         if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
312             now = pa_rtclock_now();
313         }
314 
315         if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) {
316             ProcessRewind(u, now);
317         }
318 
319         /* Render some data and drop it immediately */
320         if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
321             if (u->timestamp <= now) {
322                 ProcessRender(u, now);
323             }
324 
325             pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
326         } else {
327             pa_rtpoll_set_timer_disabled(u->rtpoll);
328         }
329 
330         /* Hmm, nothing to do. Let's sleep */
331         if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) {
332             goto fail;
333         }
334 
335         if (ret == 0) {
336             goto finish;
337         }
338     }
339 
340 fail:
341     /* If this was no regular exit from the loop we have to continue
342      * processing messages until we received PA_MESSAGE_SHUTDOWN */
343     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core),
344         PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
345     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
346 
347 finish:
348     AUDIO_DEBUG_LOG("Thread shutting down");
349 }
350 
InitFailed(pa_module * m,pa_modargs * ma)351 int InitFailed(pa_module *m, pa_modargs *ma)
352 {
353     AUDIO_ERR_LOG("Inner Capturer Sink Init Failed");
354     if (ma)
355         pa_modargs_free(ma);
356 
357     pa__done(m);
358 
359     return PA_ERR;
360 }
361 
CreateSink(pa_module * m,pa_modargs * ma,struct userdata * u)362 int CreateSink(pa_module *m, pa_modargs *ma, struct userdata *u)
363 {
364     pa_sample_spec ss;
365     pa_channel_map map;
366     pa_sink_new_data data;
367     pa_format_info *format;
368 
369     pa_assert(m);
370 
371     ss = m->core->default_sample_spec;
372     map = m->core->default_channel_map;
373     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
374         AUDIO_ERR_LOG("Invalid sample format specification or channel map");
375         return PA_ERR;
376     }
377 
378     pa_sink_new_data_init(&data);
379     data.driver = __FILE__;
380     data.module = m;
381     pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
382     pa_sink_new_data_set_sample_spec(&data, &ss);
383     pa_sink_new_data_set_channel_map(&data, &map);
384     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, _("Null Output"));
385     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, "capturer");
386     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, "innercapturer");
387 
388     u->formats = pa_idxset_new(NULL, NULL);
389     format = pa_format_info_new();
390     format->encoding = PA_ENCODING_PCM;
391     pa_idxset_put(u->formats, format, NULL);
392 
393     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
394         AUDIO_ERR_LOG("Invalid properties");
395         pa_sink_new_data_done(&data);
396         return PA_ERR;
397     }
398 
399     u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY);
400     pa_sink_new_data_done(&data);
401 
402     if (!u->sink) {
403         AUDIO_ERR_LOG("Failed to create sink.");
404         return PA_ERR;
405     }
406 
407     u->sink->parent.process_msg = SinkProcessMsg;
408     u->sink->set_state_in_io_thread = SinkSetStateInIoThreadCb;
409     u->sink->update_requested_latency = SinkUpdateRequestedLatencyCb;
410     u->sink->reconfigure = SinkReconfigureCb;
411     u->sink->get_formats = SinkGetFormatsCb;
412     u->sink->set_formats = SinkSetFormatsCb;
413     u->sink->userdata = u;
414 
415     return 0;
416 }
417 
pa__init(pa_module * m)418 int pa__init(pa_module *m)
419 {
420     struct userdata *u = NULL;
421     pa_modargs *ma = NULL;
422     size_t nbytes;
423     int mq;
424     int mg;
425 
426     pa_assert(m);
427 
428     ma = pa_modargs_new(m->argument, VALID_MODARGS);
429     CHECK_AND_RETURN_RET_LOG(ma != NULL, InitFailed(m, ma), "Failed to parse module arguments:%{public}s", m->argument);
430 
431     m->userdata = u = pa_xnew0(struct userdata, 1);
432     u->core = m->core;
433     u->module = m;
434     u->rtpoll = pa_rtpoll_new();
435 
436     mq = pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
437     CHECK_AND_RETURN_RET_LOG(mq >=0, InitFailed(m, ma), "pa_thread_mq_init() failed.");
438 
439     if (CreateSink(m, ma, u) != 0) {
440         return InitFailed(m, ma);
441     }
442 
443     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
444     pa_sink_set_rtpoll(u->sink, u->rtpoll);
445 
446     u->buffer_size = DEFAULT_BUFFER_SIZE;
447 
448     mg = pa_modargs_get_value_u32(ma, "buffer_size", &u->buffer_size);
449     CHECK_AND_RETURN_RET_LOG(mg >= 0, InitFailed(m, ma),
450         "Failed to parse buffer_size arg in capturer sink");
451 
452     u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec);
453     nbytes = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
454 
455     pa_sink_set_max_rewind(u->sink, nbytes);
456 
457     pa_sink_set_max_request(u->sink, u->buffer_size);
458 
459     if (!(u->thread = pa_thread_new("OS_InnerCap", ThreadFunc, u))) {
460         AUDIO_ERR_LOG("Failed to create thread.");
461         return InitFailed(m, ma);
462     }
463     pa_sink_set_latency_range(u->sink, 0, u->block_usec);
464 
465     pa_sink_put(u->sink);
466 
467     pa_modargs_free(ma);
468 
469     return 0;
470 }
471 
pa__get_n_used(pa_module * m)472 int pa__get_n_used(pa_module *m)
473 {
474     struct userdata *u;
475 
476     pa_assert(m);
477     pa_assert_se(u = m->userdata);
478 
479     return pa_sink_linked_by(u->sink);
480 }
481 
pa__done(pa_module * m)482 void pa__done(pa_module*m)
483 {
484     struct userdata *u;
485 
486     pa_assert(m);
487 
488     if (!(u = m->userdata)) {
489         return;
490     }
491 
492     if (u->sink) {
493         pa_sink_unlink(u->sink);
494     }
495 
496     if (u->thread) {
497         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
498         pa_thread_free(u->thread);
499     }
500 
501     pa_thread_mq_done(&u->thread_mq);
502 
503     if (u->sink) {
504         pa_sink_unref(u->sink);
505     }
506 
507     if (u->rtpoll) {
508         pa_rtpoll_free(u->rtpoll);
509     }
510 
511     if (u->formats) {
512         pa_idxset_free(u->formats, (pa_free_cb_t) pa_format_info_free);
513     }
514 
515     pa_xfree(u);
516     m->userdata = NULL;
517 }
518