ESP32×FreeRTOS:MessageBuffer入門|可変長フレーム受け渡し

FreeRTOS

この記事でわかること

  • MessageBuffer(メッセージバッファ)で何ができるか
  • Queue / StreamBuffer との違い
  • ESP32(ESP-IDF)で動く最小サンプル
  • 主要API(Create / Send / Receive / FromISR / 状態確認系)の引数・戻り値
  • 設計のコツ(サイズ設計・1 writer/1 reader・詰まり対策・ハマりどころ)

こんな方におすすめ

  • UART/Socket/MQTTなどで「可変長データ(フレーム)」を別タスクに渡したい
  • Queueで「固定長struct」か「1バイトずつ送る」設計になって辛い
  • MessageBufferの xMessageBufferReceive() の引数(受信バッファ長)で混乱している
  • ISR(割り込み)→ タスクへ“安全に受け渡し”したい

1. MessageBufferとは

FreeRTOSの MessageBuffer は、タスク間で 「可変長メッセージ」を安全に受け渡しする仕組みです。

  • 送信側:好きな長さの「メッセージ(バイト列)」を送る
  • 受信側:1回のReceiveで“1メッセージ分”を取り出す

MessageBuffer/StreamBufferは基本設計が 1 writer / 1 reader 前提です。
複数writerにしたい場合は、外側で Mutex等で送信APIを排他すれば運用は可能です。
ただし xMessageBufferSend() がブロックすると Mutex保持のまま待つため、送信側全体が詰まりやすくなります。実務では以下2点を推奨します。

  • 送信は1本の集約タスクに寄せる
  • どうしても複数writerなら 有限タイムアウト+失敗ログ(例:100ms待って送れなければドロップ/再送)

2. Queue / StreamBuffer / MessageBufferの違い(設計判断が速くなる表)

手段何が得意?境界サイズ典型用途
Queue固定サイズの“要素”をFIFOあり(要素単位)固定(item size)コマンド/構造体/小さな値
StreamBufferバイト“ストリーム”なし可変(読み出し側次第)UARTの生バイト、連続データ
MessageBuffer可変長“メッセージ”あり可変(メッセージ単位)フレーム(JSON/行/ヘッダ付き)

Queueは本来 item sizeを任意に設定できます。
ただし可変長を扱うときに、送信データの最大長のstructを作る必要があり、データ長が一定でない場合は無駄な領域が発生します。

MessageBufferは バッファ全体は固定容量だが、Queueのように「1要素=最大長固定」の箱を並べない。
そのため可変長データでも、メッセージごとに必要な分だけ詰めて格納でき、無駄が出にくい。


3. MessageBufferの内部イメージ(サイズ設計のために重要)

MessageBufferはメッセージを積むとき、各メッセージに管理用ヘッダ(メッセージ長)が付くため、単純に「データ合計=バッファ容量」ではありません。

  • 各メッセージごとに sizeof(size_t) バイトのオーバーヘッドが必要です。
    例:ESP32(32-bit環境)だと sizeof(size_t)=4 のケースが多いので、1メッセージあたり +4 byte(目安)

つまり容量設計は:

必要容量 ≒(最大メッセージ長 + sizeof(size_t))× ある程度の並び量 + 余裕


4. 【最小サンプル】可変長メッセージを送って、1回で受け取る(ESP32 + ESP-IDF)

やりたいこと

  • ProducerTask:可変長の文字列メッセージを送る
  • ConsumerTask:1回のReceiveで“1メッセージ”として受け取って表示
#include <cstdio>
#include <cstring>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/message_buffer.h"

static MessageBufferHandle_t mbuf;

static void ProducerTask(void *pv)
{
    const char *msgs[] = {
        "OK",
        "TEMP=24.5,HUM=41",
        "{\"type\":\"sensor\",\"v\":12345}"
    };

    for (;;) {
        for (int i = 0; i < 3; i++) {
            const char *m = msgs[i];
            size_t len = strlen(m); // '\0' は送らない(バイト列として送る)

            // 送信:成功すると「書き込めたバイト数」が返る
            size_t sent = xMessageBufferSend(mbuf, m, len, pdMS_TO_TICKS(1000));
            if (sent != len) {
                printf("[Producer] send failed (sent=%u)\n", (unsigned)sent);
            } else {
                printf("[Producer] sent: %s (len=%u)\n", m, (unsigned)len);
            }
            vTaskDelay(pdMS_TO_TICKS(500));
        }
    }
}

static void ConsumerTask(void *pv)
{
    uint8_t rx[64];

    for (;;) {
        // 受信:1メッセージ分を取り出す
        size_t n = xMessageBufferReceive(mbuf, rx, sizeof(rx), portMAX_DELAY);

        if (n > 0) {
            // 表示のため終端を付ける(MessageBuffer自体は '\0' を付けない)
            size_t m = (n < sizeof(rx) - 1) ? n : (sizeof(rx) - 1);
            rx[m] = '\0';
            printf("[Consumer] got: %s (n=%u)\n", rx, (unsigned)n);
        }
    }
}

extern "C" void app_main(void)
{
    // 例:容量256バイト(ユースケースに合わせて調整)
    mbuf = xMessageBufferCreate(256);
    if (mbuf == NULL) {
        printf("xMessageBufferCreate failed\n");
        return;
    }

    xTaskCreate(ProducerTask, "ProducerTask", 2048, NULL, 5, NULL);
    xTaskCreate(ConsumerTask, "ConsumerTask", 2048, NULL, 5, NULL);
}

動きのポイント

  • Producerは 長さの違うメッセージを送っている
  • Consumerは xMessageBufferReceive() 1回で1メッセージ受け取る
  • 終端 '\0' は自前で付ける(バイト列なので)

5. 主要APIを厚めに理解(WP版:引数・戻り値・設計の意図まで)

MessageBufferは「使うだけ」なら簡単ですが、引数設計をミスると取りこぼし/詰まりが起きます。


xMessageBufferCreate()(作成する)

MessageBufferHandle_t mb = xMessageBufferCreate(1024);
  • 役割:MessageBuffer を作成する(メッセージ境界ありのバッファ)
  • 引数
    • 第1引数xBufferSizeBytes(確保するバッファ容量[byte])
      • 例:1024
  • 戻り値
    • 成功:MessageBufferHandle_t(バッファのハンドル)
    • 失敗:NULL(メモリ不足など)

xMessageBufferCreateStatic()(静的確保で作成する)

StaticMessageBuffer_t mbStruct;
uint8_t storage[1024];

MessageBufferHandle_t mb =
    xMessageBufferCreateStatic(sizeof(storage), storage, &mbStruct);
  • 役割:動的メモリを使わず、静的領域で MessageBuffer を作成する
  • 引数
    • 第1引数xBufferSizeBytes(バッファ容量[byte])
    • 第2引数pucBufferStorageArea(実データ領域のポインタ)
    • 第3引数pxStaticMessageBuffer(管理構造体 StaticMessageBuffer_t のポインタ)
  • 戻り値
    • 成功:MessageBufferHandle_t
    • 失敗:NULL

xMessageBufferSend()(送信する)

size_t sent = xMessageBufferSend(mb, tx, txLen, portMAX_DELAY);
  • 役割:MessageBuffer に「1メッセージ」として書き込む(境界あり)
  • 引数
    • 第1引数:対象の MessageBuffer ハンドル
    • 第2引数:送信データ先頭ポインタ
    • 第3引数:送信バイト数(メッセージ長)
    • 第4引数:待ち時間(Tick)
      • 0:待たない(空きがなければ失敗)
      • portMAX_DELAY:空きができるまで待つ(ブロック)
      • pdMS_TO_TICKS(100):最大100ms待つ、など
  • 戻り値
    • 成功:送れたバイト数(通常は txLen
    • 失敗:0(タイムアウト、空き不足など)

xMessageBufferReceive()(受信する)

size_t rxLen = xMessageBufferReceive(mb, rx, sizeof(rx), portMAX_DELAY);
  • 役割:MessageBuffer から「1メッセージ」を読み出す(境界あり)
  • 引数
    • 第1引数:対象の MessageBuffer ハンドル
    • 第2引数:受信バッファ先頭ポインタ
    • 第3引数:受信バッファサイズ(最大で受け取れるサイズ)
    • 第4引数:待ち時間(Tick)
      • 0:待たない(未到着なら失敗)
      • portMAX_DELAY:メッセージ到着まで待つ(ブロック)
  • 戻り値
    • 成功:受信したバイト数(メッセージ長)
    • 失敗:0(タイムアウト等)
    • 注意:受信バッファが小さいと メッセージは受信されず(基本 0 で戻る)
      ※「取りこぼし/欠損」ではなく「受け取れない」扱いになる点が重要

xMessageBufferSendFromISR()(ISRから送信する)

BaseType_t hpw = pdFALSE;
size_t sent = xMessageBufferSendFromISR(mb, tx, txLen, &hpw);
if (hpw) portYIELD_FROM_ISR();
  • 役割:割り込み(ISR)から MessageBuffer に送信する
  • 引数
    • 第1引数:対象の MessageBuffer ハンドル
    • 第2引数:送信データ先頭ポインタ
    • 第3引数:送信バイト数
    • 第4引数pxHigherPriorityTaskWoken
      • ISR内で通知により高優先度タスクが起床したかを返す
  • 戻り値
    • 成功:送れたバイト数(通常 txLen
    • 失敗:0(空き不足など)
  • 注意
    • ISRでは待てない(ブロック不可)
    • hpwpdTRUE のとき portYIELD_FROM_ISR() で即時切替を促す

xMessageBufferReceiveFromISR()(ISRから受信する)

size_t rxLen = xMessageBufferReceiveFromISR(mb, rx, sizeof(rx));
  • 役割:ISRから MessageBuffer を読み出す(用途は限定的。基本はタスクで受信が推奨)
  • 引数
    • 第1引数:対象の MessageBuffer ハンドル
    • 第2引数:受信バッファ先頭ポインタ
    • 第3引数:受信バッファサイズ
  • 戻り値
    • 成功:受信したバイト数
    • 失敗:0(未到着等)
  • 注意
    • ISRでの受信は「設計上やむを得ない場合」に限定するのが無難

xMessageBufferBytesAvailable()(溜まっている総量を確認する)

size_t avail = xMessageBufferBytesAvailable(mb);
  • 役割:バッファ内に溜まっている「総バイト量」を返す(デバッグ/監視向き)
  • 引数
    • 第1引数:対象の MessageBuffer ハンドル
  • 戻り値
    • size_t:現在溜まっているバイト数

xMessageBufferSpacesAvailable()(空き容量を確認する)

size_t freeBytes = xMessageBufferSpacesAvailable(mb);
  • 役割:バッファの空き容量(書き込める余裕)を返す
  • 引数
    • 第1引数:対象の MessageBuffer ハンドル
  • 戻り値
    • size_t:空きバイト数

vMessageBufferDelete()(削除する)

vMessageBufferDelete(mb);
  • 役割:MessageBuffer を削除する(動的確保した場合の解放)
  • 引数
    • 第1引数:対象の MessageBuffer ハンドル
  • 戻り値
    • なし

6. 重要な注意点

注意1:基本は「1 writer / 1 reader」前提

MessageBuffer/StreamBufferは、複数writer/readerに対して安全ではない前提があります。
複数writerにする場合、推奨としては「クリティカルセクションで書き込みAPIを包み、ブロック時間0」などの制約が出ます。

実務的には:

  • 基本は 1本の受信タスク → 1本の処理タスクで設計する
  • 複数送信元があるなら「送信を集約するタスク」を1つ噛ませる
  • セマフォを使用し、同時読み書きが発生しない設計とする

注意2:受信バッファ不足は“詰まり”を誘発する

受信バッファが小さいと「先頭の巨大メッセージ」が残り続けます。

注意3:容量設計は「最大長+ヘッダ+並び量」

各メッセージに sizeof(size_t) のオーバーヘッドがある点を踏まえて設計します。


7. よくある設計パターン

パターンA:UART受信タスク → パーサタスク

  • UART側で「1フレーム分」を組み立ててMessageBufferへ送る
  • パーサ側は Receive 1回でフレームを処理でき、設計が綺麗

パターンB:ISRは“通知だけ”・コピーはタスクで

  • ISRでは重い処理をせず、Task Notificationで起床
  • 起床したタスクがMessageBufferへまとめて送る
    (ISRでメモリコピー時間を食いにくい)

8. まとめ

  • MessageBufferは 可変長メッセージ(境界あり) を渡す仕組み
  • Queueで可変長を扱うと辛い問題を、設計ごと単純化できる
  • 重要ポイントは 受信バッファ長1 writer / 1 reader前提

コメント

タイトルとURLをコピーしました