Skip to main content

Overview

The gst-plugin-msgbroker package consists of two GStreamer elements that implement standard Publish/Subscribe messaging semantics:
  • qtimsgpub, which publishes messages to an external messaging backend
  • qtimsgsub, which subsribes to messages from an external messaging backend
gst-plugin-msgbroker is a foundational messaging component in the Qualcomm QTI GStreamer plugin ecosystem. It enables seamless Pub/Sub communication by exposing a unified interface to the GStreamer pipeline. At the architectural level, msgbroker functions as a high-level adapter that abstracts the underlying messaging protocol. Its extensible protocol adaptation layer is designed to support multiple Pub/Sub backends, allowing applications to integrate different messaging systems without changing the pipeline-facing interface. By decoupling protocol-specific details from pipeline logic, gst-plugin-msgbroker allows GStreamer pipelines to operate cleanly as message producers or consumers while remaining portable across supported messaging implementations.
  • qtimsgpub: A publisher element that transmits pipeline data to an external message broker.
gst-launch-1.0 -e --gst-debug=2 \
   [Source Plugin] ! \
   [Filter/Processing] ! \
   [Publisher Plugin] topic=<Target Topic> host=<Target Host> port=<Port> protocol=<Protocol>
  • qtimsgsub: A subscriber element that receives external messages from the broker and injects them into the media pipeline.
Supported protocols
  • MQTT
    • MQTT is a lightweight publish/subscribe messaging protocol optimized for machine-to-machine (M2M) telemetry in low-bandwidth environments. It uses a broker-based communication model that decouples clients from one another: publishers send messages to named topics, and subscribers receive messages from the topics to which they are subscribed. Built on the libmosquitto Mosquitto C/C++ client library, gst-plugin-msgbroker abstracts the underlying complexity of the OASIS-standard MQTT protocol and provides application developers with a clean, high-level interface for integrating reliable publish/subscribe (Pub/Sub) messaging into GStreamer-based applications and pipelines.
  • KAFKA
    • Kafka is a distributed streaming platform designed for building real-time data pipelines and streaming applications. Kafka uses a distributed commit log architecture where messages are persisted to disk and replicated across multiple brokers for fault tolerance. Kafka organizes messages into topics, which are partitioned and distributed across a cluster of brokers. Producers publish messages to topics, and consumers subscribe to topics as part of consumer groups, enabling scalable message processing with automatic load balancing and fault tolerance. Built on the librdkafka C client library, the kafka protocol library abstracts the underlying complexity of the Apache Kafka protocol and provides application developers with a clean, high-level interface for integrating reliable publish/subscribe (Pub/Sub) messaging into GStreamer-based applications and pipelines.
# Message Construction
echo "Message in file" > /usr/local/message.txt

# Publisher pipeline
gst-launch-1.0 -e --gst-debug=2 filesrc location=/usr/local/message.txt ! qtimsgpub topic=file host=127.0.0.1 port=1883 protocol=mqtt config=/usr/local/config

# Subscriber pipeline
gst-launch-1.0 -e --gst-debug=2 qtimsgsub topic=file host=127.0.0.1 port=1883 protocol=mqtt config=/usr/local/config ! filesink location=/usr/local/sub_message.txt async=false
Publisher pipeline reads from disk and publishes the content via file topic to MQTT server. Subscriber pipeline receives messages from MQTT server via file topic and writes to disk.

Hierarchy

Pad Templates

  • qtimsgpub plugin
Capabilities
ANYformat: NA
Availability: Always
Direction: sink
  • qtimsgsub plugin
Capabilities
ANYformat: NA
Availability: Always
Direction: src

Element Properties

PropertyPluginDescription
configqtimsgpub / qtimsgsubAbsolute path to the protocol configuration file.

Type: String
Default: NULL
Flags: readable/writable
hostqtimsgpub / qtimsgsubMessage broker server IP address.

Type: String
Default: NULL
Flags: readable/writable
jsonqtimsgpubConverts messages to JSON format before publishing. Ignored by qtimsgsub.

Type: Boolean
Default: false
Flags: readable/writable
messageqtimsgpubMessage payload specified directly for publishing. Ignored by qtimsgsub.

Type: String
Default: NULL
Flags: readable/writable (changeable in NULL, READY, PAUSED, PLAYING states)
portqtimsgpub / qtimsgsubMessage broker server port.

Type: Integer
Default: 1883
Range: 0 - 2147483647
Flags: readable/writable
protocolqtimsgpub / qtimsgsubMessage protocol such as mqtt or kafka.

Type: String
Default: NULL
Flags: readable/writable
topicqtimsgpub / qtimsgsubTopic to publish to for qtimsgpub or subscribe to for qtimsgsub.

Type: String
Default: NULL
Flags: readable/writable (changeable in NULL, READY, PAUSED, PLAYING states)

Signals

SignalsPluginDescription
add-publishqtimsgpubSignal to publish an additional topic and message at runtime.

Type: Signal (boolean return)
Parameters: (string topic, string message)

MQTT Configuration File Options

This configuration file is provided to the element through the config property. The file uses a key=value format and supports comments starting with #.
OptionDescription
clean_sessionWhether to clear historical sessions for the same client ID.

Type: bool
Default: TRUE
idMQTT client ID.

Type: string
Default: NULL (randomly assigned)
keepaliveHeartbeat keep‑alive interval (seconds).

Type: int
Default: 60
max_inflightMaximum concurrent in‑flight messages for QoS 1/2.

Type: int
Default: 20
mqtt_versionMQTT protocol version (MQTTV31 / MQTTV311 / MQTTV5).

Type: string
Default: MQTTV311
passwordBroker authentication password.

Type: string
Default: NULL
qosQuality of Service level (0 / 1 / 2).

Type: int
Default: 0
retainWhether messages are retained on the Broker.

Type: bool
Default: FALSE
tcp_nodelayDisable Nagle’s algorithm for lower latency.

Type: bool
Default: FALSE
usernameBroker authentication username.

Type: string
Default: NULL
will_payloadLast Will message payload for unexpected disconnects.

Type: string
Default: NULL
will_qosQoS level for the Last Will message (0 / 1 / 2).

Type: int
Default: 0
will_retainWhether the Last Will message is retained on the Broker.

Type: bool
Default: FALSE
will_topicLast Will message topic for unexpected disconnects.

Type: string
Default: NULL

Kafka Configuration File Options

This configuration file is provided to the element through the config property. The file uses an INI-style format with sections and supports comments starting with #. The configuration file is organized into three sections:

[global-config]

Common configuration options that apply to both producers and consumers.
OptionDescription
proto-cfgProtocol‑specific configuration string in key‑value format, separated by semicolons.

Type: string
Default: NULL (optional)
Example: acks=all; compression.type=gzip; retries=3

[producer-config]

Configuration options specific to Kafka producers (qtimsgpub).
OptionDescription
partition-keyKey used for message partitioning. Messages with the same key are sent to the same partition.

Type: string
Default: NULL (required)
Example: partition-key = "cars"
proto-cfgProducer‑specific protocol configuration string in key‑value format, separated by semicolons.

Type: string
Default: NULL (optional)
Example: batch.size=16384; linger.ms=10
timeout-msMaximum time (in milliseconds) to wait for message delivery confirmation.

Type: int (milliseconds)
Default: NULL (required)
Example: timeout-ms = "5000"

[consumer-config]

Configuration options specific to Kafka consumers (qtimsgsub).
PropertyDescription
group-idConsumer group identifier. Consumers with the same group‑id share the load of consuming messages.

Type: string
Default: NULL (required)
Example: group-id = "tmp-group"
proto-cfgConsumer‑specific protocol configuration string in key‑value format, separated by semicolons.

Type: string
Default: NULL (optional)
Example: auto.offset.reset=earliest; enable.auto.commit=true

Internal Architecture

The plugin follows a three-layer architecture that decouples the GStreamer element layer from the underlying messaging protocol: GStreamer Element Layer → Protocol Adaptor → Protocol Implementation. The adaptor layer dynamically loads protocol-specific implementations using dlopen and dlsym, and exposes a unified interface to the element layer.

Protocol Adapter

GstMsgProtocol is responsible for loading the protocol-specific shared library based on the configured protocol name (for example, libgstqtimqttadaptor.so), resolving the unified entry symbol GST_PROTOCOL_CFUNC_SYMBOL, and forwarding upper-layer API calls to the corresponding protocol implementation. GstProtocolCommonFunc defines the common protocol interface, including new, free, config, connect, disconnect, publish, and subscribe.
struct _GstProtocolCommonFunc {
  GstProtocolNewFunction        new;        // Create protocol instance
  GstProtocolFreeFunction       free;       // Release protocol instance
  GstProtocolConfigFunction     config;     // Configure protocol parameters
  GstProtocolConnectFunction    connect;    // Connect to Broker
  GstProtocolDisconnectFunction disconnect; // Disconnect
  GstProtocolPublishFunction    publish;    // Publish message
  GstProtocolSubscribeFunction  subscribe;  // Subscribe to Topic
};

MQTT Protocol Implementation (GstMqtt)

GstMqtt (mqtt.c) implements the complete lifecycle of the Mosquitto clients and serves as the concrete MQTT protocol implementation behind the generic adaptor API.
  • Dynamic Loading: The Mosquitto library is loaded at runtime using dlopen("libmosquitto.so.1"), avoiding a hard build-time dependency.
  • Dual-role support: The implementation supports both publisher (pub) and subscriber (sub) roles. The role parameter determines the operating mode and the corresponding callbacks are registered accordingly.
  • Asynchronous event handling: The Mosquitto network loop is started on a dedicated thread using mosquitto_loop_start(), enabling non-blocking network I/O.
  • Callback flow: For subscribed messages, data is propagated through the callback chain message_callback → gst_adaptor_sub_callback → gst_plugin_sub_callback, and is ultimately pushed into GstDataQueue for consumption by the GStreamer pipeline thread.

Message Delivery Flow

Publisher  │  Upstream element (GStreamer pipeline thread)
           │      │
           │      ▼
           │  GstBuffer
           │      │
           │      ▼
           │  gst_msg_pub_render()
           │      │
           │      ▼
           │  gst_msg_protocol_publish()
           │      │
           │      ▼
           │  gst_mqtt_publish()
           │      │  mosquitto_publish_v5()
           │      ▼
-----------+----------------------------------------------
Broker     │  ┌──────────────┐
           │  │  MQTT Broker           │
           │  └──────────────┘
-----------+----------------------------------------------
Subscriber │      │  (Mosquitto network thread)
           │      ▼
           │  message_callback()
           │      │
           │      ▼
           │  gst_adaptor_sub_callback()
           │      │
           │      ▼
           │  gst_plugin_sub_callback()
           │      │  gst_data_queue_push()
           │      ▼
           │  GstDataQueue (msg_queue)
           │      │  gst_data_queue_pop() (GStreamer pipeline thread)
           │      ▼
           │  gst_msg_sub_create()
           │      │
           │      ▼
           │  GstBuffer → downstream elements

Publish Side Flow (qtimsgpub)

Pipeline PLAYING


gst_msg_pub_start()
    ├── gst_msg_protocol_new(protocol="mqtt", role="pub")
    │       └── dlopen("libgstqtimqttadaptor.so")
    │           └── gst_mqtt_new("pub")
    │               └── mosquitto_lib_init()
    ├── gst_msg_protocol_config(config_path)
    │       └── gst_mqtt_config()
    │           ├── Parse configuration file (extract_prop_from_file)
    │           ├── mosquitto_new(id, clean_session)
    │           ├── Set protocol version, maximum inflight messages, will message, authentication, etc.
    │           └── Register connect/disconnect/publish callbacks
    └── gst_msg_protocol_connect(host, port)
            └── gst_mqtt_connect()
                ├── mosquitto_connect_bind_v5()
                └── mosquitto_loop_start()  ← Start asynchronous network thread
data arrival (gst_msg_pub_render)
    ├── [Optional] Publish CLI message from message_cmd
    ├── [Optional] Convert GstBuffer data to JSON
    └── gst_msg_protocol_publish(topic, message)
            └── gst_mqtt_publish()
                └── mosquitto_publish_v5()
Pipeline STOP
    └── gst_msg_pub_stop()
            └── gst_msg_protocol_disconnect()
                    └── gst_mqtt_disconnect()
                        ├── mosquitto_disconnect_v5()
                        └── mosquitto_loop_stop()

Subscribe Side Flow (qtimsgsub)

Pipeline PLAYING


gst_msg_sub_start()
    ├── gst_msg_protocol_new(protocol="mqtt", role="sub")
    │       └── dlopen("libgstqtimqttadaptor.so")
    │           └── gst_mqtt_new("sub")
    ├── gst_msg_protocol_config(config_path)
    │       └── gst_mqtt_config()
    │           ├── Parse configuration file
    │           ├── mosquitto_new()
    │           └── Register connect/disconnect/subscribe/message callbacks
    ├── gst_msg_protocol_connect(host, port)
    │       └── mosquitto_connect_bind_v5() + mosquitto_loop_start()
    └── gst_msg_protocol_subscribe(topic, msg_queue, callback)
            └── gst_mqtt_subscribe()
                └── mosquitto_subscribe_v5(topic)
Message arrival (network thread)
    └── message_callback()
            ├── Verify Topic match
            ├── Wrap into GstBuffer (gst_buffer_new_memdup)
            └── Push into GstDataQueue
GStreamer thread requests data
    └── gst_msg_sub_create()
            ├── gst_data_queue_pop()
            └── Return GstBuffer
Pipeline STOP
    └── gst_msg_sub_stop()
            └── gst_msg_protocol_disconnect()

Kafka Protocol Implementation (GstKafka)

GstKafka (kafka.c) implements the complete lifecycle of the librdkafka client and serves as the concrete Kafka protocol implementation behind the generic adaptor API.
  • Dual-role support: The implementation supports both producer (pub) and subscriber (sub) roles. The role parameter determines the operating mode, and the corresponding callbacks are registered accordingly.
  • Asynchronous event handling:
    • For producers, the delivery callback (gst_kafka_dr_msg_cb) is invoked asynchronously when messages are acknowledged by the broker. The producer uses rd_kafka_poll() to service delivery callbacks.
    • For consumers, a dedicated GstTask thread continuously polls the broker using rd_kafka_consumer_poll() for new messages, enabling non-blocking network I/O.
  • Callback flow: For subscribed messages, data is propagated through the callback chain: gst_kafka_consume_message → gst_adaptor_sub_callback → gst_plugin_sub_callback, and is ultimately pushed into GstDataQueue for consumption by the GStreamer pipeline thread.
  • Message delivery tracking: The producer implementation uses a mutex-protected status variable to track message delivery state (submitted, success, or failure), ensuring synchronous confirmation of message delivery.

Message Delivery Flow

Publisher  │  Upstream element (GStreamer pipeline thread)
           │      │
           │      ▼
           │  GstBuffer
           │      │
           │      ▼
           │  gst_msg_pub_render()
           │      │
           │      ▼
           │  gst_msg_protocol_publish()
           │      │
           │      ▼
           │  gst_kafka_publish()
           │      │  rd_kafka_producev()
           │      │  rd_kafka_poll() (wait for delivery callback)
           │      ▼
-----------+----------------------------------------------
Broker     │  ┌──────────────────┐
           │  │  Kafka Cluster                │
           │  │  (Partitioned  Topics)        │
           │  │                               │
           │  └──────────────────┘
-----------+----------------------------------------------
Subscriber │      │  (Consumer task thread - GstTask)
           │      ▼
           │  gst_kafka_consume_message()
           │      │  rd_kafka_consumer_poll()
           │      ▼
           │  message received
           │      │
           │      ▼
           │  gst_adaptor_sub_callback()
           │      │
           │      ▼
           │  gst_plugin_sub_callback()
           │      │  gst_data_queue_push()
           │      ▼
           │  GstDataQueue (msg_queue)
           │      │  gst_data_queue_pop() (GStreamer pipeline thread)
           │      ▼
           │  gst_msg_sub_create()
           │      │
           │      ▼
           │  GstBuffer → downstream elements

Publisher Side Flow(qtimsgpub)

Pipeline PLAYING


gst_msg_pub_start()
├── gst_msg_protocol_new(protocol="kafka", role="pub")
│       └── dlopen("libgstqtikafkaadaptor.so")
│           └── gst_kafka_new("pub")
│               └── Initialize GstKafka structure
├── gst_msg_protocol_config(config_path)
│       └── gst_kafka_config()
│           ├── Parse configuration file (gst_fetch_config_value)
│           ├── Parse proto-cfg strings (gst_kafka_parse_proto_cfg)
│           │   └── Set key=value pairs via rd_kafka_conf_set()
│           └── Create rd_kafka_conf_t configuration object
└── gst_msg_protocol_connect(host, port)
    └── gst_kafka_connect()
        ├── Set bootstrap.servers = host:port
        ├── Register delivery callback (gst_kafka_dr_msg_cb)
        └── rd_kafka_new(RD_KAFKA_PRODUCER, conf)

Data arrival (gst_msg_pub_render)
├── [Optional] Publish CLI message from message_cmd
├── [Optional] Convert GstBuffer data to JSON
└── gst_msg_protocol_publish(topic, message)
    └── gst_kafka_publish()
        ├── Set msgstatus = GST_KAFKA_MSG_SUBMITTED
        ├── rd_kafka_producev()
        │   ├── RD_KAFKA_V_TOPIC(topic)
        │   ├── RD_KAFKA_V_VALUE(payload)
        │   ├── RD_KAFKA_V_KEY(partition_key)
        │   └── RD_KAFKA_V_OPAQUE(self) for callback context
        ├── rd_kafka_poll(timeout_ms) ← Wait for delivery callback
        └── Check msgstatus == GST_KAFKA_MSG_DELIVERY_SUCCESS

Delivery callback (network thread)
└── gst_kafka_dr_msg_cb()
    ├── Check rkmessage->err
    ├── Set msgstatus = SUCCESS or FAIL
    └── rd_kafka_yield() ← Return control to rd_kafka_poll()

Pipeline STOP
└── gst_msg_pub_stop()
    └── gst_msg_protocol_disconnect()
        └── gst_kafka_disconnect()
            ├── rd_kafka_flush(10000) ← Flush pending messages
            └── rd_kafka_destroy(producer)

Subscribe Side Flow(qtimsgsub)

Pipeline PLAYING


gst_msg_sub_start()
├── gst_msg_protocol_new(protocol="kafka", role="sub")
│       └── dlopen("libgstqtikafkaadaptor.so")
│           └── gst_kafka_new("sub")
├── gst_msg_protocol_config(config_path)
│       └── gst_kafka_config()
│           ├── Parse configuration file
│           ├── Parse proto-cfg strings
│           └── Set group.id via rd_kafka_conf_set()
├── gst_msg_protocol_connect(host, port)
│       └── gst_kafka_connect()
│           ├── Set bootstrap.servers = host:port
│           └── rd_kafka_new(RD_KAFKA_CONSUMER, conf)
└── gst_msg_protocol_subscribe(topic, msg_queue, callback)
    └── gst_kafka_subscribe()
        ├── rd_kafka_topic_partition_list_new()
        ├── rd_kafka_topic_partition_list_add(topic)
        ├── rd_kafka_subscribe(subscription)
        ├── Create GstTask (gst_kafka_consume_message)
        └── gst_task_start() ← Start consumer polling thread

Consumer task loop (dedicated thread)
└── gst_kafka_consume_message()
    ├── rd_kafka_consumer_poll(100ms)
    ├── Check rkm->err
    ├── Extract payload and topic
    ├── Create GstBuffer from payload
    └── Invoke callback chain
        └── gst_adaptor_sub_callback()
            └── Push to GstDataQueue

GStreamer thread requests data
└── gst_msg_sub_create()
    ├── gst_data_queue_pop()
    └── Return GstBuffer

Pipeline STOP
└── gst_msg_sub_stop()
    └── gst_msg_protocol_disconnect()
        └── gst_kafka_disconnect()
            ├── gst_task_stop() ← Stop consumer task
            ├── rd_kafka_consumer_close()
            ├── rd_kafka_destroy(consumer)
            └── gst_task_join() ← Wait for task completion

Prerequisites & Assumptions

MQTT

  • Mosquitto server installed and configured with optional username/password.
The location of the configuration file mosquitto.conf is displayed after this installation.
  • mosquitto_passwd -c /etc/mosquitto/pwfile <username>, set password following the instructions on shell
  • Config mosquitto.conf with below settings:
password_file /etc/mosquitto/pwfile
allow_anonymous false
listener 1883
  • systemctl daemon-reload && systemctl restart mosquitto
  • Client has a valid plugin configuration file on device (/usr/local/config) with mqtt_version/username/password as needed.
# MQTT Client Configuration
mqtt_version = MQTTV311
username = <username registered on server>
password = <password registered on server>
...
  • Network connectivity: client and server can ping each other; broker listens on 1883.

KAFKA

  • Step 1: Get Kafka server running on a host machine.
    • Download the latest Kafka release and extract it.
    $ tar -xzf kafka_<ver>.tgz
    $ cd kafka_<ver>
    
This has been tested with v2.13-4.3.0
  • Step 2: Start the Kafka environment
    • NOTE: Your local environment must have Java 17+ installed.
    • Kafka can be run using local scripts and downloaded files or the docker image.
    • Using downloaded files:
  1. Update config/server.properties.
# Update the "advertised.listeners" with the IP address of the machine.
advertised.listeners=PLAINTEXT://<IP ADDR of HOST>:9092,CONTROLLER://localhost:9093
  1. Generate a Cluster UUID
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
  1. Format Log Directories
$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
  1. Start the Kafka Server
$ bin/kafka-server-start.sh config/server.properties
Once the Kafka server has successfully launched, you will have a basic Kafka environment running and ready to use.
  1. Create topics with partitions based on your use case. This topic will be used to publish and consume messages later.
# From a different shell
$ bin/kafka-topics.sh --create --topic tmp --bootstrap-server localhost:9092
A topic has to be created before running any testcases.
  • Step 3: Connect the device to the same netowork as the server. Server and client(qtimsgpub/qtimsgsub) should be able to ping each other.

Use Cases

1

Download Required Files

FileDownloadSave as
YOLOX W8A8 modelQualcomm AI Hub — YOLOXyolo_x_w8a8.tflite
Detection labelsyolov8.jsonyolov8.json
Sample videoInput videoai_demo_sample.mp4
2

Copy files to device

# Run from your host machine — replace <user> and <device-ip>
ssh <user>@<device-ip> "mkdir -p $HOME/{models,labels,media,media/output}"
scp yolo_x_w8a8.tflite          <user>@<device-ip>:$HOME/models/
scp yolov8.json                  <user>@<device-ip>:$HOME/labels/
scp ai_demo_sample.mp4   <user>@<device-ip>:$HOME/media/
3

Connect to device

# Run from your host machine — replace <user> and <device-ip>
ssh <user>@<device-ip>
4

Set environment variables

Run below command on your device
mkdir -p $HOME/{models,labels,media,media/output}
export MODEL_NAME=yolo_x_w8a8.tflite
export LABELS_NAME=yolov8.json
export SRC_VIDEO_NAME=ai_demo_sample.mp4
Subscriber pipeline should be run first and then launch the publisher pipeline from a different shell.

Publish CLI message -> Subscribe to file

This example demonstrates how a message provided through the message property is transmitted from the publisher pipeline to the subscriber pipeline. In this pipeline (publisher), fakesrc is used as a dummy source to establish the pipeline structure. The qtimsgpub element then publishes the literal message ("Command Message") specified through its message property to the cmd_topic topic on the configured MQTT or KAFKA broker. Publisher Pipeline: Use following for MQTT:
Publisher (one-shot):
gst-launch-1.0 -e --gst-debug=2 fakesrc ! qtimsgpub topic=cmd_topic message="Command Message" host=<mosquitto server server ip> port=<port advertised by the server> protocol=mqtt config=<path to mqtt config file>
for Kafka:
# Create topic on host
bin/kafka-topics.sh --create --topic cmd_topic --bootstrap-server localhost:9092
#Example configuration file
[producer-config]
partition-key = "objects"
timeout-ms = "5000"

[consumer-config]
group-id = "tmp-group"
Publisher (one-shot):
gst-launch-1.0 -e --gst-debug=2 fakesrc ! qtimsgpub topic=cmd_topic message="Command Message" host=<kafka server ip> port=<port advertised by the server> protocol=kafka config=<path to kafka config file>
Subscriber Pipeline: This pipeline (subscriber) uses qtimsgsub to subscribe to messages published on the cmd_topic topic of the specified MQTT or KAFKA broker. The received messages are forwarded directly to filesink, which writes them to sub_message.txt. Use following for MQTT:
Subscriber (press Ctrl+C to stop):
gst-launch-1.0 -e --gst-debug=2 qtimsgsub topic=cmd_topic host=<mosquitto server> port=<port advertised by the server> protocol=mqtt config=<path to mqtt config file> ! filesink location=$HOME/media/output/sub_message.txt async=false
for Kafka
Subscriber (press Ctrl+C to stop):
gst-launch-1.0 -e --gst-debug=2 qtimsgsub topic=cmd_topic host=<kafka server ip> port=<port advertised by the server> protocol=kafka config=<path to kafka config file> ! filesink location=$HOME/media/output/sub_message.txt async=false

Publish file contents → Subscribe to file

This example demonstrates the transmission of a text-file-based message from the publisher pipeline to the subscriber pipeline. The file message.txt is first created with the content "Message in file", and the publisher pipeline uses filesrc to read the file contents. The qtimsgpub element then publishes the data as a message to the file_topic topic on the configured MQTT/KAFKA broker. Similar to Use Case 1, the subscriber pipeline uses qtimsgsub to subscribe to the file_topic topic on the MQTT or KAFKA broker, and the received messages are written to sub_message.txt using filesink. Publisher Pipeline: Use following for MQTT:
Publisher (prepare and send):
echo "Message in file" > $HOME/media/message.txt
gst-launch-1.0 -e --gst-debug=2 filesrc location=$HOME/media/message.txt ! qtimsgpub topic=file_topic host=<mosquitto server> port=<port advertised by the server> protocol=mqtt config=<path to mqtt config file>
For Kafka:
# Create topic on host
bin/kafka-topics.sh --create --topic file_topic --bootstrap-server localhost:9092
Publisher (prepare and send):
echo "Message in file" > $HOME/media/message.txt
gst-launch-1.0 -e --gst-debug=2 filesrc location=$HOME/media/message.txt ! qtimsgpub topic=file_topic host=<kafka server ip> port=<port advertised by the server> protocol=kafka config=<path to kafka config file>
Subscriber Pipeline: Use following for MQTT:
Subscriber (press Ctrl+C to stop):
gst-launch-1.0 -e --gst-debug=2 qtimsgsub topic=file_topic host=<mosquitto server> port=<port advertised by the server> protocol=mqtt config=<path to mqtt config file> ! filesink location=$HOME/media/output/sub_message.txt async=false
For Kafka:
Subscriber (press Ctrl+C to stop):
gst-launch-1.0 -e --gst-debug=2 qtimsgsub topic=file_topic host=<kafka server ip> port=<port advertised by the server> protocol=kafka config=<path to kafka config file> ! filesink location=$HOME/media/output/sub_message.txt async=false

Publish AI Metadata packaged in JSON → Subscribe to AI Metadata

This example demonstrates how AI metadata produced after model inference can be transmitted from the publisher pipeline running the AI model to the subscriber pipeline. The subscriber pipeline can then implement custom logic to handle the AI event and trigger the appropriate action. This AI pipeline performs object detection on incoming video frames, and qtimsgpub publishes the resulting classification metadata to the MQTT or KAFKA broker in JSON format. Publisher Pipeline: Use following for MQTT:
Publisher (AI pipeline → JSON):
gst-launch-1.0 -e --gst-debug=2 filesrc location=$HOME/media/$SRC_VIDEO_NAME ! qtdemux ! h264parse ! v4l2h264dec capture-io-mode=4 output-io-mode=4 ! queue ! qtimlvconverter ! queue ! qtimltflite delegate=external external-delegate-path=libQnnTFLiteDelegate.so external-delegate-options="QNNExternalDelegate,backend_type=htp;" model=$HOME/models/$MODEL_NAME ! queue ! qtimlpostprocess module=yolov8 labels=$HOME/labels/$LABELS_NAME settings="{\"confidence\": 51.0}" ! text/x-raw ! queue !  qtimsgpub topic=detection_topic host=<mosquitto server> port=<port advertised by the server> protocol=mqtt config=<path to mqtt config file> json=true async=false
For Kafka
# Create topic on host
bin/kafka-topics.sh --create --topic detection_topic --bootstrap-server localhost:9092
Publisher (AI pipeline → JSON):
gst-launch-1.0 -e --gst-debug=2 filesrc location=$HOME/media/$SRC_VIDEO_NAME ! qtdemux ! h264parse ! v4l2h264dec capture-io-mode=4 output-io-mode=4 ! queue ! qtimlvconverter ! queue ! qtimltflite delegate=external external-delegate-path=libQnnTFLiteDelegate.so external-delegate-options="QNNExternalDelegate,backend_type=htp;" model=$HOME/models/$MODEL_NAME ! queue ! qtimlpostprocess module=yolov8 labels=$HOME/labels/$LABELS_NAME settings="{\"confidence\": 51.0}" ! text/x-raw ! queue !  qtimsgpub topic=detection_topic host=<kafka server ip> port=<port advertised by the server> protocol=kafka config=<path to kafka config file> json=true async=false
Subscriber Pipeline: This subscriber pipeline uses qtimsgsub to subscribe to the detection_topic MQTT or KAFKA topic. The received data, expected to be in JSON format, is then written directly to sub.json through filesink. Use following for MQTT:
Subscriber (press Ctrl+C to stop):
gst-launch-1.0 -e --gst-debug=2 qtimsgsub topic=detection_topic host=<mosquitto server> port=<port advertised by the server> protocol=mqtt config=<path to mqtt config file> ! filesink location=$HOME/media/output/sub.json async=false
For Kafka:
Subscriber (press Ctrl+C to stop):
gst-launch-1.0 -e --gst-debug=2 qtimsgsub topic=detection_topic host=<kafka server ip> port=<port advertised by the server> protocol=kafka config=<path to kafka config file> ! filesink location=$HOME/media/output/sub.json async=false