Pipeline

概述

Pipeline是一种处理音频流数据的统一框架组件,将音频流处理过程拆解成串行模式,提供了一种可灵活定义数据处理步骤的方法。上层应用可以按照不同的需求,依序预设好数据处理操作即可。

功能特性

  • 支持上层应用灵活定义数据处理流程

  • 提供处理流程统一的processor结构体,便于统一管理

流程图

  • 使用pipeline时,每个处理步骤都作为独立的处理工序,以流水线方式运行。

  • 前一个处理工序的输出内容,作为下一个处理工序的输入内容。

        flowchart LR

A(processor A) --> buf1[/Pipeline buffer/] --> B(processor B)
B(processor B) --> buf2[/Pipeline buffer/] --> C(processor C)
subgraph stream
A --> B --> C
end
    

Pipeline stream运行流程图

运行原理

  • Pipeline里的processor模块表示各功能模块,提供了主要的数据处理接口。struct pipeline_processor结构为各功能模块需实现的统一接口,stream通过struct pipeline_processor结构对各processor进行管理。

  • Pipeline procedure的数据处理都是由对应的processor实现,procedure保存processor针对特定stream生成的handler和配置。同一个pipeline stream的procedure采用链表方式记录,执行到某个procedure时,根据procedure->processor找到对应的processor,执行具体的数据处理。

  • Pipeline buff即plb表示数据帧,plb采用链表形式,每个plb都相当于数据帧的header部分,记录了该数据帧的相关信息,数据帧存储在plb->buf里。需要注意的是plb里的procedure指针,只有当该数据帧被某个处理工序挂起时才会给plb->procedure赋值,标明当前挂起该数据帧的是哪个处理工序。

  • 数据流处理基于stream管理,stream通过stream_program数组创建,提供了WQ_PIPELINE_STREAM_SCHEDULE_START、WQ_PIPELINE_STREAM_SCHEDULE_STEP、WQ_PIPELINE_STREAM_SCHEDULE_END宏定义来统一定义stream的流程数组。

  • Pipeline stream在创建的时候,需要发送create command,依次执行到stream_program数组里对应的processor probe接口,生成procedure,并加入到stream procedure链表中。

  • Pipeline里处理command的函数是wq_pipeline_stream_control,该函数作为接收command的统一入口。

    1. 接收到create command:阻塞执行,create完成后才会返回。command->param里会记录当前stream的类型,为WQ_PIPELINE_STREAM_CTX_OWN_TASK类型时需要创建独立task运行。

    2. 接收到update command:异步执行,将command加入stream command list表中,通过wq_pipeline_stream_trigger抛消息到对应的task中进行处理。

    3. 接收到destory command:执行方式默认异步执行,若需要阻塞方式,则需要实现command的callback函数。stream发送destory command后等待event,而callback函数在destory完成后才会设置event,以此来实现阻塞方式。

  • 如果存在某个procedure输出多个数据帧的情况,stream会将除第一个输出plb挂起到下一个prcedure,然后取第一个输出plb作为下一个prcedure的输入plb。Stream挂起的数据帧也是按照链表方式组织,当已经有挂起的plb时,后挂起的plb会插入到挂起plb list的头部。

  • Pipeline stream在运行的时候,先检查是否有挂起的数据帧,如果存在则根据plb中记录的procedure开始执行;如果没有挂起的数据帧,则从stream的第一个procedure开始执行。Pipeline stream运行时按照链表顺序执行procedure,依次执行对应processor->run函数。每一个procedure的输出plb,作为下一个procedure的输入plb。

  • Pipeline stream通过wq_pipeline_stream_trigger启动stream运行,stream会查找plb list中是否有数据,读到数据帧时会依次执行processor,直到plb list中不再有数据帧。

  • Pipeline shaper模块是用于数据帧长度转换。Pipeline shaper模块会申请一块内存作为数据池,将输入的plb都拷贝到数据池里,然后从数据池中取出特定长度的数据帧作为输出数据帧。

  • 当有多路数据输入,需要混合成一路输出时,则需要用到pipeline gather模块。使用Pipeline gather模块时,需要配置gather config,gather模块初始化时需要根据config信息设置当前gather handler信息。若有多个输出需要gather,则根据gather handler里的gather id来区分。当某个gather id的gather handler已经创建时,仅需要attach到已经创建的gather client。Gather users记录了需要混合的数据路数。

  • Pipeline mirror模块用于拷贝上一个proceduer的输出。当pipeline stream里某个procedure的输出需要作为多个proceduer的输入时,则需要在此procedure后加上一个pipeline mirror。Pipeline stream配置可参考用法流程里sub stream配置。

  • Pipeline breaker用在stream_program END之前,用于释放上一个procedure输出的数据帧。

用法流程

  1. 数据流处理流程模块需实现本模块的processor结构体

pipeline_processor

  1. 初始化数据流,按照pipeline提供的统一框架预设处理流程

  • 不存在sub stream情况,即不需要mirror的情况

        flowchart LR
A(A) --> B(B) --> C(C)
    
 1/* Demo stream run schedule, should end with NULL processor */
 2/* Run schedule: processor A -> B -> C */
 3#define STREAM_SCHEDULE_DEMO                         \
 4   WQ_PIPELINE_SCHEDULE_START                        \
 5   WQ_PIPELINE_SCHEDULE_STEP(processor_A, &A_config) \
 6   WQ_PIPELINE_SCHEDULE_STEP(processor_B, &B_config) \
 7   WQ_PIPELINE_SCHEDULE_STEP(processor_C, &C_config) \
 8   WQ_PIPELINE_SCHEDULE_END
 9
10void demo_stream_init(void)
11{
12    wq_pipeline_stream_schedule_t demo_stream_schedule[] = STREAM_SCHEDULE_DEMO;
13   ...
14}
  • 存在sub stream情况,即需要mirror的情况

        flowchart LR
A(A) --> B(B) --> C(C) --> D(D) --> M1(Mirror1)
M1(Mirror1) --> E(E) --> M2(Mirror2)
M2(Mirror2) --> F(F) --> G(G) --> H(H)

M1 --> I(I) --> B(B)
M2 --> J(J) --> G(G)
    
 1#define STREAM_SCHEDULE_DEMO                                               \
 2    WQ_PIPELINE_SCHEDULE_START                                             \
 3    /* primary stream */                                                   \
 4    WQ_PIPELINE_SCHEDULE_STEP(processor_A, &A_config)                      \
 5    WQ_PIPELINE_SCHEDULE_STEP(processor_B, &B_config)                      \
 6    WQ_PIPELINE_SCHEDULE_STEP(processor_C, &B_config)                      \
 7    WQ_PIPELINE_SCHEDULE_STEP(processor_D, &D_config)                      \
 8    WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_1_config)        \
 9    WQ_PIPELINE_SCHEDULE_STEP(processor_E, &E_config)                      \
10    WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_2_config)        \
11    WQ_PIPELINE_SCHEDULE_STEP(processor_F, &F_config)                      \
12    WQ_PIPELINE_SCHEDULE_STEP(processor_G, &G_config)                      \
13    WQ_PIPELINE_SCHEDULE_STEP(processor_H, &G_config)                      \
14    /* sub stream 1 from mirror 1 */                                       \
15    WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_1_config)        \
16    WQ_PIPELINE_SCHEDULE_STEP(processor_I, &I_config)                      \
17    WQ_PIPELINE_SCHEDULE_STEP(processor_B, &B_config)                      \
18    /* sub stream 2 from mirror 2 */                                       \
19    WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_2_config)        \
20    WQ_PIPELINE_SCHEDULE_STEP(processor_J, &J_config)                      \
21    WQ_PIPELINE_SCHEDULE_STEP(processor_G, &G_config)                      \
22    WQ_PIPELINE_SCHEDULE_END
23
24void demo_stream_init(void)
25{
26    wq_pipeline_stream_schedule_t demo_stream_schedule[] = STREAM_SCHEDULE_DEMO;
27   ...
28}
  • 也可以将sub stream配置在独立的stream里

 1#define STREAM_SCHEDULE_DEMO                                               \
 2    WQ_PIPELINE_SCHEDULE_START                                             \
 3    /* primary stream */                                                   \
 4    WQ_PIPELINE_SCHEDULE_STEP(processor_A, &A_config)                      \
 5    WQ_PIPELINE_SCHEDULE_STEP(processor_B, &B_config)                      \
 6    WQ_PIPELINE_SCHEDULE_STEP(processor_C, &B_config)                      \
 7    WQ_PIPELINE_SCHEDULE_STEP(processor_D, &D_config)                      \
 8    WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_1_config)        \
 9    WQ_PIPELINE_SCHEDULE_STEP(processor_E, &E_config)                      \
10    WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_2_config)        \
11    WQ_PIPELINE_SCHEDULE_STEP(processor_F, &F_config)                      \
12    WQ_PIPELINE_SCHEDULE_STEP(processor_G, &G_config)                      \
13    WQ_PIPELINE_SCHEDULE_STEP(processor_H, &G_config)                      \
14    WQ_PIPELINE_SCHEDULE_END
15
16#define STREAM_SCHEDULE_DEMO_SUB_STREAM_1                                  \
17    WQ_PIPELINE_SCHEDULE_START                                             \
18    /* sub stream 1 from mirror 1 */                                       \
19    WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_1_config)        \
20    WQ_PIPELINE_SCHEDULE_STEP(processor_I, &I_config)                      \
21    WQ_PIPELINE_SCHEDULE_STEP(processor_B, &B_config)                      \
22    WQ_PIPELINE_SCHEDULE_END
23
24#define STREAM_SCHEDULE_DEMO_SUB_STREAM_2                                  \
25    WQ_PIPELINE_SCHEDULE_START                                             \
26    /* sub stream 2 from mirror 2 */                                       \
27    WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_2_config)        \
28    WQ_PIPELINE_SCHEDULE_STEP(processor_J, &J_config)                      \
29    WQ_PIPELINE_SCHEDULE_STEP(processor_G, &G_config)                      \
30    WQ_PIPELINE_SCHEDULE_END
  1. 创建pipeline数据流

  • 配置好demo数据流的task信息,发送create命令创建demo数据流。

  • 创建stream的时候需要配置stream所需的音频流实体runner,runner除了需要设置stream类型、配置和处理流程,还可以配置该stream中是否有所有处理流程都可以共同使用的内存,即scratch point和scratch size。Scratch memory可以是在pipeline stream create的时候创建,也可以是在外部预分配好内存后,配置到runner中。如果在pipeline stream create的时候判断到scratch point为NULL或者scratch size为0时,会去申请scratch memory,反之则不申请。

  • 需要注意的是,scratch memory是该stream中所有processor共享的内存,需要保证某个processor在执行完成后不需要再使用存放在scratch memory中的数据。

  • 不需要配置scratch memory时,需要关闭CONFIG_SCRATCH_MEM_ENABLE;如果打开了CONFIG_SCRATCH_MEM_ENABLE,则pipeline stream在创建时会遍历stream中的所有processor,取最大的scratch memory size来申请内存。

 1/* Demo stream stask info */
 2static char demo_task_name[] = "stream_demo";
 3static wq_pipeline_stream_task_cfg_t demo_task_cfg = {
 4    .prio = 7,
 5    .stack_size = 1024,
 6    .name = demo_task_name,
 7};
 8
 9void demo_stream_start(void)
10{
11    /* Define demo stream runner */
12    wq_pipeline_stream_runner_t demo_runner;
13    memset(&demo_runner, 0, sizeof(wq_pipeline_stream_runner_t));
14    demo_runner.type = WQ_PIPELINE_STREAM_CTX_OWN_TASK;
15    demo_runner.config = &demo_task_cfg;
16    demo_runner.schedule = demo_schedule;
17
18    /* Create demo stream */
19    wq_pipeline_stream_command_t command;
20    memset(&command, 0, sizeof(wq_pipeline_stream_command_t));
21    command.type = WQ_PIPELINE_STREAM_COMMAND_CREATE;  /* Create command */
22    command.stream_id = STREAM_ID_DEMO;                /* Demo stream id */
23    command.init_sample_rate = demo_sample_rate;       /* Demo stream sample rate */
24    command.param = &demo_runner;
25    wq_pipeline_stream_control(&command);
26}
  • 在外部预分配scratch memory时,可以参考如下代码创建pipeline stream。

 1/* Demo stream stask info */
 2static char demo_task_name[] = "stream_demo";
 3static wq_pipeline_stream_task_cfg_t demo_task_cfg = {
 4    .prio = 7,
 5    .stack_size = 1024,
 6    .name = demo_task_name,
 7};
 8
 9void demo_stream_start(void)
10{
11    /* Define demo stream runner */
12    wq_pipeline_stream_runner_t demo_runner;
13    memset(&demo_runner, 0, sizeof(wq_pipeline_stream_runner_t));
14    demo_runner.type = WQ_PIPELINE_STREAM_CTX_OWN_TASK;
15    demo_runner.config = &demo_task_cfg;
16    demo_runner.schedule = demo_schedule;
17
18    void *demo_scratch;
19    demo_scratch = wq_heap_caps_malloc(sizeof(uint8_t), MALLOC_CAP_NONE);
20    assert(demo_scratch);
21    memset(demo_scratch, 0, sizeof(uint8_t));
22    demo_runner.scratch = demo_scratch;
23    demo_runner.scratch = sizeof(uint8_t);
24
25    /* Create demo stream */
26    wq_pipeline_stream_command_t command;
27    memset(&command, 0, sizeof(wq_pipeline_stream_command_t));
28    command.type = WQ_PIPELINE_STREAM_COMMAND_CREATE;  /* Create command */
29    command.stream_id = STREAM_ID_DEMO;                /* Demo stream id */
30    command.init_sample_rate = demo_sample_rate;       /* Demo stream sample rate */
31    command.param = &demo_runner;
32    wq_pipeline_stream_control(&command);
33}
  1. 启动pipeline数据流运行

wq_pipeline_stream_trigger()

  1. 更新pipeline数据流中某些数据

  • pipeline stream运行过程中,某些时刻可能需要更新stream中使用到的特定数据,此时可以通过发送update command来实现。

  • Update command的param参数含义可以参考下述示例代码注释。

 1/* When update command malloc memory,should free memory in callback function */
 2static void demo_update_callback(void *arg)
 3{
 4    uint32_t *demo_update_param = (uint32_t *)arg;
 5    wq_heap_caps_free(demo_update_param);
 6}
 7
 8void demo_stream_update(void)
 9{
10    /* Init demo update param */
11    uint32_t *demo_update_param;
12    demo_update_param = wq_heap_caps_malloc(sizeof(uint8_t), MALLOC_CAP_NONE);
13    assert(demo_update_param);
14    memset(demo_update_param, 0, sizeof(uint8_t));
15
16    /* Update demo stream param */
17    wq_pipeline_stream_command_t command;
18    memset(&command, 0, sizeof(wq_pipeline_stream_command_t));
19    command.type = WQ_PIPELINE_STREAM_COMMAND_UPDATE_PROCEDURE;
20    command.stream_id = STREAM_ID_DEMO;
21    /* Which processor to handle the command */
22    command.processor = demo_processor;
23    /* Procedure index of stream,reserved,can set 0 */
24    command.procedure_index = 0;
25    /* The command is depend stream or not */
26    command.stream_depend = false;
27    /* Command param */
28    command.param = demo_update_param;
29    /* Callback handler,do something when command completed */
30    command.callback = demo_update_callback;
31    /* The param for callback handler,  usually the memory to be freed*/
32    command.callback_param = demo_update_param;
33    if (wq_pipeline_stream_control(&command) != WQ_RET_OK) {
34        wq_heap_caps_free(demo_update_param);
35    }
36}
  1. 销毁pipeline数据流

  • 如果不要求销毁pipeline数据流的时序,直接发送destory command即可。

1void demo_stream_stop(void)
2{
3    /* Destory demo stream */
4    wq_pipeline_stream_command_t command;
5    memset(&command, 0, sizeof(wq_pipeline_stream_command_t));
6    command.type = WQ_PIPELINE_STREAM_COMMAND_DESTROY;
7    command.stream_id = STREAM_ID_DEMO;
8    wq_pipeline_stream_control(&command);
9}
  • 如果要求销毁pipeline数据流的时序,则需实现销毁动作的回调函数。

wq_pipeline_stream_command_t

 1/* When stream destory finished will call this callback */
 2static void demo_stream_destory_callback(void *arg)
 3{
 4    os_set_event(arg, BIT(0));
 5}
 6
 7void demo_stream_stop(void)
 8{
 9    /* Destory demo stream */
10    wq_pipeline_stream_command_t command;
11    memset(&command, 0, sizeof(wq_pipeline_stream_command_t));
12    command.type = WQ_PIPELINE_STREAM_COMMAND_DESTROY;  /* Destory command */
13    command.stream_id = STREAM_ID_DEMO;                 /* Demo stream id */
14    command.callback = demo_stream_destory_callback;    /* Destory callback */
15
16    uint32_t events;
17    os_event_h p_event;
18    p_event = os_create_event(WQ_PIPELINE_MID);
19    if (!p_event) {
20        assert(0);
21    }
22    command.callback_param = p_event;
23    wq_pipeline_stream_control(&command);
24    /* Do not return until the event is received */
25    bool ret = os_wait_event(p_event, MAX_TIME, &events);
26    if (ret) {
27        /* Ret TRUE means stream destoryed successfully*/
28    }
29    os_delete_event(p_event);
  1. Pipeline shaper模块使用介绍

  • Pipeline shaper模块是用于实现pcm数据长度的转换,例如将10ms的数据帧转换成7.5ms的数据帧。Pipeline shaper将输入的数据拷贝至预分配的缓存buffer中,再根据wq_pipeline_shaper_config_t的配置取出特定长度的数据帧输出。

  • wq_pipeline_shaper_config_t中size为0时,表明使用默认的缓存buffer长度,该长度根据out_sp、bit_width和channel_num计算得出。参数out_sp是要输出的采样点数,比如需要7.5ms的16K采样率的数据,out_sp即为120。参数bit_width表示数据帧的位宽,channel_num表示数据的channel数。

1wq_pipeline_shaper_config_t demo_shaper_config = {
2    .size = 0,        /* use default size */
3    .out_sp = 120,    /* output sampling points, ex.16k sample rate, 7.5ms */
4    .bit_width = 16,  /* bit wdith */
5    .channel_num = 3, /* channel num */
6};
  1. Pipeline gather模块使用介绍

  • Pipeline gather模块是一个多输入单输出的模块,主要是用于实现收集多个procedure的输出完成混音mix运算,如果需要其他子模块处理,则可以在wq_pipeline_gather_config_t中sub_processor配置。

  • wq_pipeline_gather_config_t中stream_id是当前所属的stream的id,gather_id是预分配id,相同gather_id的procedure会被关联在一起,这些被关联在一起的procedure被称为gather client。当只有一个gather client时,gather processor不做处理,直接将输入数据输出。

  • wq_pipeline_gather_config_t中的primary用于表示是否是输出流。Pipeline gather client只有一个primary client,只有primary gather procedure执行时才会进行gather操作,输出数据。非primary的gather client只会将输入plb缓存在队列里,输出plb为NULL。

  • wq_pipeline_gather_config_t中match_ts用于表示是否需要时间戳ts(pipeline里的时间戳即采样点数)对齐。参数use_sec_ts用于表示是否要使用tone通路的时间戳ts来对齐。

 1wq_pipeline_gather_config_t demo_gather_config = {
 2    .stream_id = STREAM_ID_DEMO,
 3    .gather_id = STREAM_ID_GATHER,
 4    .bit_width = 16,
 5    .depth = 0,
 6    .primary = true,
 7    .match_ts = true,
 8    .use_sec_ts = true,
 9    .sub_processor = NULL,
10    .sub_processor_config = NULL,
11};
  1. Pipeline mirror模块使用介绍

  • Pipeline mirror模块是用于实现对多个procedure的数据分发,并且根据需要去trigger对应的stream。Pipeline mirror通常用于创建数据流旁路或者回路,或者用于在不同的stream之间传递数据。Pipeline mirror需要配置wq_pipeline_mirror_config_t,来表明数据需要分发到哪个stream。

  • wq_pipeline_mirror_config_t中stream_id是当前所属的stream的id,mirror_id是预分配的id,相同的mirror_id的procedure都关联到同一个mirror handler,每一个mirror procedure都有一个mirror client,mirror processor将每个mirror client的输入数据复制分发到每个mirror client。

  • wq_pipeline_mirror_config_t中depth表示mirror里tx_list的深度,当mirror client tx_list里的plb长度超过depth时,会有“process too slow”的日志打印,提示当前算法执行速度过慢。参数depth设置为0时,使用默认深度4。

  • wq_pipeline_mirror_config_t中copy_mode表示是否需要复制数据,copy_mode为false时表示只需要clone plb的配置,数据不需要进行复制。

  • wq_pipeline_mirror_config_t中redirect_mode表示是否需要丢弃数据,bypass_id_map表明了需要丢弃数据的stream所对应的map值。

1wq_pipeline_mirror_config_t demo_mirror_config = {
2    .stream_id = STREAM_ID_DEMO,
3    .mirror_id = STREAM_ID_MIRROR,
4    .depth = 0,
5    .copy_mode = false,
6    .redirect_mode = true,
7    .bypass_id_map = BIT(STREAM_ID_BYPASS),
8};

参考示例

examples/pipeline_demo

API 介绍

Defines

WQ_PIPELINE_BUFF_DEFAULT_PAD_SIZE

Pad size

WQ_PIPELINE_BUFF_FLAG_ZERO_DATA

Zero data flag of pipeline buff

WQ_PIPELINE_BUFF_FLAG_PB_DATA

PB data flag of pipeline buff

WQ_PIPELINE_BUFF_FLAG_FORCE_STEREO_DATA

Force stereo data flag of pipeline buff

WQ_PIPELINE_BUFF_FLAG_TAIL_DATA

Tail data flag of pipeline buff

WQ_PIPELINE_BUFF_FLAG_MIX_DATA

Mix data flag of pipeline buff

WQ_PIPELINE_BUFF_FLAG_REF_DATA

Mic ref valid flag of pipeline buff

WQ_PIPELINE_BUFF_FLAG_READ_ONLY

Read only data flag of pipeline buff

WQ_PIPELINE_DATA_FORMAT_INVALID

Invalid format of pipeline buff data

WQ_PIPELINE_DATA_FORMAT_PCM

PCM format of pipeline buff data

WQ_PIPELINE_DATA_FORMAT_SBC

SBC format of pipeline buff data

WQ_PIPELINE_DATA_FORMAT_AAC

AAC format of pipeline buff data

WQ_PIPELINE_DATA_FORMAT_LDAC

LDAC format of pipeline buff data

WQ_PIPELINE_DATA_FORMAT_LHDC

LHDC format of pipeline buff data

WQ_PIPELINE_DATA_FORMAT_LC3

LC3 format of pipeline buff data

WQ_PIPELINE_DATA_FORMAT_MSBC

MSBC format of pipeline buff data

WQ_PIPELINE_DATA_FORMAT_CVSD

CVSD format of pipeline buff data

WQ_PIPELINE_DATA_FORMAT_OPUS

OPUS format of pipeline buff data

WQ_PIPELINE_DATA_FORMAT_COMPLEX

COMPLEX format of pipeline buff data

WQ_PIPELINE_CHANNEL_NUM_MAX

Max channel num

WQ_PIPELINE_CHANNEL_INVALID

Invalid channel num

WQ_PIPELINE_CHANNEL_DECODE_MASK

Invalid channel num

WQ_PIPELINE_CHANNEL_LEFT

Left channel flag

WQ_PIPELINE_CHANNEL_RIGHT

Right channel flag

WQ_PIPELINE_CHANNEL_MIX

Mix channel flag

WQ_PIPELINE_CHANNEL_DUAL

Dual channel flag

WQ_PIPELINE_CHANNEL_SINGLE

Single channel flag

WQ_PIPELINE_CHANNEL_MIC0

Mic0 channel flag

WQ_PIPELINE_CHANNEL_MIC1

Mic1 channel flag

WQ_PIPELINE_CHANNEL_MIC2

Mic2 channel flag

WQ_PIPELINE_CHANNEL_MIC3

Mic3 channel flag

WQ_PIPELINE_CHANNEL_MIC4

Mic4 channel flag

WQ_PIPELINE_CHANNEL_MIC5

Mic5 channel flag

WQ_PIPELINE_CHANNEL_MIC6

Mic6 channel flag

WQ_PIPELINE_CHANNEL_MIC7

Mic7 channel flag

WQ_PIPELINE_CHANNEL_STEREO

Stereo channel flag

WQ_PIPELINE_CHANNEL_LEFTDUAL

Left dual channel flag

WQ_PIPELINE_CHANNEL_RIGHTDUAL

Right dual channel flag

WQ_PIPELINE_CHANNEL_MIXDUAL

Mix dual channel flag

TWS_LEFT_DEV_TYPE

Tws left dev form

TWS_RIGHT_DEV_TYPE

Tws right dev form

AUD_STEREO_DEV_TYPE

Audio stereo dev form

AUD_MIXER_DEV_TYPE

Audio mixer dev form

WQ_PIPELINE_SAMPLE_WIDTH_16BIT

16 bit width of pipeline buff data

WQ_PIPELINE_SAMPLE_WIDTH_32BIT

32 bit width of pipeline buff data

Typedefs

typedef struct wq_plb wq_plb_t
typedef void (*plb_dump_hook)(wq_plb_t *plb)

Typedef for hook function to dump plb buff.

备注

This function is intended to be called for dump plb buff.

Param plb:

The point of plb buff

Functions

static inline void wq_plb_list_init(wq_plb_list_t *list)

Init pipeline buff list head.

参数:

list -- The pipeline buff list head

static inline bool wq_plb_list_is_empty(const wq_plb_list_t *list)

Check if a queue is empty.

参数:

list -- The pipeline buff list head

返回:

True if the list is empty

static inline uint32_t wq_plb_list_get_len(const wq_plb_list_t *list)

Get pipeline buff list length.

参数:

list -- The pipeline buff list head

返回:

Queue len

static inline bool wq_plb_is_list_first(const wq_plb_list_t *list, const wq_plb_t *adb)

Check if an pipeline buff is the first entry in the list.

参数:
  • list -- The pipeline buff list head

  • adb -- Pipeline buff is current packet

返回:

True if the pipeline buff is the first entry

static inline bool wq_plb_is_list_last(const wq_plb_list_t *list, const wq_plb_t *adb)

Check if an pipeline buff is the last entry in the list.

参数:
  • list -- The pipeline buff list head

  • adb -- Pipeline buff is current packet

返回:

True if the pipeline buff is the last entry

static inline wq_plb_t *wq_plb_peek_head(const wq_plb_list_t *list)

Find the first pipeline buff in the list, without dequeue.

参数:

list -- The pipeline buff list head

返回:

First pipeline buff in the list

static inline wq_plb_t *wq_plb_peek_next(wq_plb_t *adb, const wq_plb_list_t *list)

Find the next pipeline buff in the list, without dequeue.

参数:
  • adb -- Pipeline buff is the current packet

  • list -- The pipeline buff list head

返回:

Next pipeline buff in the list

static inline wq_plb_t *wq_plb_peek_tail(const wq_plb_list_t *list)

Find the last pipeline buff in the list, without dequeue.

参数:

list -- The pipeline buff list head

返回:

Last pipeline buff in the list

static inline void wq_plb_insert(wq_plb_t *newst, wq_plb_t *prev, wq_plb_t *next, wq_plb_list_t *list)

Insert new pipeline buff between two old pipeline buffs.

备注

This function must be called atomically

参数:
  • newst -- New pipeline buff to be inserted

  • prev -- Prev pipeline buff

  • next -- Next pipeline buff

  • list -- The pipeline buff list head

static inline void wq_plb_queue_head(wq_plb_list_t *list, wq_plb_t *newst)

Queue new packet in the head of list.

参数:
  • list -- The pipeline buff list head

  • newst -- New pipeline buff to be inserted

static inline void wq_plb_queue_tail(wq_plb_list_t *list, wq_plb_t *newst)

Queue new packet in the tail of list.

参数:
  • list -- The pipeline buff list head

  • newst -- New pipeline buff to be inserted

static inline void wq_plb_list_splice_head(const wq_plb_list_t *list, wq_plb_list_t *head)

Join first list into the head of second list.

参数:
  • list -- The pipeline buff list consumed

  • head -- The pipeline buff list modified

static inline void wq_plb_list_splice_tail(const wq_plb_list_t *list, wq_plb_list_t *head)

Join first list into the tail of second list.

参数:
  • list -- The pipeline buff list consumed

  • head -- The pipeline buff list modified

static inline void wq_plb_unlink(wq_plb_t *adb, wq_plb_list_t *list)

Remove a pipeline buff from list.

备注

This function must be called atomically

参数:
  • adb -- Pipeline buff to be unlinked

  • list -- The pipeline buff list head

static inline wq_plb_t *wq_plb_dequeue_head(wq_plb_list_t *list)

Find & dequeue the first pipeline buff from list.

参数:

list -- The pipeline buff list head

返回:

First pipeline buff in the list

static inline wq_plb_t *wq_plb_dequeue_tail(wq_plb_list_t *list)

Find & dequeue the last pipeline buff from list.

参数:

list -- The pipeline buff list head

返回:

Last pipeline buff in the list

static inline wq_plb_t *wq_plb_dequeue_all(wq_plb_list_t *list)

Dequeue all the pipeline buff from list.

参数:

list -- The pipeline buff list head

返回:

First pipeline buff in the list

static inline void wq_plb_get(wq_plb_t *adb)

Add users for the pipeline buff.

参数:

adb -- The current pipeline buff

static inline uint32_t wq_plb_get_users(wq_plb_t *adb)

Get number of users belonged to pipeline buff.

参数:

adb -- The current pipeline buff

返回:

Number of users belonged to this pipeline buff

static inline bool wq_plb_is_shared(wq_plb_t *adb)

Check if the pipeline buff is shared by users.

参数:

adb -- The current pipeline buff

返回:

True if the pipeline buff is shared

static inline uint32_t wq_plb_get_channels(wq_plb_t *adb)

Get number of data channels in pipeline buff.

参数:

adb -- The current packet

返回:

Number of data channels in pipeline buff

wq_plb_t *wq_plb_alloc_header(void)

Alloc a pipeline buff head without buffer.

返回:

Alloced pipeline buff head

wq_plb_t *wq_plb_alloc(uint32_t size, uint32_t pad)

Alloc a pipeline buff head with buffer.

参数:
  • size -- The request buffer size

  • pad -- The preset pad size before data

返回:

Alloced pipeline buff

wq_plb_t *wq_plb_clone(wq_plb_t *adb)

Alloc a pipeline buff head and point it to the same data in old pipeline buff.

参数:

adb -- The pipeline buff to be cloned

返回:

Pointer of cloned pipeline buff

wq_plb_t *wq_plb_copy(wq_plb_t *adb)

Alloc a pipeline buff and copy data from old pipeline buff to it.

参数:

adb -- The pipeline buff to be copied

返回:

Pointer of copied pipeline buff

void wq_plb_free(wq_plb_t *adb)

Free a pipeline buff.

参数:

adb -- The pipeline buff to be free

void wq_plb_free_list(wq_plb_list_t *list)

Free a pipeline buff list.

参数:

list -- The pipeline buff list to be free

struct wq_plb

Public Members

struct wq_plb *next

Next plb point

struct wq_plb *prev

Prev plb point

uint8_t *buf

Start pointer of data buffer belonged to this pipeline buff NULL for no buffer alloced

uint8_t *end

End pointer of data buffer, NULL for data length should be hold

uint8_t *data

Pointer of data start

uint32_t sp

Sample point in this pipeline buff

uint32_t len

Length (in bytes) of one channel data for multi-channel packet

uint32_t channel_gap

Gap (in bytes) from one channel data to another channel

uint32_t ts

Main path time stamp of pipeline data

uint32_t ts_sec

Tone path time stamp of pipeline data

uint32_t sample_rate

Data sample rate

uint32_t flags

Flags of this pipeline buff

uint8_t format

Format of pipeline buff data, e.g., pcm, aac, msbc

uint8_t channel

Channel bitmap for pipeline buff data

uint8_t dev_form

Audio production form

uint8_t bit_width

Sample bit width for pipeline buff data

uint8_t reserved

Reserved param

uint32_t users

Users of this pipeline buff

void *procedure

Reserved for pipeline pipeline, pointer of pend procedure

void (*free_cb)(struct wq_plb*, void*)

When this pipeline buff is free, this callback function is called

void *free_cb_param

Parameter used by free callback function

uint16_t plb_id

Plb data which is associated to processor id and stream id consist by (process id << 8) | stream id

uint32_t param

User-defined parameter

struct wq_plb_list_t

Public Members

struct wq_plb *next

Next plb struct point

struct wq_plb *prev

Prev plb struct point

uint32_t qlen

Plb list lenght

Defines

WQ_PIPELINE_GATHER_MAX_NUM

Max gather number in system

WQ_PIPELINE_GATHER_MAX_INPUT

Max output stream number for gather processor

struct wq_pipeline_gather_config_t
#include <pipeline_gather.h>

Gather processor handler multi-input data, the data should have same sample bit-width and same sample rate.

Public Members

uint16_t stream_id

Id of stream that gather belonged

uint16_t bit_width

Sample bit width, only support 16bit/32bit

uint16_t depth

Depth of hold stb queue, set 0 to use default value

uint8_t gather_id

Id of used gather

bool primary

True to set this stream as gather primary stream

bool match_ts

True to match ts before run gather operation

bool use_sec_ts

True to use second timestamp instead of main timestamp

wq_pipeline_processor_t *sub_processor

Sub processor called by gather processor

void *sub_processor_config

Sub processor configure

Defines

WQ_PIPELINE_MIRROR_MAX_NUM

Max mirror number in system

WQ_PIPELINE_MIRROR_MAX_OUTPUT

Max output stream number for mirror processor

struct wq_pipeline_mirror_config_t

Public Members

uint16_t stream_id

Id of stream which called the mirror processor

uint16_t mirror_id

Id of used mirror

uint16_t depth

Depth of hold stb queue, set 0 to use default value

bool copy_mode

True to copy stb to dispatch

bool redirect_mode

True to drop output stb for current procedure after mirror

uint32_t bypass_id_map

Bypass stream id bitmap

bool dummy_trigger

Typedefs

typedef struct pipeline_processor_info wq_pipeline_processor_info_t
typedef struct pipeline_processor_preset wq_pipeline_processor_preset_t
typedef struct pipeline_processor wq_pipeline_processor_t
struct pipeline_processor_info

Public Members

uint32_t mips

Cpu mips of the processor

uint32_t persistent_size

Persistent buffer size of the processor

uint32_t scratch_size

Scratch buffer size of the processor

uint32_t stack_size

Stack size of the processor

void *processor_info

The point customized by processor

struct pipeline_processor_preset

Public Members

void *scratch

Scratch buffer point

uint16_t stream_id

Stream id

wq_pipeline_processor_info_t *info

Processor info point

struct pipeline_processor

Public Members

char *name

Processor name

WQ_RET (*query)(void*, struct pipeline_processor_info*)

Query processor & return processor info

void *(*probe)(void*, struct pipeline_processor_preset*)

Probe processor & return handler

void (*remove)(void*)

Remove processor handler

void (*reset)(void*)

Reset processor handler

WQ_RET (*update)(void*, void*)

Update processor handler with new parameter

WQ_RET (*run)(void*, struct wq_plb*, struct wq_plb**)

Run processor

struct wq_pipeline_shaper_config_t

Public Members

uint16_t size

Pool size in bytes, zero for default value (4 output packet size)

uint16_t out_sp

Output sample point number per packet

uint16_t bit_width

Sample bit width, only support 16bit/32bit

uint16_t channel_num

Channel number of shaper

Defines

CONFIG_PIPELINE_STREAM_MAX_NUM

Max pipeline stream num

CONFIG_PIPELINE_PRESET_TASK_MAX_NUM

Max pipeline preset task num

WQ_PIPELINE_STREAM_SCHEDULE_START

Pipeline stream framework. processor means struct pipeline_processor point, config means config info point for processor init. when processor param is NULL, means the stream schedule is end.

schedule end must be {NULL, NULL}.

WQ_PIPELINE_STREAM_SCHEDULE_STEP(processor, config)
WQ_PIPELINE_STREAM_SCHEDULE_END

Typedefs

typedef struct stream_schedule wq_pipeline_stream_schedule_t

Enums

enum WQ_PIPELINE_STREAM_CTX

Values:

enumerator WQ_PIPELINE_STREAM_CTX_OWN_TASK

Stream has own task, can be triggered

enumerator WQ_PIPELINE_STREAM_CTX_PRESET_TASK

Reserved. Stream belongs to preset task, can be triggered

enumerator WQ_PIPELINE_STREAM_CTX_IRQ_HANDLER

Reserved. Stream work can be triggered by irq handler

enumerator WQ_PIPELINE_STREAM_CTX_TIMER_HANDLER

Reserved. Stream work can be triggered by timer handler

enumerator WQ_PIPELINE_STREAM_CTX_HIGH_SHARE_TASK

Reserved. Stream belongs to high share task, can be triggered

enumerator WQ_PIPELINE_STREAM_CTX_LOW_SHARE_TASK

Reserved. Stream belongs to low share task, can be triggered

enumerator WQ_PIPELINE_STREAM_CTX_CALLED

Stream work can only be called, can not be triggered

enum WQ_PIPELINE_STREAM_COMMAND

Values:

enumerator WQ_PIPELINE_STREAM_COMMAND_INVALID

Invalid command flag

enumerator WQ_PIPELINE_STREAM_COMMAND_CREATE

Command to create a stream

enumerator WQ_PIPELINE_STREAM_COMMAND_DESTROY

Command to destroy a stream

enumerator WQ_PIPELINE_STREAM_COMMAND_PEND

Command to pend a stream

enumerator WQ_PIPELINE_STREAM_COMMAND_RESUME

Command to resume a stream

enumerator WQ_PIPELINE_STREAM_COMMAND_QUERY

Command to query a stream

enumerator WQ_PIPELINE_STREAM_COMMAND_RESET_PROCEDURE

Command to reset a procedure in stream

enumerator WQ_PIPELINE_STREAM_COMMAND_BYPASS_PROCEDURE

Command to bypass a procedure in stream

enumerator WQ_PIPELINE_STREAM_COMMAND_UPDATE_PROCEDURE

Command to update runtime parameter of a procedure in stream

enumerator WQ_PIPELINE_STREAM_COMMAND_DEBUG_PROCEDURE

Command to enable log dump for a procedure in stream

enumerator WQ_PIPELINE_STREAM_COMMAND_DUMP_PROCEDURE_INPUT

Command to enable input plb dump for a procedure in stream

enumerator WQ_PIPELINE_STREAM_COMMAND_DUMP_PROCEDURE_OUTPUT

Command to enable output plb dump for a procedure in stream

enumerator WQ_PIPELINE_STREAM_COMMAND_MAX

Command max flag

Functions

WQ_RET wq_pipeline_stream_control(wq_pipeline_stream_command_t *command)

Send control command to pipeline stream.

参数:

command -- The command to be send to pipeline stream.

返回:

  • WQ_RET_OK success.

  • WQ_RET_INVAL command is invalid.

WQ_RET wq_pipeline_stream_work(uint16_t stream_id)

Run pipeline stream work.

参数:

stream_id -- is stream id.

返回:

  • WQ_RET_OK success.

  • WQ_RET_NOT_EXIST stream is not exist.

WQ_RET wq_pipeline_stream_trigger(uint16_t stream_id)

Trigger pipeline stream to run .

参数:

stream_id -- is stream id.

返回:

  • WQ_RET_OK success.

  • WQ_RET_NOT_EXIST stream is not exist.

struct stream_schedule

Public Members

const wq_pipeline_processor_t *processor

Processor point

void *config

Pipeline stream config info point

struct wq_pipeline_stream_task_cfg_t

Public Members

uint16_t prio

Stream own task priority

uint16_t stack_size

Stream own task stack size

char *name

Stream own task name

struct wq_pipeline_stream_preset_task_t

Public Members

uint16_t preset_task_id

Preset task id

bool preset_task_perpetuate

Preset task perpetuate attribute

wq_pipeline_stream_task_cfg_t task_cfg

Preset task config

struct wq_pipeline_stream_task_handle_t

Public Members

uint32_t ref

Reference for stream task used

void *task_handle

Stream task handle

uint16_t prio

Stream task priority

struct wq_pipeline_stream_runner_t

Public Members

WQ_PIPELINE_STREAM_CTX type

Stream ctx type

void *config

Stream task config point

struct stream_schedule *schedule

Stream schedule

void *scratch

Stream scratch memory point

uint32_t scratch_size

Stream scratch memory size

uint32_t run_times

Stream tigger run times, will decrease one when each run loop

struct wq_pipeline_stream_command_t

Public Members

struct list_head node

Command list node

WQ_PIPELINE_STREAM_COMMAND type

Command type

wq_pipeline_processor_t *processor

Which processor to handle the command

uint16_t procedure_index

Procedure index of stream

uint8_t stream_id

Stream id of the stream to handle the command

bool stream_depend

The command is depend stream

uint32_t init_sample_rate

Init sample rate for create command

void *callback_param

The param for callback handler

void (*callback)(void *callback_param)

Callback handler

void *param

Command param