ARPO Codes (Python)
import sys
import random
from operator import attrgetter
from QPsolver import QPsolver
import math
from AssetCache import AssetCache
from Output import Output
from ElapsedTime import ElapsedTime
from Solution import Solution
class ARPO:
# 0. Instance fields and class constructor */
def __init__(self, someInputs, someTest):
self.test = someTest
self.inputs = someInputs
# self.outputs = Outputs()
self.qpSolver = QPsolver()
self.cache = AssetCache()
def solve(self, minReturn, uefRisk):
currentSol = Solution(self.inputs.getNAssets())
detSol = Solution(self.inputs.getNAssets())
newSol = Solution(self.inputs.getNAssets())
stochSols = []
finalStochSols = []
ElapsedT = ElapsedTime()
elapsedTime = 0.0
startTime = ElapsedT.systemTime()
currentSol = self.generateDummy(minReturn, currentSol)
elapsedTime = ElapsedTime.calcElapsed(startTime, ElapsedTime.systemTime())
if currentSol == None:
print("Dummy not found!!")
return None
currentSol.setTime(elapsedTime)
currentSol.simReturn(minReturn, self.inputs, self.test, False)
currentSol.simRisk(self.inputs, self.test, False)
stochSols = self.updateList(currentSol, stochSols)
currentSol.copyInto(detSol)
'''
print("Portfolio length:", len(currentSol.getPortfolio()))
print("risk:", currentSol.getCurrentRisk())
print("return", currentSol.getCurrentReturn())
print("Reliability", currentSol.getReliability())
print("Expected risk", currentSol.getExpectedRisk())
'''
stopTime = self.test.getTimeLoop()
k = 1
count=0
while elapsedTime < stopTime:
count+=1
currentSol.copyInto(newSol)
newSol = self.shake(newSol, k)
newSol.getPortfolio().sort(key=attrgetter('ri'), reverse=True)
maxReturnAsset = newSol.getPortfolio()[0]
if maxReturnAsset.getRi() < minReturn:
random.shuffle(newSol.getDiscarded())
for inAsset in newSol.getDiscarded():
if inAsset.getRi() >= minReturn:
pos = random.randint(0, len(newSol.getPortfolio()) - 1)
outAsset = newSol.getPortfolio()[pos]
newSol.getPortfolio().remove(outAsset)
newSol.getDiscarded().remove(inAsset)
newSol.getPortfolio().append(inAsset)
newSol.getDiscarded().append(outAsset)
break
if self.cache.isCached(newSol):
self.cache.loadFromCache(newSol)
else:
key = AssetCache.getSolPermutation(newSol)
self.qpSolver.solve(newSol, minReturn, self.inputs)
ARPO.discardEmptyAssets(newSol)
self.cache.cache1(key, newSol)
newSol = self.swap(newSol)
newSol.getPortfolio().sort(key=attrgetter('ri'), reverse=True)
maxReturnAsset = newSol.getPortfolio()[0]
if maxReturnAsset.getRi() < minReturn:
random.shuffle(newSol.getDiscarded())
for inAsset in newSol.getDiscarded():
if inAsset.getRi() >= minReturn:
pos = random.randint(0, len(newSol.getPortfolio()) - 1)
outAsset = newSol.getPortfolio()[pos]
newSol.getPortfolio().remove(outAsset)
newSol.getDiscarded().remove(inAsset)
newSol.getPortfolio().append(inAsset)
newSol.getDiscarded().append(outAsset)
break
if self.cache.isCached(newSol):
self.cache.loadFromCache(newSol)
else:
key = AssetCache.getSolPermutation(newSol)
self.qpSolver.solve(newSol, minReturn, self.inputs)
ARPO.discardEmptyAssets(newSol)
self.cache.cache1(key, newSol)
elapsedTime = ElapsedTime.calcElapsed(startTime, ElapsedTime.systemTime())
newSol.setTime(elapsedTime)
if self.verifySolution(newSol):
delta = newSol.getCurrentRisk() - currentSol.getCurrentRisk()
if delta < 0.0:
newSol.simReturn(minReturn, self.inputs, self.test, False)
if newSol.getReliability() >= (self.test.getProbability() - 0.01):
newSol.simRisk(self.inputs, self.test, False)
newSol.copyInto(currentSol)
stochSols = self.updateList(newSol, stochSols)
k = 1
if newSol.getCurrentRisk() < detSol.getCurrentRisk():
newSol.copyInto(detSol)
else:
newSol.simReturn(minReturn, self.inputs, self.test, False)
if newSol.getReliability() >= (self.test.getProbability() - 0.01):
if k < 3:
k += 1
else:
k = 1
for s in stochSols: s.simReturn(minReturn, self.inputs, self.test, True)
for s in stochSols:
if s.getReliability() >= self.test.getProbability():
s.simRisk(self.inputs, self.test, True)
finalStochSols.append(s)
detSol.simReturn(minReturn, self.inputs, self.test, True)
detSol.simRisk(self.inputs, self.test, True)
if len(finalStochSols) == 0:
print("No feasible solutions found.")
return None
finalStochSols.sort(key=attrgetter('currentRisk'))
output = Output(detSol, finalStochSols, self.test, uefRisk)
print('VNS loop numbers:', count)
return output
@staticmethod
def getPositionsArrayGD(beta1, beta2, size):
auxArray = [i for i in range(size)]
posArray = []
beta = beta1 + random.random() * (beta2 - beta1)
for i in range(size):
index = int(math.log(random.random()) / math.log(1 - beta))
index = index % (size - i)
posArray.append(auxArray[index])
auxArray.pop(index)
return posArray
def generateDummy(self, minReturn, baseSol):
ElapsedT = ElapsedTime()
start = ElapsedT.systemTime()
elapsed = 0.0
assetcopy = sorted(self.inputs.getAssets(), key=attrgetter('ri'), reverse=True)
for i in range(self.inputs.getNAssets()): # create a portfolio of "n" highest return assets
if i < self.inputs.getAssetsKMax():
baseSol.getPortfolio().append(assetcopy[i])
else:
baseSol.getDiscarded().append(assetcopy[i])
self.qpSolver.solve(baseSol, minReturn, self.inputs)
ARPO.discardEmptyAssets(baseSol)
if self.verifySolution(baseSol):
baseSol.simReturn(minReturn, self.inputs, self.test, False)
if baseSol.getReliability() >= self.test.getProbability(): return baseSol
found = False
nAssets = self.inputs.getNAssets()
aux = 0
while found != True and elapsed < self.test.getTimeInit():
baseSol = Solution(self.inputs.getNAssets())
indexarray = self.getPositionsArrayGD(self.test.getBeta1(), self.test.getBeta2(), nAssets)
j = random.randint(1, self.inputs.getAssetsKMax())
j = max(self.inputs.getAssetsKMin(), j)
for i in range(j):
aux = indexarray[i]
nextAsset = assetcopy[aux]
baseSol.getPortfolio().append(nextAsset)
for i in range(j, nAssets):
aux = indexarray[i]
nextAsset = assetcopy[aux]
baseSol.getDiscarded().append(nextAsset)
newReturn = 0
totalFrac = 0
for k in range(len(baseSol.getPortfolio())):
kAsset = baseSol.getPortfolio()[k]
baseSol.setCurrentLevel(kAsset.getId(), kAsset.getQmin())
totalFrac += kAsset.getQmin()
indexarray1 = self.getPositionsArrayGD(self.test.getBeta1(), self.test.getBeta2(),
len(baseSol.getPortfolio()))
for i in range(len(baseSol.getPortfolio())):
aux = indexarray1[i]
asset = baseSol.getPortfolio()[aux]
part = min(asset.getQmax(), 1.0 - totalFrac + baseSol.getCurrentLevel(asset.getId()))
totalFrac += part - baseSol.getCurrentLevel(asset.getId())
baseSol.setCurrentLevel(asset.getId(), part)
newReturn += part * asset.getRi()
baseSol.setCurrentReturn(newReturn)
baseSol.computeCurrentRisk(self.inputs)
ARPO.discardEmptyAssets(baseSol)
baseSol.simReturn(minReturn, self.inputs, self.test, False)
found = ((newReturn >= minReturn) and (baseSol.getReliability() >= (self.test.getProbability() - 0.01)))
elapsed = ElapsedTime.calcElapsed(start, ElapsedTime.systemTime())
if found:
return baseSol
else:
return None
@staticmethod
def discardEmptyAssets(sol): # remove empty assets in the portfolio
for iAsset in sol.getPortfolio():
if sol.getCurrentLevel(iAsset.getId()) == 0:
sol.getDiscarded().append(iAsset)
for iAsset in sol.getDiscarded():
if sol.getCurrentLevel(iAsset.getId()) == 0 and iAsset in sol.getPortfolio():
sol.getPortfolio().remove(iAsset)
def verifySolution(self, sol):
if len(sol.getPortfolio()) < self.inputs.getAssetsKMin(): return False
if len(sol.getPortfolio()) > self.inputs.getAssetsKMax(): return False
for inAsset in sol.getPortfolio():
epsilon = inAsset.getQmin()
delta = inAsset.getQmax()
level = sol.getCurrentLevel(inAsset.getId())
if level > 0.0 and level < epsilon: return False
if level > delta: return False
if sol.sumX() < 0.999 : return False
return True
def updateList(self, sol, stochSols):
inStore = False
numSol = self.test.getNumSol()
i = 0
asset = sol.getPortfolio()
asset.sort(key=attrgetter('id'))
while i < len(stochSols) and inStore == False:
if stochSols[i] == sol: inStore = True
i += 1
if inStore == False:
if len(stochSols) < numSol:
aux = Solution(self.inputs.getNAssets())
sol.copyInto(aux)
stochSols.append(aux)
elif sol.getExpectedRisk() < stochSols[numSol - 1].getExpectedRisk():
stochSols.pop(numSol - 1)
aux = Solution(self.inputs.getNAssets())
sol.copyInto(aux)
stochSols.append(aux)
stochSols.sort(key=attrgetter('expectedRisk'))
return stochSols
def shake(self, newSol, k):
nAssets = int(round(1.0 * self.test.getK(k) / 100 * len(newSol.getPortfolio())))
for i in range(nAssets):
pos = random.randint(0, len(newSol.getPortfolio()) - 1)
anAsset = newSol.getPortfolio()[pos]
newSol.getPortfolio().remove(anAsset)
newSol.getDiscarded().append(anAsset)
while len(newSol.getPortfolio()) < self.inputs.getAssetsKMax():
pos = random.randint(0, len(newSol.getDiscarded()) - 1)
anAsset = newSol.getDiscarded()[pos]
newSol.getDiscarded().remove(anAsset)
newSol.getPortfolio().append(anAsset)
return newSol
def swap(self, newSol):
auxSol = Solution(self.inputs.getNAssets())
newSol.copyInto(auxSol)
anAsset = auxSol.getMinAsset()
xAsset = auxSol.getCurrentLevel(anAsset.getId())
auxSol.getPortfolio().remove(anAsset)
auxSol.setCurrentLevel(anAsset.getId(), 0)
auxSol.getDiscarded().append(anAsset)
minOF = auxSol.getCurrentRisk()
ord1 = self.getPositionsArrayGD(self.test.getBeta1(), self.test.getBeta2(), len(auxSol.getDiscarded()))
auxSol.getDiscarded().sort(key=attrgetter('avCov'))
for i in range(len(auxSol.getDiscarded())):
auxSol1 = Solution(self.inputs.getNAssets())
auxSol.copyInto(auxSol1)
otherAsset = auxSol1.getDiscarded()[ord1[i]]
if otherAsset.getQmin() <= xAsset and otherAsset.getQmax() >= xAsset:
auxSol1.getPortfolio().append(otherAsset)
auxSol1.getDiscarded().remove(otherAsset)
auxSol1.setCurrentLevel(otherAsset.getId(), xAsset)
auxSol1.computeCurrentRisk(self.inputs)
if auxSol1.getCurrentRisk() < minOF:
return auxSol1
return newSol