The best of the best, or the lowest of the low ?
Given n items, finding the “best” k of them is a common problem in games. This could be the k closest enemies, the k most important textures to be streamed next, the k least important entities to unload, or many other problems. Here we look at a neat algorithm for doing this in O(n·log(k)) time, and O(k) memory, the “Limited Heap”. O(n·log(k)) time is good, though not necessarily the best possible, but the O(k) storage is better than just “ok”, its very good.
Evgeniy Gabrilovich and Alex Gontmakher, first presented this algorithm in the Dr Dobb’s Journal article, “Heap Ltd.”, June 2003 (// Build heap
for each item in input_items
if not heap full
add item to heap
else if max item in heap > item
pop heap
push item into heap
// Convert heap into sorted array
k = num items in heap
for i=1; i<k; ++i
output[k-i-1] = pop heap
Thats the crux of it. Simple, effective, elegant.
In More Detail
While the previous pseudo code can be implemented using the standard heap operations, we can get a good optimization by creating custom functions. Won’t go into all the details of how a standard binary heap words, as there is lots of really good explanations elsewhere (// elem: element to be inserted into heap
// array: array containing heap (zero based indexing)
// size: previous number of elements in heap
max_heap_insert(elem,array,size){
// Move pointer so array can be indexed starting from one
--array;
// Loop till invariant satisfied
child=size+1;
parent=child>>1;
while(child>1){
// Invariant satisfied ?
if(array[parent]>=elem)break;
// Swap with parent
array[child]=array[parent];
child=parent;
parent>>=1;
}
// Store new element
array[child]=elem;
}
To remove the top item of the heap, we take the last item in the array and overwrite the first, reducing the size of the array by one. We then move the item from the start of the array towards the end until the heap invariant is restored.
// array: array containing heap (zero based indexing)
// size: previous number of elements in heap
max_heap_pop(array,size){
// Reduce the size of the heap
--size;
// Element to replace popped element, then pushed down the heap
elem=array[size];
// Move pointer so array can be indexed starting from one
--array;
// Loop while both children valid
parent=1;
child=2;
while(child<size){
// Select the max of the two children
child+=array[child+1]>array[child];
// Invariant satisfied ?
if(elem>=array[child]){
array[parent]=elem;
return;
}
// Swap with the max child
array[parent]=array[child];
parent=child;
child<<=1;
}
// Check final child if any
if(child==size){
if(array[child]>elem){
array[parent]=array[child];
parent=child;
}
}
// Store element that was moved from the end
array[parent]=elem;
}
For implementing a limited heap, rather than doing a pop followed by an insert, we can merge the two operations together. A pop followed by an insert, reduces the heap size by one, fixes up the heap, then increases the size by one, and fixes up the heap again. The key observation here, is that instead we can simply replace the current top of the heap with the new item, and fix up the just heap once.
// elem: element to be inserted into heap
// array: array containing heap (zero based indexing)
// size: number of elements in heap
max_heap_pop_insert(elem,array,size){
// Move pointer so array can be indexed starting from one
--array;
// Loop while both children valid
parent=1;
child=2;
while(child<size){
// Select the max of the two children
child+=array[child+1]>array[child];
// Invariant satisfied ?
if(elem>=array[child]){
array[parent]=elem;
return;
}
// Swap with the max child
array[parent]=array[child];
parent=child;
child<<=1;
}
// Check final child if any
if(child==size){
if(array[child]>elem){
array[parent]=array[child];
parent=child;
}
}
// Store new element
array[parent]=elem;
}
You may have noticed that unlike most psuedo code for binary heaps, we have stored the element that is being moved up or down the heap in a separate temporary variable, as opposed to storing it directly in the array. Only at the end of the functions, is it finally written into the correct spot. While this is only pseudo code, it is also good practice to implement it this way, as it avoids the dreaded load-hit-store problem on architectures that don’t have store-to-load forwarding (eg. the current generation consoles).
A Full Implementation
Finally, some real code. Here is a binary heap implementation, including limited heap functions.
heap.h++
#ifndef INCLUDED_HEAP_H
#define INCLUDED_HEAP_H
#include <assert.h>
////////////////////////////////////////////////////////////////////////////////
template<class TYPE>
struct cmp_lt{
inline bool operator()(const TYPE& lhs,const TYPE& rhs)const{
return lhs<rhs;
}
};
////////////////////////////////////////////////////////////////////////////////
template<class TYPE>
struct cmp_gt{
inline bool operator()(const TYPE& lhs,const TYPE& rhs)const{
return lhs>rhs;
}
};
////////////////////////////////////////////////////////////////////////////////
// Helper to reverse the arguments of a comparitor type.
template<class CMP,class TYPE>
struct reverse_cmp{
inline bool operator()(const TYPE& lhs,const TYPE& rhs)const{
return CMP()(rhs,lhs);
}
};
////////////////////////////////////////////////////////////////////////////////
// Check if invariant satisfied for sub heap rooted at index.
// Indices start from 1.
template<class CMP,class TYPE>
bool heap_is_valid(unsigned index,const TYPE* array,unsigned size){
const TYPE* const rebased_array=array-1;
unsigned parent=index;
unsigned child=parent<<1;
for(unsigned i=0;i<2;++i){
if(child>size)return true;
if(CMP()(rebased_array[child],rebased_array[parent]))return false;
if(!heap_is_valid<CMP>(child,array,size))return false;
++child;
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
// Check if invariant satisfied for entire heap.
template<class CMP,class TYPE>
bool heap_is_valid(const TYPE* array,unsigned size){
--array;
for(unsigned child=size;child>2;--child){
const unsigned parent=child>>1;
if(CMP()(array[child],array[parent]))return false;
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
// Move position at index down towards the bottom of the heap until invariant
// satisfied, and then store elem there.
// Indices start from 1.
template<class CMP,class TYPE>
void heap_sift_down(TYPE elem,unsigned index,TYPE* array,unsigned size){
// Move pointer so array can be indexed starting from one
--array;
// Loop while all children valid
unsigned parent=index;
unsigned child=parent<<1;
while(child<size){
// Select the most important of the two children
child+=CMP()(array[child+1],array[child]);
// Invariant satisfied ?
if(!CMP()(array[child],elem)){
array[parent]=elem;
return;
}
// Swap with the most important child
array[parent]=array[child];
parent=child;
child<<=1;
}
// Check final child if any. Note that this is impossible if size=2^n-1.
if(child<=size){
if(CMP()(array[child],elem)){
array[parent]=array[child];
parent=child;
}
}
// Store new element
array[parent]=elem;
}
////////////////////////////////////////////////////////////////////////////////
// Move position at index up towards the top of the heap until invariant
// satisfied, and then store elem there.
// Indices start from 1.
template<class CMP,class TYPE>
void heap_sift_up(TYPE elem,unsigned index,TYPE* array){
// Move pointer so array can be indexed starting from one
--array;
// Loop till invariant satisfied
unsigned child=index;
unsigned parent=child>>1;
while(child>1){
// Invariant satisfied ?
if(CMP()(array[parent],elem))break;
// Swap with parent
array[child]=array[parent];
child=parent;
parent>>=1;
}
// Store element
array[child]=elem;
}
////////////////////////////////////////////////////////////////////////////////
// Add elem to heap.
template<class CMP,class TYPE>
void heap_insert(const TYPE& __restrict__ elem,TYPE* __restrict__ array,
unsigned size){
// Add item to bottom of heap, and move upwards until invariant is satisfied
++size;
heap_sift_up<CMP>(elem,size,array);
assert((heap_is_valid<CMP>(array,size)));
}
////////////////////////////////////////////////////////////////////////////////
// Access current top of heap.
template<class TYPE>
inline const TYPE& heap_top(const TYPE* array){
return *array;
}
////////////////////////////////////////////////////////////////////////////////
// Remove the current top of the heap.
template<class CMP,class TYPE>
void heap_pop(TYPE* array,unsigned size){
// Move last element to top of heap, then move downwards until invariant is
// satisfied
assert(size);
--size;
heap_sift_down<CMP>(array[size],1,array,size);
assert((heap_is_valid<CMP>(array,size)));
}
////////////////////////////////////////////////////////////////////////////////
// Remove the current top of the heap, and then add elem.
template<class CMP,class TYPE>
void heap_pop_insert(const TYPE& __restrict__ elem,TYPE* __restrict__ array,
unsigned size){
heap_sift_down<CMP>(elem,1,array,size);
assert((heap_is_valid<CMP>(array,size)));
}
////////////////////////////////////////////////////////////////////////////////
// Convert array into a valid heap.
template<class CMP,class TYPE>
void heap_make(TYPE* array,unsigned size){
const unsigned root=1;
for(unsigned p=size;p>=root;--p){
heap_sift_down<CMP>(array[p],p,array,size);
}
assert((heap_is_valid<CMP>(array,size)));
}
////////////////////////////////////////////////////////////////////////////////
// Convert heap into a sorted array.
// Order of array is the reverse of the heap ordering.
template<class CMP,class TYPE>
void heap_reverse_sort_into_array(TYPE* array,unsigned size){
assert((heap_is_valid<CMP>(array,size)));
while(size>1){
const TYPE t=heap_top(array);
heap_pop<CMP>(array,size);
--size;
array[size]=t;
}
}
////////////////////////////////////////////////////////////////////////////////
// Sort elements of array.
template<class CMP,class TYPE>
void heap_sort(TYPE* array,unsigned size){
typedef reverse_cmp<CMP,TYPE> rcmp;
heap_make<rcmp>(array,size);
heap_reverse_sort_into_array<rcmp>(array,size);
}
////////////////////////////////////////////////////////////////////////////////
// Insert part of limited heap algorithm.
// Use same comparator direction as you would use to sort (ie, reverse of the
// internal heap direction).
// Returns new heap size.
template<class CMP,class TYPE>
unsigned limited_heap_insert(const TYPE& __restrict__ elem,
TYPE* __restrict__ array,unsigned curr_size,unsigned max_size){
typedef reverse_cmp<CMP,TYPE> rcmp;
// If heap not yet full, insert
if(curr_size<max_size){
heap_insert<rcmp>(elem,array,curr_size);
return curr_size+1;
}
// Is the new element more important than the current heap top ?
if(CMP()(elem,heap_top(array))){
heap_pop_insert<rcmp>(elem,array,max_size);
}
return max_size;
}
////////////////////////////////////////////////////////////////////////////////
// Convert limited heap into array.
template<class CMP,class TYPE>
void limited_heap_reverse_sort_into_array(TYPE* array,unsigned size){
typedef reverse_cmp<CMP,TYPE> rcmp;
heap_reverse_sort_into_array<rcmp>(array,size);
}
#endif // INCLUDED_HEAP_H
The code follows fairly closely the previous pseudo code. The major difference being that it is now templated on the type stored in the heap, and a comparitor for comparing elements in the heap. Another difference is that the upward movement from heap_insert() and the downward movement from heap_pop() have been factored out into the functions heap_sift_up() and heap_sift_down() respectively.
Pointer-Key Pairs
One real problem with the above code though, is that if used naively, it is likely that the objects of which we want to find the k “best” of are likely to be stored directly. This can be both a waste of memory, and make comparisons more expensive. Instead, it is better to store pointer-key pairs. But if the code above is directly used for that, it will lead to template bloat. So here is a set of wrapper functions to help dealing with pointer-key pairs, and avoid instantiating the code for each pointer type.
pkheap.h++
#ifndef INCLUDED_PKHEAP_H
#define INCLUDED_PKHEAP_H
#include "heap.h++"
////////////////////////////////////////////////////////////////////////////////
// User visible type.
template<class PTYPE,class KTYPE>
struct pointer_key{
PTYPE* p;
KTYPE k;
};
////////////////////////////////////////////////////////////////////////////////
// Internal type used to stop template bloat.
template<class KTYPE>
struct vpointer_key{
void* p;
KTYPE k;
};
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class KTYPE>
struct vpointer_key_cmp{
inline bool operator()(const vpointer_key<KTYPE>& lhs,
const vpointer_key<KTYPE>& rhs)const{
return KCMP()(lhs.k,rhs.k);
}
};
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class PTYPE,class KTYPE>
bool pkheap_is_valid(unsigned index,const pointer_key<PTYPE,KTYPE>* array,
unsigned size){
typedef vpointer_key_cmp<KCMP,KTYPE> cmp;
return heap_is_valid<cmp>(index,(const vpointer_key<KTYPE>*)array,size);
}
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class PTYPE,class KTYPE>
bool pkheap_is_valid(const pointer_key<PTYPE,KTYPE>* array,unsigned size){
typedef vpointer_key_cmp<KCMP,KTYPE> cmp;
return heap_is_valid<cmp>((const vpointer_key<KTYPE>*)array,size);
}
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class PTYPE,class KTYPE>
void pkheap_insert(PTYPE* ptr,const KTYPE& __restrict__ key,
pointer_key<PTYPE,KTYPE>* __restrict__ array,unsigned size){
typedef vpointer_key_cmp<KCMP,KTYPE> cmp;
vpointer_key<KTYPE> e={(void*)ptr,key};
heap_insert<cmp>(e,(vpointer_key<KTYPE>*)array,size);
}
////////////////////////////////////////////////////////////////////////////////
template<class PTYPE,class KTYPE>
inline const pointer_key<PTYPE,KTYPE>& pkheap_top(
const pointer_key<PTYPE,KTYPE>* array){
return *array;
}
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class PTYPE,class KTYPE>
void pkheap_pop(pointer_key<PTYPE,KTYPE>* array,unsigned size){
typedef vpointer_key_cmp<KCMP,KTYPE> cmp;
heap_pop<cmp>((vpointer_key<KTYPE>*)array,size);
}
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class PTYPE,class KTYPE>
void pkheap_pop_insert(PTYPE* ptr,const KTYPE& __restrict__ key,
pointer_key<PTYPE,KTYPE>* __restrict__ array,unsigned size){
typedef vpointer_key_cmp<KCMP,KTYPE> cmp;
vpointer_key<KTYPE> e={(void*)ptr,key};
heap_pop_insert<cmp>(e,(vpointer_key<KTYPE>*)array,size);
}
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class PTYPE,class KTYPE>
void pkheap_make(pointer_key<PTYPE,KTYPE>* array,unsigned size){
typedef vpointer_key_cmp<KCMP,KTYPE> cmp;
heap_make<cmp>((vpointer_key<KTYPE>*)array,size);
}
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class PTYPE,class KTYPE>
void pkheap_reverse_sort_into_array(pointer_key<PTYPE,KTYPE>* array,
unsigned size){
typedef vpointer_key_cmp<KCMP,KTYPE> cmp;
heap_reverse_sort_into_array<cmp>((vpointer_key<KTYPE>*)array,size);
}
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class PTYPE,class KTYPE>
void pkheap_sort(pointer_key<PTYPE,KTYPE>* array,unsigned size){
typedef vpointer_key_cmp<KCMP,KTYPE> cmp;
heap_sort<cmp>((vpointer_key<KTYPE>*)array,size);
}
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class PTYPE,class KTYPE>
unsigned limited_pkheap_insert(PTYPE* ptr,const KTYPE& __restrict__ key,
pointer_key<PTYPE,KTYPE>* __restrict__ array,unsigned curr_size,
unsigned max_size){
typedef vpointer_key_cmp<KCMP,KTYPE> cmp;
vpointer_key<KTYPE> e={(void*)ptr,key};
return limited_heap_insert<cmp>(e,(vpointer_key<KTYPE>*)array,curr_size,
max_size);
}
////////////////////////////////////////////////////////////////////////////////
template<class KCMP,class PTYPE,class KTYPE>
void limited_pkheap_reverse_sort_into_array(pointer_key<PTYPE,KTYPE>* array,
unsigned size){
typedef vpointer_key_cmp<KCMP,KTYPE> cmp;
limited_heap_reverse_sort_into_array<cmp>((vpointer_key<KTYPE>*)array,size);
}
#endif // INCLUDED_PKHEAP_H
Floating Point Keys
While the wrapper functions in pkheap.h++ encourage better usage than storing entire objects in the heap, there is still one big elephant in the room. The interface now kinda encourages float keys. This is pretty terrible for the current generation consoles, where a branch on a float comparison will cause a pipeline flush. A solution to this is to use uint32_ts. If the float is garunteed to be a finite number >= 0.f, and not -0.f, then the simple solution is to reinterpret the bits as a uint32_t using something like,
template<class TO,class FROM> TO bit_cast(const FROM& from){
TO to;
memcpy(&to,&from,sizeof(TO));
return to;
}
The memcpy() may look horrifically slow, but any sensible compiler will simply use register moves via memory when the size is a compile time constant. memcpy() is probably the only standards compliant way to do this without violating aliasing rules (template<class TO,class FROM> TO bit_cast(const FROM& from){
union{FROM f;TO t;} u;
u.f=from;
return u.t;
}
But now we have just replaced lots of pipeline flushes, with less (but still quite a few) load-hit-stores. A sensible solution to this is to calculate and store several float keys, then load them as uint32_ts and insert them into the limited heap.
If the keys may also be negative, then it is possible to convert them to integers that have the same orderring, and -0.f and 0.f both map to the same value. Have a look at Bruce Dawson’s excellent article at
A slightly tighter upper bound could be found by taking into acount the fact that the heap is not initially full or that it is popped to empty, but for our purposes here, its not really worth the extra effort.