# -*- coding: utf-8 -*-
###
# Description: This script implements an MQTT client for sending and receiving command messages for CVP.
# It utilizes the Paho MQTT library and Protocol Buffers for message serialization.
# Author: Muhammed Abdullah Shaikh
# Date Created: Feb 14, 2024
# Last Modified: Feb 26, 2024
# Python Version: 3.10.11
# Dependencies: Paho MQTT, Protocol Buffers, protoserver
# License: BSD-3-Clause License
###
import os, sys, uuid, time
import logging
import threading
from datetime import datetime
# sys.path.append("./TMCVP630")
package_dir = os.path.abspath(os.path.dirname(__file__))
tata_motors_path = os.path.join(package_dir, 'TMCVP630')
if tata_motors_path not in sys.path:
sys.path.append(tata_motors_path)
logging.basicConfig(format='[%(asctime)s] [%(levelname)-8s] : "%(message)s"', level=logging.DEBUG, filename="commander.log", filemode="w")
import paho.mqtt.client as mqtt
import paho.mqtt.subscribe as subscribe
from protoserver import utils
from protoserver.utils import fill_message as fill_payload
import tmcvp_common_pb2
import tmcvp_command_pb2
import tmcvp_command_message_pb2
import tmcvp_commandresponse_message_pb2
# from TMCVP630 import tmcvp_common_pb2
# from TMCVP630 import tmcvp_command_pb2
# from TMCVP630 import tmcvp_command_message_pb2
# from TMCVP630 import tmcvp_commandresponse_message_pb2
MQTT_BROKER = "test.mosquitto.org"
PORT_NO = 1883
[docs]
def generate_command_message(subtype):
"""
Generate a CommandMessage with common fields and payload based on subtype from user input.
Parameters:
subtype (int): The subtype of the command message corresponding to `enum commandMessageSubType`
Returns:
tmcvp_command_message_pb2.CommandMessage: The generated CommandMessage.
.. admonition:: For Future Changes
:class: caution
This function creates and sets common fields of command message such as
``message_id``, ``correlation_id``, ``vehicle_id``, ``type``, ``priority``, ``provisioning_state``,
``version``, ``time_stamp``, and ``packet_status`` to hard code values.
If any of these fields change, the code needs to be modified. However,
it is agnostic to changes ``commandPayload``.
Example::
command_message = generate_command_message(1)
print(command_message)
"""
# Create a CommandMessage and set common fields
command_message = tmcvp_command_message_pb2.CommandMessage()
command_message.message_id = str(uuid.uuid4())
command_message.correlation_id = "correlation-id"
command_message.vehicle_id = "MH12VF1121"
command_message.type = tmcvp_common_pb2.eTcuMessageType.command
command_message.subtype = subtype
command_message.priority = "moderate"
command_message.provisioning_state = tmcvp_common_pb2.eProvisioningState.provisioned
command_message.version = "V6.2"
command_message.time_stamp.GetCurrentTime()
command_message.packet_status = tmcvp_common_pb2.PacketStatus.Live
# Select the appropriate command payload based on subtype
command_payload = getattr(
tmcvp_command_pb2,
tmcvp_command_message_pb2.commandMessageSubType.Name(subtype) + "Payload",
)()
fill_payload(command_payload)
# Set the payload in the command message
getattr(
command_message.command_payload,
command_message.command_payload.DESCRIPTOR.fields[subtype].name,
).CopyFrom(command_payload)
return command_message
[docs]
def decode_response(rcvdMsg):
r"""
Decode and print the received MQTT message.
:param bytes rcvdMsg: The received MQTT message in bytes.
.. admonition:: For Future Changes
:class: caution
This function assumes that the fields ``message_id``, ``correlation_id``, ``vehicle_id``,
``type``, ``subtype``, ``priority``, ``provisioning_state``, ``version``, ``time_stamp``, ``packet_status``
and ``return_code`` are present in future proto versions of ``CommandResponseMessage``
as in version TMCVP 6.3. If any of these fields change, the code needs to be
modified. However, it is agnostic to changes ``commandResponsePayload``.
Example:
.. code-block:: python
rcvd_message = b'\n$ebc4a199-67d7-4bea-9fed-d05db21e7744\x12$b82a63a2-6619-496a-8b92-be7a763db448\x1a\x11ACCDEV14012078186 \x01(\x112\x0108\x02B\x056.2.0J\t\x08\x9f\x93\xd6\xae\x06\x10\x8a\x02PLb\x05\x9a\x01\x02\x08\x01'
decode_response(rcvd_message)
"""
print("In Hex:\n{}".format(rcvdMsg.hex(" ").upper()))
try:
# Decode and print the response based on subtype
response_message = tmcvp_commandresponse_message_pb2.CommandResponseMessage()
response_message.ParseFromString(rcvdMsg)
response_payload_type = str(response_message.commandResponsePayload.WhichOneof("commandResponsePayload"))
response_payload = getattr(response_message.commandResponsePayload, response_payload_type)
print("message_id:", response_message.message_id)
print("correlation_id:", response_message.correlation_id)
print("vehicle_id:", response_message.vehicle_id)
print("type:", tmcvp_common_pb2.eTcuMessageType.Name(response_message.type))
print("subtype:", tmcvp_command_message_pb2.commandMessageSubType.Name(response_message.subtype))
print("priority:", response_message.priority)
print("provisioning_state:", tmcvp_common_pb2.eProvisioningState.Name(response_message.provisioning_state))
print("version:", response_message.version)
print("time_stamp:", datetime.fromtimestamp(response_message.time_stamp.seconds).strftime('%Y-%m-%d %H:%M:%S'))
print("packet_status:", tmcvp_common_pb2.PacketStatus.Name(response_message.packet_status))
print("return_code:", tmcvp_commandresponse_message_pb2.eReturnCode.Name(response_message.return_code))
print("commandResponsePayload:\n{}".format(utils.MessageToTable(response_payload, show_empty=True)))
logging.info("Command Response:\n{}".format(utils.MessageToTable(response_message)))
except Exception as e:
print("Parsing Failed. Error: {}".format(e))
return
[docs]
def start_mqtt(client):
"""
Configures callbacks and start the MQTT client for communication. Blocks until connection is established.
Parameters:
client (paho.mqtt.client): MQTT client object
"""
def on_connect(client, userdata, flags, rc):
print("Client with id: " + str(client._client_id) + " Connected")
logging.info("Connected with result code %s", str(rc))
# client.subscribe("/device/+/MQTTPROTOBUF/commandresponse")
def on_message(client, userdata, msg):
msg_len = len(msg.payload)
logging.info("Topic: %s Received Message: %s)", msg.topic, msg.payload)
def on_publish(client, userdata, mid):
logging.info("Message Published: %s", str(mid))
# @client.topic_callback("/device/+/MQTTPROTOBUF/commandresponse")
# def handle_mytopic(client, userdata, message):
# # logging.info("Received Message on Topic: %s\n", message.topic)
# print("Received Message on Topic: {}".format(message.topic))
# decode_response(message.payload)
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.connect(MQTT_BROKER, PORT_NO)
client.loop_start()
# Wait for connection
print("Waiting for connection...")
while not client.is_connected():
time.sleep(0.1)
# client.subscribe("/device/ABC/MQTTPROTOBUF/commandresponse")
def handle_mytopic(client, userdata, message):
# logging.info("Received Message on Topic: %s\n", message.topic)
print("Received Message on Topic: {}".format(message.topic))
decode_response(message.payload)
[docs]
def mqtt_subscribe(vin_no='+'):
"""Subscribe to a MQTT topic for a given VIN number else uses wild card (+)
.. hint::
This function uses `paho.mqtt.subscribe <https://eclipse.dev/paho/files/paho.mqtt.python/html/index.html#subscribe>`_ instead of subcribing with same client.
Reason: I was facing issues when publishing and subcribing with the same client object.
Parameters:
vin_no (str): VIN number for MQTT topic subscription
"""
CommandResponseTopic = "/device/" + vin_no + "/MQTTPROTOBUF/commandresponse"
print("Subcribed to Topic: {}".format(CommandResponseTopic))
subscribe.callback(handle_mytopic, CommandResponseTopic, hostname=MQTT_BROKER, port=PORT_NO)
VinNo = ""
CommandTopic =""
def main():
global VinNo, CommandTopic
# VinNo = "ACCDEV14012078186"
VinNo = input("Enter the VIN number: ").strip()
CommandTopic = "/device/" + VinNo + "/MQTTPROTOBUF/command"
client = mqtt.Client(client_id="Tester_6.3")
start_mqtt(client)
threading.Thread(target=mqtt_subscribe, args=(VinNo,), daemon=True).start()
while True:
try:
subtype = utils.get_enum_input(tmcvp_command_message_pb2.commandMessageSubType.DESCRIPTOR)
while subtype is None:
print("Invalid subtype.")
subtype = utils.get_enum_input(tmcvp_command_message_pb2.commandMessageSubType.DESCRIPTOR)
# Generate and print the command message
command_message = generate_command_message(subtype)
print("Generated CommandMessage:")
print("Serialized Format (in Hex):")
serialized_message = command_message.SerializeToString()
print(serialized_message.hex(" ").upper())
# print("Table format:\n", utils.MessageToTable(command_message))
client.publish(CommandTopic, serialized_message)
logging.info("Published CommandMessage:\n%s", utils.MessageToTable(command_message))
input("Press Enter to continue...")
except KeyboardInterrupt:
break
finally:
client.loop_stop()
if __name__ == "__main__":
main()