與 RabbitMQ 整合

下載軟體套件

  1. ERLANG
  2. RabbitMQ

安裝 RabbitMQ

  1. 設定 ERLANG_HOME 環境變數
  2. 執行服務《sbin/rabbitmq-server.bat》
  3. 加載管理介面 《sbin/rabbitmq-plugins enable rabbitmq_management》
  4. 管理介面《http://localhost:15672》
  5. 設定訊息保存佇列《host2》

訊息接收程式

import sys
import pika


mqConnection = None
mqChannel = None


def OpenMQ(mqHost, mqName, mqDurable):
    global mqConnection, mqChannel

    mqConnection = pika.BlockingConnection(pika.ConnectionParameters(host=mqHost))
    mqChannel = mqConnection.channel()
    mqChannel.queue_declare(queue=mqName, durable=mqDurable)


def TaskMQ(ch, method, properties, body):
    TaskMsg(body)


def TaskMQ2(ch, method, properties, body):
    TaskMsg(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


def ListenMQ(mqName):
    global mqChannel
    mqChannel.basic_consume(TaskMQ, queue=mqName, no_ack=True)


def ListenMQ2(mqName):
    global mqChannel
    mqChannel.basic_qos(prefetch_count=1)
    mqChannel.basic_consume(TaskMQ2, queue=mqName)


def StartMQ():
    global mqChannel
    mqChannel.start_consuming()


def TaskMsg(bodyMsg):
    print "%s" % (bodyMsg)


def Main(mqHost, mqName, mqDurable):
    OpenMQ(mqHost, mqName, mqDurable)

    if mqDurable:
        ListenMQ2(mqName)
    else:
        ListenMQ(mqName)

    StartMQ()


if __name__ == "__main__":
    mqHost = 'localhost'
    mqName = 'hello'
    mqDurable = False

    if len(sys.argv) == 2:
        if sys.argv[1] == '1':
            mqDurable = True
            mqName = '%s2' % (mqName)

    if len(sys.argv) > 2:
        mqName = sys.argv[2]
        if sys.argv[2] == '1':
            mqDurable = True
            mqName = '%s2' % (mqName)

    if len(sys.argv) > 3:
        mqHost = sys.argv[3]

    Main(mqHost, mqName, mqDurable)

訊息發送程式

import sys
import datetime
import pika


mqConnection = None
mqChannel = None


def OpenMQ(mqHost, mqName, mqDurable):
    global mqConnection, mqChannel

    mqConnection = pika.BlockingConnection(pika.ConnectionParameters(host=mqHost))
    mqChannel = mqConnection.channel()
    mqChannel.queue_declare(queue=mqName, durable=mqDurable)


def SendMQ(mqName, mqMsg, mqDurable):
    global mqChannel

    if mqDurable:
        mqChannel.basic_publish(exchange='', routing_key=mqName, body=mqMsg, properties=pika.BasicProperties(delivery_mode=2, ))
    else:
        mqChannel.basic_publish(exchange='', routing_key=mqName, body=mqMsg)


def CloseMQ():
    global mqConnection

    mqConnection.close()


def Main(mqHost, mqName, mqDurable, mqMsgHead):
    OpenMQ(mqHost, mqName, mqDurable)

    for i in range(1, 10):
        mqMsg = '%s\t(%02d)' % (mqMsgHead, i)
        SendMQ(mqName, mqMsg, mqDurable)
        print (mqMsg)

    CloseMQ()


if __name__ == "__main__":
    mqHost = 'localhost'
    mqName = 'hello'
    mqDurable = False
    mqMsgHead = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')

    if len(sys.argv) > 1:
        mqMsgHead = sys.argv[1]

    if len(sys.argv) == 3:
        if sys.argv[2] == '1':
            mqDurable = True
            mqName = '%s2' % (mqName)

    if len(sys.argv) > 3:
        mqName = sys.argv[3]
        if sys.argv[2] == '1':
            mqDurable = True
            mqName = '%s2' % (mqName)

    if len(sys.argv) > 4:
        mqHost = sys.argv[4]

    Main(mqHost, mqName, mqDurable, mqMsgHead)
ċ
mqReceiver.py
(1k)
李智,
2015年3月8日 下午8:25
ċ
mqSend.py
(2k)
李智,
2015年3月8日 下午8:25