程式庫:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
# -*- coding: utf-8 -*-import osimport sysimport mmapimport numpyimport structclass SharedMem: memPage = 1 pageSize = mmap.PAGESIZE memF = None memV = None def __init__(self, memPage, pageSize): self.memPage = memPage self.pageSize = pageSize def Server(self, memFile): OK = False try: memPages = self.memPage * self.pageSize self.memF = os.open(memFile, os.O_CREAT | os.O_TRUNC | os.O_RDWR) assert os.write(self.memF, '\x00' * memPages) == memPages self.memV = mmap.mmap(self.memF, memPages, mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE) OK = True except (RuntimeError, TypeError, NameError): self.memF = None self.memV = None return OK def Client(self, memFile): OK = False try: memPages = self.memPage * self.pageSize self.memF = os.open(memFile, os.O_RDONLY) self.memV = mmap.mmap(self.memF, memPages, mmap.MAP_SHARED, mmap.PROT_READ) OK = True except (RuntimeError, TypeError, NameError): self.memF = None self.memV = None return OK def Done(self): if self.memV is not None: self.memV.close() def Set(self, posPage, data): if self.memV is None: return False if posPage >= self.memPage: return False pos = posPage * self.pageSize self.memV[pos:pos + self.pageSize] = data self.memV.flush() return True def Get(self, posPage): if self.memV is None: return None if posPage >= self.memPage: return None pos = posPage * self.pageSize data = self.memV[pos:pos + self.pageSize] return data class Screen: def Clear(self): sys.stdout.write("\x1b7\x1b[2J") sys.stdout.flush() def GetXY(self, seq, memPage): row = 1 col = 1 if seq < memPage / 2: row = seq col = 1 else: row = seq % (memPage / 2) row += 1 col = 40 return (row, col) def Print(self, row, col, text, attr=30): sys.stdout.write("\33[%d;%df\33[%dm%s\33[0m" % (row, col, attr, text)) sys.stdout.flush()
資料發佈:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
# -*- coding: utf-8 -*-import sysfrom __init__ import SharedMem, Screen import numpyimport structimport timedef Clear(): sys.stdout.write("\x1b7\x1b[2J") sys.stdout.flush() def Print(x, y, text): sys.stdout.write("\x1b7\x1b[%d;%df%s\x1b8" % (x, y, text)) sys.stdout.flush() if __name__ == '__main__': shmFile = '/tmp/SharedMem.dat' if len(sys.argv) > 1: shmFile = sys.argv[1] memPage = 48 pageSize = 4 worker = SharedMem(memPage, pageSize) scrn = Screen() if worker.Server(shmFile): scrn.Clear() while True: posPage = numpy.random.randint(1, memPage) pos = posPage * pageSize val = numpy.random.randint(1, pos + 1) worker.Set(posPage, struct.pack('=I', val)) row, col = scrn.GetXY(posPage, memPage) scrn.Print(row, col, '%3d' % (posPage)) scrn.Print(row, col + 7, '%4d' % (val), 31) time.sleep(1) worker.Done()
資料訂閱:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
# -*- coding: utf-8 -*-import sysfrom __init__ import SharedMem, Screen import structimport timeif __name__ == '__main__': shmFile = '/tmp/SharedMem.dat' if len(sys.argv) > 1: shmFile = sys.argv[1] memPage = 48 pageSize = 4 worker = SharedMem(memPage, pageSize) scrn = Screen() if worker.Client(shmFile): scrn.Clear() posPage = 0 while True: data = worker.Get(posPage) val = struct.unpack('=I', data) if val[0] != 0: row, col = scrn.GetXY(posPage, memPage) scrn.Print(row, col, '%3d' % (posPage)) scrn.Print(row, col + 7, '%4d' % (val), 34) posPage += 1 if posPage == memPage: posPage = 0 time.sleep(1) worker.Done()