jack 4 månader sedan
förälder
incheckning
88ba225b3a

+ 13 - 0
rabbitmq/09-死信队列(DLX|DLQ)/usage.txt

@@ -0,0 +1,13 @@
+# 启动 消费者(先跑,它会建好队列和 DLX/DLQ)
+python consumer_dlq.py
+
+# 启动 生产者 发 1~7 条消息
+python producer_dlq.py
+
+
+# 奇数(1,3,5,7)会被拒绝 → 进入 dlq
+# 偶数(2,4,6)被正常 ACK
+# 若队列长度 >3,多余消息也会进入 dlq
+# TTL 10 秒到期未消费的消息同样进入 dlq
+
+# 用管理界面或再跑一个 独立消费者 监听 dlq,即可看到全部死信。

+ 26 - 0
rabbitmq/10-惰性队列(Lazy Queue)/consumer_lazy.py

@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+import pika
+import time
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+params = pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+conn = pika.BlockingConnection(params)
+ch = conn.channel()
+
+# 1. 声明惰性队列
+ch.queue_declare(
+    queue='lazy_queue',
+    durable=True,
+    arguments={'x-queue-mode': 'lazy'}  # 这就是全部魔法
+)
+
+print(' [*] Waiting messages on lazy_queue (Ctrl+C to quit)')
+
+def callback(ch, method, props, body):
+    print(' [x] Received:', body.decode())
+    time.sleep(1)        # 模拟慢速消费
+    ch.basic_ack(method.delivery_tag)
+
+ch.basic_qos(prefetch_count=10)
+ch.basic_consume(queue='lazy_queue', on_message_callback=callback)
+ch.start_consuming()

+ 21 - 0
rabbitmq/10-惰性队列(Lazy Queue)/producer_lazy.py

@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+import pika
+import sys
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+params = pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+conn = pika.BlockingConnection(params)
+ch = conn.channel()
+
+# 队列已在 consumer 端声明,生产者无需再声明
+count = int(sys.argv[1]) if len(sys.argv) > 1 else 100_000
+for i in range(count):
+    ch.basic_publish(
+        exchange='',
+        routing_key='lazy_queue',
+        body=f'Msg-{i}',
+        properties=pika.BasicProperties(delivery_mode=2)
+    )
+
+print(f' [x] Sent {count} messages into lazy_queue')
+conn.close()

+ 10 - 0
rabbitmq/10-惰性队列(Lazy Queue)/usage.txt

@@ -0,0 +1,10 @@
+# 启动消费者:
+python consumer_lazy.py
+
+# 启动生产者 发送十万条数据:
+python producer_lazy.py 100000
+
+# 用 RabbitMQ Management UI 看:
+# lazy_queue 消息数很快飙到 100k
+# Memory 占用增长非常缓慢(消息主要落盘)
+# 消费端重启后仍能继续消费,不会丢消息

+ 24 - 0
rabbitmq/11-仲裁队列(Quorum Queue)/consumer_quorum.py

@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+import pika
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+params = pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+conn = pika.BlockingConnection(params)
+ch = conn.channel()
+
+# 1. 声明仲裁队列
+ch.queue_declare(
+    queue='quorum_queue',
+    durable=True,
+    arguments={'x-queue-type': 'quorum'}  # 关键
+)
+
+print(' [*] Waiting messages on quorum_queue (Ctrl+C to quit)')
+
+def callback(ch, method, props, body):
+    print(' [x] Received:', body.decode())
+    ch.basic_ack(method.delivery_tag)
+
+ch.basic_qos(prefetch_count=10)
+ch.basic_consume(queue='quorum_queue', on_message_callback=callback)
+ch.start_consuming()

+ 21 - 0
rabbitmq/11-仲裁队列(Quorum Queue)/producer_quorum.py

@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+import pika
+import sys
+
+user_info = pika.PlainCredentials('user', 'Un2yzriWm7veSDoh')
+params = pika.ConnectionParameters('rabbitmq.rabbitmq.svc.cluster.local', 5672, '/', user_info)
+conn = pika.BlockingConnection(params)
+ch = conn.channel()
+
+# 队列已在 consumer 端声明
+count = int(sys.argv[1]) if len(sys.argv) > 1 else 1000
+for i in range(count):
+    ch.basic_publish(
+        exchange='',
+        routing_key='quorum_queue',
+        body=f'QMsg-{i}',
+        properties=pika.BasicProperties(delivery_mode=2)
+    )
+
+print(f' [x] Sent {count} messages into quorum_queue')
+conn.close()

+ 11 - 0
rabbitmq/11-仲裁队列(Quorum Queue)/usage.txt

@@ -0,0 +1,11 @@
+# 启动消费者:
+python consumer_quorum.py
+
+# 启动生产者 发送 2000 条消息
+python producer_quorum.py 2000
+
+# 在管理界面查看:
+# Queue Type 一栏显示 quorum
+# 即使某个节点挂掉,消息依旧不丢,队列自动重新选主,继续可用。
+
+# 在队列声明里加 arguments={'x-queue-type':'quorum'},就得到高可用 + 强一致的 仲裁队列,无需额外镜像配置。

+ 29 - 0
rabbitmq/12-流式日志(Stream)/stream_consumer.py

@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+import asyncio
+from rstream import Consumer, OffsetType
+
+STREAM = 'demo-stream'
+USER   = 'user'
+PWD    = 'Un2yzriWm7veSDoh'
+HOST   = 'rabbitmq.rabbitmq.svc.cluster.local'
+
+async def on_message(msg):
+    print(" [x]", msg.data.decode())
+
+async def main():
+    consumer = Consumer(
+        host=HOST,
+        port=5552,
+        username=USER,
+        password=PWD
+    )
+    async with consumer:
+        await consumer.subscribe(
+            STREAM,
+            callback=on_message,
+            offset_type=OffsetType.FIRST
+        )
+        await consumer.run()
+
+if __name__ == '__main__':
+    asyncio.run(main())

+ 19 - 0
rabbitmq/12-流式日志(Stream)/stream_producer.py

@@ -0,0 +1,19 @@
+# -*- coding: utf-8 -*-
+import asyncio
+from rstream import Producer
+
+STREAM = 'demo-stream'
+USER   = 'user'
+PWD    = 'Un2yzriWm7veSDoh'
+HOST   = 'rabbitmq.rabbitmq.svc.cluster.local'
+
+async def main():
+    producer = Producer(host=HOST, port=5552, username=USER, password=PWD)
+    async with producer:
+        await producer.create_stream(STREAM)
+        for i in range(100_000):
+            await producer.send(STREAM, f"msg-{i}".encode())
+        print(" [x] Sent 100k messages to stream")
+
+if __name__ == '__main__':
+    asyncio.run(main())

+ 39 - 0
rabbitmq/12-流式日志(Stream)/usage.txt

@@ -0,0 +1,39 @@
+RabbitMQ Stream(3.11+ 实验特性) 的最小可运行 demo。
+核心要点:
+使用 stream 类型队列(x-queue-type: stream)
+支持 回溯消费(offset)、大量客户端、高吞吐
+
+pip install rstream
+
+
+
+确保 RabbitMQ 已启用 Stream 插件(3.11+ 默认内置):
+kubectl exec -n rabbitmq rabbitmq-0 -- rabbitmq-plugins enable rabbitmq_stream
+
+''' 我使用的情况:
+kubectl exec -n rabbitmq rabbitmq-0 -- rabbitmq-plugins enable rabbitmq_stream
+Defaulted container "rabbitmq" out of: rabbitmq, prepare-plugins-dir (init)
+Enabling plugins on node rabbit@rabbitmq-0.rabbitmq-headless.rabbitmq.svc.cluster.local:
+rabbitmq_stream
+The following plugins have been configured:
+  rabbitmq_auth_backend_ldap
+  rabbitmq_management
+  rabbitmq_management_agent
+  rabbitmq_peer_discovery_common
+  rabbitmq_peer_discovery_k8s
+  rabbitmq_prometheus
+  rabbitmq_stream
+  rabbitmq_web_dispatch
+Applying plugin configuration to rabbit@rabbitmq-0.rabbitmq-headless.rabbitmq.svc.cluster.local...
+The following plugins have been enabled:
+  rabbitmq_stream
+
+started 1 plugins.
+'''
+
+
+先启动消费者:
+python stream_consumer.py
+
+再启动生产者:
+python stream_producer.py