Class_Store

class Store(Buffer):

    """Models buffers for processes coupled by putting/getting distinguishable 

    items.

    Blocks a process when a put would cause buffer overflow or a get would cause 

    buffer underflow.

    Default queuing discipline for blocked processes is priority FIFO.

    """

    def getnrBuffered(self):

        return len(self.theBuffer)

    nrBuffered=property(getnrBuffered)

    

    def getbuffered(self):

        return self.theBuffer

    buffered=property(getbuffered)

        

    def __init__(self,**pars):

        Buffer.__init__(self,**pars)

        self.theBuffer=[]

        if self.name is None:

            self.name="a_store" ## default name

        if type(self.capacity)!=type(1) or self.capacity<=0:

            raise FatalSimerror\

                ("Store: capacity parameter not a positive integer > 0: %s"\

                    %self.initialBuffered)

        if type(self.initialBuffered)==type([]):

            if len(self.initialBuffered)>self.capacity:

                raise FatalSimerror("initialBuffered exceeds capacity")

            else:

                self.theBuffer[:]=self.initialBuffered##buffer==list of objects

        elif self.initialBuffered is None: 

            self.theBuffer=[]

        else:

            raise FatalSimerror\

                ("Store: initialBuffered not a list")

        if self.monitored:

            self.bufferMon.observe(y=self.nrBuffered,t=now())

        self._sort=None

            

 

    

    def addSort(self,sortFunc):

        """Adds buffer sorting to this instance of Store. It maintains

        theBuffer sorted by the sortAttr attribute of the objects in the

        buffer.

        The user-provided 'sortFunc' must look like this:

        

        def mySort(self,par):

            tmplist=[(x.sortAttr,x) for x in par]

            tmplist.sort()

            return [x for (key,x) in tmplist]

        

        """

 

        self._sort=new.instancemethod(sortFunc,self,self.__class__)

        self.theBuffer=self._sort(self.theBuffer)

        

    def _put(self,arg):

        """Handles put requests for Store instances"""

        obj=arg[1].who

        if len(arg[0]) == 5:        # yield put,self,buff,whattoput,priority

            obj._putpriority[self]=arg[0][4]

            whatToPut=arg[0][3]

        elif len(arg[0]) == 4:      # yield put,self,buff,whattoput

            obj._putpriority[self]=Buffer.priorityDefault #default

            whatToPut=arg[0][3]

        else:                       # error, whattoput missing 

            raise FatalSimerror("Item to put missing in yield put stmt")

        if type(whatToPut)!=type([]):

            raise FatalSimerror("put parameter is not a list")

        whatToPutNr=len(whatToPut)

        if whatToPutNr+self.nrBuffered>self.capacity:

            obj._nextTime=None      #passivate put requestor

            obj._whatToPut=whatToPut

            self.putQ.enterPut(obj) #and queue, with items to put

        else:

            self.theBuffer.extend(whatToPut)

            if not(self._sort is None):

                self.theBuffer=self._sort(self.theBuffer)

            if self.monitored:

                self.bufferMon.observe(y=self.nrBuffered,t=now())

            _e._post(_Action(who=obj,generator=obj._nextpoint),

                at=_t,prior=1) # continue the put requestor

                

            # service any waiting getters

            # service in queue order: do not serve second in queue before first

            # has been served

            block=False

            buffQ=self.getQ

            for getter in buffQ:

                if self.nrBuffered>0 and len(self.getQ):

                    proc=getter

                    if inspect.isfunction(proc._nrToGet):

                        movCand=proc._nrToGet(self.theBuffer) #predicate parameter

                        if movCand:

                            proc.got=movCand[:]

                            for i in movCand:

                                self.theBuffer.remove(i)

                            self.getQ.takeout(proc)

                            if self.monitored:

                                self.bufferMon.observe(y=self.nrBuffered,t=now()) 

                            _e._post(_Action(who=proc,generator=proc._nextpoint),

                                 at=_t) # continue a blocked get requestor

                    else: #numerical parameter

                        if not block and proc._nrToGet<=self.nrBuffered:

                            nrToGet=proc._nrToGet

                            proc.got=[]

                            proc.got[:]=self.theBuffer[0:nrToGet]

                            self.theBuffer[:]=self.theBuffer[nrToGet:]

                            if self.monitored:

                                self.bufferMon.observe(y=self.nrBuffered,t=now())           

                            # take this get requestor's record out of queue:

                            self.getQ.takeout(proc) 

                            _e._post(_Action(who=proc,generator=proc._nextpoint),

                                    at=_t) # continue a blocked get requestor

                        else:

                            # block subsequent numerically specified get's in getQ

                            # to prevent starvation of larger gets by smaller ones

                            block=True

                else:

                    break # either out of items in buffer or out of getters in getQ

 

    def _get(self,arg):

        """Handles get requests"""

        filtfunc=None

        obj=arg[1].who

        obj.got=[]                  # the list of items retrieved by 'get'

        if len(arg[0]) == 5:        # yield get,self,buff,whattoget,priority

            obj._getpriority[self]=arg[0][4]

            if inspect.isfunction(arg[0][3]):

                filtfunc=arg[0][3]

            else:

                nrToGet=arg[0][3]

        elif len(arg[0]) == 4:      # yield get,self,buff,whattoget

            obj._getpriority[self]=Buffer.priorityDefault #default

            if inspect.isfunction(arg[0][3]):

                filtfunc=arg[0][3]

            else:

                nrToGet=arg[0][3]

        else:                       # yield get,self,buff 

            obj._getpriority[self]=Buffer.priorityDefault

            nrToGet=1

        if not filtfunc: #number specifies nr items to get

            if nrToGet<0:

                raise FatalSimerror\

                    ("Store: get parameter not positive number: %s"%nrToGet)            

            if self.nrBuffered < nrToGet:

                obj._nrToGet=nrToGet

                self.getQ.enterGet(obj)

                # passivate/block queuing 'get' process

                obj._nextTime=None          

            else:

                for i in range(nrToGet):

                    obj.got.append(self.theBuffer.pop(0)) # move items from 

                                                # buffer to requesting process

                if self.monitored:

                    self.bufferMon.observe(y=self.nrBuffered,t=now())

                _e._post(_Action(who=obj,generator=obj._nextpoint),

                    at=_t,prior=1)

                # reactivate any put requestors for which space is now available

                # serve in queue order: do not serve second in queue before first

                # has been served

                while len(self.putQ): 

                    proc=self.putQ[0]

                    if len(proc._whatToPut)+self.nrBuffered<=self.capacity:

                        for i in proc._whatToPut:

                            self.theBuffer.append(i) #move items to buffer

                        if not(self._sort is None):

                            self.theBuffer=self._sort(self.theBuffer)

                        if self.monitored:

                            self.bufferMon.observe(y=self.nrBuffered,t=now())           

                        self.putQ.takeout(proc) # dequeue requestor's record 

                        _e._post(_Action(who=proc,generator=proc._nextpoint),

                                at=_t) # continue a blocked put requestor 

                    else:

                        break

        else: # items to get determined by filtfunc

            movCand=filtfunc(self.theBuffer)

            if movCand: # get succeded

                _e._post(_Action(who=obj,generator=obj._nextpoint),

                    at=_t,prior=1)

                obj.got=movCand[:]

                for item in movCand:

                    self.theBuffer.remove(item)

                if self.monitored:

                    self.bufferMon.observe(y=self.nrBuffered,t=now())

                # reactivate any put requestors for which space is now available

                # serve in queue order: do not serve second in queue before first

                # has been served

                while len(self.putQ): 

                    proc=self.putQ[0]

                    if len(proc._whatToPut)+self.nrBuffered<=self.capacity:

                        for i in proc._whatToPut:

                            self.theBuffer.append(i) #move items to buffer

                        if not(self._sort is None):

                            self.theBuffer=self._sort(self.theBuffer)

                        if self.monitored:

                            self.bufferMon.observe(y=self.nrBuffered,t=now())           

                        self.putQ.takeout(proc) # dequeue requestor's record 

                        _e._post(_Action(who=proc,generator=proc._nextpoint),

                                at=_t) # continue a blocked put requestor 

                    else:

                        break

            else: # get did not succeed, block

                obj._nrToGet=filtfunc

                self.getQ.enterGet(obj)

                # passivate/block queuing 'get' process

                obj._nextTime=None  

class makeStoreTestcase(unittest.TestCase):

    def testStatic(self):

        """Store: tests initialization of Store instances

        """

        a=Store()

        assert a.capacity==sys.maxint,"wrong capacity:%s"%a

        assert a.nrBuffered==0,"wrong buffer content: %s"%a

        assert a.name=="a_store","wrong name: %s"%a

        assert not a.monitored,"should not be monitored: %s"%a

        assert a.putQMon is None,"should not have putQMon: %s"%a

        assert a.getQMon is None,"should not have getQMon: %s"%a

        assert a.bufferMon is None,"should not have bufferMon: %s"%a

        assert a.putQType.__name__=="FIFO" and a.getQType.__name__=="FIFO",\

               "putQType and getQType should be FIFO: %s"%a

            

        stored=[Widget(weight=5)]*10       

        b=Store(name="b",initialBuffered=stored,monitored=True,capacity=12,

                    putQType=PriorityQ)

        assert b.capacity==12,"wrong capacity:%s"%b

        assert b.nrBuffered==10,"wrong buffer content: %s"%b

        assert b.name=="b","wrong name: %s"%b

        assert b.monitored,"should be monitored: %s"%b

        assert not (b.putQMon is None),"should have putQMon: %s"%b

        assert not (b.getQMon is None),"should have getQMon: %s"%b

        assert not (b.bufferMon is None),"should have bufferMon: %s"%b

        assert b.putQType.__name__=="PriorityQ",\

               "putQType should be PriorityQ: %s"%b

        assert b.getQType.__name__=="FIFO",\

                "getQType should be PriorityQ: %s"%b

 

    def testConProdPrinciple(self):

        """Store: tests basic Producer/Consumer principles:

        -   Consumers must not be waiting while items in Store buffer,

        -   Producers must not be waiting while space available in Store buffer

        """

        bufferSize=1

        productionTime=1

        consumptionTime=5

        endtime=50

 

        initialize()

        buffer=Store(capacity=bufferSize)

        consumer=ConsumerPrincS()

        activate(consumer,consumer.consume(buffer,consumptionTime))

        producer=ProducerPrincS()

        activate(producer,producer.produce(buffer,productionTime))

        simulate(until=endtime)

 

    def testConProd1(self):

        """Store: tests put/get in 1 Producer/ 1 Consumer scenario"""

        initialize()

        buffer=Store(initialBuffered=[])

        p=ProducerWidget()

        activate(p,p.produce(buffer))

        c=ConsumerWidget()

        activate(c,c.consume(buffer))

        simulate(until=100)

        assert \

           ProducerWidget.produced-ConsumerWidget.consumed==buffer.nrBuffered,\

           "items produced/consumed/buffered do not tally: %s %s %s"\

           %(ProducerWidget.produced,ConsumerWidget.consumed,buffer.nrBuffered)

        

    def testConProdM(self):

        """Store: tests put/get in multiple Producer/Consumer scenario"""

        initialize()

        buffer=Store(initialBuffered=[])

        ProducerWidget.produced=0

        ConsumerWidget.consumed=0

        for i in range(2):

            c=ConsumerWidget()

            activate(c,c.consume(buffer))

        for i in range(3):

            p=ProducerWidget()

            activate(p,p.produce(buffer))

        simulate(until=10)

        assert ProducerWidget.produced-ConsumerWidget.consumed==buffer.nrBuffered,\

            "items produced/consumed/buffered do not tally: %s %s %s"\

            %(ProducerWidget.produced,ConsumerWidget.consumed,buffer.nrBuffered)

 

    def testConProdPriorM(self):

        """Store: Tests put/get in multiple Producer/Consumer scenario, 

        with Producers having different priorities.

        How; Producers forced to queue; all after first should be done in

        priority order

        """

        global doneList

        doneList=[]

        initialize()

        buffer=Store(capacity=7,putQType=PriorityQ,monitored=True)

        for i in range(4):

            p=ProducerWidget(i)

            pPriority=i

            activate(p,p.producePriority(buffer=buffer,priority=pPriority))

        c=ConsumerWidget()

        activate(c,c.consume1(buffer=buffer))

        simulate(until=100)

        assert doneList==[0,3,2,1],"puts were not done in priority order: %s"\

                                    %doneList

                                    

    def testConPriorProdM(self):

        """Tests put/get in multiple Producer/Consumer scenario, with

        Consumers having different priorities.

        How; Consumers forced to queue; all after first should be done in

        priority order

        """

        global doneList

        doneList=[]

        initialize()

        buffer=Store(capacity=7,getQType=PriorityQ,monitored=True)

        for i in range(4):

            c=ConsumerWidget(str(i))

            cPriority=i

            activate(c,c.consumePriority(buffer=buffer,priority=cPriority))

        p=ProducerWidget()

        activate(p,p.produce1(buffer=buffer))

        simulate(until=100)

        assert doneList==["3","2","1","0"],\

              "gets were not done in priority order: %s"%doneList

                                    

    def testBufferSort(self):

        """Tests the optional sorting of theBuffer by applying a user-defined

        sort function."""

        initialize()

        gotten=[]

        sortedStore=Store()

        sortedStore.addSort(mySortFunc)

        p=ProducerWidget()

        activate(p,p.produceUnordered(sortedStore))

        for i in range(9):

            c=ConsumerWidget()

            activate(c,c.consumeSorted(buffer=sortedStore,gotten=gotten),at=1)

        simulate(until=10)

        assert gotten==[1,2,3,4,5,6,7,8,9],"sort wrong: %s"%gotten

        

    def testBufferFilter(self):

        """Tests get from a Store with a filter function

        """

        initialize()

        ItClass=FilterConsumer.Widget

        all=[ItClass(1),ItClass(4),ItClass(6),ItClass(12)]

        st=Store(initialBuffered=all)

        fc=FilterConsumer()

        minw=2;maxw=10

        activate(fc,fc.getItems(store=st,a=minw,b=maxw))

        simulate(until=1)

        

def makeStoreSuite():

    suite = unittest.TestSuite()

    testStatic = makeStoreTestcase("testStatic")

    testConProdPrinciple=makeStoreTestcase("testConProdPrinciple")

    testConProd1=makeStoreTestcase("testConProd1")

    testConProdM=makeStoreTestcase("testConProdM")

    testConProdPriorM=makeStoreTestcase("testConProdPriorM")

    testConPriorProdM=makeStoreTestcase("testConPriorProdM")

    testBufferSort=makeStoreTestcase("testBufferSort")

    testBufferFilter=makeStoreTestcase("testBufferFilter")

    suite.addTests([testStatic,testConProdPrinciple,testConProd1,

                    testConProdM,testConProdPriorM,

                    testConPriorProdM,testBufferSort,

                    testBufferFilter]) 

    return suite