#!/usr/bin/env python3 """ hdmonServerTest.py A test client for hdmonServer. This script connects as both a publisher and a subscriber to the hdmonServer proxy. It uses the same mechanisms for determining the host and the publisher and subscriber ports: - Command-line options: --host, --pub-port, and --sub-port - Environment variable: HDMON_PORTS in the format "pub-port:sub-port" If neither is provided, default values of host "localhost", pub-port 11246, and sub-port 11247 are used. The script subscribes to a test topic ("TEST") and sends a multipart message [topic, sender, text] from its publisher socket. It then polls its subscriber socket for up to 5 seconds to verify that the message is received. """ import zmq import sys import argparse import os import time def parse_ports(): parser = argparse.ArgumentParser( description="Test client for hdmonServer. Connects as publisher and subscriber." ) parser.add_argument('--host', type=str, help="Host for connection", default="localhost") parser.add_argument('--pub-port', type=int, help="Port for publisher connection") parser.add_argument('--sub-port', type=int, help="Port for subscriber connection") args = parser.parse_args() host = args.host pub_port = args.pub_port sub_port = args.sub_port # If pub_port or sub_port are not provided, check the HDMON_PORTS environment variable. if pub_port is None or sub_port is None: env_ports = os.environ.get("HDMON_PORTS") if env_ports: parts = env_ports.split(":") if len(parts) == 2: try: pub_port = int(parts[0]) sub_port = int(parts[1]) except ValueError: pass # Fall back to defaults if conversion fails. if pub_port is None: pub_port = 11246 if sub_port is None: sub_port = 11247 return host, pub_port, sub_port def main(): host, pub_port, sub_port = parse_ports() context = zmq.Context() # Create the subscriber socket and connect to the XPUB endpoint of the proxy. subscriber = context.socket(zmq.SUB) sub_endpoint = f"tcp://{host}:{sub_port}" subscriber.connect(sub_endpoint) # Subscribe to messages with topic "TEST" subscriber.setsockopt_string(zmq.SUBSCRIBE, "TEST") # Create the publisher socket and connect to the XSUB endpoint of the proxy. publisher = context.socket(zmq.PUB) pub_endpoint = f"tcp://{host}:{pub_port}" publisher.connect(pub_endpoint) print("hdmonServerTest client started.") print(f" - Publisher connected to: {pub_endpoint}") print(f" - Subscriber connected to: {sub_endpoint}") # Allow some time for the subscription to propagate. time.sleep(1) # Prepare a test multipart message with three parts: [topic, sender, text] topic = "TEST" sender = "hdmonServerTest" text = "Hello from publisher!" message = [topic.encode(), sender.encode(), text.encode()] publisher.send_multipart(message) print(f"Sent message: [{topic}, {sender}, {text}]") # Wait for a message on the subscriber socket (with a 5-second timeout). poller = zmq.Poller() poller.register(subscriber, zmq.POLLIN) socks = dict(poller.poll(timeout=5000)) # timeout in milliseconds if subscriber in socks: msg_parts = subscriber.recv_multipart() # Convert received parts from bytes to strings. msg_parts = [part.decode() for part in msg_parts] print("Received message:") for i, part in enumerate(msg_parts): print(f" Part {i}: {part}") else: print("No message received within the timeout.") # Clean up. subscriber.close() publisher.close() context.term() if __name__ == '__main__': main()