jack пре 4 месеци
комит
e4e258cbbf
29 измењених фајлова са 912 додато и 0 уклоњено
  1. 62 0
      .gitignore
  2. 43 0
      rabbitmq/01-路由模式(Routing)/consumer_routing.py
  3. 26 0
      rabbitmq/01-路由模式(Routing)/producer_routing.py
  4. 5 0
      rabbitmq/01-路由模式(Routing)/usage.txt
  5. 35 0
      rabbitmq/02-主题模式(Topic)/consumer_topic.py
  6. 24 0
      rabbitmq/02-主题模式(Topic)/producer_topic.py
  7. 15 0
      rabbitmq/02-主题模式(Topic)/usage.txt
  8. 32 0
      rabbitmq/03-头部模式(Header)/consumer_headers.py
  9. 31 0
      rabbitmq/03-头部模式(Header)/producer_headers.py
  10. 7 0
      rabbitmq/03-头部模式(Header)/usage.txt
  11. 33 0
      rabbitmq/04-任务队列(Work Queue)/consumer.py
  12. 43 0
      rabbitmq/04-任务队列(Work Queue)/consumer_auto_ack.py
  13. 31 0
      rabbitmq/04-任务队列(Work Queue)/producer.py
  14. 28 0
      rabbitmq/04-任务队列(Work Queue)/producer_durable.py
  15. 68 0
      rabbitmq/05-RPC模式/rpc_client.py
  16. 50 0
      rabbitmq/05-RPC模式/rpc_server.py
  17. 29 0
      rabbitmq/06-发布订阅模式(fanout)/consumer_pubsub.py
  18. 14 0
      rabbitmq/06-发布订阅模式(fanout)/producer_pubsub.py
  19. 28 0
      rabbitmq/07-延迟队列(Delayed Message)/consumer_delay.py
  20. 31 0
      rabbitmq/07-延迟队列(Delayed Message)/producer_delay.py
  21. 19 0
      rabbitmq/07-延迟队列(Delayed Message)/usage.txt
  22. 29 0
      rabbitmq/08-优先级队列(Priority Queue)/batch_priority.py
  23. 26 0
      rabbitmq/08-优先级队列(Priority Queue)/consumer_priority.py
  24. 32 0
      rabbitmq/08-优先级队列(Priority Queue)/producer_priority.py
  25. 10 0
      rabbitmq/08-优先级队列(Priority Queue)/usage.txt
  26. 41 0
      rabbitmq/09-死信队列(DLX|DLQ)/consumer_dlq.py
  27. 20 0
      rabbitmq/09-死信队列(DLX|DLQ)/producer_dlq.py
  28. 0 0
      rabbitmq/09-死信队列(DLX|DLQ)/usage.txt
  29. 100 0
      rabbitmq/Readme.md

+ 62 - 0
.gitignore

@@ -0,0 +1,62 @@
+.DS_Store
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+*.egg-info/
+.installed.cfg
+*.egg
+.idea/*
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*,cover
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+
+rust/**/target

+ 43 - 0
rabbitmq/01-路由模式(Routing)/consumer_routing.py

@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+import pika
+import sys
+
+# 连接参数
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local',
+                              5672,
+                              '/',
+                              user_info)
+)
+channel = connection.channel()
+
+# 1. 声明同一个 direct 交换机
+channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
+
+# 2. 创建临时队列
+result = channel.queue_declare(queue='', exclusive=True)
+queue_name = result.method.queue
+
+# 3. 从命令行读取要绑定的 routing_key,可多个
+severities = sys.argv[1:]
+if not severities:
+    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
+    sys.exit(1)
+
+for severity in severities:
+    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
+
+print(' [*] Waiting for logs. To exit press CTRL+C')
+
+# 4. 回调
+def callback(ch, method, properties, body):
+    print(f" [x] {method.routing_key}:{body.decode()}")
+
+channel.basic_consume(
+    queue=queue_name,
+    on_message_callback=callback,
+    auto_ack=True
+)
+
+channel.start_consuming()

+ 26 - 0
rabbitmq/01-路由模式(Routing)/producer_routing.py

@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+import pika
+import sys
+
+# 连接参数
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(pika.ConnectionParameters(
+    'rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info))
+channel = connection.channel()
+
+# 1. 声明一个 direct 交换机
+channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
+
+# 2. 从命令行读取 routing_key(默认 info)
+severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
+message = ' '.join(sys.argv[2:]) or 'Hello Routing!'
+
+# 3. 发消息
+channel.basic_publish(
+    exchange='direct_logs',
+    routing_key=severity,          # 只有绑定相同 routing_key 的队列才能收到
+    body=message
+)
+print(f" [x] Sent {severity}:{message}")
+
+connection.close()

+ 5 - 0
rabbitmq/01-路由模式(Routing)/usage.txt

@@ -0,0 +1,5 @@
+python consumer_routing.py info warning
+python consumer_routing.py error
+
+python producer_routing.py error "Disk full"
+python producer_routing.py info "Server started"

+ 35 - 0
rabbitmq/02-主题模式(Topic)/consumer_topic.py

@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+import pika
+import sys
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+)
+channel = connection.channel()
+
+# 1. 声明同一个 topic 交换机
+channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
+
+# 2. 创建临时队列
+result = channel.queue_declare(queue='', exclusive=True)
+queue_name = result.method.queue
+
+# 3. 从命令行读取要监听的 pattern(可多个)
+binding_keys = sys.argv[1:]
+if not binding_keys:
+    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
+    sys.exit(1)
+
+for binding_key in binding_keys:
+    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
+    print(f" [*] Queue bound with pattern: {binding_key}")
+
+print(' [*] Waiting for logs. To exit press CTRL+C')
+
+# 4. 回调
+def callback(ch, method, properties, body):
+    print(f" [x] {method.routing_key}:{body.decode()}")
+
+channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
+channel.start_consuming()

+ 24 - 0
rabbitmq/02-主题模式(Topic)/producer_topic.py

@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+import pika
+import sys
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+)
+channel = connection.channel()
+
+# 1. 声明 topic 交换机
+channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
+
+# 2. 从命令行获取 routing_key 和消息
+routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
+message = ' '.join(sys.argv[2:]) or 'Hello Topic!'
+
+channel.basic_publish(
+    exchange='topic_logs',
+    routing_key=routing_key,
+    body=message
+)
+print(f" [x] Sent {routing_key}:{message}")
+connection.close()

+ 15 - 0
rabbitmq/02-主题模式(Topic)/usage.txt

@@ -0,0 +1,15 @@
+
+# 先启动消费者, 再使用生产者发送消息, 如果先先发送消息, 消费者还没有启动, 会丢失消息
+
+# 只收 order 模块的所有级别
+python consumer_topic.py "order.*"
+
+# 收所有 error 级别,不分模块
+python consumer_topic.py "*.error"
+
+# 收所有消息
+python consumer_topic.py "#"
+
+# 收所有消息,并打印消息内容
+python producer_topic.py order.created "Order 12345 created"
+python producer_topic.py payment.error "Payment failed"

+ 32 - 0
rabbitmq/03-头部模式(Header)/consumer_headers.py

@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+import pika
+import sys
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+)
+channel = connection.channel()
+
+# 1. 声明同一个交换机
+channel.exchange_declare(exchange='header_logs', exchange_type='headers')
+
+# 2. 创建临时队列
+result = channel.queue_declare(queue='', exclusive=True)
+queue_name = result.method.queue
+
+# 3. 绑定:x-match 可以是 'all'(全匹配)或 'any'(任一匹配)
+bind_args = {
+    'x-match': 'all',          # 所有键值都匹配才收
+    'x-version': '1',
+    'x-env': 'prod'
+}
+channel.queue_bind(exchange='header_logs', queue=queue_name, arguments=bind_args)
+print(' [*] Waiting for messages matching', bind_args)
+
+# 4. 回调
+def callback(ch, method, properties, body):
+    print(' [x] %r' % body.decode())
+
+channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
+channel.start_consuming()

+ 31 - 0
rabbitmq/03-头部模式(Header)/producer_headers.py

@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+import pika
+import sys
+import json
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+)
+channel = connection.channel()
+
+# 1. 声明 headers 交换机
+channel.exchange_declare(exchange='header_logs', exchange_type='headers')
+
+# 2. 构造 headers
+headers = {
+    'x-version': '1',
+    'x-env': 'prod',
+    'x-critical': 'true'
+}
+
+message = ' '.join(sys.argv[1:]) or 'Hello Headers!'
+
+channel.basic_publish(
+    exchange='header_logs',
+    routing_key='',                      # headers 交换机忽略 routing_key
+    body=message,
+    properties=pika.BasicProperties(headers=headers)
+)
+print(f" [x] Sent {headers} -> {message}")
+connection.close()

+ 7 - 0
rabbitmq/03-头部模式(Header)/usage.txt

@@ -0,0 +1,7 @@
+# 交换机类型 = headers,路由规则由消息头里的键值对决定,完全忽略 routing_key。
+
+# 启动消费者(全匹配):
+python consumer_headers.py
+
+# 启动生产者发消息:
+python producer_headers.py "Deploy v1.0.0"

+ 33 - 0
rabbitmq/04-任务队列(Work Queue)/consumer.py

@@ -0,0 +1,33 @@
+# coding=utf-8
+### 消费者
+
+import pika
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info))
+channel = connection.channel()
+
+# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是
+# 这样生产者和消费者就没有必要的先后启动顺序了
+channel.queue_declare(queue='hello')
+
+
+# 回调函数
+def callback(ch, method, properties, body):
+    print('消费者收到:{}'.format(body))
+
+# channel: 包含channel的一切属性和方法
+# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
+# properties: basic_publish 通过 properties 传入的参数
+# body: basic_publish发送的消息
+
+
+channel.basic_consume(queue='hello',  # 接收指定queue的消息
+                      auto_ack=True,  # 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
+                      on_message_callback=callback  # 设置收到消息的回调函数
+                      )
+
+print('Waiting for messages. To exit press CTRL+C')
+
+# 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
+channel.start_consuming()

+ 43 - 0
rabbitmq/04-任务队列(Work Queue)/consumer_auto_ack.py

@@ -0,0 +1,43 @@
+# coding=utf-8
+### 消费者
+# 如果在消费者获取到队列里的消息后,在回调函数的处理过程中,
+# 消费者突然出错或程序崩溃等异常,那么就会造成这条消息并未被实际正常的处理掉。
+# 为了解决这个问题,我们只需在消费者basic_consume(auto_ack=False),
+# 并在回调函数中设置手动应答即可ch.basic_ack(delivery_tag=method.delivery_tag)
+
+import pika
+import time
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info))
+channel = connection.channel()
+
+# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是
+# 这样生产者和消费者就没有必要的先后启动顺序了
+channel.queue_declare(queue='queue')
+
+
+# 回调函数
+def callback(ch, method, properties, body):
+    time.sleep(5)
+    ch.basic_ack(delivery_tag=method.delivery_tag)
+    print('消费者收到:{}'.format(body.decode('utf-8')))
+
+
+# channel: 包含channel的一切属性和方法
+# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
+# properties: basic_publish 通过 properties 传入的参数
+# body: basic_publish发送的消息
+
+
+channel.basic_consume(queue='queue',  # 接收指定queue的消息
+                      auto_ack=False,  # 指定为False,表示取消自动应答,交由回调函数手动应答
+                      on_message_callback=callback  # 设置收到消息的回调函数
+                      )
+
+# 应答的本质是告诉消息队列可以将这条消息销毁了
+
+print('Waiting for messages. To exit press CTRL+C')
+
+# 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
+channel.start_consuming()

+ 31 - 0
rabbitmq/04-任务队列(Work Queue)/producer.py

@@ -0,0 +1,31 @@
+# coding=utf-8
+### 生产者
+
+import pika
+import time
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info))#连接服务器上的RabbitMQ服务
+
+# 创建一个channel
+channel = connection.channel()
+
+# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
+channel.queue_declare(queue='hello')
+
+# # 持久化队列
+# # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
+# channel.queue_declare(queue='durable_queue',durable=True)
+
+json_str = '{"name": "zhangsan", "age": %d}'
+
+for i in range(0, 10):
+    channel.basic_publish(exchange='test',#当前是一个简单模式,所以这里设置为空字符串就可以了
+                          routing_key='',# 指定消息要发送到哪个queue
+                          body=json_str % i # 指定要发送的消息
+                          )
+    time.sleep(1)
+    
+# 关闭连接
+# connection.close()
+

+ 28 - 0
rabbitmq/04-任务队列(Work Queue)/producer_durable.py

@@ -0,0 +1,28 @@
+# coding=utf-8
+### 生产者
+
+import pika
+import time
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info))#连接服务器上的RabbitMQ服务
+
+# 创建一个channel
+channel = connection.channel()
+
+# 持久化队列
+# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
+channel.queue_declare(queue='durable_queue',durable=True)
+
+json_str = '{"name": "zhangsan", "age": %d}'
+
+for i in range(0, 10):
+    channel.basic_publish(exchange='test',#当前是一个简单模式,所以这里设置为空字符串就可以了
+                          routing_key='',# 指定消息要发送到哪个queue
+                          body=json_str % i # 指定要发送的消息
+                          )
+    time.sleep(1)
+    
+# 关闭连接
+# connection.close()
+

+ 68 - 0
rabbitmq/05-RPC模式/rpc_client.py

@@ -0,0 +1,68 @@
+import pika
+import uuid
+import json
+
+class RpcClient:
+    def __init__(self):
+        user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+        self.connection = pika.BlockingConnection(
+            pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+        )
+        self.channel = self.connection.channel()
+
+        # 1. 创建仅供自己使用的回调队列
+        result = self.channel.queue_declare(queue='', exclusive=True)
+        self.callback_queue = result.method.queue
+
+        # 2. 监听回调队列
+        self.channel.basic_consume(
+            queue=self.callback_queue,
+            on_message_callback=self.on_response,
+            auto_ack=True
+        )
+
+        self.response = None
+        self.corr_id = None
+
+    def on_response(self, ch, method, props, body):
+        # 3. 只接收自己的响应
+        if props.correlation_id == self.corr_id:
+            self.response = json.loads(body)['result']
+
+    def call(self, func, a, b, timeout=5):
+        self.response = None
+        self.corr_id = str(uuid.uuid4())
+
+        # 4. 发请求
+        self.channel.basic_publish(
+            exchange='',
+            routing_key='rpc_queue',
+            properties=pika.BasicProperties(
+                reply_to=self.callback_queue,        # 服务端把结果发回这里
+                correlation_id=self.corr_id          # 请求 id
+            ),
+            body=json.dumps({'func': func, 'params': {'a': a, 'b': b}})
+        )
+
+        # 5. 阻塞直到拿到结果(或超时)
+        start = time.time()
+        while self.response is None and (time.time() - start) < timeout:
+            self.connection.process_data_events()
+        if self.response is None:
+            raise TimeoutError('RPC 调用超时')
+        return self.response
+
+# ----------------- demo -----------------
+if __name__ == '__main__':
+    import time
+    rpc = RpcClient()
+    try:
+        print('调用 add(10, 5)...')
+        result = rpc.call('add', 10, 5)
+        print('服务端返回:', result)
+
+        print('调用 sub(20, 8)...')
+        result = rpc.call('sub', 20, 8)
+        print('服务端返回:', result)
+    except Exception as e:
+        print('错误:', e)

+ 50 - 0
rabbitmq/05-RPC模式/rpc_server.py

@@ -0,0 +1,50 @@
+import pika
+import json
+
+# 连接信息
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+)
+channel = connection.channel()
+
+# 1. 声明固定队列
+channel.queue_declare(queue='rpc_queue')
+
+# 2. 业务函数
+def add(a, b):
+    return a + b
+
+def sub(a, b):
+    return a - b
+
+func_map = {'add': add, 'sub': sub}
+
+def on_request(ch, method, props, body):
+    # 3. 解析请求
+    req = json.loads(body)
+    func_name = req['func']
+    params = req['params']
+    a, b = params['a'], params['b']
+
+    # 4. 本地计算
+    result = func_map[func_name](a, b)
+    print(f'收到 {func_name}({a},{b}) -> {result}')
+
+    # 5. 回结果
+    ch.basic_publish(
+        exchange='',
+        routing_key=props.reply_to,
+        properties=pika.BasicProperties(
+            correlation_id=props.correlation_id  # 原样返回
+        ),
+        body=json.dumps({'result': result})
+    )
+    ch.basic_ack(delivery_tag=method.delivery_tag)
+
+# 6. 公平分发
+channel.basic_qos(prefetch_count=1)
+channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
+
+print('服务端已启动,等待 RPC 请求...')
+channel.start_consuming()

+ 29 - 0
rabbitmq/06-发布订阅模式(fanout)/consumer_pubsub.py

@@ -0,0 +1,29 @@
+#消费者
+import pika
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info))
+channel = connection.channel()
+
+channel.exchange_declare(exchange='交换机', exchange_type='fanout')
+
+# 使用RabbitMQ给自己生成一个专有的queue
+result = channel.queue_declare(queue='333')
+# result = channel.queue_declare(queue='', exclusive=True)
+queue_name = result.method.queue
+# 这里如果设置exclusive=True参数,那么该队列就是一个只有队列,在消费者结束后,该专有队列也会自动清除,如果queue=''没有设置名字的话,那么就会自动生成一个
+# 不会重复的队列名
+
+# 将queue绑定到指定交换机
+channel.queue_bind(exchange='交换机', queue=queue_name)
+
+print(' [*] Waiting for  message.')
+
+
+def callback(ch, method, properties, body):
+    print("消费者收到:{}".format(body.decode('utf-8')))
+
+
+channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
+
+channel.start_consuming()

+ 14 - 0
rabbitmq/06-发布订阅模式(fanout)/producer_pubsub.py

@@ -0,0 +1,14 @@
+#生产者
+import pika
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info))
+channel = connection.channel()
+
+# 创建一个指定名称的交换机,并指定类型为fanout,用于将接收到的消息广播到所有queue中
+channel.exchange_declare(exchange='交换机', exchange_type='fanout')
+
+
+# 将消息发送给指定的交换机,在fanout类型中,routing_key=''表示不用发送到指定queue中,
+# 而是将发送到绑定到此交换机的所有queue
+channel.basic_publish(exchange='交换机', routing_key='', body='这是一条测试消息')

+ 28 - 0
rabbitmq/07-延迟队列(Delayed Message)/consumer_delay.py

@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+import pika
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+)
+channel = connection.channel()
+
+# 1. 声明同样的延迟交换机
+channel.exchange_declare(
+    exchange='delayed_exchange',
+    exchange_type='x-delayed-message',
+    arguments={'x-delayed-type': 'direct'}
+)
+
+# 2. 创建队列并绑定
+channel.queue_declare(queue='delayed_queue', durable=True)
+channel.queue_bind(exchange='delayed_exchange', queue='delayed_queue', routing_key='delayed')
+
+print(' [*] Waiting for delayed messages...')
+
+# 3. 回调
+def callback(ch, method, properties, body):
+    print(' [x] Received after delay:', body.decode())
+
+channel.basic_consume(queue='delayed_queue', on_message_callback=callback, auto_ack=True)
+channel.start_consuming()

+ 31 - 0
rabbitmq/07-延迟队列(Delayed Message)/producer_delay.py

@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+import pika
+import sys
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+)
+channel = connection.channel()
+
+# 1. 声明延迟交换机(插件必须启用)
+channel.exchange_declare(
+    exchange='delayed_exchange',
+    exchange_type='x-delayed-message',
+    arguments={'x-delayed-type': 'direct'}  # 交换机内部用直连
+)
+
+# 2. 从命令行读取延迟时间(毫秒)和消息
+delay_ms = int(sys.argv[1]) if len(sys.argv) > 1 else 5000
+message    = ' '.join(sys.argv[2:]) or f'Hello after {delay_ms} ms'
+
+channel.basic_publish(
+    exchange='delayed_exchange',
+    routing_key='delayed',
+    body=message,
+    properties=pika.BasicProperties(
+        headers={'x-delay': delay_ms}  # 关键:延迟投递时间
+    )
+)
+print(f" [x] Sent message to be delayed {delay_ms} ms: {message}")
+connection.close()

+ 19 - 0
rabbitmq/07-延迟队列(Delayed Message)/usage.txt

@@ -0,0 +1,19 @@
+# 启动服务
+# rabbitmq 需要启用检查命令
+# linux
+rabbitmq-plugins enable rabbitmq_delayed_message_exchange
+# docker
+docker exec -it <rabbitmq-container> rabbitmq-plugins enable rabbitmq_delayed_message_exchange
+# k8s
+kubectl exec -n rabbitmq <pod-name> -- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
+
+# ps. 我用的k8s, 装插件太麻烦, 不整了
+
+# 验证插件生效
+rabbitmq-plugins list | grep delayed
+
+# 启动消费者(无延迟):
+python consumer_delay.py
+
+# 发送一条 10 秒后投递的消息:
+python producer_delay.py 10000 "Order pay timeout"

+ 29 - 0
rabbitmq/08-优先级队列(Priority Queue)/batch_priority.py

@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+import pika
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+conn = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+)
+ch = conn.channel()
+ch.queue_declare(queue='priority_queue', durable=True, arguments={'x-max-priority': 10})
+
+msgs = [
+    (1, 'low-1'),
+    (9, 'urgent-9'),
+    (5, 'medium-5'),
+    (9, 'urgent-9-2'),
+    (1, 'low-2'),
+    (5, 'medium-5-2'),
+]
+
+for prio, body in msgs:
+    ch.basic_publish(
+        exchange='',
+        routing_key='priority_queue',
+        body=body,
+        properties=pika.BasicProperties(priority=prio, delivery_mode=2)
+    )
+    print(f' [>] Sent priority={prio}: {body}')
+
+conn.close()

+ 26 - 0
rabbitmq/08-优先级队列(Priority Queue)/consumer_priority.py

@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+import pika
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+)
+channel = connection.channel()
+
+# 1. 声明同名队列(幂等)
+channel.queue_declare(
+    queue='priority_queue',
+    durable=True,
+    arguments={'x-max-priority': 10}
+)
+
+print(' [*] Waiting for messages. Higher priority comes first.')
+
+# 2. 回调
+def callback(ch, method, properties, body):
+    print(f" [x] Received priority={properties.priority}: {body.decode()}")
+    ch.basic_ack(delivery_tag=method.delivery_tag)
+
+channel.basic_qos(prefetch_count=1)
+channel.basic_consume(queue='priority_queue', on_message_callback=callback)
+channel.start_consuming()

+ 32 - 0
rabbitmq/08-优先级队列(Priority Queue)/producer_priority.py

@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+import pika
+import sys
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+)
+channel = connection.channel()
+
+# 1. 声明优先级队列,最多支持 10 个优先级
+channel.queue_declare(
+    queue='priority_queue',
+    durable=True,
+    arguments={'x-max-priority': 10}
+)
+
+# 2. 从命令行读取优先级和消息
+priority = int(sys.argv[1]) if len(sys.argv) > 1 else 0
+message  = ' '.join(sys.argv[2:]) or f'Message with priority {priority}'
+
+channel.basic_publish(
+    exchange='',
+    routing_key='priority_queue',
+    body=message,
+    properties=pika.BasicProperties(
+        priority=priority,   # 关键
+        delivery_mode=2      # 持久化
+    )
+)
+print(f" [x] Sent priority={priority}: {message}")
+connection.close()

+ 10 - 0
rabbitmq/08-优先级队列(Priority Queue)/usage.txt

@@ -0,0 +1,10 @@
+# 先启动消费者:
+python consumer_priority.py
+
+# 再启动生产者,按不同优先级丢消息: (在测试中, 这样发送是不行的, 因为消息没有积压, 数据量不够, 但是是正常现象, 在正常生产环境中, 这个是对的)
+python producer_priority.py 1 "low"
+python producer_priority.py 9 "urgent"
+python producer_priority.py 5 "medium"
+
+# 为了测试, 使用批量发送
+python batch_priority.py

+ 41 - 0
rabbitmq/09-死信队列(DLX|DLQ)/consumer_dlq.py

@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+import pika
+
+USER = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+PARAMS = pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', USER)
+
+conn = pika.BlockingConnection(PARAMS)
+ch = conn.channel()
+
+# 1. 声明死信交换机 & 死信队列
+dlx_name = 'dlx_exchange'
+dlq_name = 'dlq'
+ch.exchange_declare(exchange=dlx_name, exchange_type='direct')
+ch.queue_declare(queue=dlq_name, durable=True)
+ch.queue_bind(exchange=dlx_name, queue=dlq_name, routing_key='dead')
+
+# 2. 声明业务队列,并绑定 DLX
+biz_queue = 'business_queue'
+biz_args = {
+    'x-dead-letter-exchange': dlx_name,      # DLX 名称
+    'x-dead-letter-routing-key': 'dead',     # 进入 DLX 时使用的 routing_key
+    'x-message-ttl': 10000,                  # 消息 10 秒过期(演示 TTL 死信)
+    'x-max-length': 3                        # 队列最大长度 3(演示溢出死信)
+}
+ch.queue_declare(queue=biz_queue, durable=True, arguments=biz_args)
+
+print(' [*] Waiting business messages, will nack odd numbers. Ctrl+C to quit.')
+
+def callback(ch, method, props, body):
+    msg = body.decode()
+    print(f' [>] Received: {msg}')
+    if int(msg) % 2:        # 奇数 -> 拒绝并死信
+        ch.basic_reject(method.delivery_tag, requeue=False)
+        print(f' [!] Rejected {msg} -> DLQ')
+    else:
+        ch.basic_ack(method.delivery_tag)
+        print(f' [√] Processed {msg}')
+
+ch.basic_qos(prefetch_count=1)
+ch.basic_consume(queue=biz_queue, on_message_callback=callback)
+ch.start_consuming()

+ 20 - 0
rabbitmq/09-死信队列(DLX|DLQ)/producer_dlq.py

@@ -0,0 +1,20 @@
+# -*- coding: utf-8 -*-
+import pika
+
+USER = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+PARAMS = pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', USER)
+
+conn = pika.BlockingConnection(PARAMS)
+ch = conn.channel()
+
+# 队列已在 consumer 端声明,生产者只需要发消息
+biz_queue = 'business_queue'
+for i in range(1, 8):
+    ch.basic_publish(
+        exchange='',
+        routing_key=biz_queue,
+        body=str(i),
+        properties=pika.BasicProperties(delivery_mode=2)  # 持久化
+    )
+    print(f' [x] Sent {i}')
+conn.close()

+ 0 - 0
rabbitmq/09-死信队列(DLX|DLQ)/usage.txt


+ 100 - 0
rabbitmq/Readme.md

@@ -0,0 +1,100 @@
+| 模式/特性      | 交换机类型             | 关键词                    | 典型场景  |
+| ---------- | ----------------- | ---------------------- | ----- |
+| Simple     | 默认直连              | 单队列                    | 点对点   |
+| Work Queue | 默认直连              | prefetch               | 并行任务  |
+| Pub/Sub    | fanout            | 广播                     | 群发通知  |
+| Routing    | direct            | routing\_key 精确        | 日志分级  |
+| Topic      | topic             | 通配符                    | 多维度路由 |
+| Header     | headers           | 键值对匹配                  | 元数据路由 |
+| RPC        | 默认直连              | reply\_to+corr\_id     | 远程调用  |
+| Delay      | x-delayed-message | x-delay                | 定时任务  |
+| Priority   | 任意                | priority               | 插队处理  |
+| DLX        | 任意                | x-dead-letter-exchange | 失败转移  |
+| Lazy       | 任意                | x-queue-mode=lazy      | 海量堆积  |
+| Quorum     | 任意                | x-queue-type=quorum    | 高可用   |
+| Stream     | stream            | offset                 | 日志流   |
+
+
+──────────────────
+1️⃣ Routing(路由)模式
+交换机类型:direct(直连)
+只把消息送到 routing_key 精确匹配 的队列。
+场景:按业务类型分 queue,比如日志分级(info、warn、error)。
+最小 demo 指路:
+producer:channel.basic_publish(exchange='direct_logs', routing_key='error', body=…)
+consumer:channel.queue_bind(queue=xxx, exchange='direct_logs', routing_key='error')
+──────────────────
+2️⃣ Topic(主题)模式
+交换机类型:topic
+routing_key 用 “点分单词” 通配符匹配(* 单层,# 多层)。
+场景:日志按模块+级别(order.info、order.error、payment.*)或 IM 离线消息多租户。
+最小 demo 指路:
+producer:routing_key = "order.created.us"
+consumer:bind routing_key = "order.*.us" or "order.#"
+──────────────────
+3️⃣ Header(头部)模式
+交换机类型:headers
+路由规则由消息头里的键值对决定,完全忽略 routing_key。
+场景:需要根据多个元数据字段(如 x-version:1,x-env:prod)灵活路由,且不想把信息放进 routing_key。
+最小 demo 指路:
+producer:properties=pika.BasicProperties(headers={'x-version':'1','x-env':'prod'})
+consumer:channel.queue_bind(queue=xxx, exchange='header_logs', arguments={'x-match':'all','x-version':'1','x-env':'prod'})
+──────────────────
+4️⃣ Work Queue(竞争消费 / 任务队列)
+本质:简单队列 + 多消费者 + basic_qos=1
+把耗时任务均匀分发给多个 worker,一条消息只被一个消费者拿到。
+场景:图片压缩、批量邮件、爬虫任务。
+你已经学过,就是简单模式里加 prefetch_count 那段。
+──────────────────
+5️⃣ RPC 模式
+交换机类型:默认直连("")
+客户端像调本地函数一样,把“函数名 + 参数”塞进队列;服务端算完后把结果塞回客户端的私有回调队列,用 correlation_id 保证“一问一答”不串线。
+关键要素
+队列:客户端创建独占临时队列当“回执邮箱”;服务端监听固定队列 rpc_queue。
+消息属性:reply_to(回执邮箱地址)、correlation_id(请求 ID)。
+阻塞/轮询:客户端用 process_data_events() 或 start_consuming() 等结果。
+典型场景
+微服务之间“同步”调用(下单服务 → 库存服务扣库存并返回剩余数量)。
+异构语言互通(Python 调 Java 的算法库)。
+──────────────────
+6️⃣ 发布订阅 / Fanout 模式
+交换机类型:fanout
+消息来了就广播给所有绑定到这个交换机的队列,每条消息可被所有消费者各自收到一次。
+关键要素
+队列:通常由消费者在绑定时自动生成(exclusive=True 或 queue=""),生命周期跟随连接。
+绑定:任何队列只要 queue_bind(exchange='logs') 就能收到广播。
+消息丢失:交换机没有队列绑定时,消息直接丢弃。
+典型场景
+群发通知 / 公告(所有在线用户收到同一推送)。
+日志汇聚:一个交换器 logs 同时发给“文件落盘队列”“监控大屏队列”“实时告警队列”。
+──────────────────
+7️⃣ 延迟队列(Delayed Message)
+RabbitMQ 官方插件:rabbitmq_delayed_message_exchange
+让消息 延迟 N 秒 后才投递。
+场景:订单 15 分钟未支付自动关单、定时提醒。
+使用:声明交换机 type = 'x-delayed-message',发消息时 header 加 x-delay=15000。
+──────────────────
+8️⃣ 优先级队列(Priority Queue)
+官方插件已内置。
+队列里高 priority 的消息先被消费。
+场景:VIP 用户任务插队。
+使用:queue_declare(arguments={'x-max-priority':10}),publish 时加 priority=5。
+──────────────────
+9️⃣ 死信队列(DLX / DLQ)
+消息“死”了(被拒绝、TTL 到期、队列满)就自动转到另一个交换机 → 死信队列。
+场景:失败任务人工审计、防丢消息。
+使用:queue_declare(arguments={'x-dead-letter-exchange':'dlx'}),再建 dlx 绑定的队列。
+──────────────────
+🔟 惰性队列(Lazy Queue)
+消息优先落磁盘,百万级堆积 场景下内存更稳。
+使用:queue_declare(arguments={'x-queue-mode':'lazy'})
+──────────────────
+1️⃣1️⃣ 仲裁队列(Quorum Queue)
+RabbitMQ 3.8+ 新特性,替代经典镜像队列。
+Raft 协议复制,高可用 + 强一致,牺牲少量吞吐。
+使用:queue_declare(queue='q1', durable=True, arguments={'x-queue-type':'quorum'})
+──────────────────
+1️⃣2️⃣ Stream(流式日志)
+RabbitMQ 3.11+ 实验特性。
+Kafka-like 的 append-only log,支持大量客户端、回溯消费。
+使用:stream 声明 + offset tracking。