# rabbitmq队列详解 # 转载:http://www.cnblogs.com/alex3714/articles/5248247.html # 安装python rabbitMQ module # pip install pika # or # easy_install pika # or # 源码 # https://pypi.python.org/pypi/pika ################################################################################# # 实现最简单的队列通信 # send端 # !/usr/bin/env python # import pika # connection = pika.BlockingConnection(pika.ConnectionParameters( # 'localhost')) # channel = connection.channel() # # 声明queue # channel.queue_declare(queue='hello') # # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. # channel.basic_publish(exchange='', # routing_key='hello', # body='Hello World!') # print(" [x] Sent 'Hello World!'") # connection.close() ################################################################################# # receive端 # _*_coding:utf-8_*_ # __author__ = 'Alex Li' # import pika # connection = pika.BlockingConnection(pika.ConnectionParameters( # 'localhost')) # channel = connection.channel() # # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # # We could avoid that if we were sure that the queue already exists. For example if send.py program # # was run before. But we're not yet sure which program to run first. In such cases it's a good # # practice to repeat declaring the queue in both programs. # channel.queue_declare(queue='hello') # def callback(ch, method, properties, body): # print(" [x] Received %r" % body) # channel.basic_consume(callback, # queue='hello', # no_ack=True) # print(' [*] Waiting for messages. To exit press CTRL+C') # channel.start_consuming() ################################################################################# # 远程连接rabbitmq server的话,需要配置权限 # 首先在rabbitmq server上创建一个用户 # sudo rabbitmqctl add_user alex alex3714 # 同时还要配置权限,允许从外面访问 # sudo rabbitmqctl set_permissions -p / alex ".*" ".*" ".*" # 客户端连接的时候需要配置认证参数 # credentials = pika.PlainCredentials('alex', 'alex3714') # connection = pika.BlockingConnection(pika.ConnectionParameters( # '10.211.55.5', 5672, '/', credentials)) # channel = connection.channel() ################################################################################# # 消息持久化 # channel.queue_declare(queue='hello', durable=True) # channel.queue_declare(queue='task_queue', durable=True) # channel.basic_publish(exchange='', # routing_key="task_queue", # body=message, # properties=pika.BasicProperties( # delivery_mode = 2, # make message persistent # )) ################################################################################# # 消息公平分发 # 如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完, # 同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1, # 意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。 ################################################################################# # Publish\Subscribe(消息发布\订阅) # 之前的例子都基本都是1对1的消息发送和接收, # 即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了 # 表达式符号说明:#代表一个或多个字符,*代表任何字符 # 例:#.a会匹配a.a,aa.a,aaa.a等 # *.a会匹配a.a,b.a,c.a等 # 注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout # headers: 通过headers 来决定把消息发给哪些queue ################################################################################# # 有选择的接收消息(exchange type=direct) # RabbitMQ还支持根据关键字发送,即:队列绑定关键字, # 发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。 ################################################################################# # Remote procedure call (RPC) # fibonacci_rpc = FibonacciRpcClient() # result = fibonacci_rpc.call(4) # print("fib(4) is %r" % result) #################################################################################