标题:在 Kubernetes 上实现高可用 Mosquitto MQTT

Raymii.org

Quis custodiet ipsos custodes? Home | About | All pages | Cluster Status | RSS Feed Clear

在 Kubernetes 上实现高可用 Mosquitto MQTT

发布时间:2025-05-14 22:11 | 作者:Remy van Elst | 纯文本版本

目录

本文将介绍如何使用 Eclipse Mosquitto 构建一个完全声明式、Kubernetes 原生的、高可用的 MQTT broker。 该配置利用了核心 Kubernetes 原语(DeploymentsServicesConfigMapsRBAC),并结合 Traefik IngressRouteTCP 来暴露外部 MQTT 流量。它引入了一种轻量级的、自愈的故障转移机制,当主 broker 不可用时,可以自动将流量重新路由到辅助 broker。 该设置还演示了内部 MQTT bridging,允许在 broker 之间无缝进行消息传播。 相对于单个 Pod 部署(在节点发生故障的情况下,k8s 将在 5 分钟后重启),主要优势在于此设置的停机时间仅为 5 秒,并且具有共享状态,因此所有消息都可以在故障转移时可用。

最近,我因为 Google Ads 的侵入式跟踪以及 Google Analytics,已经从本网站移除了它们。 如果您发现此内容有用,请考虑使用以下任何选项进行小额捐赠。 如果您表达感谢,并且帮助支付服务器费用,这对我很重要: GitHub Sponsorship PCBWay referral link (You get $5, I get $20 after you've placed an order) Digital Ocea referral link ($200 credit for 60 days. Spend $25 after your credit expires and I'll get $25!)

diagram

设置示意图

本指南假定您已使用 Traefik 设置了可用的 Kubernetes 环境。 在我的例子中,我用于本文的 Kubernetes/k3s 版本是 v1.32.2+k3s1

如果您没有这样的集群,可以查看我的所有其他 kubernetes 文章

在典型的 Kubernetes 部署中,如果只有一个 Mosquitto pod,那么弹性是有限的。 如果运行该 pod 的节点发生故障,Kubernetes 可能需要长达 5 分钟才能检测到故障并进行恢复。 此延迟源于默认的 node-monitor-grace-period,通常设置为 5 分钟(300 秒)。 在此期间,MQTT 客户端会失去连接,消息会被丢弃,并且依赖实时消息传递的系统可能会性能下降或进入故障模式。

此处介绍的配置通过部署 primarysecondary Mosquitto broker 来避免这种停机时间,每个 broker 都在其自己的 pod 中,调度在不同的节点上,并使用自定义的故障转移控制器来处理流量重定向。 一个轻量级的控制器监控 primary pod 的就绪状态,如果它变得不可用,则修补 Kubernetes Service,以在 5 秒内将流量重新路由到 secondary broker。 这大大缩短了恢复时间,并提高了故障期间的系统响应速度。

因为 secondary broker 始终在运行并 bridging 到 primary,所以它保持了接近实时的消息状态。 客户端继续连接到相同的 LoadBalancer 端点(raymii-mosquitto-svc),无需更新客户端配置或管理 DNS 更改。 故障转移是透明且快速的,即使主服务器脱机,也能确保消息流继续。

本文面向使用 k3straefik。 将其适配于 nginx 应该不难。

概述

这是 YAML 文件功能的总结。 两个 mosquitto 实例都可以通过它们自己的端口(primary2883secondary3883)以及端口 1883(具有自动故障转移功能)进行访问。 客户端应连接到端口 1883,其他端口可用于监控。

mosquitto 实例配置为 bridge 所有消息。 发布到 primary 的任何内容都会发布到 secondary,反之亦然。 发生故障转移时,客户端会失去连接,必须重新连接,但 secondary broker 拥有所有消息(包括保留的消息)。 当 primary 重新上线时,客户端还需要另一次重新连接。 您可以调整 failover 控制器以使 secondary 充当 primary,直到 secondary 发生故障,但这不在本文的讨论范围之内。

运行 failover 监控的 Pod 由于 Affinity 被调度到不同的 Node 上。 在发生故障时,该 failover Pod 大约每 5 分钟重启一次。 如果运行 failover pod 的 Node 发生故障,并且运行 primary pod 的 Node 发生故障,则在 failover Pod 恢复之前,不会发生故障转移。 因此,在某些罕见的情况下,故障转移可能仍然需要 5 分钟。 即使这样,由于 bridge 配置,丢失的消息也会更少。

在我打算使用的用例中,客户端会在每次发生故障时重新连接,并在连接时发布保留的消息,因此故障恢复不是问题。

由于 bridge 配置,在中断期间发布到 secondary 的任何保留消息都会发布回 primary

1. Namespace & ConfigMaps

2. Deployments

3. Services

4. RBAC

5. Traefik IngressRouteTCP

为什么 Mosquitto Failover Pod 需要 Service Account

在 Kubernetes 中,默认情况下,任何 pod 都无法访问集群资源,甚至无法检查其他 pod 的状态或修补 service。 当 Mosquitto failover pod 需要监控 broker 健康状况并在 primarysecondary 之间切换流量时,这就是一个问题。 如果没有正确的权限,故障转移逻辑会静默失败。

因此,我们必须创建一个 ServiceAccount,将其绑定到具有 getlistpatch 权限的 Role,并将它分配给 failover pod。 此 RBAC 设置是让 pod 查询运行状况并通过 kubectl 动态地重新路由流量的唯一方法。

Mosquitto Failover Pod 如何保持 MQTT 服务存活

raymii-mosquitto-failover pod 是一个轻量级的控制循环,其设计目的只有一个:即使 primary broker 宕机,也能保持 MQTT 流量畅通。 它持续检查 primary Mosquitto pod 的就绪状态。 如果 primary 失败,它会修补 raymii-mosquitto-svc service 以将流量路由到 secondary broker。 当 primary 恢复时,流量会自动恢复到 primary

此 pod 在 shell 循环中运行 kubectl,使用 Kubernetes API 调用来检测运行状况并重定向流量。 它故意设计得很简单,没有 operator、没有 sidecar、没有自定义资源。

我可以构建一个自定义控制器,但这有点过头了。 控制器会带来开销、额外的代码、CRD、生命周期管理以及更多复杂性,而这只是针对一个非常具体的事情。 failover pod 用可读性、可审计性、可调试性和即时部署来换取抽象性。 对于一项正确完成的工作来说,越少越好。

现在是 YAML 文件的摘要。

当主节点宕机时会发生什么?

该设置在三个节点上运行。 一个用于 primary,一个用于 secondary,一个用于 failover 监控。

我拔掉了运行 primary 节点的 k3s 服务器的网线。 客户端断开连接,但在几秒钟后重新连接到 raymii-mosquitto-svc Service,并连接到 secondary 节点。

几分钟后(超过 5 分钟),我将网线插回了托管 primary pod 的 k3s 服务器。 failover Pod 注意到了这一点并修补了 service 并恢复了正常:

kubectl logs -n raymii-mosquitto-dev -l app=mosquitto-failover

输出:

service/raymii-mosquitto-svc patched (no change)
Wed May 14 18:58:30 UTC 2025 - Primary healthy, routing to primary.
service/raymii-mosquitto-svc patched
Wed May 14 19:11:54 UTC 2025 - Primary down, routing to secondary.
service/raymii-mosquitto-svc patched
Wed May 14 19:13:41 UTC 2025 - Primary healthy, routing to primary.

K8S Deployment YAML 文件

这是 YAML 文件,包括 k3s 1.32 HelmChartConfig 以暴露 443 和 80 以外的端口。 如果您使用 NGINX,则必须根据您的设置调整该部分。

namespace 是 raymii-mosquitto-dev。 如果您想要不同的 namespace,请搜索并替换。 如果您使用证书或用于身份验证的自定义 CA,或者您在其中保存 mosquitto 持久数据库,则可以附加持久卷。 对于我的用例,客户端在每次 connect 时都发布保留的消息,因此无需保存 raymii-mosquitto.db 文件。 您可能想要使用 Longhorn 或保存该状态。 为了简单起见,我使用 ConfigMap 进行 broker 配置。

---
apiVersion: v1
kind: Namespace
metadata:
 name: raymii-mosquitto-dev
---
apiVersion: v1
kind: ConfigMap
metadata:
 name: raymii-mosquitto-primary-config
 namespace: raymii-mosquitto-dev
data:
 raymii-mosquitto.conf: |
  listener 1883
  allow_anonymous true 
  listener 2883
  allow_anonymous true 
---
apiVersion: v1
kind: ConfigMap
metadata:
 name: raymii-mosquitto-bridge-config
 namespace: raymii-mosquitto-dev
data:
 raymii-mosquitto.conf: |
  listener 1883
  allow_anonymous true
  listener 3883
  allow_anonymous true
  connection bridge-to-primary
  address raymii-mosquitto-primary-svc.raymii-mosquitto-dev.svc.cluster.local:2883
  clientid raymii-mosquitto-bridge
  topic # both 0
  start_type automatic
  try_private true
  notifications true
  restart_timeout 5
---
apiVersion: apps/v1
kind: Deployment
metadata:
 name: raymii-mosquitto-primary
 namespace: raymii-mosquitto-dev
spec:
 replicas: 1
 selector:
  matchLabels:
   app: raymii-mosquitto
   role: primary
 template:
  metadata:
   labels:
    app: raymii-mosquitto
    role: primary
  spec:
   containers:
   - name: raymii-mosquitto
    image: eclipse-mosquitto:2.0.21
    command: ["mosquitto"]
    args: ["-c", "/raymii-mosquitto/config/raymii-mosquitto.conf"]
    ports:
    - containerPort: 1883
    - containerPort: 2883
    livenessProbe:
     tcpSocket:
      port: 1883
     initialDelaySeconds: 5
     periodSeconds: 10
    readinessProbe:
     tcpSocket:
      port: 1883
     initialDelaySeconds: 5
     periodSeconds: 10
    volumeMounts:
    - name: primary-config
     mountPath: /raymii-mosquitto/config/
   volumes:
   - name: primary-config
    configMap:
     name: raymii-mosquitto-primary-config   
   affinity:
    podAntiAffinity:
     requiredDuringSchedulingIgnoredDuringExecution:
      - labelSelector:
        matchExpressions:
         - key: app
          operator: In
          values:
           - raymii-mosquitto
       topologyKey: kubernetes.io/hostname
---
apiVersion: apps/v1
kind: Deployment
metadata:
 name: raymii-mosquitto-secondary
 namespace: raymii-mosquitto-dev
spec:
 replicas: 1
 selector:
  matchLabels:
   app: raymii-mosquitto
   role: secondary
 template:
  metadata:
   labels:
    app: raymii-mosquitto
    role: secondary
  spec:
   containers:
   - name: raymii-mosquitto
    image: eclipse-mosquitto:2.0.21
    command: ["mosquitto"]
    args: ["-c", "/raymii-mosquitto/config/raymii-mosquitto.conf"]
    ports:
    - containerPort: 1883
    - containerPort: 3883
    livenessProbe:
     tcpSocket:
      port: 1883
     initialDelaySeconds: 5
     periodSeconds: 10
    readinessProbe:
     tcpSocket:
      port: 1883
     initialDelaySeconds: 5
     periodSeconds: 10    
    volumeMounts:
    - name: bridge-config
     mountPath: /raymii-mosquitto/config/     
   volumes:
   - name: bridge-config
    configMap:
     name: raymii-mosquitto-bridge-config
   restartPolicy: Always
   affinity:
    podAntiAffinity:
     requiredDuringSchedulingIgnoredDuringExecution:
      - labelSelector:
        matchExpressions:
         - key: app
          operator: In
          values:
           - raymii-mosquitto
       topologyKey: kubernetes.io/hostname
---
apiVersion: v1
kind: Service
metadata:
 name: raymii-mosquitto-svc
 namespace: raymii-mosquitto-dev
spec:
 type: LoadBalancer
 selector:
  app: raymii-mosquitto
  role: primary
 ports:  
 - port: 1883
  targetPort: 1883
  protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
 name: raymii-mosquitto-primary-svc
 namespace: raymii-mosquitto-dev
spec:
 type: LoadBalancer
 selector:
  app: raymii-mosquitto
  role: primary
 ports:  
 - port: 2883
  targetPort: 2883
  protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
 name: raymii-mosquitto-secondary-svc
 namespace: raymii-mosquitto-dev
spec:
 type: LoadBalancer
 selector:
  app: raymii-mosquitto
  role: secondary
 ports:
 - port: 3883
  targetPort: 3883
  protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
 name: raymii-mosquitto-failover
 namespace: raymii-mosquitto-dev
spec:
 replicas: 1
 selector:
  matchLabels:
   app: raymii-mosquitto-failover
 template:
  metadata:
   labels:
    app: raymii-mosquitto-failover
  spec:
   serviceAccountName: raymii-mosquitto-failover-sa
   containers:
   - name: failover
    image: bitnami/kubectl
    command:
    - /bin/sh
    - -c
    - |
     PREV_STATUS=""
     while true; do
      STATUS=$(kubectl get pod -l app=raymii-mosquitto,role=primary -n raymii-mosquitto-dev -o jsonpath='{.items[0].status.conditions[?(@.type=="Ready")].status}')
      if [ "$STATUS" != "$PREV_STATUS" ]; then
       if [ "$STATUS" != "True" ]; then
        kubectl patch service raymii-mosquitto-svc -n raymii-mosquitto-dev -p '{"spec":{"selector":{"app":"raymii-mosquitto","role":"secondary"}}}'
        echo "$(date) - Primary down, routing to secondary."
       else
        kubectl patch service raymii-mosquitto-svc -n raymii-mosquitto-dev -p '{"spec":{"selector":{"app":"raymii-mosquitto","role":"primary"}}}'
        echo "$(date) - Primary healthy, routing to primary."
       fi
       PREV_STATUS="$STATUS"
      fi
      sleep 5
     done
   affinity:
    podAntiAffinity:
     requiredDuringSchedulingIgnoredDuringExecution:
      - labelSelector:
        matchExpressions:
         - key: app
          operator: In
          values:
           - raymii-mosquitto
       topologyKey: kubernetes.io/hostname
---
apiVersion: v1
kind: ServiceAccount
metadata:
 name: raymii-mosquitto-failover-sa
 namespace: raymii-mosquitto-dev
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
 name: raymii-mosquitto-failover-role
 namespace: raymii-mosquitto-dev
rules:
- apiGroups: [""]
 resources: ["pods", "services"]
 verbs: ["get", "patch", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
 name: raymii-mosquitto-failover-rb
 namespace: raymii-mosquitto-dev
roleRef:
 apiGroup: rbac.authorization.k8s.io
 kind: Role
 name: raymii-mosquitto-failover-role
subjects:
- kind: ServiceAccount
 name: raymii-mosquitto-failover-sa
 namespace: raymii-mosquitto-dev
---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
 name: raymii-mosquitto-dev-mqtt
 namespace: raymii-mosquitto-dev
spec:
 entryPoints:
  - raymii-mosquitto-dev-mqtt
 routes:
  - match: HostSNI(`*`)
   services:
    - name: raymii-mosquitto-svc
     port: 1883
---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
 name: raymii-mosquitto-dev-mqtt-primary
 namespace: raymii-mosquitto-dev
spec:
 entryPoints:
  - raymii-mosquitto-dev-mqtt-primary
 routes:
  - match: HostSNI(`*`)
   services:
    - name: raymii-mosquitto-primary-svc
     port: 2883
---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
 name: raymii-mosquitto-dev-mqtt-secondary
 namespace: raymii-mosquitto-dev
spec:
 entryPoints:
  - raymii-mosquitto-dev-mqtt-secondary
 routes:
  - match: HostSNI(`*`)
   services:
    - name: raymii-mosquitto-secondary-svc
     port: 3883

针对 k3s 的 Traefik Helm Chart Config

在 k3s 1.32 中,Traefik 是默认的 ingress controller,但默认情况下,它仅配置为 HTTP(S) 路由。 当您需要路由 TCP service(如 MQTT(端口 1883、2883、3883))时,除非您明确配置 Traefik 以暴露这些端口,否则您将遇到难题。 这就是 HelmChartConfig CRD 变得至关重要的地方。

通过使用正确的 valuesContent 创建 HelmChartConfig,您可以将自定义值注入到 k3s 本身管理的 Traefik Helm chart 中。 如果没有这个,Traefik 将不会绑定到额外的 TCP 端口,不会将流量路由到 MQTT service,甚至不会启动监听器,因为 k3s 使用它自己的嵌入式 Helm controller,并且您无法直接修补 deployment。 当使用捆绑的 k3s 设置时,此配置是修改 Traefik deployment 的唯一支持方法。

K3s 监视此 HelmChartConfig,在 Traefik chart 协调期间应用更改,并确保在节点级别正确暴露端口(如 1883、2883、3883)并将其路由到正确的 IngressRouteTCP 规则。

这是 YAML 文件:

apiVersion: helm.cattle.io/v1
kind: HelmChartConfig
metadata:
 name: traefik
 namespace: kube-system
spec:
 valuesContent: |-
  logs:
   general:
    level: "DEBUG"
   access:
    enabled: false
  ports:
   web:
    port: 80
    expose:
     default: true
   websecure:
    port: 443
    expose:
     default: true
   raymii-mosquitto-dev-mqtt:
    port: 1883
    expose:
     default: true
    exposedPort: 1883
   raymii-mosquitto-dev-mqtt-primary:
    port: 2883
    expose:
     default: true
    exposedPort: 2883
   raymii-mosquitto-dev-mqtt-secondary:
    port: 3883
    expose:
     default: true
    exposedPort: 3883

Tags: armbian , cloud , ha , high-availability , k3s , k8s , kubernetes , linux , mosquitto , mqtt , orange-pi , raspberry-pi , traefik , tutorials Home | About | All pages | Cluster Status | Generated by ingsoc.