Pipeline =========== 概述 -------------- Pipeline是一种处理音频流数据的统一框架组件,将音频流处理过程拆解成串行模式,提供了一种可灵活定义数据处理步骤的方法。上层应用可以按照不同的需求,依序预设好数据处理操作即可。 功能特性 -------------- - 支持上层应用灵活定义数据处理流程 - 提供处理流程统一的processor结构体,便于统一管理 流程图 -------------- - 使用pipeline时,每个处理步骤都作为独立的处理工序,以流水线方式运行。 - 前一个处理工序的输出内容,作为下一个处理工序的输入内容。 .. mermaid:: :align: center :caption: Pipeline stream运行流程图 flowchart LR A(processor A) --> buf1[/Pipeline buffer/] --> B(processor B) B(processor B) --> buf2[/Pipeline buffer/] --> C(processor C) subgraph stream A --> B --> C end 运行原理 -------------- - Pipeline里的processor模块表示各功能模块,提供了主要的数据处理接口。struct pipeline_processor结构为各功能模块需实现的统一接口,stream通过struct pipeline_processor结构对各processor进行管理。 - Pipeline procedure的数据处理都是由对应的processor实现,procedure保存processor针对特定stream生成的handler和配置。同一个pipeline stream的procedure采用链表方式记录,执行到某个procedure时,根据procedure->processor找到对应的processor,执行具体的数据处理。 - Pipeline buff即plb表示数据帧,plb采用链表形式,每个plb都相当于数据帧的header部分,记录了该数据帧的相关信息,数据帧存储在plb->buf里。需要注意的是plb里的procedure指针,只有当该数据帧被某个处理工序挂起时才会给plb->procedure赋值,标明当前挂起该数据帧的是哪个处理工序。 - 数据流处理基于stream管理,stream通过stream_program数组创建,提供了WQ_PIPELINE_STREAM_SCHEDULE_START、WQ_PIPELINE_STREAM_SCHEDULE_STEP、WQ_PIPELINE_STREAM_SCHEDULE_END宏定义来统一定义stream的流程数组。 - Pipeline stream在创建的时候,需要发送create command,依次执行到stream_program数组里对应的processor probe接口,生成procedure,并加入到stream procedure链表中。 - Pipeline里处理command的函数是wq_pipeline_stream_control,该函数作为接收command的统一入口。 1. 接收到create command:阻塞执行,create完成后才会返回。command->param里会记录当前stream的类型,为WQ_PIPELINE_STREAM_CTX_OWN_TASK类型时需要创建独立task运行。 2. 接收到update command:异步执行,将command加入stream command list表中,通过wq_pipeline_stream_trigger抛消息到对应的task中进行处理。 3. 接收到destory command:执行方式默认异步执行,若需要阻塞方式,则需要实现command的callback函数。stream发送destory command后等待event,而callback函数在destory完成后才会设置event,以此来实现阻塞方式。 - 如果存在某个procedure输出多个数据帧的情况,stream会将除第一个输出plb挂起到下一个prcedure,然后取第一个输出plb作为下一个prcedure的输入plb。Stream挂起的数据帧也是按照链表方式组织,当已经有挂起的plb时,后挂起的plb会插入到挂起plb list的头部。 - Pipeline stream在运行的时候,先检查是否有挂起的数据帧,如果存在则根据plb中记录的procedure开始执行;如果没有挂起的数据帧,则从stream的第一个procedure开始执行。Pipeline stream运行时按照链表顺序执行procedure,依次执行对应processor->run函数。每一个procedure的输出plb,作为下一个procedure的输入plb。 - Pipeline stream通过wq_pipeline_stream_trigger启动stream运行,stream会查找plb list中是否有数据,读到数据帧时会依次执行processor,直到plb list中不再有数据帧。 - Pipeline shaper模块是用于数据帧长度转换。Pipeline shaper模块会申请一块内存作为数据池,将输入的plb都拷贝到数据池里,然后从数据池中取出特定长度的数据帧作为输出数据帧。 - 当有多路数据输入,需要混合成一路输出时,则需要用到pipeline gather模块。使用Pipeline gather模块时,需要配置gather config,gather模块初始化时需要根据config信息设置当前gather handler信息。若有多个输出需要gather,则根据gather handler里的gather id来区分。当某个gather id的gather handler已经创建时,仅需要attach到已经创建的gather client。Gather users记录了需要混合的数据路数。 - Pipeline mirror模块用于拷贝上一个proceduer的输出。当pipeline stream里某个procedure的输出需要作为多个proceduer的输入时,则需要在此procedure后加上一个pipeline mirror。Pipeline stream配置可参考用法流程里sub stream配置。 - Pipeline breaker用在stream_program END之前,用于释放上一个procedure输出的数据帧。 用法流程 -------------- 1. 数据流处理流程模块需实现本模块的processor结构体 :cpp:type:`pipeline_processor` 2. 初始化数据流,按照pipeline提供的统一框架预设处理流程 - 不存在sub stream情况,即不需要mirror的情况 .. mermaid:: flowchart LR A(A) --> B(B) --> C(C) .. code:: c :number-lines: /* Demo stream run schedule, should end with NULL processor */ /* Run schedule: processor A -> B -> C */ #define STREAM_SCHEDULE_DEMO \ WQ_PIPELINE_SCHEDULE_START \ WQ_PIPELINE_SCHEDULE_STEP(processor_A, &A_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_B, &B_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_C, &C_config) \ WQ_PIPELINE_SCHEDULE_END void demo_stream_init(void) { wq_pipeline_stream_schedule_t demo_stream_schedule[] = STREAM_SCHEDULE_DEMO; ... } - 存在sub stream情况,即需要mirror的情况 .. mermaid:: flowchart LR A(A) --> B(B) --> C(C) --> D(D) --> M1(Mirror1) M1(Mirror1) --> E(E) --> M2(Mirror2) M2(Mirror2) --> F(F) --> G(G) --> H(H) M1 --> I(I) --> B(B) M2 --> J(J) --> G(G) .. code:: c :number-lines: #define STREAM_SCHEDULE_DEMO \ WQ_PIPELINE_SCHEDULE_START \ /* primary stream */ \ WQ_PIPELINE_SCHEDULE_STEP(processor_A, &A_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_B, &B_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_C, &B_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_D, &D_config) \ WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_1_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_E, &E_config) \ WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_2_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_F, &F_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_G, &G_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_H, &G_config) \ /* sub stream 1 from mirror 1 */ \ WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_1_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_I, &I_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_B, &B_config) \ /* sub stream 2 from mirror 2 */ \ WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_2_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_J, &J_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_G, &G_config) \ WQ_PIPELINE_SCHEDULE_END void demo_stream_init(void) { wq_pipeline_stream_schedule_t demo_stream_schedule[] = STREAM_SCHEDULE_DEMO; ... } - 也可以将sub stream配置在独立的stream里 .. code:: c :number-lines: #define STREAM_SCHEDULE_DEMO \ WQ_PIPELINE_SCHEDULE_START \ /* primary stream */ \ WQ_PIPELINE_SCHEDULE_STEP(processor_A, &A_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_B, &B_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_C, &B_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_D, &D_config) \ WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_1_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_E, &E_config) \ WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_2_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_F, &F_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_G, &G_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_H, &G_config) \ WQ_PIPELINE_SCHEDULE_END #define STREAM_SCHEDULE_DEMO_SUB_STREAM_1 \ WQ_PIPELINE_SCHEDULE_START \ /* sub stream 1 from mirror 1 */ \ WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_1_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_I, &I_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_B, &B_config) \ WQ_PIPELINE_SCHEDULE_END #define STREAM_SCHEDULE_DEMO_SUB_STREAM_2 \ WQ_PIPELINE_SCHEDULE_START \ /* sub stream 2 from mirror 2 */ \ WQ_PIPELINE_SCHEDULE_STEP(wq_pipeline_mirror, &mirror_2_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_J, &J_config) \ WQ_PIPELINE_SCHEDULE_STEP(processor_G, &G_config) \ WQ_PIPELINE_SCHEDULE_END 3. 创建pipeline数据流 - 配置好demo数据流的task信息,发送create命令创建demo数据流。 - 创建stream的时候需要配置stream所需的音频流实体runner,runner除了需要设置stream类型、配置和处理流程,还可以配置该stream中是否有所有处理流程都可以共同使用的内存,即scratch point和scratch size。Scratch memory可以是在pipeline stream create的时候创建,也可以是在外部预分配好内存后,配置到runner中。如果在pipeline stream create的时候判断到scratch point为NULL或者scratch size为0时,会去申请scratch memory,反之则不申请。 - 需要注意的是,scratch memory是该stream中所有processor共享的内存,需要保证某个processor在执行完成后不需要再使用存放在scratch memory中的数据。 - 不需要配置scratch memory时,需要关闭CONFIG_SCRATCH_MEM_ENABLE;如果打开了CONFIG_SCRATCH_MEM_ENABLE,则pipeline stream在创建时会遍历stream中的所有processor,取最大的scratch memory size来申请内存。 .. code:: c :number-lines: /* Demo stream stask info */ static char demo_task_name[] = "stream_demo"; static wq_pipeline_stream_task_cfg_t demo_task_cfg = { .prio = 7, .stack_size = 1024, .name = demo_task_name, }; void demo_stream_start(void) { /* Define demo stream runner */ wq_pipeline_stream_runner_t demo_runner; memset(&demo_runner, 0, sizeof(wq_pipeline_stream_runner_t)); demo_runner.type = WQ_PIPELINE_STREAM_CTX_OWN_TASK; demo_runner.config = &demo_task_cfg; demo_runner.schedule = demo_schedule; /* Create demo stream */ wq_pipeline_stream_command_t command; memset(&command, 0, sizeof(wq_pipeline_stream_command_t)); command.type = WQ_PIPELINE_STREAM_COMMAND_CREATE; /* Create command */ command.stream_id = STREAM_ID_DEMO; /* Demo stream id */ command.init_sample_rate = demo_sample_rate; /* Demo stream sample rate */ command.param = &demo_runner; wq_pipeline_stream_control(&command); } - 在外部预分配scratch memory时,可以参考如下代码创建pipeline stream。 .. code:: c :number-lines: /* Demo stream stask info */ static char demo_task_name[] = "stream_demo"; static wq_pipeline_stream_task_cfg_t demo_task_cfg = { .prio = 7, .stack_size = 1024, .name = demo_task_name, }; void demo_stream_start(void) { /* Define demo stream runner */ wq_pipeline_stream_runner_t demo_runner; memset(&demo_runner, 0, sizeof(wq_pipeline_stream_runner_t)); demo_runner.type = WQ_PIPELINE_STREAM_CTX_OWN_TASK; demo_runner.config = &demo_task_cfg; demo_runner.schedule = demo_schedule; void *demo_scratch; demo_scratch = wq_heap_caps_malloc(sizeof(uint8_t), MALLOC_CAP_NONE); assert(demo_scratch); memset(demo_scratch, 0, sizeof(uint8_t)); demo_runner.scratch = demo_scratch; demo_runner.scratch = sizeof(uint8_t); /* Create demo stream */ wq_pipeline_stream_command_t command; memset(&command, 0, sizeof(wq_pipeline_stream_command_t)); command.type = WQ_PIPELINE_STREAM_COMMAND_CREATE; /* Create command */ command.stream_id = STREAM_ID_DEMO; /* Demo stream id */ command.init_sample_rate = demo_sample_rate; /* Demo stream sample rate */ command.param = &demo_runner; wq_pipeline_stream_control(&command); } 4. 启动pipeline数据流运行 :cpp:func:`wq_pipeline_stream_trigger` 5. 更新pipeline数据流中某些数据 - pipeline stream运行过程中,某些时刻可能需要更新stream中使用到的特定数据,此时可以通过发送update command来实现。 - Update command的param参数含义可以参考下述示例代码注释。 .. code:: c :number-lines: /* When update command malloc memory,should free memory in callback function */ static void demo_update_callback(void *arg) { uint32_t *demo_update_param = (uint32_t *)arg; wq_heap_caps_free(demo_update_param); } void demo_stream_update(void) { /* Init demo update param */ uint32_t *demo_update_param; demo_update_param = wq_heap_caps_malloc(sizeof(uint8_t), MALLOC_CAP_NONE); assert(demo_update_param); memset(demo_update_param, 0, sizeof(uint8_t)); /* Update demo stream param */ wq_pipeline_stream_command_t command; memset(&command, 0, sizeof(wq_pipeline_stream_command_t)); command.type = WQ_PIPELINE_STREAM_COMMAND_UPDATE_PROCEDURE; command.stream_id = STREAM_ID_DEMO; /* Which processor to handle the command */ command.processor = demo_processor; /* Procedure index of stream,reserved,can set 0 */ command.procedure_index = 0; /* The command is depend stream or not */ command.stream_depend = false; /* Command param */ command.param = demo_update_param; /* Callback handler,do something when command completed */ command.callback = demo_update_callback; /* The param for callback handler, usually the memory to be freed*/ command.callback_param = demo_update_param; if (wq_pipeline_stream_control(&command) != WQ_RET_OK) { wq_heap_caps_free(demo_update_param); } } 6. 销毁pipeline数据流 - 如果不要求销毁pipeline数据流的时序,直接发送destory command即可。 .. code:: c :number-lines: void demo_stream_stop(void) { /* Destory demo stream */ wq_pipeline_stream_command_t command; memset(&command, 0, sizeof(wq_pipeline_stream_command_t)); command.type = WQ_PIPELINE_STREAM_COMMAND_DESTROY; command.stream_id = STREAM_ID_DEMO; wq_pipeline_stream_control(&command); } - 如果要求销毁pipeline数据流的时序,则需实现销毁动作的回调函数。 :cpp:type:`wq_pipeline_stream_command_t` .. code:: c :number-lines: /* When stream destory finished will call this callback */ static void demo_stream_destory_callback(void *arg) { os_set_event(arg, BIT(0)); } void demo_stream_stop(void) { /* Destory demo stream */ wq_pipeline_stream_command_t command; memset(&command, 0, sizeof(wq_pipeline_stream_command_t)); command.type = WQ_PIPELINE_STREAM_COMMAND_DESTROY; /* Destory command */ command.stream_id = STREAM_ID_DEMO; /* Demo stream id */ command.callback = demo_stream_destory_callback; /* Destory callback */ uint32_t events; os_event_h p_event; p_event = os_create_event(WQ_PIPELINE_MID); if (!p_event) { assert(0); } command.callback_param = p_event; wq_pipeline_stream_control(&command); /* Do not return until the event is received */ bool ret = os_wait_event(p_event, MAX_TIME, &events); if (ret) { /* Ret TRUE means stream destoryed successfully*/ } os_delete_event(p_event); 7. Pipeline shaper模块使用介绍 - Pipeline shaper模块是用于实现pcm数据长度的转换,例如将10ms的数据帧转换成7.5ms的数据帧。Pipeline shaper将输入的数据拷贝至预分配的缓存buffer中,再根据wq_pipeline_shaper_config_t的配置取出特定长度的数据帧输出。 - wq_pipeline_shaper_config_t中size为0时,表明使用默认的缓存buffer长度,该长度根据out_sp、bit_width和channel_num计算得出。参数out_sp是要输出的采样点数,比如需要7.5ms的16K采样率的数据,out_sp即为120。参数bit_width表示数据帧的位宽,channel_num表示数据的channel数。 .. code:: c :number-lines: wq_pipeline_shaper_config_t demo_shaper_config = { .size = 0, /* use default size */ .out_sp = 120, /* output sampling points, ex.16k sample rate, 7.5ms */ .bit_width = 16, /* bit wdith */ .channel_num = 3, /* channel num */ }; 8. Pipeline gather模块使用介绍 - Pipeline gather模块是一个多输入单输出的模块,主要是用于实现收集多个procedure的输出完成混音mix运算,如果需要其他子模块处理,则可以在wq_pipeline_gather_config_t中sub_processor配置。 - wq_pipeline_gather_config_t中stream_id是当前所属的stream的id,gather_id是预分配id,相同gather_id的procedure会被关联在一起,这些被关联在一起的procedure被称为gather client。当只有一个gather client时,gather processor不做处理,直接将输入数据输出。 - wq_pipeline_gather_config_t中的primary用于表示是否是输出流。Pipeline gather client只有一个primary client,只有primary gather procedure执行时才会进行gather操作,输出数据。非primary的gather client只会将输入plb缓存在队列里,输出plb为NULL。 - wq_pipeline_gather_config_t中match_ts用于表示是否需要时间戳ts(pipeline里的时间戳即采样点数)对齐。参数use_sec_ts用于表示是否要使用tone通路的时间戳ts来对齐。 .. code:: c :number-lines: wq_pipeline_gather_config_t demo_gather_config = { .stream_id = STREAM_ID_DEMO, .gather_id = STREAM_ID_GATHER, .bit_width = 16, .depth = 0, .primary = true, .match_ts = true, .use_sec_ts = true, .sub_processor = NULL, .sub_processor_config = NULL, }; 9. Pipeline mirror模块使用介绍 - Pipeline mirror模块是用于实现对多个procedure的数据分发,并且根据需要去trigger对应的stream。Pipeline mirror通常用于创建数据流旁路或者回路,或者用于在不同的stream之间传递数据。Pipeline mirror需要配置wq_pipeline_mirror_config_t,来表明数据需要分发到哪个stream。 - wq_pipeline_mirror_config_t中stream_id是当前所属的stream的id,mirror_id是预分配的id,相同的mirror_id的procedure都关联到同一个mirror handler,每一个mirror procedure都有一个mirror client,mirror processor将每个mirror client的输入数据复制分发到每个mirror client。 - wq_pipeline_mirror_config_t中depth表示mirror里tx_list的深度,当mirror client tx_list里的plb长度超过depth时,会有“process too slow”的日志打印,提示当前算法执行速度过慢。参数depth设置为0时,使用默认深度4。 - wq_pipeline_mirror_config_t中copy_mode表示是否需要复制数据,copy_mode为false时表示只需要clone plb的配置,数据不需要进行复制。 - wq_pipeline_mirror_config_t中redirect_mode表示是否需要丢弃数据,bypass_id_map表明了需要丢弃数据的stream所对应的map值。 .. code:: c :number-lines: wq_pipeline_mirror_config_t demo_mirror_config = { .stream_id = STREAM_ID_DEMO, .mirror_id = STREAM_ID_MIRROR, .depth = 0, .copy_mode = false, .redirect_mode = true, .bypass_id_map = BIT(STREAM_ID_BYPASS), }; 参考示例 ----------- :: examples/pipeline_demo API 介绍 ----------- .. doxygenfile:: pipeline_breaker.h .. doxygenfile:: pipeline_buff.h .. doxygenfile:: pipeline_gather.h .. doxygenfile:: pipeline_mirror.h .. doxygenfile:: pipeline_processor.h .. doxygenfile:: pipeline_shaper.h .. doxygenfile:: pipeline_stream.h