1131017-待測
問:
使用繁體中文,以下這段程式"import socket import threading import tkinter as tk from tkinter import filedialog, ttk import time import os import subprocess class Server: def __init__(self, root): self.root = root self.root.title("Server") self.root.geometry("800x600") # 新增標題 "UDP派送檔案" self.title_label = tk.Label(self.root, text="UDP派送檔案", font=("Arial", 18), fg="deepskyblue") self.title_label.pack(pady=10) # 建立表格 self.tree = ttk.Treeview(root, columns=("IP", "MAC", "Name", "Status"), show='headings') # 設定表格標題與資料置中 self.tree.heading("IP", text="IP 位置", anchor="center") self.tree.heading("MAC", text="MAC 位置", anchor="center") self.tree.heading("Name", text="電腦名稱", anchor="center") self.tree.heading("Status", text="狀態", anchor="center") # 設定每個欄位的資料置中 self.tree.column("IP", anchor="center") self.tree.column("MAC", anchor="center") self.tree.column("Name", anchor="center") self.tree.column("Status", anchor="center") self.tree.pack(fill=tk.BOTH, expand=True) # 儲存客戶端資料的字典 self.client_data_dict = {} # 建立廣播按鈕 self.broadcast_button = tk.Button(self.root, text="手動廣播伺服器", command=self.start_broadcast_manually) self.broadcast_button.pack(pady=10) # 建立傳輸檔案按鈕 self.transfer_button = tk.Button(self.root, text="傳送檔案", command=self.select_client_and_file) self.transfer_button.pack(pady=10) # 新增備註區域,左右兩格 self.footer_frame = tk.Frame(self.root) self.footer_frame.pack(pady=20, fill=tk.X) # 左邊格 - 跑馬燈效果,文字顏色改為gold self.marquee_text = "感謝ChatGPT 感謝UDPcast " self.marquee_label = tk.Label(self.footer_frame, text=self.marquee_text, font=("Arial", 12), fg="gold") self.marquee_label.pack(side=tk.LEFT, padx=10) # 右邊格 - 固定文字,文字顏色改為gold self.fixed_label = tk.Label(self.footer_frame, text="製作者:WCS 日期:202410.01", font=("Arial", 12), fg="gold") self.fixed_label.pack(side=tk.RIGHT, padx=10) # 啟動跑馬燈 self.move_text() # 取得伺服端的本地 IP self.server_ip = self.get_local_ip() # 啟動伺服器 self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.server_socket.bind(("", 37020)) # 啟動接收客戶端報到的線程 threading.Thread(target=self.receive_clients, daemon=True).start() # 啟動時廣播30秒 threading.Thread(target=self.broadcast_for_30_seconds, daemon=True).start() def get_local_ip(self): hostname = socket.gethostname() local_ip = socket.gethostbyname(hostname) print(f"伺服端 IP: {local_ip}") return local_ip def move_text(self): # 跑馬燈效果,將字串每次往左移動一個字符 self.marquee_text = self.marquee_text[1:] + self.marquee_text[0] self.marquee_label.config(text=self.marquee_text) self.root.after(200, self.move_text) # 每200毫秒移動一次 def receive_clients(self): while True: try: data, addr = self.server_socket.recvfrom(1024) client_data = data.decode().split(',') # 確認收到的資料是否完整 if len(client_data) == 3: client_ip, client_mac, client_name = client_data # 排除伺服端自己的 IP,不顯示在表格中 if client_ip != self.server_ip: self.update_or_insert_client(client_ip, client_mac, client_name, "Online") else: print(f"接收到不完整的資料:{client_data}") except Exception as e: print(f"接收資料時發生錯誤: {e}") def update_or_insert_client(self, ip, mac, name, status): if ip in self.client_data_dict or mac in self.client_data_dict: # 更新該行的資料 item_id = self.client_data_dict.get(ip) or self.client_data_dict.get(mac) self.tree.item(item_id, values=(ip, mac, name, status)) else: # 插入新行並保存該行的 ID item_id = self.tree.insert("", "end", values=(ip, mac, name, status)) self.client_data_dict[ip] = item_id self.client_data_dict[mac] = item_id def broadcast_for_30_seconds(self): start_time = time.time() while time.time() - start_time < 30: try: self.server_socket.sendto(b"Server here", ('<broadcast>', 37020)) time.sleep(5) except Exception as e: print(f"廣播時發生錯誤: {e}") print("廣播已停止") def start_broadcast_manually(self): threading.Thread(target=self.broadcast_for_30_seconds, daemon=True).start() def select_client_and_file(self): selected_items = self.tree.selection() # 可以選取多個客戶端 if selected_items: client_ips = [self.tree.item(item, 'values')[0] for item in selected_items] # 獲取所有選中的客戶端 IP # 選擇檔案或資料夾 file_paths = filedialog.askopenfilenames(title="選擇檔案") if file_paths: # 傳送檔案清單給每一個選中的客戶端 for client_ip in client_ips: self.prepare_file_transfer_list(client_ip, file_paths) def prepare_file_transfer_list(self, client_ip, file_paths): transfer_list = "\n".join(file_paths) print(f"準備傳輸檔案到 {client_ip}:\n{transfer_list}") # 傳送檔案清單給 client threading.Thread(target=self.send_file_list, args=(client_ip, file_paths), daemon=True).start() def send_file_list(self, client_ip, file_paths): # 建立檔案傳輸指令 file_list_str = ",".join(file_paths) try: self.server_socket.sendto(file_list_str.encode(), (client_ip, 8500)) print(f"檔案傳輸清單已傳送至 {client_ip}") # 等待2秒後再使用 udp-sender.exe 傳送檔案 time.sleep(2) # 根據選取的客戶端數量,計算 min-receivers selected_clients = self.tree.selection() client_count = len(selected_clients) # 使用 udp-sender.exe 傳送檔案 for file_path in file_paths: if os.path.exists(file_path): subprocess.run([ 'udp-sender.exe', '--file', file_path, '--port', '8500', '--min-receivers', str(client_count), # 根據選取的客戶端數量設定 '--autostart', '5' # 自動啟動,等待 5 秒 ]) print(f"檔案 {file_path} 已傳送至 {client_ip},並等待 {client_count} 個接收端") except Exception as e: print(f"傳送檔案清單時發生錯誤: {e}") # 啟動伺服端 root = tk.Tk() server = Server(root) root.mainloop()" ,要如何使用subprocess.Popen來補捉"# 使用 udp-sender.exe 傳送檔案 for file_path in file_paths: if os.path.exists(file_path): subprocess.run([ 'udp-sender.exe', '--file', file_path, '--port', '8500', '--min-receivers', str(client_count), # 根據選取的客戶端數量設定 '--autostart', '5' # 自動啟動,等待 5 秒 ]) print(f"檔案 {file_path} 已傳送至 {client_ip},並等待 {client_count} 個接收端")"這程式執行後的輸出,並能夠呈現在UI界面上?
要使用 subprocess.Popen 來捕捉 udp-sender.exe 執行後的輸出並顯示在 UI 界面上,你可以進行以下步驟:
使用 subprocess.Popen 替代 subprocess.run,並指定 stdout 和 stderr 的管道。
使用 threading 來異步讀取 stdout 的內容。
在 UI 中新增一個顯示執行結果的區域,例如一個 Text 小部件。
以下是修改後的程式碼片段,展示如何實現這個功能:
import socket
import threading
import tkinter as tk
from tkinter import filedialog, ttk
import time
import os
import subprocess
class Server:
def __init__(self, root):
self.root = root
self.root.title("Server")
self.root.geometry("800x600")
# 新增標題 "UDP派送檔案"
self.title_label = tk.Label(self.root, text="UDP派送檔案", font=("Arial", 18), fg="deepskyblue")
self.title_label.pack(pady=10)
# 建立表格
self.tree = ttk.Treeview(root, columns=("IP", "MAC", "Name", "Status"), show='headings')
self.tree.heading("IP", text="IP 位置", anchor="center")
self.tree.heading("MAC", text="MAC 位置", anchor="center")
self.tree.heading("Name", text="電腦名稱", anchor="center")
self.tree.heading("Status", text="狀態", anchor="center")
self.tree.column("IP", anchor="center")
self.tree.column("MAC", anchor="center")
self.tree.column("Name", anchor="center")
self.tree.column("Status", anchor="center")
self.tree.pack(fill=tk.BOTH, expand=True)
# 儲存客戶端資料的字典
self.client_data_dict = {}
# 新增一個 Text 控制元件來顯示輸出
self.output_text = tk.Text(self.root, height=10)
self.output_text.pack(pady=10, fill=tk.BOTH, expand=True)
# 建立廣播按鈕
self.broadcast_button = tk.Button(self.root, text="手動廣播伺服器", command=self.start_broadcast_manually)
self.broadcast_button.pack(pady=10)
# 建立傳輸檔案按鈕
self.transfer_button = tk.Button(self.root, text="傳送檔案", command=self.select_client_and_file)
self.transfer_button.pack(pady=10)
# 取得伺服端的本地 IP
self.server_ip = self.get_local_ip()
# 啟動伺服器
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.server_socket.bind(("", 37020))
# 啟動接收客戶端報到的線程
threading.Thread(target=self.receive_clients, daemon=True).start()
def get_local_ip(self):
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
print(f"伺服端 IP: {local_ip}")
return local_ip
def select_client_and_file(self):
selected_items = self.tree.selection() # 可以選取多個客戶端
if selected_items:
client_ips = [self.tree.item(item, 'values')[0] for item in selected_items] # 獲取所有選中的客戶端 IP
# 選擇檔案或資料夾
file_paths = filedialog.askopenfilenames(title="選擇檔案")
if file_paths:
# 傳送檔案清單給每一個選中的客戶端
for client_ip in client_ips:
self.prepare_file_transfer_list(client_ip, file_paths)
def prepare_file_transfer_list(self, client_ip, file_paths):
transfer_list = "\n".join(file_paths)
print(f"準備傳輸檔案到 {client_ip}:\n{transfer_list}")
# 傳送檔案清單給 client
threading.Thread(target=self.send_file_list, args=(client_ip, file_paths), daemon=True).start()
def send_file_list(self, client_ip, file_paths):
file_list_str = ",".join(file_paths)
try:
self.server_socket.sendto(file_list_str.encode(), (client_ip, 8500))
print(f"檔案傳輸清單已傳送至 {client_ip}")
# 等待2秒後再使用 udp-sender.exe 傳送檔案
time.sleep(2)
# 根據選取的客戶端數量,計算 min-receivers
selected_clients = self.tree.selection()
client_count = len(selected_clients)
# 使用 udp-sender.exe 傳送檔案
for file_path in file_paths:
if os.path.exists(file_path):
self.run_udp_sender(client_ip, file_path, client_count)
except Exception as e:
print(f"傳送檔案清單時發生錯誤: {e}")
def run_udp_sender(self, client_ip, file_path, client_count):
process = subprocess.Popen([
'udp-sender.exe',
'--file', file_path,
'--port', '8500',
'--min-receivers', str(client_count), # 根據選取的客戶端數量設定
'--autostart', '5' # 自動啟動,等待 5 秒
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# 開始讀取輸出
threading.Thread(target=self.read_output, args=(process, client_ip, file_path), daemon=True).start()
def read_output(self, process, client_ip, file_path):
for line in process.stdout:
self.output_text.insert(tk.END, f"{line.strip()}\n")
self.output_text.see(tk.END) # 滾動到最後一行
process.stdout.close()
process.wait() # 等待進程結束
# 啟動伺服端
root = tk.Tk()
server = Server(root)
root.mainloop()
新增 Text 控制元件:使用 tk.Text 控制元件來顯示 udp-sender.exe 的輸出。
使用 Popen 執行命令:用 subprocess.Popen 替代 subprocess.run,並且將 stdout 和 stderr 設定為管道。
讀取輸出:在新的線程中異步讀取命令的標準輸出,並將每一行輸出插入到 Text 控制元件中。
這樣,當執行 udp-sender.exe 時,所有的輸出將會實時顯示在 UI 界面中。