博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python-RabbitMQ-RPC(非阻塞版)
阅读量:6254 次
发布时间:2019-06-22

本文共 2692 字,大约阅读时间需要 8 分钟。

服务器端:rpc_server.py

 

import pika,timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):    if n == 0:        return 0    elif n == 1:        return 1    else:        return fib(n-1) + fib(n -2)def on_request(ch, method, props, body):    n = int(body)    print("[.] fib(%s)" % n)    response = fib(n)#斐波那契的执行结果赋值给reponse    #再把得到的消息发回给客户端    ch.basic_publish(exchange='',                     routing_key=props.reply_to,                     properties = pika.BasicProperties(                         correlation_id= \                            props.correlation_id                     ),                     body =str(response))    ch.basic_ack(delivery_tag=method.delivery_tag)#确保消息被消费,代表任务完成#channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request,                      queue='rpc_queue')print(" [x] Awaiting RPC request")channel.start_consuming()

 

客户端:rpc_client.py

import pika,sys,uuidimport timeclass FibonacciRpcClient(object):    def __init__(self):        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))        self.channel = self.connection.channel()        result = self.channel.queue_declare(exclusive=True)        self.callback_queue = result.method.queue#获取queue名字        self.channel.basic_consume(self.on_response,#只要收到就调用on_response()                                   no_ack=True,                                   queue=self.callback_queue                                   )    def on_response(self, ch, method, props, body):        if self.corr_id == props.correlation_id:#判断服务器端corr_id和本地corr_id相等,才往下走            self.response = body#response收到body的消息表示response不为空    def call(self, n):        self.response = None        self.corr_id = str(uuid.uuid4())        self.channel.basic_publish(exchange='',                                   routing_key='rpc_queue',                                   properties=pika.BasicProperties(                                       reply_to=self.callback_queue,#指定返回到那个queue                                       correlation_id=self.corr_id,                                   ),                                   body=str(n))#传字符串,把30传进来        while self.response is None:            #收到消息,就会触发on_response(),没消息就继续往下走循环            self.connection.process_data_events()#非阻塞版的start_consuming            print("no msg...")#只要走到这,就相当于没消息            time.sleep(0.5)        return int(self.response)fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)")response = fibonacci_rpc.call(8)print(" [.] Got %r" % response)

 

转载于:https://www.cnblogs.com/fuyuteng/p/9263764.html

你可能感兴趣的文章
java类可选,类层次结构中的Java可选接口
查看>>
php 二维数组分页效率,PHP二维数组分页排序分页_PHP数组分页
查看>>
php网站开发开题报告,基于PHP的网上租车租赁网站设计与实现开题报告
查看>>
用matlab怎样表示褶积,信号的时域表示以及卷积运算(MATLAB)
查看>>
mysql 命令 字符集,MySQL字符集的设置
查看>>
php登录半透明,WordPress透明OAuth 1.0使用PHP登录
查看>>
php开发信息发布平台思路,基于PHP的大学信息发布平台设计与实现.doc
查看>>
php使用163使用465端口吗,在CentOS 7系统里使用465端口发送邮件
查看>>
java关联vss 80020009,80020009: Invalid password[src=SourceSafe,guid=null]
查看>>
java复制文件到指定文件夹下,java:把一个文件夹中的所有文件复制到指定文件夹下...
查看>>
matlab足球赛排名问题程序,足球队排名问题及解决方法.doc
查看>>
ubuntu php5-imap,在Ubuntu 11上安装具有IMAP / Kerberos支持的PHP的问题
查看>>
php圣经 源码,基于PHP的圣经读者用剑模块和diatheke
查看>>
php中的$this-%3efetch,Zend DB fetchAll(): where子句數組帶有IN操作符
查看>>
李思琼php,nginx单机1w并发优化
查看>>
怎么手动设置oracle,手把手设置win7系统手动启动Oracle服务的设置方法
查看>>
oracle fk作用,oracle pk&fk
查看>>
oracle裂块是什么意思,Oracle索引块分裂split信息汇总
查看>>
php构造函数创建对象,7.10 构造函数来创建对象
查看>>
oracle解密后台包,oracle9i加密解密包用法
查看>>