Class_Store
class Store(Buffer):
"""Models buffers for processes coupled by putting/getting distinguishable
items.
Blocks a process when a put would cause buffer overflow or a get would cause
buffer underflow.
Default queuing discipline for blocked processes is priority FIFO.
"""
def getnrBuffered(self):
return len(self.theBuffer)
nrBuffered=property(getnrBuffered)
def getbuffered(self):
return self.theBuffer
buffered=property(getbuffered)
def __init__(self,**pars):
Buffer.__init__(self,**pars)
self.theBuffer=[]
if self.name is None:
self.name="a_store" ## default name
if type(self.capacity)!=type(1) or self.capacity<=0:
raise FatalSimerror\
("Store: capacity parameter not a positive integer > 0: %s"\
%self.initialBuffered)
if type(self.initialBuffered)==type([]):
if len(self.initialBuffered)>self.capacity:
raise FatalSimerror("initialBuffered exceeds capacity")
else:
self.theBuffer[:]=self.initialBuffered##buffer==list of objects
elif self.initialBuffered is None:
self.theBuffer=[]
else:
raise FatalSimerror\
("Store: initialBuffered not a list")
if self.monitored:
self.bufferMon.observe(y=self.nrBuffered,t=now())
self._sort=None
def addSort(self,sortFunc):
"""Adds buffer sorting to this instance of Store. It maintains
theBuffer sorted by the sortAttr attribute of the objects in the
buffer.
The user-provided 'sortFunc' must look like this:
def mySort(self,par):
tmplist=[(x.sortAttr,x) for x in par]
tmplist.sort()
return [x for (key,x) in tmplist]
"""
self._sort=new.instancemethod(sortFunc,self,self.__class__)
self.theBuffer=self._sort(self.theBuffer)
def _put(self,arg):
"""Handles put requests for Store instances"""
obj=arg[1].who
if len(arg[0]) == 5: # yield put,self,buff,whattoput,priority
obj._putpriority[self]=arg[0][4]
whatToPut=arg[0][3]
elif len(arg[0]) == 4: # yield put,self,buff,whattoput
obj._putpriority[self]=Buffer.priorityDefault #default
whatToPut=arg[0][3]
else: # error, whattoput missing
raise FatalSimerror("Item to put missing in yield put stmt")
if type(whatToPut)!=type([]):
raise FatalSimerror("put parameter is not a list")
whatToPutNr=len(whatToPut)
if whatToPutNr+self.nrBuffered>self.capacity:
obj._nextTime=None #passivate put requestor
obj._whatToPut=whatToPut
self.putQ.enterPut(obj) #and queue, with items to put
else:
self.theBuffer.extend(whatToPut)
if not(self._sort is None):
self.theBuffer=self._sort(self.theBuffer)
if self.monitored:
self.bufferMon.observe(y=self.nrBuffered,t=now())
_e._post(_Action(who=obj,generator=obj._nextpoint),
at=_t,prior=1) # continue the put requestor
# service any waiting getters
# service in queue order: do not serve second in queue before first
# has been served
block=False
buffQ=self.getQ
for getter in buffQ:
if self.nrBuffered>0 and len(self.getQ):
proc=getter
if inspect.isfunction(proc._nrToGet):
movCand=proc._nrToGet(self.theBuffer) #predicate parameter
if movCand:
proc.got=movCand[:]
for i in movCand:
self.theBuffer.remove(i)
self.getQ.takeout(proc)
if self.monitored:
self.bufferMon.observe(y=self.nrBuffered,t=now())
_e._post(_Action(who=proc,generator=proc._nextpoint),
at=_t) # continue a blocked get requestor
else: #numerical parameter
if not block and proc._nrToGet<=self.nrBuffered:
nrToGet=proc._nrToGet
proc.got=[]
proc.got[:]=self.theBuffer[0:nrToGet]
self.theBuffer[:]=self.theBuffer[nrToGet:]
if self.monitored:
self.bufferMon.observe(y=self.nrBuffered,t=now())
# take this get requestor's record out of queue:
self.getQ.takeout(proc)
_e._post(_Action(who=proc,generator=proc._nextpoint),
at=_t) # continue a blocked get requestor
else:
# block subsequent numerically specified get's in getQ
# to prevent starvation of larger gets by smaller ones
block=True
else:
break # either out of items in buffer or out of getters in getQ
def _get(self,arg):
"""Handles get requests"""
filtfunc=None
obj=arg[1].who
obj.got=[] # the list of items retrieved by 'get'
if len(arg[0]) == 5: # yield get,self,buff,whattoget,priority
obj._getpriority[self]=arg[0][4]
if inspect.isfunction(arg[0][3]):
filtfunc=arg[0][3]
else:
nrToGet=arg[0][3]
elif len(arg[0]) == 4: # yield get,self,buff,whattoget
obj._getpriority[self]=Buffer.priorityDefault #default
if inspect.isfunction(arg[0][3]):
filtfunc=arg[0][3]
else:
nrToGet=arg[0][3]
else: # yield get,self,buff
obj._getpriority[self]=Buffer.priorityDefault
nrToGet=1
if not filtfunc: #number specifies nr items to get
if nrToGet<0:
raise FatalSimerror\
("Store: get parameter not positive number: %s"%nrToGet)
if self.nrBuffered < nrToGet:
obj._nrToGet=nrToGet
self.getQ.enterGet(obj)
# passivate/block queuing 'get' process
obj._nextTime=None
else:
for i in range(nrToGet):
obj.got.append(self.theBuffer.pop(0)) # move items from
# buffer to requesting process
if self.monitored:
self.bufferMon.observe(y=self.nrBuffered,t=now())
_e._post(_Action(who=obj,generator=obj._nextpoint),
at=_t,prior=1)
# reactivate any put requestors for which space is now available
# serve in queue order: do not serve second in queue before first
# has been served
while len(self.putQ):
proc=self.putQ[0]
if len(proc._whatToPut)+self.nrBuffered<=self.capacity:
for i in proc._whatToPut:
self.theBuffer.append(i) #move items to buffer
if not(self._sort is None):
self.theBuffer=self._sort(self.theBuffer)
if self.monitored:
self.bufferMon.observe(y=self.nrBuffered,t=now())
self.putQ.takeout(proc) # dequeue requestor's record
_e._post(_Action(who=proc,generator=proc._nextpoint),
at=_t) # continue a blocked put requestor
else:
break
else: # items to get determined by filtfunc
movCand=filtfunc(self.theBuffer)
if movCand: # get succeded
_e._post(_Action(who=obj,generator=obj._nextpoint),
at=_t,prior=1)
obj.got=movCand[:]
for item in movCand:
self.theBuffer.remove(item)
if self.monitored:
self.bufferMon.observe(y=self.nrBuffered,t=now())
# reactivate any put requestors for which space is now available
# serve in queue order: do not serve second in queue before first
# has been served
while len(self.putQ):
proc=self.putQ[0]
if len(proc._whatToPut)+self.nrBuffered<=self.capacity:
for i in proc._whatToPut:
self.theBuffer.append(i) #move items to buffer
if not(self._sort is None):
self.theBuffer=self._sort(self.theBuffer)
if self.monitored:
self.bufferMon.observe(y=self.nrBuffered,t=now())
self.putQ.takeout(proc) # dequeue requestor's record
_e._post(_Action(who=proc,generator=proc._nextpoint),
at=_t) # continue a blocked put requestor
else:
break
else: # get did not succeed, block
obj._nrToGet=filtfunc
self.getQ.enterGet(obj)
# passivate/block queuing 'get' process
obj._nextTime=None
class makeStoreTestcase(unittest.TestCase):
def testStatic(self):
"""Store: tests initialization of Store instances
"""
a=Store()
assert a.capacity==sys.maxint,"wrong capacity:%s"%a
assert a.nrBuffered==0,"wrong buffer content: %s"%a
assert a.name=="a_store","wrong name: %s"%a
assert not a.monitored,"should not be monitored: %s"%a
assert a.putQMon is None,"should not have putQMon: %s"%a
assert a.getQMon is None,"should not have getQMon: %s"%a
assert a.bufferMon is None,"should not have bufferMon: %s"%a
assert a.putQType.__name__=="FIFO" and a.getQType.__name__=="FIFO",\
"putQType and getQType should be FIFO: %s"%a
stored=[Widget(weight=5)]*10
b=Store(name="b",initialBuffered=stored,monitored=True,capacity=12,
putQType=PriorityQ)
assert b.capacity==12,"wrong capacity:%s"%b
assert b.nrBuffered==10,"wrong buffer content: %s"%b
assert b.name=="b","wrong name: %s"%b
assert b.monitored,"should be monitored: %s"%b
assert not (b.putQMon is None),"should have putQMon: %s"%b
assert not (b.getQMon is None),"should have getQMon: %s"%b
assert not (b.bufferMon is None),"should have bufferMon: %s"%b
assert b.putQType.__name__=="PriorityQ",\
"putQType should be PriorityQ: %s"%b
assert b.getQType.__name__=="FIFO",\
"getQType should be PriorityQ: %s"%b
def testConProdPrinciple(self):
"""Store: tests basic Producer/Consumer principles:
- Consumers must not be waiting while items in Store buffer,
- Producers must not be waiting while space available in Store buffer
"""
bufferSize=1
productionTime=1
consumptionTime=5
endtime=50
initialize()
buffer=Store(capacity=bufferSize)
consumer=ConsumerPrincS()
activate(consumer,consumer.consume(buffer,consumptionTime))
producer=ProducerPrincS()
activate(producer,producer.produce(buffer,productionTime))
simulate(until=endtime)
def testConProd1(self):
"""Store: tests put/get in 1 Producer/ 1 Consumer scenario"""
initialize()
buffer=Store(initialBuffered=[])
p=ProducerWidget()
activate(p,p.produce(buffer))
c=ConsumerWidget()
activate(c,c.consume(buffer))
simulate(until=100)
assert \
ProducerWidget.produced-ConsumerWidget.consumed==buffer.nrBuffered,\
"items produced/consumed/buffered do not tally: %s %s %s"\
%(ProducerWidget.produced,ConsumerWidget.consumed,buffer.nrBuffered)
def testConProdM(self):
"""Store: tests put/get in multiple Producer/Consumer scenario"""
initialize()
buffer=Store(initialBuffered=[])
ProducerWidget.produced=0
ConsumerWidget.consumed=0
for i in range(2):
c=ConsumerWidget()
activate(c,c.consume(buffer))
for i in range(3):
p=ProducerWidget()
activate(p,p.produce(buffer))
simulate(until=10)
assert ProducerWidget.produced-ConsumerWidget.consumed==buffer.nrBuffered,\
"items produced/consumed/buffered do not tally: %s %s %s"\
%(ProducerWidget.produced,ConsumerWidget.consumed,buffer.nrBuffered)
def testConProdPriorM(self):
"""Store: Tests put/get in multiple Producer/Consumer scenario,
with Producers having different priorities.
How; Producers forced to queue; all after first should be done in
priority order
"""
global doneList
doneList=[]
initialize()
buffer=Store(capacity=7,putQType=PriorityQ,monitored=True)
for i in range(4):
p=ProducerWidget(i)
pPriority=i
activate(p,p.producePriority(buffer=buffer,priority=pPriority))
c=ConsumerWidget()
activate(c,c.consume1(buffer=buffer))
simulate(until=100)
assert doneList==[0,3,2,1],"puts were not done in priority order: %s"\
%doneList
def testConPriorProdM(self):
"""Tests put/get in multiple Producer/Consumer scenario, with
Consumers having different priorities.
How; Consumers forced to queue; all after first should be done in
priority order
"""
global doneList
doneList=[]
initialize()
buffer=Store(capacity=7,getQType=PriorityQ,monitored=True)
for i in range(4):
c=ConsumerWidget(str(i))
cPriority=i
activate(c,c.consumePriority(buffer=buffer,priority=cPriority))
p=ProducerWidget()
activate(p,p.produce1(buffer=buffer))
simulate(until=100)
assert doneList==["3","2","1","0"],\
"gets were not done in priority order: %s"%doneList
def testBufferSort(self):
"""Tests the optional sorting of theBuffer by applying a user-defined
sort function."""
initialize()
gotten=[]
sortedStore=Store()
sortedStore.addSort(mySortFunc)
p=ProducerWidget()
activate(p,p.produceUnordered(sortedStore))
for i in range(9):
c=ConsumerWidget()
activate(c,c.consumeSorted(buffer=sortedStore,gotten=gotten),at=1)
simulate(until=10)
assert gotten==[1,2,3,4,5,6,7,8,9],"sort wrong: %s"%gotten
def testBufferFilter(self):
"""Tests get from a Store with a filter function
"""
initialize()
ItClass=FilterConsumer.Widget
all=[ItClass(1),ItClass(4),ItClass(6),ItClass(12)]
st=Store(initialBuffered=all)
fc=FilterConsumer()
minw=2;maxw=10
activate(fc,fc.getItems(store=st,a=minw,b=maxw))
simulate(until=1)
def makeStoreSuite():
suite = unittest.TestSuite()
testStatic = makeStoreTestcase("testStatic")
testConProdPrinciple=makeStoreTestcase("testConProdPrinciple")
testConProd1=makeStoreTestcase("testConProd1")
testConProdM=makeStoreTestcase("testConProdM")
testConProdPriorM=makeStoreTestcase("testConProdPriorM")
testConPriorProdM=makeStoreTestcase("testConPriorProdM")
testBufferSort=makeStoreTestcase("testBufferSort")
testBufferFilter=makeStoreTestcase("testBufferFilter")
suite.addTests([testStatic,testConProdPrinciple,testConProd1,
testConProdM,testConProdPriorM,
testConPriorProdM,testBufferSort,
testBufferFilter])
return suite