A countdown event is a less commonly seen, but quite useful synchronization primitive. It is available in C# .NET4. Here we look at how to roll your own countdown event, with a Playstation3 implementation, as that is the most interesting (other platforms are simpler). The basic idea of a countdown event is that there is a counter of outstanding work items, which is decremented as they complete. Waiting on a countdown event will block a thread until the count has returned to zero.

A generic use case example is,

 
      while there is work to be scheduled
 
          increment countdown event
 
          schedule work for another thread
 
  
 
      do something else while worker threads do their thing
 
  
 
      wait on the countdown event returning to zero
 
  

Here we are incrementing the counter before each work item is scheduled. Once each work item completes, it will then decrement the counter. Incrementing the counter before the work item is scheduled prevents there being a race condition where the counter could become negative temporarily.

If you are familiar with the C# CountdownEvent, there is an important difference between their implementation and ours. The C# CountdownEvent becomes signaled whenever the counter reaches zero, making it an error to increment the counter when signaled. Our implementation does not have this issue, so there is no need for an additional increment and decrement on the main thread like C#.

To implement a countdown event, we need atomic integer operations, and another synchronization object that can be used if the waiting thread needs to block. For GNU/Linux a semaphore works well and Windows can use an Event. For registered Playstation3 developers, there are other synchronization primitives that work well, but here we are using GNU/Linux installed as Other OS, so semaphores it shall be.

Pseudo Code

The basic interface is quite simple,

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  16
 
  17
 
  18
 
  
class countdown_event{
 
  public:
 
   
 
    /**Increment counter
 
     * @param num     value to add to counter (default 1)
 
     */
 
    inline void inc(unsigned num=1);
 
   
 
    /**Decrement counter, waking any waiter if count goes to zero*/
 
    inline void dec();
 
   
 
    /**Wait till counter has returned to zero*/
 
    inline void wait();
 
   
 
  private:
 
    volatile uint32_t count;
 
    enum { WAITER = 0x80000000 };
 
  };

First up, pseudo code implementations of these functions.

inc() is about as trivial as you can get.

1
 
  2
 
  3
 
  
inline void countdown_event::inc(unsigned num){
 
    atomic_add(&count,num);
 
  }

wait() is a bit more meaty. We atomically set the most significant bit of the counter (WAITER flag), with the atomic operation returning the previous value before the bitwise or. If the previous value was zero, then all the dec()s have already occurred, so we do not block. If the counter was non-zero, then we need to wait to be signaled by another thread.

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  
inline void countdown_event::wait(){
 
    uint32_t prev=atomic_or(&count,WAITER);
 
    if(prev){
 
      wait_to_be_woken_up();
 
    }
 
    count=0;
 
  }

We can optimize wait() a little bit to save the atomic_or() when the count has already reached zero. The idea here is that we want the non-blocking case to be as fast as possible. Atomic operations are reasonably costly, so this optimization is worth while.

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  
inline void countdown_event::wait(){
 
    if(count){
 
      uint32_t prev=atomic_or(&count,WAITER);
 
      if(prev){
 
        wait_to_be_woken_up();
 
      }
 
      count=0;
 
    }
 
  }

dec() works in a race against wait(). If the decrement of the counter takes it to zero before the WAITER flag has been set, then nothing special needs to be done. But if the WAITER flag has already been set, then the waiting thread needs to be woken up.

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  
inline void countdown_event::dec(){
 
    uint32_t prev=atomic_dec(&count);
 
    if(prev==WAITER+1){
 
      wake_waiting_thread();
 
    }
 
  }

WARNING: From this design, we can see there are some limitations. inc() must never be called after wait(). If there was multiple inc()s after the wait(), then the waiting thread could be woken up too early by a dec(). Also, when a waiting thread is awaken, it will simply write zero back to the counter, so that would interfere with any inc()s. Generally you just want to call inc() and wait() on the same thread so it is not an issue.

Playstation3 Implementation

For most platforms turning the above pseudo code into a fully fledged implementation is very straight forward. But things get a bit tricker on the Playstation3 if we let SPUs also call dec(). We need a mechanism for code running on an SPU to wake up a PPU thread. Here we are going with mailbox interrupts to send a message to the PPU that it needs to wake a thread. Stop-and-signal instructions would also work, but a mailbox interrupt is cheaper from the SPU point of view (and much of a muchness on the PPU side).

Here is our PS3 GNU/Linux implementation,

countdown_event.h++

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  16
 
  17
 
  18
 
  19
 
  20
 
  21
 
  22
 
  23
 
  24
 
  25
 
  26
 
  27
 
  28
 
  29
 
  30
 
  31
 
  32
 
  33
 
  34
 
  35
 
  36
 
  37
 
  38
 
  39
 
  40
 
  41
 
  42
 
  43
 
  44
 
  45
 
  46
 
  47
 
  48
 
  49
 
  50
 
  51
 
  52
 
  53
 
  54
 
  55
 
  56
 
  57
 
  58
 
  59
 
  60
 
  61
 
  62
 
  63
 
  64
 
  65
 
  66
 
  67
 
  68
 
  69
 
  70
 
  71
 
  72
 
  73
 
  74
 
  75
 
  76
 
  77
 
  78
 
  79
 
  80
 
  81
 
  82
 
  83
 
  84
 
  85
 
  86
 
  87
 
  88
 
  89
 
  90
 
  91
 
  92
 
  93
 
  94
 
  95
 
  96
 
  97
 
  98
 
  99
 
  100
 
  101
 
  102
 
  103
 
  104
 
  105
 
  106
 
  107
 
  108
 
  109
 
  110
 
  111
 
  112
 
  113
 
  114
 
  115
 
  116
 
  117
 
  118
 
  119
 
  120
 
  121
 
  122
 
  123
 
  124
 
  125
 
  126
 
  127
 
  128
 
  129
 
  130
 
  131
 
  132
 
  133
 
  134
 
  135
 
  136
 
  137
 
  138
 
  139
 
  140
 
  141
 
  142
 
  143
 
  144
 
  145
 
  146
 
  147
 
  148
 
  149
 
  150
 
  151
 
  152
 
  153
 
  154
 
  155
 
  156
 
  157
 
  158
 
  159
 
  160
 
  161
 
  162
 
  163
 
  164
 
  165
 
  166
 
  167
 
  168
 
  169
 
  
#ifndef INCLUDED_COUNTDOWN_EVENT_H
 
  #define INCLUDED_COUNTDOWN_EVENT_H
 
   
 
  #include "ensure.h++"
 
  #include "mailbox_cmds.h++"
 
  #include "static_assert.h++"
 
  #include <stddef.h>
 
  #include <stdint.h>
 
   
 
  #ifdef __PPU__
 
  # include <ppu_intrinsics.h>
 
  # include <semaphore.h>
 
  #elif defined __SPU__
 
  # include <spu_intrinsics.h>
 
  # include <spu_mfcio.h>
 
  #endif
 
   
 
  ////////////////////////////////////////////////////////////////////////////////
 
  class countdown_event{
 
  public:
 
   
 
  #  ifdef __PPU__
 
   
 
      /**Initialize countdown_event
 
       * @note  Purposely not done in constructor to
 
       *        simplify use as global variable
 
       */
 
      inline void init();
 
   
 
      /**Free resources*/
 
      inline void destroy();
 
   
 
      /**Increment counter
 
       * @param num     value to add to counter (default 1)
 
       */
 
      inline void inc(unsigned num=1);
 
   
 
      /**Decrement counter, waking any waiter if count goes to zero*/
 
      inline void dec();
 
   
 
      /**Wait till counter has returned to zero*/
 
      inline void wait();
 
   
 
      /**PPU specific function for handling interrupts from an SPU*/
 
      inline void ppu_signal();
 
   
 
  # elif defined __SPU__
 
   
 
      /**Decrement counter, waking any waiter if count goes to zero
 
       * @warning     interrupts must be disabled if they are being used
 
       * @param ea    effective address of countdown_event
 
       */
 
      static inline void dec(uint32_t ea);
 
   
 
  # endif
 
   
 
  private:
 
    volatile uint32_t count;
 
    enum { WAITER = 0x80000000 };
 
   
 
  # ifndef __SPU__
 
      // Semaphore not available on spu, so sizeof(countdown_event) invalid.
 
      sem_t sem;
 
  # endif
 
  }
 
  __attribute__((__aligned__(128)));
 
   
 
   
 
  #ifdef __PPU__ /////////////////////////////////////////////////////////////////
 
   
 
    //////////////////////////////////////////////////////////////////////////////
 
    inline void countdown_event::init(){
 
      ENSURE(sem_init(&sem,0,0)==0);
 
      count=0;
 
    }
 
   
 
    //////////////////////////////////////////////////////////////////////////////
 
    inline void countdown_event::destroy(){
 
      ENSURE(sem_destroy(&sem)==0);
 
    }
 
   
 
    //////////////////////////////////////////////////////////////////////////////
 
    inline void countdown_event::inc(unsigned num){
 
   
 
      // Atomically increment counter
 
      uint32_t prev;
 
      do{
 
        prev=__lwarx(&count);
 
      }while(__builtin_expect(!__stwcx(&count,prev+num),0));
 
    }
 
   
 
    //////////////////////////////////////////////////////////////////////////////
 
    inline void countdown_event::dec(){
 
   
 
      // Atomically decrement counter
 
      uint32_t prev;
 
      do{
 
        prev=__lwarx(&count);
 
      }while(__builtin_expect(!__stwcx(&count,prev-1),0));
 
   
 
      // If we are the last decrementer, and there is a waiter, wake it up
 
      if(__builtin_expect(prev==WAITER+1,0)){
 
        ENSURE(sem_post(&sem)==0);
 
      }
 
    }
 
   
 
    //////////////////////////////////////////////////////////////////////////////
 
    inline void countdown_event::wait(){
 
   
 
      // Fast path don't wait if counter already zero.  This test is just an
 
      // optimization, it is not required for correctness.
 
      if(__builtin_expect(count,0)){
 
   
 
        // Atomically or in the waiter flag
 
        uint32_t prev;
 
        do{
 
          prev=__lwarx(&count);
 
        }while(__builtin_expect(!__stwcx(&count,prev|WAITER),0));
 
   
 
        // If we set the waiter flag before the last decrement, then wait.  It is
 
        // important that we do not wait if the counter was already zero.  The
 
        // optimization check at the top of the function is not sufficient, we
 
        // need to check the previous value from the atomic or.
 
        if(prev){
 
          ENSURE(sem_wait(&sem)==0);
 
        }
 
   
 
        // Clear waiter flag
 
        count=0;
 
      }
 
    }
 
   
 
    //////////////////////////////////////////////////////////////////////////////
 
    inline void countdown_event::ppu_signal(){
 
      ENSURE(sem_post(&sem)==0);
 
    }
 
   
 
  #elif defined __SPU__ //////////////////////////////////////////////////////////
 
   
 
    //////////////////////////////////////////////////////////////////////////////
 
    /*static*/ inline void countdown_event::dec(uint32_t ea){
 
      static volatile uint32_t ls[32] __attribute__((__aligned__(128)));
 
      uint32_t prev;
 
      do{
 
        // Get and reserve cache line
 
        spu_writech(MFC_EAL,ea);
 
        spu_writech(MFC_LSA,(uint32_t)ls);
 
        spu_writech(MFC_Cmd,MFC_GETLLAR_CMD);
 
        spu_readch(MFC_RdAtomicStat);
 
   
 
        // Decrement and put conditional
 
        STATIC_ASSERT(offsetof(countdown_event,count)==0);
 
        prev=*ls;
 
        *ls=prev-1;
 
        spu_writech(MFC_EAL,ea);
 
        spu_writech(MFC_LSA,(uint32_t)ls);
 
        spu_writech(MFC_Cmd,MFC_PUTLLC_CMD);
 
      }while(__builtin_expect(spu_readch(MFC_RdAtomicStat),0));
 
   
 
      // Wake PPU if it is waiting
 
      if(__builtin_expect(prev==WAITER+1,0)){
 
        spu_writech(SPU_WrOutMbox,ea);
 
        spu_writech(SPU_WrOutIntrMbox,MAILBOX_CMD_COUNTDOWN_EVENT_SIGNAL);
 
      }
 
    }
 
   
 
  #endif
 
   
 
  #endif // INCLUDED_COUNTDOWN_EVENT_H

Notice we have added a new member function, ppu_signal(). This is called by the PPU interrupt mailbox handler. The SPU implementation of dec() writes the effective address of the countdown_event object to the outbound non-interrupt mailbox, then writes a command number to the outbound interrupt mailbox. Using a command number like this allows the single outbound interrupt mailbox to be multiplexed for different purposes.

The PPU interrupt mailbox handler is the only remaining piece to the Playstation3 countdown event implementation. This is handled inside the function event_handler_thread_func() (main.ppu.c++). Other than that, the remaining code is all just a test setup for countdown event.

Despite just being “test” code, think it is still interesting enough to be posted here in full. Since this runs on GNU/Linux rather than the lv2 OS, the method for controlling the SPEs via the PPU may look unfamiliar, it uses the libspe2 interface.

The main loop inside of main() pushes job descriptor structs into a queue that are then executed by the SPUs. Each job is a simple delay (with a random time specified by the PPU), then a decrement on the countdown event.

main.ppu.c++

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  16
 
  17
 
  18
 
  19
 
  20
 
  21
 
  22
 
  23
 
  24
 
  25
 
  26
 
  27
 
  28
 
  29
 
  30
 
  31
 
  32
 
  33
 
  34
 
  35
 
  36
 
  37
 
  38
 
  39
 
  40
 
  41
 
  42
 
  43
 
  44
 
  45
 
  46
 
  47
 
  48
 
  49
 
  50
 
  51
 
  52
 
  53
 
  54
 
  55
 
  56
 
  57
 
  58
 
  59
 
  60
 
  61
 
  62
 
  63
 
  64
 
  65
 
  66
 
  67
 
  68
 
  69
 
  70
 
  71
 
  72
 
  73
 
  74
 
  75
 
  76
 
  77
 
  78
 
  79
 
  80
 
  81
 
  82
 
  83
 
  84
 
  85
 
  86
 
  87
 
  88
 
  89
 
  90
 
  91
 
  92
 
  93
 
  94
 
  95
 
  96
 
  97
 
  98
 
  99
 
  100
 
  101
 
  102
 
  103
 
  104
 
  105
 
  106
 
  107
 
  108
 
  109
 
  110
 
  111
 
  112
 
  113
 
  114
 
  115
 
  116
 
  117
 
  118
 
  119
 
  120
 
  121
 
  122
 
  123
 
  124
 
  125
 
  126
 
  127
 
  128
 
  129
 
  130
 
  131
 
  132
 
  133
 
  134
 
  135
 
  136
 
  137
 
  138
 
  139
 
  140
 
  141
 
  142
 
  143
 
  144
 
  145
 
  146
 
  147
 
  148
 
  149
 
  150
 
  151
 
  152
 
  153
 
  154
 
  155
 
  156
 
  157
 
  158
 
  159
 
  160
 
  161
 
  162
 
  163
 
  164
 
  165
 
  166
 
  167
 
  168
 
  169
 
  170
 
  171
 
  172
 
  173
 
  174
 
  175
 
  176
 
  177
 
  178
 
  179
 
  180
 
  181
 
  182
 
  183
 
  184
 
  185
 
  186
 
  187
 
  188
 
  189
 
  190
 
  191
 
  192
 
  193
 
  194
 
  195
 
  196
 
  197
 
  198
 
  199
 
  200
 
  201
 
  202
 
  203
 
  204
 
  205
 
  206
 
  207
 
  208
 
  209
 
  210
 
  211
 
  212
 
  213
 
  214
 
  215
 
  216
 
  217
 
  218
 
  219
 
  220
 
  221
 
  222
 
  223
 
  224
 
  225
 
  226
 
  227
 
  228
 
  229
 
  230
 
  231
 
  232
 
  233
 
  234
 
  235
 
  236
 
  237
 
  238
 
  239
 
  240
 
  241
 
  242
 
  243
 
  244
 
  
#include "countdown_event.h++"
 
  #include "ensure.h++"
 
  #include "mailbox_cmds.h++"
 
  #include "static_assert.h++"
 
  #include "stop_cmds.h++"
 
  #include "test_job.h++"
 
  #include <libspe2.h>
 
  #include <pthread.h>
 
  #include <semaphore.h>
 
  #include <stdint.h>
 
  #include <stdio.h>
 
  #include <stdlib.h>
 
  #include <unistd.h>
 
   
 
  #define NUM_SPES 6
 
   
 
  // SPU ELF is converted to this program handle in Makefile
 
  extern spe_program_handle_t g_spu_code;
 
   
 
  // Data per SPE
 
  struct spe_data{
 
    spe_context_ptr_t ctx;
 
    spe_event_unit_t  event;
 
    pthread_t         run_thread;
 
    uint32_t          id;
 
  };
 
   
 
  // Global variables
 
  static spe_event_handler_ptr_t g_event_handler;
 
  static volatile bool g_stop_event_handler_thread/*=false*/;
 
  static volatile test_job_data_queue g_queue;
 
  static sem_t g_spus_started_semaphore;
 
   
 
  ////////////////////////////////////////////////////////////////////////////////
 
  // Thread function to handle SPE events.  One thread handles all SPEs.  The only
 
  // event we look for here is the outbound interrupt mailbox.  This is where we
 
  // perform the signaling of a countdown event to wake a PPU thread.
 
  static void* event_handler_thread_func(void*){
 
   
 
    // Loop until a flag has been set to tell us to exit.
 
    while(__builtin_expect(!g_stop_event_handler_thread,1)){
 
   
 
      // Get the next SPE event.  Use a timeout so that we can poll the exit flag.
 
      spe_event_unit_t e;
 
      const int max_events=1;
 
      const int timeout_ms=100;
 
      int num;
 
      ENSURE((num=spe_event_wait(g_event_handler,&e,max_events,timeout_ms))!=-1);
 
   
 
      // If we got an event, handle it.
 
      if(__builtin_expect(num>0,1)){
 
   
 
        // Switch on the event type
 
        switch(e.events){
 
   
 
          // Outbound interrupt mailbox
 
          case SPE_EVENT_OUT_INTR_MBOX:{
 
   
 
            // The 32-bit value passed through the mail box is treated as a
 
            // command code
 
            uint32_t cmd;
 
            ENSURE(spe_out_intr_mbox_read(e.spe,&cmd,1,SPE_MBOX_ALL_BLOCKING)==1);
 
            switch(cmd){
 
   
 
              // Notification that the SPU has started executing
 
              case MAILBOX_CMD_SPU_STARTED:
 
                ENSURE(sem_post(&g_spus_started_semaphore)==0);
 
                break;
 
   
 
              // Command to wake a PPU thread that was waiting on a countdown
 
              // event
 
              case MAILBOX_CMD_COUNTDOWN_EVENT_SIGNAL:{
 
   
 
                // Read the effective address of the countdown event from the
 
                // non-interrupt outbound mailbox
 
                uint32_t countdown;
 
                ENSURE(spe_out_mbox_read(e.spe,&countdown,1)==1);
 
                STATIC_ASSERT(sizeof(void*)==4);
 
   
 
                // Signal countdown event to wake sleeping PPU thread
 
                ((countdown_event*)countdown)->ppu_signal();
 
                break;
 
              }
 
            }
 
            break;
 
          }
 
        }
 
      }
 
    }
 
    return NULL;
 
  }
 
   
 
  ////////////////////////////////////////////////////////////////////////////////
 
  // Thread function to handle running of SPEs.  One thread per SPE.
 
  static void* spu_run_thread_func(void* user_data){
 
    spe_data* spe=(spe_data*)user_data;
 
    spe_context_ptr_t ctx=spe->ctx;
 
   
 
    // Initial arguments for SPU execution.  The three qwords in argp are passed
 
    // to the SPU in registers $3, $4 and $5.
 
    unsigned npc=SPE_DEFAULT_ENTRY;
 
    unsigned runflags=SPE_RUN_USER_REGS|SPE_NO_CALLBACKS;
 
    STATIC_ASSERT(sizeof(void*)==4);
 
    uint32_t argp[12] __attribute__((__aligned__(16)))={
 
     spe->id,0,0,0, (uint32_t)&g_queue,0,0,0, 0,0,0,0};
 
    void* const envp=NULL;
 
   
 
    for(;;){
 
   
 
      spe_stop_info_t info;
 
      ENSURE(spe_context_run(ctx,&npc,runflags,argp,envp,&info)>=0);
 
   
 
      // Only expecting the SPE to stop execution via a STOP or STOPD instruction
 
      ENSURE((info.spu_status&0xffff)==2);
 
   
 
      // Use stop code as a command identifier
 
      const uint32_t stop_code=info.spu_status>>16;
 
      switch(stop_code){
 
   
 
        // SPU program code exitted
 
        case STOP_CMD_EXIT:
 
          ENSURE(spe_event_handler_deregister(g_event_handler,&spe->event)==0);
 
          return NULL;
 
      }
 
   
 
      runflags&=~SPE_RUN_USER_REGS;
 
    }
 
  }
 
   
 
  ////////////////////////////////////////////////////////////////////////////////
 
  int main(){
 
   
 
    // Setup SPE event handler thread
 
    g_event_handler=spe_event_handler_create();
 
    pthread_attr_t thread_attr;
 
    ENSURE(pthread_attr_init(&thread_attr)==0);
 
    ENSURE(pthread_attr_setdetachstate(&thread_attr,PTHREAD_CREATE_JOINABLE)==0);
 
    pthread_t event_handler_thread;
 
    ENSURE(pthread_create(&event_handler_thread,&thread_attr,
 
     event_handler_thread_func,NULL)==0);
 
   
 
    // Initialize job queue to be run.  The lwsync ensures queue initialization
 
    // goes to memory before SPUs start.
 
    g_queue.next=0;
 
    g_queue.end=0;
 
    __lwsync();
 
   
 
    // Start SPEs
 
    spe_data spe[NUM_SPES];
 
    const unsigned create_flags=SPE_EVENTS_ENABLE;
 
    const spe_gang_context_ptr_t gang=NULL;
 
    ENSURE(sem_init(&g_spus_started_semaphore,0,0)==0);
 
    for(unsigned i=0;i<NUM_SPES;++i){
 
   
 
      spe[i].id=i;
 
   
 
      // Create SPE context
 
      ENSURE((spe[i].ctx=spe_context_create(create_flags,gang))!=NULL);
 
   
 
      // Register context with event handler
 
      spe[i].event.events=SPE_EVENT_OUT_INTR_MBOX;
 
      spe[i].event.spe=spe[i].ctx;
 
      spe[i].event.data.u64=0;
 
      ENSURE(spe_event_handler_register(g_event_handler,&spe[i].event)==0);
 
   
 
      // Load code and create thread to run SPU
 
      ENSURE(spe_program_load(spe[i].ctx,&g_spu_code)==0);
 
      ENSURE(pthread_create(&spe[i].run_thread,&thread_attr,spu_run_thread_func,
 
       spe+i)==0);
 
    }
 
   
 
    // Wait until all SPUs have started running.  This prevents problems where the
 
    // PPU thread executing spu_run_thread_func() is not scheduled.
 
    for(unsigned i=0;i<NUM_SPES;++i){
 
      ENSURE(sem_wait(&g_spus_started_semaphore)==0);
 
    }
 
    ENSURE(sem_destroy(&g_spus_started_semaphore)==0);
 
   
 
    // Run countdown event tests
 
    srand(0xdecafbad);
 
    static countdown_event ce;
 
    ce.init();
 
    for(unsigned i=0;i<128;++i){
 
   
 
      // Increment countdown event, and kick SPU jobs that will decrement it.
 
      for(unsigned j=0;j<test_job_data_queue::MAX_JOBS;++j){
 
   
 
        // Increment countdown event.  To prevent the SPUs ever decrementing the
 
        // counter to negative, it is important that this is done before the job
 
        // is made executable.
 
        ce.inc();
 
   
 
        // Fill in job structure for SPU code.
 
        volatile test_job_data* job=g_queue.jobs+j;
 
        STATIC_ASSERT(sizeof(void*)==4);
 
        job->counter_ea=(uint32_t)&ce;
 
        job->delay=rand()&0xfffff;
 
   
 
        // Ensure counter has been incremented and job fully writen to memory
 
        // before we make the job executable.
 
        __lwsync();
 
   
 
        // Allow an SPU to run this job.
 
        g_queue.end=j+1;
 
      }
 
   
 
      // Wait a random amount of time before we wait on the countdown event
 
      usleep(rand()&0xffff);
 
   
 
      // Wait until all the SPU jobs have completed and decremented the counter
 
      // back to zero.
 
      ce.wait();
 
   
 
      // Reset the job queue.  The lwsync ensures that end is reset before next,
 
      // preventing SPUs from re-running old jobs.
 
      g_queue.end=0;
 
      __lwsync();
 
      g_queue.next=0;
 
    }
 
    ce.destroy();
 
   
 
    // Queue jobs with a countdown event effective address of zero.  This is used
 
    // to tell each SPU that we are done, and it should now stop.
 
    for(unsigned i=0;i<NUM_SPES;++i){
 
      volatile test_job_data* job=g_queue.jobs+i;
 
      job->counter_ea=0;
 
      job->delay=0;
 
      ce.inc();
 
      __lwsync();
 
      g_queue.end=i+1;
 
    }
 
   
 
    // Clean up
 
    void* thread_ret;
 
    for(unsigned i=0;i<NUM_SPES;++i){
 
      ENSURE(pthread_join(spe[i].run_thread,&thread_ret)==0);
 
      ENSURE(spe_context_destroy(spe[i].ctx)==0);
 
    }
 
    g_stop_event_handler_thread=true;
 
    ENSURE(pthread_join(event_handler_thread,&thread_ret)==0);
 
    ENSURE(spe_event_handler_destroy(g_event_handler)==0);
 
   
 
    return 0;
 
  }

The data structure for the jobs is pretty simple,

test_job.h++

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  16
 
  17
 
  18
 
  19
 
  20
 
  
#ifndef INCLUDED_TEST_JOB_H
 
  #define INCLUDED_TEST_JOB_H
 
   
 
  #include <stdint.h>
 
   
 
  struct test_job_data{
 
    uint32_t counter_ea;
 
    uint32_t delay;
 
  }
 
  __attribute__((__aligned__(16)));
 
   
 
  struct test_job_data_queue{
 
    enum{MAX_JOBS=8};
 
    uint32_t      next;
 
    uint32_t      end;
 
    test_job_data jobs[MAX_JOBS];
 
  }
 
  __attribute__((__aligned__(128)));
 
   
 
  #endif // INCLUDED_TEST_JOB_H

The SPU side code is a loop where it continously fetches a new job to execute.

test_job.spu.c++

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  16
 
  17
 
  18
 
  19
 
  20
 
  21
 
  22
 
  23
 
  24
 
  25
 
  26
 
  27
 
  28
 
  29
 
  30
 
  31
 
  32
 
  33
 
  34
 
  35
 
  36
 
  37
 
  38
 
  39
 
  40
 
  41
 
  42
 
  43
 
  44
 
  45
 
  46
 
  47
 
  48
 
  49
 
  50
 
  51
 
  52
 
  53
 
  54
 
  55
 
  56
 
  57
 
  58
 
  59
 
  60
 
  61
 
  62
 
  63
 
  64
 
  65
 
  66
 
  67
 
  68
 
  
#include "test_job.h++"
 
  #include "countdown_event.h++"
 
  #include <spu_intrinsics.h>
 
  #include <spu_mfcio.h>
 
   
 
  #define DECREMENTER_EVENT_MASK  0x20
 
   
 
  ////////////////////////////////////////////////////////////////////////////////
 
  extern "C" void job_entry_point(uint32_t,uint32_t);
 
  void job_entry_point(uint32_t spu_id,uint32_t queue_ea){
 
    (void) spu_id;
 
   
 
    // Enable decrementer event
 
    spu_writech(SPU_WrDec,0);
 
    spu_writech(SPU_WrEventMask,DECREMENTER_EVENT_MASK);
 
   
 
    for(;;){
 
   
 
      // Get the job queue indices, and loop till there is a queued job
 
      static char buf[128] __attribute__((__aligned__(128)));
 
      spu_writech(MFC_LSA,(uint32_t)buf);
 
      spu_writech(MFC_EAL,queue_ea);
 
      spu_writech(MFC_Cmd,MFC_GETLLAR_CMD);
 
      spu_readch(MFC_RdAtomicStat);
 
      volatile test_job_data_queue* queue=(test_job_data_queue*)buf;
 
      uint32_t next=queue->next;
 
      uint32_t end=queue->end;
 
      if(next>=end)continue;
 
   
 
      // Atomically attempt to claim job
 
      queue->next=next+1;
 
      spu_writech(MFC_LSA,(uint32_t)buf);
 
      spu_writech(MFC_EAL,queue_ea);
 
      spu_writech(MFC_Cmd,MFC_PUTLLC_CMD);
 
      if(spu_readch(MFC_RdAtomicStat)!=0)continue;
 
   
 
      // Get the job
 
      static volatile test_job_data job;
 
      const uint32_t tag=0;
 
      spu_writech(MFC_LSA,(uint32_t)&job);
 
      spu_writech(MFC_EAL,
 
       (uint32_t)(((test_job_data_queue*)queue_ea)->jobs+next));
 
      spu_writech(MFC_Size,sizeof(job));
 
      spu_writech(MFC_TagID,tag);
 
      spu_writech(MFC_Cmd,MFC_GET_CMD);
 
   
 
      // Wait for dma completion
 
      spu_writech(MFC_WrTagMask,1<<tag);
 
      spu_writech(MFC_WrTagUpdate,MFC_TAG_UPDATE_ANY);
 
      spu_readch(MFC_RdTagStat);
 
   
 
      // Zero counter ea indicates end of tests
 
      const uint32_t counter=job.counter_ea;
 
      if(!counter)return;
 
   
 
      // Delay specified number of cycles
 
      if(spu_readchcnt(SPU_RdEventStat)){
 
        spu_readch(SPU_RdEventStat);
 
        spu_writech(SPU_WrEventAck,DECREMENTER_EVENT_MASK);
 
      }
 
      spu_writech(SPU_WrDec,job.delay);
 
      spu_readch(SPU_RdEventStat);
 
      spu_writech(SPU_WrEventAck,DECREMENTER_EVENT_MASK);
 
   
 
      // Decrement countdown event
 
      countdown_event::dec(counter);
 
    }
 
  }

With a small assembly language wrapper for the entry point,

entry_point.spu.s

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  16
 
  17
 
  18
 
  19
 
  20
 
  21
 
  22
 
  23
 
  24
 
  25
 
  26
 
  27
 
  28
 
  29
 
  30
 
  31
 
  32
 
  33
 
  34
 
  
#       include     "mailbox_cmds.h++"
 
  #       include     "stop_cmds.h++"
 
   
 
          .extern     job_entry_point
 
   
 
          .section    .entry_point,"ax",@progbits
 
   
 
          // input
 
          // $3        spu id
 
          // $4        test_job_data_queue ea
 
          .global     entry_point
 
          .type       entry_point,@function
 
  entry_point:
 
   
 
          // Tell the PPU we have started
 
          ilhu        $6,MAILBOX_CMD_SPU_STARTED>>16
 
          iohl        $6,MAILBOX_CMD_SPU_STARTED&0xffff
 
          wrch        $SPU_WrOutIntrMbox,$6
 
   
 
          // Initialize 16KB stack
 
          il          $0,0
 
          il          $1,-16
 
          stqd        $0,0($1)
 
          il          $2,0x3ff0
 
          fsmbi       $6,0x0fff
 
          selb        $1,$1,$2,$6
 
   
 
          // Call to C++ code
 
          brsl        $0,job_entry_point
 
   
 
          // Done
 
          stop        STOP_CMD_EXIT
 
   
 
          .size       entry_point,.-entry_point

And the last few bits and pieces,

ensure.h++

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  16
 
  17
 
  
#ifndef INCLUDED_ENSURE_H
 
  #define INCLUDED_ENSURE_H
 
   
 
  #include <errno.h>
 
  #include <stdio.h>
 
  #include <stdlib.h>
 
   
 
  // Like an assert, but still functional in release builds
 
  #define ENSURE(EXP)                                                            \
 
  do{                                                                            \
 
    if(__builtin_expect(!(EXP),0)){                                              \
 
      fprintf(stderr,"FAILED: "#EXP"\nerrno 0x%08x\n",errno);                    \
 
      abort();                                                                   \
 
    }                                                                            \
 
  }while(0)
 
   
 
  #endif // INCLUDED_ENSURE_H

static_assert.h++

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  
#ifndef INCLUDED_STATIC_ASSERT_H
 
  #define INCLUDED_STATIC_ASSERT_H
 
   
 
  // Macro for compile time assert
 
  template<bool B> struct static_assert_struct;
 
  template<> struct static_assert_struct<true>{};
 
  #define STATIC_ASSERT(EXP) (void)sizeof(static_assert_struct<(EXP)>)
 
   
 
  #endif // INCLUDED_STATIC_ASSERT_H

mailbox_cmds.h++

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  
#ifndef INCLUDED_MAILBOX_CMDS_H
 
  #define INCLUDED_MAILBOX_CMDS_H
 
   
 
  // Global list of commands that may be sent through the interrupt mailboxes
 
   
 
  // Sent when the SPU code first starts executing.  This is used for blocking the
 
  // main PPU thread until all SPUs are running, otherwise PPU thread scheduling
 
  // may leave an SPU idle.
 
  #define MAILBOX_CMD_SPU_STARTED                     0xffff0000
 
   
 
  // Send when an SPU decrements a countdown event to zero, and there is a waiting
 
  // PPU thread.  This allows the PPU thread to be woken.
 
  #define MAILBOX_CMD_COUNTDOWN_EVENT_SIGNAL          0xffff0001
 
   
 
  #endif // INCLUDED_MAILBOX_CMDS_H

stop_cmds.h++

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  
#ifndef INCLUDED_STOP_CMDS_H
 
  #define INCLUDED_STOP_CMDS_H
 
   
 
  // Global list of commands that may be sent through stop and signal instructions
 
   
 
  // When a SPU has finished executing.
 
  #define STOP_CMD_EXIT       0
 
   
 
  #endif // INCLUDED_STOP_CMDS_H

Makefile

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  16
 
  17
 
  18
 
  19
 
  20
 
  21
 
  22
 
  23
 
  24
 
  25
 
  26
 
  27
 
  28
 
  29
 
  30
 
  31
 
  32
 
  33
 
  34
 
  35
 
  36
 
  37
 
  38
 
  39
 
  40
 
  41
 
  42
 
  43
 
  44
 
  45
 
  46
 
  47
 
  48
 
  49
 
  50
 
  51
 
  52
 
  53
 
  54
 
  
.SILENT :
 
   
 
  target          := countdown_event_test.elf
 
   
 
  ppu_srcs        := $(wildcard *.ppu.c++)
 
  ppu_objs        := $(addsuffix .o,$(basename $(ppu_srcs)))
 
  ppu_deps        := $(addsuffix .d,$(basename $(ppu_srcs)))
 
   
 
  spu_cxxsrcs     := $(wildcard *.spu.c++)
 
  spu_ssrcs       := $(wildcard *.spu.s)
 
  spu_cxxobjs     := $(addsuffix .o,$(basename $(spu_cxxsrcs)))
 
  spu_sobjs       := $(addsuffix .o,$(basename $(spu_ssrcs)))
 
  spu_objs        := $(spu_cxxobjs) $(spu_sobjs)
 
  spu_deps        := $(addsuffix .d,$(basename $(spu_objs)))
 
  spu_elf         := spu.elf
 
  spu_bin         := spu.bin
 
  spu_bin_ppu_obj := spu.bin.ppu.o
 
  spu_link_ld     := spu.ld
 
   
 
  all : $(target)
 
   
 
  -include $(ppu_deps) $(spu_deps)
 
   
 
  $(ppu_objs) : %.o : %.c++
 
      ppu-g++ -c -O3 -m32 -ffunction-sections -fdata-sections -fno-exceptions    \
 
          -fno-rtti -ggdb -MD -MP -pthread -std=gnu++98 -Wall -Werror -Wextra    \
 
          -Winline -Wshadow $< -o $@
 
   
 
  $(spu_cxxobjs) : %.o : %.c++
 
      spu-g++ -c -O3 -ffunction-sections -fdata-sections -fno-exceptions         \
 
          -fno-rtti -g -std=gnu++98 -MD -MP -Wall -Werror -Wextra -Wshadow       \
 
          -Wno-invalid-offsetof $< -o $@
 
   
 
  $(spu_sobjs) : %.o : %.s
 
      spu-gcc -c -MD -g -xassembler-with-cpp $< -o $@
 
   
 
  $(spu_elf) : $(spu_objs) $(spu_link_ld)
 
      spu-ld --fatal-warnings --gc-sections -T $(spu_link_ld) $(spu_objs) -o $@
 
   
 
  $(spu_bin_ppu_obj) : $(spu_elf)
 
      ppu-embedspu -m32 g_spu_code $< $@
 
   
 
  $(target) : $(ppu_objs) $(spu_bin_ppu_obj)
 
      ppu-g++ -Wl,--fatal-warnings -Wl,--gc-sections -m32 -lspe2 -pthread $+ -o $@
 
   
 
  .PHONY : clean
 
  clean :
 
      rm -f $(ppu_objs) $(ppu_deps) $(spu_objs) $(spu_deps) $(spu_elf)           \
 
          $(spu_bin) $(spu_bin_ppu_obj)
 
   
 
  .PHONY : run
 
  run : $(target)
 
      scp $< ps3linux:/tmp/$(notdir $<)
 
      ssh ps3linux /tmp/$(notdir $<)

spu.ld

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  
SECTIONS
 
  {
 
    ENTRY(entry_point)
 
    . = 0;
 
    .text   : { *(.entry_point) *(.text) *(.text.*) }
 
    .rodata : { *(.rodata) *(.rodata.*) }
 
    .data   : { *(.data) *(.data.*) }
 
    .bss    : { *(.bss) *(.bss.*) }
 
  }

sem_post

That’s it. If you just skimmed this article, don’t let the large amount of code scare you off. Most of it was test code which contained interesting PPU/SPU communications stuff, but its not part of the countdown event implementation itself.

A countdown event is quite a handy synchronization primitive to have in your bag of tricks, hope you find this useful.