MPSC Queue in C

Non-intrusive MPSC node-based queue

上記 URL は multi producers and single consumer 向けの queue の実装で、akka の actor の実装でもこのアルゴリズムが利用されています1。この queue の良い点は記事内で説明されているのでそちらを読んでください。

で、上記 URL に記載されている実装は volatile に memory barrier を期待している点が良くなくて、こうした方が良いという修正を施した実装を以下に参考に載せておきます。このコードは コンパイラ拡張を使っていますが、C11 が使えるなら C11 で入った <stdatomic.h> の機能を使ってコードを書いた方が良いです。

#include <stdio.h>

#if defined(__GNUC__)
#define XCHG(ptr, val)           __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define ATOMIC_LOAD_N(ptr)       __atomic_load_n((ptr), __ATOMIC_ACQUIRE)
#define ATOMIC_STORE_N(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_RELEASE)
#elif defined(__clang__)
#define XCHG(ptr, val)           __c11_atomic_exchange((ptr), (val), __ATOMIC_SEQ_CST)
#define ATOMIC_LOAD_N(ptr)       __c11_atomic_load((ptr), __ATOMIC_ACQUIRE)
#define ATOMIC_STORE_N(ptr, val) __c11_atomic_store((ptr), (val), __ATOMIC_RELEASE)
#endif

typedef struct mpscq_node_t 
{ 
  struct mpscq_node_t* next; 
  void*                state; 
} mpscq_node_t; 

typedef struct mpscq_t 
{ 
  mpscq_node_t* head; 
  mpscq_node_t* tail; 
} mpscq_t;

static mpscq_node_t STUB = { NULL, NULL };

void mpscq_create(mpscq_t* self, mpscq_node_t* stub) 
{ 
  stub->next = NULL; 
  self->head = stub; 
  self->tail = stub; 
} 

void mpscq_push(mpscq_t* self, mpscq_node_t* n) 
{ 
  n->next = NULL; 
  mpscq_node_t* prev = XCHG(&self->head, n); // serialization-point wrt producers, acquire-release
  ATOMIC_STORE_N(&prev->next, n); // serialization-point wrt consumer, release
} 

mpscq_node_t* mpscq_pop(mpscq_t* self) 
{ 
  mpscq_node_t* tail = self->tail; 
  mpscq_node_t* next = ATOMIC_LOAD_N(&tail->next); // serialization-point wrt producers, acquire
  if (next) 
    { 
      self->tail = next; 
      tail->state = next->state; 
      return tail; 
    } 
  return 0; 
}

/** example usage */
typedef struct item_t { int value; } item_t;

#define GetQNodeState(node) (node)->state
#define GetItemValue(node) ((item_t*)GetQNodeState(node))->value

int main(int argc, char** argv)
{
  mpscq_t q;
  item_t item1 = { 1 };
  item_t item2 = { 2 };
  mpscq_node_t n1 = { NULL, (void*)&item1 };
  mpscq_node_t n2 = { NULL, (void*)&item2 };

  mpscq_create(&q, &STUB);
  mpscq_push(&q, &n1);
  mpscq_push(&q, &n2);

  for (int i = 0; i < 5; ++i) {
    mpscq_node_t *ret = mpscq_pop(&q);

    if (ret) {
      printf("%d\n", GetItemValue(ret));
    }
  }

  return 0;
}

Comments

comments powered by Disqus