fastapi-mqtt

2.2.0last stable release 11 months ago
Complexity Score
Low
Open Issues
1
Dependent Projects
3
Weekly Downloadsglobal
1,798

License

  • MIT
    • Yesattribution
    • Permissivelinking
    • Permissivedistribution
    • Permissivemodification
    • Nopatent grant
    • Yesprivate use
    • Permissivesublicensing
    • Notrademark grant

Downloads

Readme

fastapi-mqtt

MQTT is a lightweight publish/subscribe messaging protocol designed for M2M (machine to machine) telemetry in low bandwidth environments. Fastapi-mqtt is the client for working with MQTT.

For more information about MQTT, please refer to here: MQTT

Fastapi-mqtt wraps around gmqtt module. Gmqtt Python async client for MQTT client implementation. Module has support of MQTT version 5.0 protocol

Documentation: FastApi-MQTT

The key feature are:

MQTT specification avaliable with help decarator methods using callbacks:

  • on_connect()

  • on_disconnect()

  • on_subscribe()

  • on_message()

  • subscribe(topic)

  • MQTT Settings available with pydantic class

  • Authentication to broker with credentials

  • unsubscribe certain topics and publish to certain topics

🔨 Installation

pip install fastapi-mqtt

🕹 Guide

from contextlib import asynccontextmanager
from typing import Any

from fastapi import FastAPI
from gmqtt import Client as MQTTClient

from fastapi_mqtt import FastMQTT, MQTTConfig

mqtt_config = MQTTConfig()
fast_mqtt = FastMQTT(config=mqtt_config)


@asynccontextmanager
async def _lifespan(_app: FastAPI):
    await fast_mqtt.mqtt_startup()
    yield
    await fast_mqtt.mqtt_shutdown()


app = FastAPI(lifespan=_lifespan)


@fast_mqtt.on_connect()
def connect(client: MQTTClient, flags: int, rc: int, properties: Any):
    client.subscribe("/mqtt")  # subscribing mqtt topic
    print("Connected: ", client, flags, rc, properties)

@fast_mqtt.subscribe("mqtt/+/temperature", "mqtt/+/humidity", qos=1)
async def home_message(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
    print("temperature/humidity: ", topic, payload.decode(), qos, properties)

@fast_mqtt.on_message()
async def message(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
    print("Received message: ", topic, payload.decode(), qos, properties)

@fast_mqtt.subscribe("my/mqtt/topic/#", qos=2)
async def message_to_topic_with_high_qos(
    client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any
):
    print(
        "Received message to specific topic and QoS=2: ", topic, payload.decode(), qos, properties
    )

@fast_mqtt.on_disconnect()
def disconnect(client: MQTTClient, packet, exc=None):
    print("Disconnected")

@fast_mqtt.on_subscribe()
def subscribe(client: MQTTClient, mid: int, qos: int, properties: Any):
    print("subscribed", client, mid, qos, properties)

@app.get("/test")
async def func():
    fast_mqtt.publish("/mqtt", "Hello from Fastapi")  # publishing mqtt topic
    return {"result": True, "message": "Published"}

Publish method:

async def func():
    fast_mqtt.publish("/mqtt", "Hello from Fastapi")  # publishing mqtt topic
    return {"result": True, "message": "Published"}

Subscribe method:

@fast_mqtt.on_connect()
def connect(client, flags, rc, properties):
    client.subscribe("/mqtt")  # subscribing mqtt topic
    print("Connected: ", client, flags, rc, properties)

Changing connection params

mqtt_config = MQTTConfig(
    host="mqtt.mosquito.org",
    port=1883,
    keepalive=60,
    username="username",
    password="strong_password",
)
fast_mqtt = FastMQTT(config=mqtt_config)

✅ Testing

  • Clone the repository and install it with poetry.
  • Run tests with pytest, using an external MQTT broker to connect (defaults to ‘test.mosquitto.org’).
  • Explore the fastapi app examples and run them with uvicorn
# (opc) Run a local mosquitto MQTT broker with docker
docker run -d --name mosquitto -p 9001:9001 -p 1883:1883 eclipse-mosquitto:1.6.15
# Set host for test broker when running pytest
TEST_BROKER_HOST=localhost pytest
# Run the example apps against local broker, with uvicorn
TEST_BROKER_HOST=localhost uvicorn examples.app:app --port 8000 --reload
TEST_BROKER_HOST=localhost uvicorn examples.ws_app.app:application --port 8000 --reload

Contributing

Fell free to open issue and send pull request.

Thanks To Contributors. Contributions of any kind are welcome!

Before you start please read CONTRIBUTING

Dependencies

Loading dependencies...

CVE IssuesActive
0
Scorecards Score
3.00
Test Coverage
No Data
Follows Semver
No
Github Stars
235
Dependenciestotal
2
DependenciesOutdated
0
DependenciesDeprecated
0
Threat Modelling
No
Repo Audits
No

Learn how to distribute fastapi-mqtt in your own private PyPI registry

pip install fastapi-mqtt
Processing...
Done

Releases

Loading Version Data
PyPI on Cloudsmith

Getting started with PyPI on Cloudsmith is fast and easy.