Zephyr RTOS 中FIFO(先进先出队列)接口介绍

发布时间:2026/6/2 6:32:49

Zephyr RTOS 中FIFO(先进先出队列)接口介绍 目录概述1 FIFO 函数介绍1.1 核心 FIFO 函数列表1.2 FIFO 的核心代码概念与工作原理1.3 FIFO 与其他 IPC 机制的对比2 核心函数详解与使用2.1 初始化 FIFO2.2 向 FIFO 添加数据k_fifo_put()2.3 从 FIFO 获取数据k_fifo_get()2.4 取消等待k_fifo_cancel_wait()3 完整应用示例3.1 一般用法3.1.1 多生产者-单消费者模式3.1.2 中断到线程的通信3.2 FIFO 的高级用法与模式3.2.1 优先级队列模式3.2.2 超时与错误处理3.2.3 批处理模式3.2.4 内存管理4 应用总结4.1 常见问题与解决方案4.2 特性总结概述在 Zephyr RTOS 中FIFO先进先出队列是用于在线程间传递数据的基本内核对象之一。与 LIFO 的后进先出顺序不同FIFO 保证了数据按照被添加的顺序被取出这对于许多通信和任务处理场景至关重要。通过合理使用 FIFO可以在 Zephyr 系统中构建高效、松耦合的多线程架构。对于简单的生产者-消费者模式FIFO 通常是比消息队列或邮箱更轻量、更灵活的选择。1 FIFO 函数介绍1.1 核心 FIFO 函数列表函数功能描述主要用途k_fifo_init()动态初始化 FIFO运行时初始化 FIFO 队列k_fifo_put()向 FIFO 添加数据项在队列尾部添加数据k_fifo_get()从 FIFO 获取数据项从队列头部取出数据k_fifo_cancel_wait()取消等待中的线程强制唤醒等待的线程K_FIFO_DEFINE()静态定义 FIFO编译时定义并初始化 FIFO1.2 FIFO 的核心代码概念与工作原理1 FIFO 的核心概念FIFO 是一个队列数据结构遵循先进先出原则。它可以看作一个管道数据从一端进入从另一端按进入顺序离开FIFO 工作原理 入队顺序: A → B → C → D 出队顺序: A → B → C → D 先进入的先被取出 队列结构 ┌───┬───┬───┬───┐ │ A │ B │ C │ D │ ← 尾部新数据加入 └───┴───┴───┴───┘ 头部数据被取出→2关键特性存储的是指针(void *)不是数据副本线程安全可在多线程和中断中并发使用支持阻塞等待和非阻塞获取无固定容量限制受内存约束1.3 FIFO 与其他 IPC 机制的对比特性FIFO (k_fifo)消息队列 (k_msgq)管道 (k_pipe)邮箱 (k_mbox)数据模型指针队列固定大小消息字节流数据块顺序保证先进先出先进先出先进先出取决于实现内存效率高仅指针中等复制开销高字节缓冲区中等数据类型任意通过指针固定结构字节数据块阻塞行为可阻塞获取可阻塞发送/接收可阻塞读写可阻塞中断安全是(k_fifo_put)有限制有限制有限制2 核心函数详解与使用2.1 初始化 FIFO1静态初始化推荐#include zephyr/kernel.h /* 静态定义和初始化 FIFO */ K_FIFO_DEFINE(my_fifo); void example_static_init(void) { printk(FIFO 已静态初始化\n); }2动态初始化struct k_fifo my_dynamic_fifo; void init_dynamic_fifo(void) { k_fifo_init(my_dynamic_fifo); printk(FIFO 已动态初始化\n); }2.2 向 FIFO 添加数据k_fifo_put()1 函数原型void k_fifo_put(struct k_fifo *fifo, void *data);2重要特性非阻塞立即返回线程安全可在中断上下文调用数据在队列中保持顺序3示例生产者线程/* 定义要传递的数据结构 */ struct sensor_packet { uint32_t timestamp; int16_t temperature; int16_t humidity; struct k_fifo *reply_fifo; /* 用于请求-响应模式 */ }; /* 生产者向 FIFO 添加数据 */ void producer_thread(void *arg1, void *arg2, void *arg3) { struct k_fifo *fifo (struct k_fifo *)arg1; uint32_t sequence 0; while (1) { /* 方法1传递静态/全局数据指针 */ static struct sensor_packet packet; packet.timestamp k_uptime_get(); packet.temperature read_temperature(); packet.humidity read_humidity(); k_fifo_put(fifo, packet); sequence; /* 方法2传递动态分配的数据 */ struct sensor_packet *dynamic_packet k_malloc(sizeof(struct sensor_packet)); if (dynamic_packet ! NULL) { dynamic_packet-timestamp k_uptime_get(); dynamic_packet-temperature 25; dynamic_packet-humidity 60; /* 设置回复 FIFO请求-响应模式*/ K_FIFO_DEFINE(reply_fifo); dynamic_packet-reply_fifo reply_fifo; k_fifo_put(fifo, dynamic_packet); /* 等待回复可选*/ // void *reply k_fifo_get(reply_fifo, K_MSEC(100)); } k_sleep(K_MSEC(1000)); } }2.3 从 FIFO 获取数据k_fifo_get()1 函数原型void *k_fifo_get(struct k_fifo *fifo, k_timeout_t timeout);2参数说明timeout等待超时可以是K_NO_WAIT非阻塞立即返回K_FOREVER永久阻塞直到有数据具体时间如K_MSEC(100)3示例消费者线程/* 消费者从 FIFO 获取并处理数据 */ void consumer_thread(void *arg1, void *arg2, void *arg3) { struct k_fifo *fifo (struct k_fifo *)arg1; struct sensor_packet *packet; printk(消费者线程启动\n); while (1) { /* 方法1阻塞等待数据 */ packet k_fifo_get(fifo, K_FOREVER); if (packet ! NULL) { printk([%u] 温度: %d°C, 湿度: %d%%\n, packet-timestamp, packet-temperature, packet-humidity); /* 如果是请求-响应模式发送回复 */ if (packet-reply_fifo ! NULL) { static int response 1; k_fifo_put(packet-reply_fifo, response); } /* 动态分配的数据需要释放 */ // k_free(packet); } /* 方法2非阻塞获取轮询模式*/ // packet k_fifo_get(fifo, K_NO_WAIT); // if (packet NULL) { // k_sleep(K_MSEC(10)); /* 无数据时短暂休眠 */ // } } }2.4 取消等待k_fifo_cancel_wait()1 函数原型void k_fifo_cancel_wait(struct k_fifo *fifo);作用强制唤醒所有正在k_fifo_get()中阻塞等待的线程让它们返回NULL。使用场景系统关闭或重启时错误恢复需要清空等待队列超时管理2 使用示例/* 示例优雅关闭系统 */ void graceful_shutdown(struct k_fifo *fifo) { printk(系统关闭中...\n); /* 1. 取消所有等待的线程 */ k_fifo_cancel_wait(fifo); /* 2. 清空 FIFO 中剩余的数据 */ void *data; while ((data k_fifo_get(fifo, K_NO_WAIT)) ! NULL) { if (needs_freeing(data)) { k_free(data); } } /* 3. 等待处理线程退出 */ k_sleep(K_MSEC(100)); printk(系统已关闭\n); }3 完整应用示例3.1 一般用法3.1.1 多生产者-单消费者模式#include zephyr/kernel.h #include zephyr/sys/printk.h /* 定义任务数据结构 */ struct work_task { void (*handler)(void *); void *arg; uint32_t priority; }; /* 静态定义任务队列 */ K_FIFO_DEFINE(task_queue); /* 任务处理线程消费者 */ void task_processor(void *arg1, void *arg2, void *arg3) { struct work_task *task; printk(任务处理器启动\n); while (1) { /* 获取任务永久等待 */ task k_fifo_get(task_queue, K_FOREVER); if (task ! NULL task-handler ! NULL) { printk(执行任务优先级: %u\n, task-priority); /* 执行任务处理函数 */ task-handler(task-arg); /* 释放任务内存如果是动态分配的 */ k_free(task); } } } /* 示例任务处理函数 */ void sample_task_handler(void *arg) { int *value (int *)arg; printk(任务执行: 值 %d\n, *value); k_sleep(K_MSEC(50)); /* 模拟处理时间 */ } /* 生产者提交任务到队列 */ void submit_task(void (*handler)(void *), void *arg, uint32_t priority) { struct work_task *task k_malloc(sizeof(struct work_task)); if (task ! NULL) { task-handler handler; task-arg arg; task-priority priority; /* 将任务添加到 FIFO */ k_fifo_put(task_queue, task); printk(任务已提交优先级: %u\n, priority); } } /* 多个生产者线程 */ void producer_high_priority(void *arg1, void *arg2, void *arg3) { int count 0; while (1) { int *data k_malloc(sizeof(int)); *data count; /* 提交高优先级任务 */ submit_task(sample_task_handler, data, 1); k_sleep(K_MSEC(200)); } } void producer_low_priority(void *arg1, void *arg2, void *arg3) { int count 1000; while (1) { int *data k_malloc(sizeof(int)); *data count; /* 提交低优先级任务 */ submit_task(sample_task_handler, data, 10); k_sleep(K_MSEC(500)); } } /* 主函数 */ int main(void) { printk( 多生产者-单消费者示例 \n); /* 启动任务处理器 */ k_thread_create(processor_tid, processor_stack, K_THREAD_STACK_SIZEOF(processor_stack), task_processor, NULL, NULL, NULL, 5, 0, K_NO_WAIT); /* 启动多个生产者 */ k_thread_create(high_pri_producer_tid, high_pri_stack, K_THREAD_STACK_SIZEOF(high_pri_stack), producer_high_priority, NULL, NULL, NULL, 6, 0, K_NO_WAIT); k_thread_create(low_pri_producer_tid, low_pri_stack, K_THREAD_STACK_SIZEOF(low_pri_stack), producer_low_priority, NULL, NULL, NULL, 7, 0, K_NO_WAIT); k_sleep(K_FOREVER); return 0; }3.1.2 中断到线程的通信#include zephyr/kernel.h #include zephyr/device.h #include zephyr/drivers/uart.h /* 定义数据包和 FIFO */ struct uart_packet { uint8_t data[64]; size_t length; uint32_t timestamp; }; K_FIFO_DEFINE(uart_rx_fifo); /* UART 接收中断处理 */ void uart_isr(const struct device *uart_dev, void *user_data) { static uint8_t rx_buffer[128]; static size_t buffer_pos 0; uint8_t byte; /* 读取所有可用字节 */ while (uart_irq_rx_ready(uart_dev)) { uart_fifo_read(uart_dev, byte, 1); /* 简单协议以换行符结束一个包 */ if (byte \n || buffer_pos sizeof(rx_buffer) - 1) { if (buffer_pos 0) { /* 创建数据包 */ struct uart_packet *packet k_malloc(sizeof(struct uart_packet)); if (packet ! NULL) { packet-length buffer_pos; packet-timestamp k_cycle_get_32(); memcpy(packet-data, rx_buffer, buffer_pos); /* 将包放入 FIFO中断安全 */ k_fifo_put(uart_rx_fifo, packet); } buffer_pos 0; } } else { rx_buffer[buffer_pos] byte; } } } /* 数据处理线程 */ void uart_data_handler(void *arg1, void *arg2, void *arg3) { struct uart_packet *packet; printk(UART 数据处理线程启动\n); while (1) { /* 等待数据包 */ packet k_fifo_get(uart_rx_fifo, K_FOREVER); if (packet ! NULL) { /* 处理数据包 */ printk(收到 UART 数据包[%u字节]: , packet-length); for (size_t i 0; i packet-length; i) { printk(%c, packet-data[i]); } printk(\n); /* 释放数据包内存 */ k_free(packet); } } }3.2 FIFO 的高级用法与模式3.2.1 优先级队列模式/* 扩展 FIFO 支持优先级 */ struct priority_item { void *data; uint8_t priority; /* 0 最高优先级 */ sys_snode_t node; }; /* 使用多个 FIFO 实现优先级 */ #define NUM_PRIORITIES 4 struct k_fifo priority_queues[NUM_PRIORITIES]; void init_priority_queues(void) { for (int i 0; i NUM_PRIORITIES; i) { k_fifo_init(priority_queues[i]); } } /* 按优先级提交项目 */ void priority_put(void *data, uint8_t priority) { if (priority NUM_PRIORITIES) { priority NUM_PRIORITIES - 1; } struct priority_item *item k_malloc(sizeof(struct priority_item)); if (item ! NULL) { item-data data; item-priority priority; k_fifo_put(priority_queues[priority], item); } } /* 按优先级获取项目高优先级先出 */ void *priority_get(k_timeout_t timeout) { struct priority_item *item NULL; /* 从高到低检查所有优先级队列 */ for (int prio 0; prio NUM_PRIORITIES; prio) { item k_fifo_get(priority_queues[prio], K_NO_WAIT); if (item ! NULL) { void *data item-data; k_free(item); return data; } } /* 如果都没有数据等待最高优先级的队列 */ return k_fifo_get(priority_queues[0], timeout); }3.2.2 超时与错误处理/* 增强的 FIFO 操作包装 */ #define FIFO_OPERATION_TIMEOUT K_MSEC(500) #define MAX_RETRY_ATTEMPTS 3 struct fifo_operation_result { void *data; int status; uint32_t attempts; }; /* 带重试的获取 */ struct fifo_operation_result robust_fifo_get(struct k_fifo *fifo, k_timeout_t base_timeout, int max_retries) { struct fifo_operation_result result {0}; k_timeout_t timeout base_timeout; for (result.attempts 0; result.attempts max_retries; result.attempts) { result.data k_fifo_get(fifo, timeout); if (result.data ! NULL) { result.status 0; /* 成功 */ return result; } /* 指数退避 */ int64_t ms k_timeout_to_ms(timeout); timeout K_MSEC(ms * 2); printk(FIFO 获取尝试 %u 失败等待 %lld ms\n, result.attempts 1, ms); } result.status -ETIMEDOUT; return result; } /* 监控 FIFO 状态 */ void monitor_fifo_stats(struct k_fifo *fifo, const char *name) { #ifdef CONFIG_DEBUG static int64_t last_report 0; int64_t now k_uptime_get(); if (now - last_report 5000) { /* 每5秒报告一次 */ /* 这里可以添加自定义统计逻辑 */ printk([%s] FIFO 监控时间: %lld\n, name, now); last_report now; } #endif }3.2.3 批处理模式/* 批处理 FIFO 操作 */ #define BATCH_SIZE 10 struct batch_processor { struct k_fifo *input_fifo; struct k_fifo *output_fifo; void *batch_buffer[BATCH_SIZE]; size_t batch_count; }; void batch_processor_init(struct batch_processor *bp, struct k_fifo *input, struct k_fifo *output) { bp-input_fifo input; bp-output_fifo output; bp-batch_count 0; } /* 收集一批数据 */ int collect_batch(struct batch_processor *bp, k_timeout_t timeout) { int64_t end_time k_uptime_get() k_timeout_to_ms(timeout); while (bp-batch_count BATCH_SIZE) { int64_t remaining end_time - k_uptime_get(); if (remaining 0) { break; /* 超时 */ } void *data k_fifo_get(bp-input_fifo, K_MSEC(remaining)); if (data ! NULL) { bp-batch_buffer[bp-batch_count] data; } else { break; /* 没有更多数据 */ } } return bp-batch_count; } /* 处理并提交批次 */ void process_and_submit_batch(struct batch_processor *bp) { if (bp-batch_count 0) { /* 处理批次数据 */ printk(处理批次: %zu 个项目\n, bp-batch_count); /* 将处理结果放入输出 FIFO */ for (size_t i 0; i bp-batch_count; i) { /* 这里可以进行数据处理 */ k_fifo_put(bp-output_fifo, bp-batch_buffer[i]); } bp-batch_count 0; } }3.2.4 内存管理/* 使用内存池避免碎片 */ K_MEM_POOL_DEFINE(fifo_pool, 64, 256, 8, 8); void *fifo_alloc(size_t size) { return k_mem_pool_alloc(fifo_pool, size, K_MSEC(100)); } void fifo_free(void *ptr) { k_mem_pool_free(fifo_pool, ptr); } /* 对象池模式 */ #define OBJECT_POOL_SIZE 20 struct data_object { /* 数据字段 */ sys_snode_t node; }; static struct data_object object_pool[OBJECT_POOL_SIZE]; static sys_slist_t free_objects; void init_object_pool(void) { sys_slist_init(free_objects); for (int i 0; i OBJECT_POOL_SIZE; i) { sys_slist_append(free_objects, object_pool[i].node); } } struct data_object *acquire_object(void) { sys_snode_t *node sys_slist_get(free_objects); return node ? CONTAINER_OF(node, struct data_object, node) : NULL; } void release_object(struct data_object *obj) { if (obj ! NULL) { sys_slist_append(free_objects, obj-node); } }4 应用总结4.1 常见问题与解决方案问题可能原因解决方案内存泄漏动态数据未释放消费者释放获取的数据死锁循环等待 FIFO 数据使用k_fifo_cancel_wait()数据竞争生产者过早重用数据使用请求-响应模式或复制数据队列饥饿高优先级数据过多实现公平调度或优先级队列性能瓶颈频繁小数据传递使用批处理模式4.2 特性总结1 Zephyr 的 FIFO 是一个简单而强大的线程间通信机制特别适合任务队列生产者提交任务消费者处理事件传递中断向线程传递事件数据缓冲平滑生产者和消费者速率差异流水线处理多个处理阶段间的数据传递2关键优势顺序保证先进先出保证公平性零复制仅传递指针效率高中断安全可在 ISR 中安全添加数据灵活性支持任意数据类型3注意事项内存管理FIFO 不管理数据内存需应用层负责阻塞控制合理设置超时避免永久阻塞并发安全虽然 FIFO 操作本身是原子的但数据内容的访问可能需要同步

相关新闻