High Available Mosquitto MQTT on Kubernetes
标题:在 Kubernetes 上实现高可用 Mosquitto MQTT
Raymii.org
Quis custodiet ipsos custodes? Home | About | All pages | Cluster Status | RSS Feed Clear
在 Kubernetes 上实现高可用 Mosquitto MQTT
发布时间:2025-05-14 22:11 | 作者:Remy van Elst | 纯文本版本
目录
- 概述
- 为什么 Mosquitto Failover Pod 需要 Service Account
- Mosquitto Failover Pod 如何保持 MQTT 服务存活
- 当主节点宕机时会发生什么?
- K8S Deployment YAML 文件
- 针对 k3s 的 Traefik Helm Chart Config
本文将介绍如何使用 Eclipse Mosquitto 构建一个完全声明式、Kubernetes 原生的、高可用的 MQTT broker。 该配置利用了核心 Kubernetes 原语(Deployments
、Services
、ConfigMaps
和 RBAC
),并结合 Traefik IngressRouteTCP
来暴露外部 MQTT 流量。它引入了一种轻量级的、自愈的故障转移机制,当主 broker 不可用时,可以自动将流量重新路由到辅助 broker。 该设置还演示了内部 MQTT bridging,允许在 broker 之间无缝进行消息传播。 相对于单个 Pod 部署(在节点发生故障的情况下,k8s 将在 5 分钟后重启),主要优势在于此设置的停机时间仅为 5 秒,并且具有共享状态,因此所有消息都可以在故障转移时可用。
最近,我因为 Google Ads 的侵入式跟踪以及 Google Analytics,已经从本网站移除了它们。 如果您发现此内容有用,请考虑使用以下任何选项进行小额捐赠。 如果您表达感谢,并且帮助支付服务器费用,这对我很重要: GitHub Sponsorship PCBWay referral link (You get $5, I get $20 after you've placed an order) Digital Ocea referral link ($200 credit for 60 days. Spend $25 after your credit expires and I'll get $25!)
设置示意图
本指南假定您已使用 Traefik 设置了可用的 Kubernetes 环境。 在我的例子中,我用于本文的 Kubernetes/k3s 版本是 v1.32.2+k3s1
。
如果您没有这样的集群,可以查看我的所有其他 kubernetes 文章。
在典型的 Kubernetes 部署中,如果只有一个 Mosquitto pod,那么弹性是有限的。 如果运行该 pod 的节点发生故障,Kubernetes 可能需要长达 5 分钟才能检测到故障并进行恢复。 此延迟源于默认的 node-monitor-grace-period
,通常设置为 5 分钟(300 秒)。 在此期间,MQTT 客户端会失去连接,消息会被丢弃,并且依赖实时消息传递的系统可能会性能下降或进入故障模式。
此处介绍的配置通过部署 primary
和 secondary
Mosquitto broker 来避免这种停机时间,每个 broker 都在其自己的 pod 中,调度在不同的节点上,并使用自定义的故障转移控制器来处理流量重定向。 一个轻量级的控制器监控 primary
pod 的就绪状态,如果它变得不可用,则修补 Kubernetes Service
,以在 5 秒内将流量重新路由到 secondary
broker。 这大大缩短了恢复时间,并提高了故障期间的系统响应速度。
因为 secondary
broker 始终在运行并 bridging 到 primary
,所以它保持了接近实时的消息状态。 客户端继续连接到相同的 LoadBalancer
端点(raymii-mosquitto-svc
),无需更新客户端配置或管理 DNS 更改。 故障转移是透明且快速的,即使主服务器脱机,也能确保消息流继续。
本文面向使用 k3s
和 traefik
。 将其适配于 nginx
应该不难。
概述
这是 YAML 文件功能的总结。 两个 mosquitto 实例都可以通过它们自己的端口(primary
的 2883
,secondary
的 3883
)以及端口 1883(具有自动故障转移功能)进行访问。 客户端应连接到端口 1883,其他端口可用于监控。
mosquitto 实例配置为 bridge 所有消息。 发布到 primary
的任何内容都会发布到 secondary
,反之亦然。 发生故障转移时,客户端会失去连接,必须重新连接,但 secondary
broker 拥有所有消息(包括保留的消息)。 当 primary
重新上线时,客户端还需要另一次重新连接。 您可以调整 failover
控制器以使 secondary 充当 primary,直到 secondary 发生故障,但这不在本文的讨论范围之内。
运行 failover
监控的 Pod 由于 Affinity
被调度到不同的 Node
上。 在发生故障时,该 failover
Pod 大约每 5 分钟重启一次。 如果运行 failover
pod 的 Node
发生故障,并且运行 primary
pod 的 Node
发生故障,则在 failover
Pod 恢复之前,不会发生故障转移。 因此,在某些罕见的情况下,故障转移可能仍然需要 5 分钟。 即使这样,由于 bridge 配置,丢失的消息也会更少。
在我打算使用的用例中,客户端会在每次发生故障时重新连接,并在连接时发布保留的消息,因此故障恢复不是问题。
由于 bridge 配置,在中断期间发布到 secondary
的任何保留消息都会发布回 primary
。
1. Namespace & ConfigMaps
- 创建
raymii-mosquitto-dev
namespace。 - 两个 ConfigMap:
- Primary Broker ConfigMap:配置 primary broker 监听端口 1883(外部)和 2883(用于 bridge 连接)。
- Secondary Broker ConfigMap:配置 secondary broker 通过端口 2883 bridge 连接到 primary,并监听端口 1883(外部)和 3883。
2. Deployments
raymii-mosquitto-primary
:- 监听端口 1883(外部)和 2883(用于 bridge 连接)。
raymii-mosquitto-secondary
:- 通过端口 2883 bridge 连接到 primary。
- 监听端口 1883(外部)和 3883。
raymii-mosquitto-failover
:- 包含 shell 循环的 Pod,用于检查 primary broker 的就绪状态。
- 如果 primary 未就绪,它会修补
raymii-mosquitto-svc
的 selector,使其指向 secondary broker,从而将流量重定向到它。
3. Services
raymii-mosquitto-svc
:- 主
LoadBalancer
service,动态地将流量路由到 primary 或 secondary broker。
- 主
raymii-mosquitto-primary-svc
:- 将流量定向到
primary
broker 的第二个监听器 (2883)。
- 将流量定向到
raymii-mosquitto-secondary-svc
:- 将流量定向到
secondary
broker 的第二个监听器 (3883)。
- 将流量定向到
4. RBAC
- Role 和 binding 允许
failover
pod 执行以下操作:Get
、list
和patch
pod 和 service。failover
pod 每 5 秒使用一次,以检查状态并在需要时进行故障转移。
5. Traefik IngressRouteTCP
raymii-mosquitto-dev-mqtt
:- 将外部 MQTT 流量路由到
raymii-mosquitto-svc
。
- 将外部 MQTT 流量路由到
- 也为
primary
和secondary
broker 提供直接路由。
为什么 Mosquitto Failover Pod 需要 Service Account
在 Kubernetes 中,默认情况下,任何 pod 都无法访问集群资源,甚至无法检查其他 pod 的状态或修补 service。 当 Mosquitto failover
pod 需要监控 broker 健康状况并在 primary
和 secondary
之间切换流量时,这就是一个问题。 如果没有正确的权限,故障转移逻辑会静默失败。
因此,我们必须创建一个 ServiceAccount
,将其绑定到具有 get
、list
和 patch
权限的 Role
,并将它分配给 failover
pod。 此 RBAC 设置是让 pod 查询运行状况并通过 kubectl
动态地重新路由流量的唯一方法。
Mosquitto Failover Pod 如何保持 MQTT 服务存活
raymii-mosquitto-failover
pod 是一个轻量级的控制循环,其设计目的只有一个:即使 primary
broker 宕机,也能保持 MQTT 流量畅通。 它持续检查 primary
Mosquitto pod 的就绪状态。 如果 primary
失败,它会修补 raymii-mosquitto-svc
service 以将流量路由到 secondary broker。 当 primary
恢复时,流量会自动恢复到 primary
。
此 pod 在 shell 循环中运行 kubectl
,使用 Kubernetes API 调用来检测运行状况并重定向流量。 它故意设计得很简单,没有 operator、没有 sidecar、没有自定义资源。
我可以构建一个自定义控制器,但这有点过头了。 控制器会带来开销、额外的代码、CRD、生命周期管理以及更多复杂性,而这只是针对一个非常具体的事情。 failover pod 用可读性、可审计性、可调试性和即时部署来换取抽象性。 对于一项正确完成的工作来说,越少越好。
现在是 YAML 文件的摘要。
当主节点宕机时会发生什么?
该设置在三个节点上运行。 一个用于 primary
,一个用于 secondary
,一个用于 failover
监控。
我拔掉了运行 primary
节点的 k3s 服务器的网线。 客户端断开连接,但在几秒钟后重新连接到 raymii-mosquitto-svc
Service
,并连接到 secondary
节点。
几分钟后(超过 5 分钟),我将网线插回了托管 primary
pod 的 k3s 服务器。 failover
Pod 注意到了这一点并修补了 service 并恢复了正常:
kubectl logs -n raymii-mosquitto-dev -l app=mosquitto-failover
输出:
service/raymii-mosquitto-svc patched (no change)
Wed May 14 18:58:30 UTC 2025 - Primary healthy, routing to primary.
service/raymii-mosquitto-svc patched
Wed May 14 19:11:54 UTC 2025 - Primary down, routing to secondary.
service/raymii-mosquitto-svc patched
Wed May 14 19:13:41 UTC 2025 - Primary healthy, routing to primary.
K8S Deployment YAML 文件
这是 YAML 文件,包括 k3s 1.32 HelmChartConfig
以暴露 443 和 80 以外的端口。 如果您使用 NGINX,则必须根据您的设置调整该部分。
namespace 是 raymii-mosquitto-dev
。 如果您想要不同的 namespace,请搜索并替换。 如果您使用证书或用于身份验证的自定义 CA,或者您在其中保存 mosquitto 持久数据库,则可以附加持久卷。 对于我的用例,客户端在每次 connect
时都发布保留的消息,因此无需保存 raymii-mosquitto.db
文件。 您可能想要使用 Longhorn
或保存该状态。 为了简单起见,我使用 ConfigMap
进行 broker 配置。
---
apiVersion: v1
kind: Namespace
metadata:
name: raymii-mosquitto-dev
---
apiVersion: v1
kind: ConfigMap
metadata:
name: raymii-mosquitto-primary-config
namespace: raymii-mosquitto-dev
data:
raymii-mosquitto.conf: |
listener 1883
allow_anonymous true
listener 2883
allow_anonymous true
---
apiVersion: v1
kind: ConfigMap
metadata:
name: raymii-mosquitto-bridge-config
namespace: raymii-mosquitto-dev
data:
raymii-mosquitto.conf: |
listener 1883
allow_anonymous true
listener 3883
allow_anonymous true
connection bridge-to-primary
address raymii-mosquitto-primary-svc.raymii-mosquitto-dev.svc.cluster.local:2883
clientid raymii-mosquitto-bridge
topic # both 0
start_type automatic
try_private true
notifications true
restart_timeout 5
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: raymii-mosquitto-primary
namespace: raymii-mosquitto-dev
spec:
replicas: 1
selector:
matchLabels:
app: raymii-mosquitto
role: primary
template:
metadata:
labels:
app: raymii-mosquitto
role: primary
spec:
containers:
- name: raymii-mosquitto
image: eclipse-mosquitto:2.0.21
command: ["mosquitto"]
args: ["-c", "/raymii-mosquitto/config/raymii-mosquitto.conf"]
ports:
- containerPort: 1883
- containerPort: 2883
livenessProbe:
tcpSocket:
port: 1883
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
tcpSocket:
port: 1883
initialDelaySeconds: 5
periodSeconds: 10
volumeMounts:
- name: primary-config
mountPath: /raymii-mosquitto/config/
volumes:
- name: primary-config
configMap:
name: raymii-mosquitto-primary-config
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- raymii-mosquitto
topologyKey: kubernetes.io/hostname
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: raymii-mosquitto-secondary
namespace: raymii-mosquitto-dev
spec:
replicas: 1
selector:
matchLabels:
app: raymii-mosquitto
role: secondary
template:
metadata:
labels:
app: raymii-mosquitto
role: secondary
spec:
containers:
- name: raymii-mosquitto
image: eclipse-mosquitto:2.0.21
command: ["mosquitto"]
args: ["-c", "/raymii-mosquitto/config/raymii-mosquitto.conf"]
ports:
- containerPort: 1883
- containerPort: 3883
livenessProbe:
tcpSocket:
port: 1883
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
tcpSocket:
port: 1883
initialDelaySeconds: 5
periodSeconds: 10
volumeMounts:
- name: bridge-config
mountPath: /raymii-mosquitto/config/
volumes:
- name: bridge-config
configMap:
name: raymii-mosquitto-bridge-config
restartPolicy: Always
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- raymii-mosquitto
topologyKey: kubernetes.io/hostname
---
apiVersion: v1
kind: Service
metadata:
name: raymii-mosquitto-svc
namespace: raymii-mosquitto-dev
spec:
type: LoadBalancer
selector:
app: raymii-mosquitto
role: primary
ports:
- port: 1883
targetPort: 1883
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: raymii-mosquitto-primary-svc
namespace: raymii-mosquitto-dev
spec:
type: LoadBalancer
selector:
app: raymii-mosquitto
role: primary
ports:
- port: 2883
targetPort: 2883
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: raymii-mosquitto-secondary-svc
namespace: raymii-mosquitto-dev
spec:
type: LoadBalancer
selector:
app: raymii-mosquitto
role: secondary
ports:
- port: 3883
targetPort: 3883
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: raymii-mosquitto-failover
namespace: raymii-mosquitto-dev
spec:
replicas: 1
selector:
matchLabels:
app: raymii-mosquitto-failover
template:
metadata:
labels:
app: raymii-mosquitto-failover
spec:
serviceAccountName: raymii-mosquitto-failover-sa
containers:
- name: failover
image: bitnami/kubectl
command:
- /bin/sh
- -c
- |
PREV_STATUS=""
while true; do
STATUS=$(kubectl get pod -l app=raymii-mosquitto,role=primary -n raymii-mosquitto-dev -o jsonpath='{.items[0].status.conditions[?(@.type=="Ready")].status}')
if [ "$STATUS" != "$PREV_STATUS" ]; then
if [ "$STATUS" != "True" ]; then
kubectl patch service raymii-mosquitto-svc -n raymii-mosquitto-dev -p '{"spec":{"selector":{"app":"raymii-mosquitto","role":"secondary"}}}'
echo "$(date) - Primary down, routing to secondary."
else
kubectl patch service raymii-mosquitto-svc -n raymii-mosquitto-dev -p '{"spec":{"selector":{"app":"raymii-mosquitto","role":"primary"}}}'
echo "$(date) - Primary healthy, routing to primary."
fi
PREV_STATUS="$STATUS"
fi
sleep 5
done
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- raymii-mosquitto
topologyKey: kubernetes.io/hostname
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: raymii-mosquitto-failover-sa
namespace: raymii-mosquitto-dev
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: raymii-mosquitto-failover-role
namespace: raymii-mosquitto-dev
rules:
- apiGroups: [""]
resources: ["pods", "services"]
verbs: ["get", "patch", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: raymii-mosquitto-failover-rb
namespace: raymii-mosquitto-dev
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: raymii-mosquitto-failover-role
subjects:
- kind: ServiceAccount
name: raymii-mosquitto-failover-sa
namespace: raymii-mosquitto-dev
---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
name: raymii-mosquitto-dev-mqtt
namespace: raymii-mosquitto-dev
spec:
entryPoints:
- raymii-mosquitto-dev-mqtt
routes:
- match: HostSNI(`*`)
services:
- name: raymii-mosquitto-svc
port: 1883
---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
name: raymii-mosquitto-dev-mqtt-primary
namespace: raymii-mosquitto-dev
spec:
entryPoints:
- raymii-mosquitto-dev-mqtt-primary
routes:
- match: HostSNI(`*`)
services:
- name: raymii-mosquitto-primary-svc
port: 2883
---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
name: raymii-mosquitto-dev-mqtt-secondary
namespace: raymii-mosquitto-dev
spec:
entryPoints:
- raymii-mosquitto-dev-mqtt-secondary
routes:
- match: HostSNI(`*`)
services:
- name: raymii-mosquitto-secondary-svc
port: 3883
针对 k3s 的 Traefik Helm Chart Config
在 k3s 1.32 中,Traefik 是默认的 ingress controller,但默认情况下,它仅配置为 HTTP(S) 路由。 当您需要路由 TCP service(如 MQTT(端口 1883、2883、3883))时,除非您明确配置 Traefik 以暴露这些端口,否则您将遇到难题。 这就是 HelmChartConfig
CRD 变得至关重要的地方。
通过使用正确的 valuesContent
创建 HelmChartConfig
,您可以将自定义值注入到 k3s 本身管理的 Traefik Helm chart 中。 如果没有这个,Traefik 将不会绑定到额外的 TCP 端口,不会将流量路由到 MQTT service,甚至不会启动监听器,因为 k3s 使用它自己的嵌入式 Helm controller,并且您无法直接修补 deployment。 当使用捆绑的 k3s 设置时,此配置是修改 Traefik deployment 的唯一支持方法。
K3s 监视此 HelmChartConfig
,在 Traefik chart 协调期间应用更改,并确保在节点级别正确暴露端口(如 1883、2883、3883)并将其路由到正确的 IngressRouteTCP
规则。
这是 YAML 文件:
apiVersion: helm.cattle.io/v1
kind: HelmChartConfig
metadata:
name: traefik
namespace: kube-system
spec:
valuesContent: |-
logs:
general:
level: "DEBUG"
access:
enabled: false
ports:
web:
port: 80
expose:
default: true
websecure:
port: 443
expose:
default: true
raymii-mosquitto-dev-mqtt:
port: 1883
expose:
default: true
exposedPort: 1883
raymii-mosquitto-dev-mqtt-primary:
port: 2883
expose:
default: true
exposedPort: 2883
raymii-mosquitto-dev-mqtt-secondary:
port: 3883
expose:
default: true
exposedPort: 3883
Tags: armbian , cloud , ha , high-availability , k3s , k8s , kubernetes , linux , mosquitto , mqtt , orange-pi , raspberry-pi , traefik , tutorials Home | About | All pages | Cluster Status | Generated by ingsoc.