#!/usr/bin/env python3 import tkinter as tk from tkinter import ttk import zmq import argparse import os import threading import subprocess import time import socket import glob from tkinter import scrolledtext # from epics import caget from zmq.utils.monitor import parse_monitor_message # For monitoring socket events LogsWin = None HelpWin = None # ------------------------------- # Global heartbeat timeout (in seconds) # ------------------------------- HEARTBEAT_TIMEOUT = 10 # ------------------------------- # UDL parsing function # ------------------------------- def parse_udl(): """ Parse command-line options and/or environment variable HDMON_UDL. Command-line arguments: --host: Proxy server host --pub-port: Publisher port --sub-port: Subscriber port --topic: Topic to subscribe to If any are missing, the environment variable HDMON_UDL is checked. Expected format for HDMON_UDL is: ZMQ://host:pub-port:sub-port/hdmonctl where "hdmonctl" is the topic. Defaults: host: localhost pub_port: 11246 sub_port: 11247 topic: hdmonctl """ parser = argparse.ArgumentParser( description="hdmonGUI: Subscribe to hdmon_ctl status messages." ) parser.add_argument("--host", type=str, help="Proxy server host") parser.add_argument("--pub-port", type=int, help="Publisher port") parser.add_argument("--sub-port", type=int, help="Subscriber port") parser.add_argument("--topic", type=str, help="Topic to subscribe to") args, _ = parser.parse_known_args() host = args.host pub_port = args.pub_port sub_port = args.sub_port topic = args.topic if not (host and pub_port and sub_port and topic): udl = os.environ.get("HDMON_UDL") if udl: try: # Expected format: "ZMQ://host:pub_port:sub_port/topic" prefix = "ZMQ://" if udl.startswith(prefix): udl = udl[len(prefix):] # Split the string at the first "/" to separate the address and topic parts. addr_part, topic_part = udl.split("/", 1) addr_fields = addr_part.split(":") if len(addr_fields) >= 3: if not host: host = addr_fields[0] if not pub_port: pub_port = int(addr_fields[1]) if not sub_port: sub_port = int(addr_fields[2]) if not topic: topic = topic_part except Exception as e: print("Error parsing HDMON_UDL:", e) if not host: host = "localhost" if not pub_port: pub_port = 11246 if not sub_port: sub_port = 11247 if not topic: topic = "hdmonctl" return host, pub_port, sub_port, topic # ------------------------------- # HDHelpWindow class definition # ------------------------------- class HDHelpWindow(tk.Toplevel): def __init__(self, master=None): super().__init__(master) self.title("Hall-D Data Monitoring Help") # Set desired window size and center it on the screen width, height = 700, 800 self.geometry(f"{width}x{height}") self.update_idletasks() # Make sure window info is available x = (self.winfo_screenwidth() // 2) - (width // 2) y = (self.winfo_screenheight() // 2) - (height // 2) self.geometry(f"{width}x{height}+{x}+{y}") # Default padding (in pixels) defpad = 15 # Create a main container frame with padding main_frame = ttk.Frame(self, padding=defpad) main_frame.pack(fill=tk.BOTH, expand=True) # ------ Title Frame title_frame = ttk.Frame(main_frame) title_frame.pack(fill=tk.X, pady=(0, 5)) # A large title label; adjust the font size as needed title_label = ttk.Label(title_frame, text="Hall-D Data Monitoring Help", font=("TkDefaultFont", 20)) title_label.pack() # ------ Middle Content Frame (with ScrolledText) mid_frame = ttk.Frame(main_frame) mid_frame.pack(fill=tk.BOTH, expand=True, padx=defpad, pady=defpad) # Create a scrolled text widget with no word-wrapping and a white background. # (Using scrolledtext ensures that vertical/horizontal scrollbars appear when needed.) help_text = scrolledtext.ScrolledText(mid_frame, wrap=tk.NONE, background="white") help_text.pack(fill=tk.BOTH, expand=True) # Insert the help text. (Markup has been removed since Tkinter's Text widget # does not support HTML or Pango markup.) help_content = """OVERVIEW ---------------------------- The hdmongui.py program can be used to launch and/or monitor the health of the monitoring farm processes. It launches and kills processes by running the start_monitoring script (which can also be run manually without this program). The monitoring function is done by continuously communicating with each farm process via ZeroMQ messages over TCP. hdmon Connection UDL ---------------------------- Communication with the farm processes is done via ZeroMQ using a proxy PUB/SUB message server called hdmonServer.py. The server location is specified by the value of the HDMON_UDL environment variable which is normally set to: ZMQ://gluon159:11246:11247/hdmonctl If it is not set, the equivalent of the following is used: ZMQ://localhost:11246:11247/hdmonctl This can be changed by changing your environment variable and relaunching hdmongui.py. Note, however, that monitoring processes that were already started using a different UDL will continue to communicate only through that UDL and must be killed and restarted in order to use the new UDL. LEVELS ---------------------------- The monitoring system is designed to support multiple "levels" of monitoring. The main application of this is to have some nodes process at high rate to fill histograms with large statistics. Other nodes can do a more full analysis of each event at a slower rate. There is no limit to the number of levels one can have. The levels are set by files matching the following naming pattern: ${DAQ_HOME}/config/monitoring/hdmon*.conf where DAQ_HOME is an environment variable (normally set to something like "/home/hdops/CDAQ/daq_dev_v0.31/daq") and the "*" part defines the level name. Level names can be as simple as "1" and "2" or more complex strings like "testing". These would correspond to configuration file names "hdmon1.conf", "hdmon2.conf", and "hdmontesting.conf". To create a new level, simply create a new configuration file in the monitoring directory. Keep in mind though that users will usually want to start monitoring for all levels so don't leave extra configuration files lying around that may accidentally get used by unsuspecting shift workers. Also keep in mind that you will need to assign nodes to the new level for any processing to be done (see next section). The "Level" menu selection in the lower right part of the window is used to specify only a single level of monitoring processes be started when the "Start Monitoring" button is pushed. It has no immediate affect if when changed. Only when "Start Monitoring" is clicked. NODE ASSIGNMENTS ---------------------------- The nodes assigned to run processes for each level are specified in the file: ${DAQ_HOME}/config/monitoring/nodes.conf Each line has two items. The first specifies which type of process to run and the second is the node name. For example: mon gluon150 OCCUPANCY would specify that a level "OCCUPANCY" monitoring process should be run on gluon150. Two important things to note: - A node can appear more than once in this file and so may have more than one process launched there. - Other node assignments may be made in this file including special RootSpy archiver process nodes. EVENT SOURCE ---------------------------- The source of events used by the monitoring processes may be specified in multiple ways. If the "COOL" box is checked then the parameters for the DAQ's ET system are extracted for the current CODA configuration (COOL configuration). Usually, this is what shift workers will want to do. If the "COOL" box is not checked, then the value in the "User specified" entry is used. This has the same format as all halld_recon programs accept. Thus, you can put a file name there (see below). In most cases, one should specify an ET system from which to read events. In the Hall-D syntax, an ET system is specified by: ET:filename:station:host:port where: filename is the name of the ET memory mapped file which is usually something like /tmp/et_hdops_ERsoftROC station is the name of the station on the ET system to attach to. It will be created if it does not exist. Usually, this should be something like "MON" or "MON1". host is the hostname of the ET system to attach to (e.g. gluonraid7) port is the TCP port the ET system is listening on. This may be one of 11111 or 23921 or something else altogether. TESTING WITH A FILE - - - - - - - - - - - - To test the monitoring system using an evio file as input, make sure the "COOL" checkbox is unchecked and then enter the full path to the fle in the "User Specified" box before clicking the "Start Monitoring" button. The exact behavior will depend on whether a scondary ET system is specified in the nodes.conf configuration file (it is by default). If so then the secondary ET system is created and a file2et processed will be launched to feed events into it from the specified file. If the "loop" checkbox is checked when "Start Monitoring" is pressed then a "-loop" argument is passed to the start_monitoring script and then to the file2et program. This will cause the file to be continuously read over and over until the monitoring system is stopped. If a secondary ET system is not specified in the nodes.conf file then the filename is passed to all hdmon processes to open and read from independently. The loop checkbox will have no affect in that case. RootSpy and Hydra Processes ---------------------------- The bottom of the window has a set of check buttons that used to toggle various RootSpy processes and Hydra processes on/off. These do not have any immediate affect and are only used to form the correct command line arguments for the start_monitoring script when it is run. In other words, when either the "Start Monitoring" or "Stop Monitoring" button is pressed. The "test mode" button only affects whether the "-t" option is passed to the start_hydra script when start_monitoring runs it. """ help_text.insert(tk.END, help_content) help_text.configure(state='disabled') # make the text read-only # ------ Bottom Buttons Frame bottom_frame = ttk.Frame(main_frame) bottom_frame.pack(fill=tk.X, pady=defpad) close_button = ttk.Button(bottom_frame, text="Close", command=self.close) close_button.pack(side=tk.RIGHT) # Ensure that clicking the window's "X" button also closes the window. self.protocol("WM_DELETE_WINDOW", self.close) def close(self): """Close the help window.""" global HelpWin self.destroy() HelpWin = None # ------------------------------- # HDLogsWindow class definition # ------------------------------- class HDLogsWindow(tk.Toplevel): def __init__(self, master=None): super().__init__(master) self.title("HD Logs Viewer") self.geometry("1000x600") # Determine log directory: use HDLOG environment variable if set, # otherwise fallback to HOME (or /tmp if HOME is not set). self.logdir = os.getenv('HDLOG', os.getenv('HOME', '/tmp')) # Create the main frames. self.left_frame = ttk.Frame(self) self.left_frame.pack(side=tk.LEFT, fill=tk.Y, padx=10, pady=10) self.right_frame = ttk.Frame(self) self.right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=10, pady=10) # Create a Treeview in the left frame for listing log files. self.store = ttk.Treeview(self.left_frame, columns=("filename",), show="headings", height=20) self.store.heading("filename", text="Log Files") self.store.column("filename", width=250, anchor="w") self.store.pack(fill=tk.Y, expand=True) self.store.bind("<>", self.Refresh) # Add a refresh button to update the file list. refresh_btn = ttk.Button(self.left_frame, text="Refresh List", command=self.UpdateFileList) refresh_btn.pack(pady=(5, 0)) # Create a scrolled text widget in the right frame for displaying log file contents. self.log_text = scrolledtext.ScrolledText(self.right_frame, wrap=tk.WORD, font=("Courier", 10)) self.log_text.pack(fill=tk.BOTH, expand=True) # ------ Bottom Buttons Frame bottom_frame = ttk.Frame(self.right_frame) bottom_frame.pack(fill=tk.X, pady=15) close_button = ttk.Button(bottom_frame, text="Close", command=self.close) close_button.pack(side=tk.RIGHT) # Ensure that clicking the window's "X" button also closes the window. self.protocol("WM_DELETE_WINDOW", self.close) # Initially populate the file list. self.UpdateFileList() def Quit(self, event=None): print("Quitting Logs Window") global LogsWin self.destroy() LogsWin = None def close(self): print("Closing Logs Window") global LogsWin self.destroy() LogsWin = None def Refresh(self, event=None): # Called when a file is selected in the treeview. selected = self.store.selection() if not selected: self.log_text.delete("1.0", tk.END) return # The stored value is the full file path. item = self.store.item(selected[0]) filename = item["values"][0] self.ReadLogFile(filename) def UpdateFileList(self): # Get list of log files matching hdmon_*.log in self.logdir. pattern = os.path.join(self.logdir, "hdmon_*.log") logfiles = [f for f in glob.glob(pattern) if os.path.isfile(f)] logfiles.sort(key=lambda x: os.path.getmtime(x)) # Clear current entries. for item in self.store.get_children(): self.store.delete(item) # Insert each log file (store the full path in the hidden column). for file in logfiles: self.store.insert("", tk.END, values=(file,)) def HighlightLine(self, line): # For simplicity, return the line unchanged. # You could add processing here to highlight errors or bold markers. return line def ReadLogFile(self, filename): # Display a "loading" message. self.log_text.delete("1.0", tk.END) self.log_text.insert(tk.END, f"Loading {filename}...\n") self.update_idletasks() try: with open(filename, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() except Exception as e: self.log_text.delete("1.0", tk.END) self.log_text.insert(tk.END, f"Error reading file: {e}") return # Optionally, if the file is very long, display only the last 1000 lines. if len(lines) > 1000: lines = lines[-1000:] processed_lines = [] for line in lines: if "ET stalled ..." in line: continue # Remove trailing newline and process line if desired. processed = self.HighlightLine(line.rstrip("\n")) processed_lines.append(processed) content = "\n".join(processed_lines) self.log_text.delete("1.0", tk.END) self.log_text.insert(tk.END, content) self.log_text.see(tk.END) # ------------------------------- # MonitoringGUI class definition # ------------------------------- class MonitoringGUI(tk.Tk): def __init__(self): super().__init__() self.title("Hall-D Data Monitoring Controls (Tkinter)") self.geometry("1200x800") # Get UDL configuration. self.host, self.pub_port, self.sub_port, self.topic = parse_udl() # Compute a unique name for this instance based on hostname and PID. local_hostname = socket.gethostname() pid = os.getpid() self.unique_name = f"{local_hostname}_{pid}" # Print connection info. print(f"Connecting to proxy at tcp://{self.host}:{self.sub_port}") print(f"Unique name: {self.unique_name}") print(f"Subscribed topic: {self.topic}") print(f"Publisher will send messages to: tcp://{self.host}:{self.pub_port}") # Variable to track connection status. self.proxy_connected = False # Header: display connection info and connection status. header_frame = ttk.Frame(self) header_frame.pack(fill=tk.X, padx=5, pady=5) conn_info = f"Proxy: SUB: tcp://{self.host}:{self.sub_port} | Pub: tcp://{self.host}:{self.pub_port}" self.conn_label = ttk.Label(header_frame, text=conn_info) self.conn_label.pack(side=tk.LEFT) self.status_label = ttk.Label(header_frame, text="Disconnected", foreground="red") self.status_label.pack(side=tk.RIGHT) # Dictionary to hold node info keyed by unique sender name. self.nodes = {} # Main GUI Layout main_frame = ttk.Frame(self) main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Left Frame: Node list (Treeview) left_frame = ttk.Frame(main_frame) left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) ttk.Label(left_frame, text="Nodes", font=("Arial", 10, "bold")).pack(anchor=tk.W) self.tree = ttk.Treeview( left_frame, columns=("name", "node", "level", "nthr", "nevents", "rate"), show="headings", height=12 ) self.tree.pack(fill=tk.BOTH, expand=True) for col, heading in zip( ("name", "node", "level", "nthr", "nevents", "rate"), ("name", "node", "level", "NThr", "Nevents", "rate") ): self.tree.heading(col, text=heading) self.tree.column("name", width=120, anchor="w") self.tree.column("node", width=150, anchor="w") self.tree.column("level", width=100, anchor="w") self.tree.column("nthr", width=50, anchor="e") self.tree.column("nevents", width=100, anchor="e") self.tree.column("rate", width=80, anchor="e") self.tree.bind("<>", self.on_tree_select) # Right Frame: Contains Totals, Level Summary, and Node Details. right_frame = ttk.Frame(main_frame) right_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=10) # Totals Frame. totals_frame = ttk.Frame(right_frame, borderwidth=2, relief=tk.RIDGE) totals_frame.pack(fill=tk.X, pady=5) totals_font = ("Arial", 12, "bold") totals_frame_header = ttk.Label(totals_frame, text="Totals", font=totals_font) totals_frame_header.grid(row=0, column=0, columnspan=2, sticky=tk.W, padx=5, pady=(5,2)) self.total_rate_var = tk.StringVar(value="-- Hz") self.total_events_var = tk.StringVar(value="--") self.total_nodes_var = tk.StringVar(value="--") self.total_threads_var = tk.StringVar(value="--") row = 1 ttk.Label(totals_frame, text="Total Rate:", font=totals_font).grid(row=row, column=0, sticky=tk.W, padx=5) ttk.Label(totals_frame, textvariable=self.total_rate_var, font=totals_font).grid(row=row, column=1, sticky=tk.W, padx=5) row += 1 ttk.Label(totals_frame, text="Total Events:", font=totals_font).grid(row=row, column=0, sticky=tk.W, padx=5) ttk.Label(totals_frame, textvariable=self.total_events_var, font=totals_font).grid(row=row, column=1, sticky=tk.W, padx=5) row += 1 ttk.Label(totals_frame, text="Total Nodes:", font=totals_font).grid(row=row, column=0, sticky=tk.W, padx=5) ttk.Label(totals_frame, textvariable=self.total_nodes_var, font=totals_font).grid(row=row, column=1, sticky=tk.W, padx=5) row += 1 ttk.Label(totals_frame, text="Total Threads:", font=totals_font).grid(row=row, column=0, sticky=tk.W, padx=5) ttk.Label(totals_frame, textvariable=self.total_threads_var, font=totals_font).grid(row=row, column=1, sticky=tk.W, padx=5) # Level Summary Frame. level_frame = ttk.Frame(right_frame, borderwidth=2, relief=tk.RIDGE) level_frame.pack(fill=tk.BOTH, pady=5) # Label with Arial 10 bold font. ttk.Label(level_frame, text="Level Summary", font=("Arial", 10, "bold")).pack(anchor=tk.W, padx=5, pady=(5,2)) # Create a custom style for the level summary TreeView with a smaller cell font. level_style = ttk.Style() level_style.configure("Level.Treeview", font=("Arial", 9)) level_style.configure("Level.Treeview.Heading", font=("Arial", 9)) self.level_tree = ttk.Treeview( level_frame, columns=("level", "rate", "events", "nodes", "thr"), show="headings", height=4, style="Level.Treeview" ) self.level_tree.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Set headers and column widths. self.level_tree.heading("level", text="Level", anchor="e") self.level_tree.column("level", width=70, anchor="e") self.level_tree.heading("rate", text="Rate", anchor="e") self.level_tree.column("rate", width=60, anchor="e") self.level_tree.heading("events", text="Events", anchor="e") self.level_tree.column("events", width=60, anchor="e") self.level_tree.heading("nodes", text="Nodes", anchor="e") self.level_tree.column("nodes", width=50, anchor="e") self.level_tree.heading("thr", text="Thr", anchor="e") self.level_tree.column("thr", width=50, anchor="e") # Node Details Frame. details_frame = ttk.Frame(right_frame, borderwidth=2, relief=tk.RIDGE) details_frame.pack(fill=tk.BOTH, expand=True, pady=5) ttk.Label(details_frame, text="Node Details", font=("Arial", 10, "bold")).pack(anchor=tk.W) self.selected_name_var = tk.StringVar(value="---") self.selected_node_var = tk.StringVar(value="---") self.selected_threads_var = tk.StringVar(value="---") self.selected_event_source_var = tk.StringVar(value="---") f_details = ttk.Frame(details_frame) f_details.pack(padx=5, pady=5, fill=tk.BOTH, expand=True) ttk.Label(f_details, text="Name:").grid(row=0, column=0, sticky=tk.W) ttk.Label(f_details, textvariable=self.selected_name_var).grid(row=0, column=1, sticky=tk.W) ttk.Label(f_details, text="Node:").grid(row=1, column=0, sticky=tk.W) ttk.Label(f_details, textvariable=self.selected_node_var).grid(row=1, column=1, sticky=tk.W) ttk.Label(f_details, text="Threads:").grid(row=2, column=0, sticky=tk.W) ttk.Label(f_details, textvariable=self.selected_threads_var).grid(row=2, column=1, sticky=tk.W) ttk.Label(f_details, text="Event Source:").grid(row=3, column=0, sticky=tk.W) ttk.Label(f_details, textvariable=self.selected_event_source_var).grid(row=3, column=1, sticky=tk.W) self.stop_node_button = ttk.Button(details_frame, text="stop node", command=self.stop_node) self.stop_node_button.pack(pady=(0, 5)) # Bottom Frame: Controls. bottom_frame = ttk.Frame(self, relief=tk.FLAT) bottom_frame.pack(fill=tk.X, padx=5, pady=5) row1_frame = ttk.Frame(bottom_frame) row1_frame.pack(fill=tk.X, expand=True) self.cool_var = tk.BooleanVar(value=True) cool_check = ttk.Checkbutton(row1_frame, text="COOL", variable=self.cool_var, command=self.toggle_user_entry) cool_check.pack(side=tk.LEFT) ttk.Label(row1_frame, text="User specified:").pack(side=tk.LEFT, padx=5) self.user_source_var = tk.StringVar(value="ET:/tmp/et_sys_monitoring:MON:gluonraid7-ib:11122") self.user_entry = ttk.Entry(row1_frame, textvariable=self.user_source_var, width=15, state=tk.DISABLED) self.user_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5) self.loop_var = tk.BooleanVar(value=True) loop_check = ttk.Checkbutton(row1_frame, text="loop", variable=self.loop_var) loop_check.pack(side=tk.RIGHT, padx=5) row2_frame = ttk.Frame(self) row2_frame.pack(fill=tk.X, padx=5, pady=5) ttk.Label(row2_frame, text="Level:").pack(side=tk.LEFT, padx=5) self.level_var = tk.StringVar(value="") self.CBlevel = ttk.Combobox(row2_frame, textvariable=self.level_var, values=[""], width=12) self.CBlevel.pack(side=tk.LEFT, padx=5) self.rsarchiver_var = tk.BooleanVar(value=True) ttk.Checkbutton(row2_frame, text="RSArchiver", variable=self.rsarchiver_var).pack(side=tk.LEFT) self.rstime_var = tk.BooleanVar(value=True) ttk.Checkbutton(row2_frame, text="RSTimeSeries", variable=self.rstime_var).pack(side=tk.LEFT) self.rsai_var = tk.BooleanVar(value=True) ttk.Checkbutton(row2_frame, text="RSAI", variable=self.rsai_var).pack(side=tk.LEFT) self.hydra_var = tk.BooleanVar(value=True) ttk.Checkbutton(row2_frame, text="Hydra", variable=self.hydra_var).pack(side=tk.LEFT) self.testmode_var = tk.BooleanVar(value=False) ttk.Checkbutton(row2_frame, text="test mode", variable=self.testmode_var).pack(side=tk.LEFT) control_frame = ttk.Frame(self) control_frame.pack(fill=tk.X, padx=5, pady=5) ttk.Button(control_frame, text="Start Monitoring", command=self.start_monitoring).pack(side=tk.LEFT, padx=5) ttk.Button(control_frame, text="Stop Monitoring", command=self.stop_monitoring).pack(side=tk.LEFT, padx=5) ttk.Button(control_frame, text="Stop All Nodes", command=self.stop_all_nodes).pack(side=tk.LEFT, padx=5) ttk.Button(control_frame, text="Logs", command=self.show_logs).pack(side=tk.LEFT, padx=5) ttk.Button(control_frame, text="Help", command=self.show_help).pack(side=tk.LEFT, padx=5) ttk.Button(control_frame, text="Quit", command=self.quit).pack(side=tk.RIGHT, padx=5) # Fill level menu self.FindConfigMonLevels() # Start the ZeroMQ subscriber thread, heartbeat checker, connection status updater, and connection monitor. self.start_zmq_subscriber() self.check_heartbeats() self.update_connection_status() self.start_connection_monitor() #----------------------- # FindConfigMonLevels # # The monitoring system supports multiple levels. # Look for files in the config directory matching the hdmonXXX.conf pattern # and return a list of all the "XXX" parts. (These should correspond to labels # in the nodes.conf file) #----------------------- def FindConfigMonLevels(self): # Start with a default level. mon_levels = ["< all levels >"] # Build the file path. daq_home = os.getenv("DAQ_HOME", "/gluex") fname = os.path.join(daq_home, "config", "monitoring", "nodes.conf") if os.path.exists(fname): with open(fname, "r") as f: for line in f: line = line.strip() # Skip empty lines and comment lines. if not line or line.startswith("#"): continue vals = line.split() # We only care about lines beginning with 'mon' if vals[0] != "mon": continue if len(vals) < 3: print(f"too few tokens in mon line in {fname} :") print(line) else: # Assume the third token contains a comma-separated list of levels. levels = vals[2].split(',') if levels: mon_levels.extend(levels) # Remove duplicates and sort the list alphabetically. mon_levels = sorted(set(mon_levels)) # Update the combobox values. self.CBlevel["values"] = mon_levels # If the current selection (stored in the associated StringVar) is not in the new list, # select the first item. current_value = self.level_var.get() if current_value not in mon_levels: self.CBlevel.current(0) # Record the current mon_levels. self.mon_levels = mon_levels # --------------------------- # ZeroMQ Subscriber Functions # --------------------------- def start_zmq_subscriber(self): self.zmq_context = zmq.Context() self.publisher = self.zmq_context.socket(zmq.PUB) pub_endpoint = f"tcp://{self.host}:{self.pub_port}" self.publisher.connect(pub_endpoint) self.subscriber = self.zmq_context.socket(zmq.SUB) sub_endpoint = f"tcp://{self.host}:{self.sub_port}" self.subscriber.connect(sub_endpoint) # Subscribe to both "hdmonctl" (from UDL) and "hdmongui". self.subscriber.setsockopt_string(zmq.SUBSCRIBE, self.topic) self.subscriber.setsockopt_string(zmq.SUBSCRIBE, "hdmongui") self.sub_thread = threading.Thread(target=self.subscriber_loop, daemon=True) self.sub_thread.start() print(f"Publisher connected to {pub_endpoint}") print(f"Subscriber connected to {sub_endpoint} on topics '{self.topic}' and 'hdmongui'") def subscriber_loop(self): while True: try: parts = self.subscriber.recv_multipart() if len(parts) < 3: continue subject = parts[0].decode('utf-8') sender = parts[1].decode('utf-8') text = parts[2].decode('utf-8') current_time = time.time() if text.strip() == "I am here": info = self.nodes.get(sender, {}) info["name"] = sender info["node"] = sender.split("_")[0] if "_" in sender else sender info["level"] = "N/A" info["nthr"] = 0 info["nevents"] = 0 info["rate"] = 0.0 info["last_update"] = current_time self.nodes[sender] = info elif text.startswith("thread info"): lines = text.splitlines() info = self.nodes.get(sender, {}) info["name"] = sender info["node"] = sender.split("_")[0] if "_" in sender else sender info["level"] = "N/A" for line in lines: if line.startswith("threads:"): try: info["nthr"] = int(line.split(":", 1)[1].strip()) except: info["nthr"] = 0 elif line.startswith("instantaneous_rates:"): try: info["rate"] = float(line.split(":", 1)[1].strip()) except: info["rate"] = 0.0 elif line.startswith("nevents:"): try: info["nevents"] = int(line.split(":", 1)[1].strip()) except: info["nevents"] = 0 elif line.startswith("level:"): try: info["level"] = line.split(":", 1)[1].strip() except: info["level"] = '----' info["last_update"] = current_time self.nodes[sender] = info self.after(0, self.update_treeview) except Exception as e: print("Error in subscriber_loop:", e) time.sleep(1) # --------------------------- # Connection Monitor using ZMQ socket monitoring. # --------------------------- def start_connection_monitor(self): self.subscriber.monitor("inproc://monitor.sub", zmq.EVENT_CONNECTED | zmq.EVENT_DISCONNECTED) self.monitor_socket = self.zmq_context.socket(zmq.PAIR) self.monitor_socket.connect("inproc://monitor.sub") self.monitor_thread = threading.Thread(target=self.connection_monitor_loop, daemon=True) self.monitor_thread.start() def connection_monitor_loop(self): while True: try: event = self.monitor_socket.recv_multipart() msg = parse_monitor_message(event) if msg['event'] == zmq.EVENT_CONNECTED: self.proxy_connected = True elif msg['event'] == zmq.EVENT_DISCONNECTED: self.proxy_connected = False except Exception as e: print("Error in connection_monitor_loop:", e) time.sleep(1) # --------------------------- # Heartbeat Checker # --------------------------- def check_heartbeats(self): current_time = time.time() for sender in list(self.nodes.keys()): last_update = self.nodes[sender].get("last_update", 0) if (current_time - last_update) > HEARTBEAT_TIMEOUT: del self.nodes[sender] self.update_treeview() self.after(3000, self.check_heartbeats) # --------------------------- # Connection Status Updater # --------------------------- def update_connection_status(self): if self.proxy_connected: self.status_label.config(text="Connected", foreground="green") else: self.status_label.config(text="Disconnected", foreground="red") self.after(1000, self.update_connection_status) # --------------------------- # GUI Update Functions # --------------------------- def update_treeview(self): # Save the currently selected sender, if any. current_selection = self.tree.selection() selected_sender = None if current_selection: selected_sender = self.tree.item(current_selection[0], "values")[0] for item in self.tree.get_children(): self.tree.delete(item) total_rate = 0.0 total_events = 0 total_threads = 0 for sender, info in self.nodes.items(): row = ( info.get("name", ""), info.get("node", ""), info.get("level", ""), info.get("nthr", 0), info.get("nevents", 0), f'{info.get("rate", 0.0):.1f}' ) self.tree.insert("", tk.END, values=row) total_rate += info.get("rate", 0.0) total_events += info.get("nevents", 0) total_threads += info.get("nthr", 0) self.total_rate_var.set(f"{total_rate:.1f} Hz") self.total_events_var.set(str(total_events)) self.total_nodes_var.set(str(len(self.nodes))) self.total_threads_var.set(str(total_threads)) # Update level summary. self.update_level_treeview() # Restore selection if still present. if selected_sender: for item in self.tree.get_children(): values = self.tree.item(item, "values") if values and values[0] == selected_sender: self.tree.selection_set(item) break def update_level_treeview(self): # Group nodes by level. summary = {} for info in self.nodes.values(): level = info.get("level", "N/A") if level not in summary: summary[level] = {"rate": 0.0, "events": 0, "nodes": 0, "thr": 0} summary[level]["rate"] += info.get("rate", 0.0) summary[level]["events"] += info.get("nevents", 0) summary[level]["thr"] += info.get("nthr", 0) summary[level]["nodes"] += 1 for item in self.level_tree.get_children(): self.level_tree.delete(item) for level, sums in summary.items(): row = (level, f"{sums['rate']:.1f}", sums["events"], sums["nodes"], sums["thr"]) self.level_tree.insert("", tk.END, values=row) def on_tree_select(self, event): selected_item = self.tree.selection() if not selected_item: return values = self.tree.item(selected_item[0], "values") self.selected_name_var.set(values[0]) self.selected_node_var.set(values[1]) self.selected_threads_var.set(values[3]) self.selected_event_source_var.set("ET:/tmp/et_sys_monitoring:MON:...") def toggle_user_entry(self): if self.cool_var.get(): self.user_entry.config(state=tk.DISABLED) else: self.user_entry.config(state=tk.NORMAL) #--------------------- # FormStartMonitoringCommand # # This generates the command used by both StarMonitoring and StopMonitoring. # The latter just appends a "-e". #--------------------- def FormStartMonitoringCommand(self, widget=None, event=None, data=None): cmd = ['start_monitoring'] if not self.cool_var.get(): # User specified source source = f'-s{self.user_source_var.get()}' cmd.append(source) # Try extracting run number from source in filename try: fname = str(self.user_source_var.get()) toks = fname.split('_') if len(toks)>2: RUN = int(toks[-2]) if int(RUN)>1 and int(RUN)<1E6 : cmd.append('-R%s' % RUN) except: pass else: # Use COOL (also explicitly set run number) # RUN = caget("HD:coda:daq:run_number") # pyepics not currently installed on RHEL9 tokens = subprocess.check_output(["caget", "HD:coda:daq:run_number"]).decode().strip().split() if len(tokens) >= 2: RUN = tokens[1] print(f'COOL source specified. Assuming online monitoring. Will start specifying run {RUN}') if RUN != '' : if int(RUN)>1 and int(RUN)<1E6 : cmd.append('-R%s' % RUN) else: print('WARNING: unable to get HD:coda:daq:run_number from epics. Run number will not be specified.') if self.rsarchiver_var.get() and self.cool_var.get(): cmd.append('-A') if self.rstime_var.get(): cmd.append('-T') if self.rsai_var.get(): cmd.append('-AI') if self.hydra_var.get(): cmd.append('-Hydra') if self.testmode_var.get(): cmd.append('-testmode') level = self.level_var.get() if level != '< all levels >': cmd.append('-L%s' % level) if self.loop_var.get(): cmd.append('-loop') return cmd #--------------------- # start_monitoring #--------------------- def start_monitoring(self): cmd = self.FormStartMonitoringCommand() print(' '.join(cmd)) subprocess.call(cmd) return False #--------------------- # stop_monitoring #--------------------- def stop_monitoring(self): cmd = self.FormStartMonitoringCommand() cmd.append('-e') print(' '.join(cmd)) subprocess.call(cmd) return False #--------------------- # stop_all_nodes #--------------------- def stop_all_nodes(self): # Loop over all currently displayed nodes for sender, info in self.nodes.items(): topic = info.get("name", "---") message = "quit" print(f"Telling node {topic} to quit ...") self.publisher.send_string(f"{topic} {message}") return False #--------------------- # stop_node #--------------------- def stop_node(self): topic = self.selected_name_var.get() message = "quit" print(f"Telling node {topic} to quit ...") self.publisher.send_string(f"{topic} {message}") # gobject.timeout_add(2000, self.KillNode, node) return False #--------------------- # show_logs #--------------------- def show_logs(self): global LogsWin if LogsWin != None: LogsWin.window.show() LogsWin.show_all() else: LogsWin = HDLogsWindow() return False #--------------------- # show_help #--------------------- def show_help(self): global HelpWin if HelpWin != None: HelpWin.window.show() HelpWin.show_all() else: HelpWin = HDHelpWindow() return False ######################################################################### if __name__ == "__main__": app = MonitoringGUI() app.mainloop()