服务器端:rpc_server.py
import pika,timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n -2)def on_request(ch, method, props, body): n = int(body) print("[.] fib(%s)" % n) response = fib(n)#斐波那契的执行结果赋值给reponse #再把得到的消息发回给客户端 ch.basic_publish(exchange='', routing_key=props.reply_to, properties = pika.BasicProperties( correlation_id= \ props.correlation_id ), body =str(response)) ch.basic_ack(delivery_tag=method.delivery_tag)#确保消息被消费,代表任务完成#channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC request")channel.start_consuming()
客户端:rpc_client.py
import pika,sys,uuidimport timeclass FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue#获取queue名字 self.channel.basic_consume(self.on_response,#只要收到就调用on_response() no_ack=True, queue=self.callback_queue ) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id:#判断服务器端corr_id和本地corr_id相等,才往下走 self.response = body#response收到body的消息表示response不为空 def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue,#指定返回到那个queue correlation_id=self.corr_id, ), body=str(n))#传字符串,把30传进来 while self.response is None: #收到消息,就会触发on_response(),没消息就继续往下走循环 self.connection.process_data_events()#非阻塞版的start_consuming print("no msg...")#只要走到这,就相当于没消息 time.sleep(0.5) return int(self.response)fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)")response = fibonacci_rpc.call(8)print(" [.] Got %r" % response)