We describe a priority queue data structure that allows item removal via key or removal via (lowest) priority. As a bonus, there is code that shows how to implement a wrapper for a decorator class that enables per-method parameters and has access to class variables.

Random Access Priority Queue

The RandomAccessPriorityQueue class pairs a heap with a dictionary and provides O(log(n)) insertion and removal.

The heap is implemented with a python list, where left and right children are computed as a function of the index: for an item at index , the left child has index ; the right child, index .

The dictionary maintains a mapping from the key to the index location in the array.

Items are inserted and removed from the last position in the the array. On insertion, the new item is then ‘bubbled up,’ swapped with its parent until the priority of the new node is no longer less than the priority of the parent—the priority invariant.

On removal, the item to be removed is recursively swapped with the child having lower priority until reaching a leaf node. Then, the item to be removed is swapped with the last item in the list and returned. The previously last item in the list is then ‘bubbled up’ at its new location until the priority invariant is satisfied.


## Python implementation of a random access priority queue

from MyItem import MyItem
import PriorityItem
from PydotBeforeAfter import PydotBeforeAfterWrapper as pydot_decorator

import pydot
import itertools
import string
import random

class RandomAccessPriorityQueue():

    def __init__(self, cls, fixed_size=None):
        self._cls = cls
        if not issubclass(cls, PriorityItem.PriorityItem):
            raise TypeError("%s must be a subclass of PriorityItem" % (cls.__name__))
        self._data = [ ]
        self._lookup = { } 
        self._fixed_size = fixed_size

    # insert a new item
    def insert(self, item):
        if type(item) != self._cls:
            err_msg = "insert requires type {0}".format(self._cls.__name__)
            raise TypeError( err_msg )

        if self._fixed_size is not None and self._fixed_size == len(self._data):
            if self._data[0] < item:

        ilast = len(self._data) - 1
        self._lookup[item.key()] = ilast
        self._fixup( ilast )

    # return the lowest priority node (or the one requested)
    def pop(self, name=None):
        if name is None:
            istart = 0
            istart = self._lookup[name]

        if 0 <= istart and istart < len(self._data):
            return self._fixdown(istart)

    # invariant:  move child up the tree until higher priority than parent
    def _fixup(self, i):
        if i > 0:
            ip = (i-1)/2
            if self._data[i] < self._data[ip]:
                self._swap(i, ip)

    # invariant:  move down the tree promoting the lesser of the two children
    def _fixdown(self, i):
        sz = len(self._data)
        il = 2*i + 1
        ir = 2*i + 2

        ## two children
        if il < sz and ir < sz:
            if self._data[il] < self._data[ir]:
                self._swap(i, il)
                return self._fixdown(il)
                return self._fixdown(ir)
        ## only the left child
        elif il < sz:
            return self._fixdown(il)
        ## only the right child
        elif ir < sz:
            raise AssertionError("left child missing; right child not")
        ## a leaf node
        ##   swap with the last item in the array; 
        ##   fixup the last item in the new location; 
        ##   remove the requested item from its current location at the end of array
            ilast = len(self._data) - 1
            if i < ilast:
                self._swap(i, ilast)

            res = self._data[ilast]
            key = res.key()
            del self._data[-1]
            del self._lookup[key]

            return res

    # keep the lookup table and data arrays synced
    def _swap(self, i, j):
        key_i = self._data[i].key()
        key_j = self._data[j].key()

        tmp = self._data[i]
        self._data[i] = self._data[j]
        self._data[j] = tmp

        iidx = self._lookup[key_i]
        jidx = self._lookup[key_j]
        self._lookup[key_i] = jidx
        self._lookup[key_j] = iidx

    # Visualization via pydot
    def dot(self):
        self._graph = pydot.Dot(graph_type='graph')
        return self._graph

    def _dot_visit(self, i):
        sz = len(self._data)
        if 0 <= i and i < sz:
            p = self._data[i]
            self._graph.add_node( pydot.Node( p.key(), label=str(p) ) )

            il = 2*i + 1
            if il < sz:
                cl = self._data[il]
                self._graph.add_edge( pydot.Edge( p.key(), cl.key() ) )

            ir = 2*i + 2
            if ir < sz:
                cr = self._data[ir]
                self._graph.add_edge( pydot.Edge( p.key(), cr.key() ) )

if __name__ == "__main__":

    # Uncomment to generate step-by-step figures
    RandomAccessPriorityQueue.PYDOT_DEBUG = True

    rapq = RandomAccessPriorityQueue(MyItem, fixed_size = 5)

    # Construct a simple example
    N = 10
    nodes = string.ascii_lowercase[0:N]
    priorities = random.sample(range(0,N), N)
    for n, p in itertools.izip(nodes, priorities):
        item = MyItem(n, p)

    for key in ['e', 'a', 'b']:
            z = rapq.pop(key)
        except KeyError:
            print "'%s' not a stored key" % (key)


An Example

Add ‘A’ with priority 9: after

Add ‘B’ with priority 3: after

Add ‘C’ with priority 0: after

Add ‘D’ with priority 6: after

Add ‘E’ with priority 5: after

Add ‘F’ with priority 2: after

Add ‘G’ with priority 4: after

Add ‘H’ with priority 7: after

Add ‘I’ with priority 1: after

Add ‘J’ with priority 8: after

Remove the lowest priority: after

Remove ‘E’: after

Remove ‘A’: after

Remove ‘B’: after

Remove the lowest priority: after

Remove the lowest priority: after

Remove the lowest priority: after

Remove the lowest priority: after

PriorityItem Abstract Class

RandomAccessPriorityQueue allows insertion of objects subclassed from the abstract PriorityItem class. However, in order to maintain consistency across less-than comparitors, all items must be of the same declared subclass.


class PriorityItem(object):

    def __repr__(self):
        raise NotImplementedError("derived classes must implement __repr__(self)")

    def __lt__(self, other):
        raise NotImplementedError("derived classes must implement __lt__(self, other)")

    def key(self):
        raise NotImplementedError("derived classes must implement key(self)")

An example, non-abstract subclass:


import PriorityItem

class MyItem(PriorityItem.PriorityItem):
    def __init__(self, key, priority):
        self._priority = priority
        self._key = key

    def __lt__(self, other):
        if self._priority < other._priority:
            return True
            return False

    def __repr__(self):
        return "%s : %d" % ( self._key, self._priority )

    def key(self):
        return self._key

The Pydot Decorator Class

This is perhaps the most interesting piece of this article. We want to allow the user to set the PYDOT_DEBUG class variable of the RandomAccessPriorityQueue class. Setting this class variable is trivial. However, accessing the class variable from a decorator requires a decorator class which must be callable, i.e. it must implement the __call__ method.

However, the decorator class should also implement the __get__ method in order to set up proper bindings. This latter task is done through a lambda function which calls the PydotBeforeAfter instance passing ‘extra’ parameters, in our case the owner. The newly defined lambda function is then returned to the caller with the appropriate bindings and the expected parameter definitions. When the lambda function is called, it recovers the context of the closure and is able to pass the owner variable to the PydotBeforeAfter __call__ method.

Finally, we write a wrapper class that allows us to pass an extra tag, specific to each method decorated, so as to keep the output figures organized.


# A decorator class for RandomAccessPriorityQueue to create 
# before and after figures

class PydotBeforeAfter(object):

    def __init__(self, func, tag):
        self.tag = tag
        self.func = func
        self.fig_id = 0

    def __get__(self, instance, owner):
        if instance is None:
            return self
        d = self
        # use a lambda to produce a bound method which calls PydotBeforeAfter with 
        # extra parameters; in particular, pass the owner
        mfactory = lambda self, *args, **kw: d(owner, self, *args, **kw)
        mfactory.__name__ = self.func.__name__
        return mfactory.__get__(instance, owner)

    def __call__(self, owner, instance, *args, **kwargs):
        self.fig_id = self.fig_id + 1

        def closure(*args, **kwargs):
            # print owner, instance, args, kwargs
            if hasattr(owner, "PYDOT_DEBUG"):
                fig_name = self._fig_name("a")
                print "output: %s" % (fig_name)
                instance.dot().write_png( fig_name )

            val = self.func(instance, *args, **kwargs)

            if hasattr(owner, "PYDOT_DEBUG"):
                fig_name = self._fig_name("b")
                print "output: %s" % (fig_name)
                instance.dot().write_png( fig_name )
            return val

        return closure(*args, **kwargs)

    def _fig_name(self, suffix):
        return "figs/fig_{2}_{0:>02}{1}.png".format(self.fig_id, suffix, self.tag)

# The wrapper class allows us to pass parameters to the main decorator.  In particular
# this allows tagginf of each function decorated.
class PydotBeforeAfterWrapper(object):
    def __init__(self, *decorator_args):
        self.decorator_args = decorator_args

    def __call__(self, func):
        decorator = PydotBeforeAfter(func, *self.decorator_args)
        return decorator