在"# 建立傳輸檔案按鈕",右邊,希望增加一個"傳送文字"的按鈕,功能是使用者按下後,出現一個文字框讓使用者可以輸入文字,並傳送到多個選取的clinet端電腦,client端電腦會跳出一個UI界面的畫面來呈現,UI界面的標題是"無聲廣播",請要問如何修正程式?
在建立「傳送檔案」按鈕的下方,增加「傳送文字」按鈕,並設定它的命令為一個新的函數 send_text_to_clients。
這個函數需要創建一個新的對話框,讓使用者輸入要傳送的文字,然後將該文字傳送到選中的 client。
在 client 端,需新增一個接收文字的 UI 界面來顯示所傳送的文字。
以下是具體的修改步驟:
在「傳送檔案」按鈕的下方,加入新的按鈕與功能:
# 建立傳送文字按鈕
self.send_text_button = tk.Button(button_frame, text="傳送文字", command=self.send_text_to_clients, fg="purple")
self.send_text_button.pack(side=tk.LEFT, padx=5, pady=10)
建立一個方法來彈出文字輸入框並處理輸入的文字:
def send_text_to_clients(self):
# 創建一個新的窗口用來輸入文字
text_window = tk.Toplevel(self.root)
text_window.title("輸入要傳送的文字")
text_window.geometry("400x200")
# 創建文字框
text_entry = tk.Text(text_window, height=10)
text_entry.pack(pady=10, padx=10)
# 按鈕來確認傳送
send_button = tk.Button(text_window, text="傳送", command=lambda: self.confirm_send_text(text_entry.get("1.0", tk.END).strip(), text_window))
send_button.pack(pady=5)
新增一個方法來處理傳送的邏輯:
def confirm_send_text(self, text, text_window):
selected_items = self.tree.selection() # 獲取選中的客戶端
if not selected_items:
messagebox.showwarning("警告", "請先選擇至少一個 client 電腦")
text_window.destroy() # 關閉窗口
return
for item in selected_items:
client_ip = self.tree.item(item, 'values')[0] # 獲取客戶端 IP
self.server_socket.sendto(f"TEXT:{text}".encode(), (client_ip, 8510)) # 使用 8510 端口發送文字
print(f"發送文字 '{text}' 至 {', '.join([self.tree.item(item, 'values')[0] for item in selected_items])}")
messagebox.showinfo("傳送成功", "文字已成功傳送!")
text_window.destroy() # 關閉窗口
在 client 端的程式中,你需要接收這些文字並顯示在一個新的 UI 窗口中。以下是接收文字的基本範例(假設你已有一個 Client 的接收程式):
def receive_commands(self):
while True:
data, addr = self.client_socket.recvfrom(1024)
message = data.decode()
if message.startswith("TEXT:"):
text_message = message[5:] # 移除 "TEXT:" 前綴
self.show_text_message(text_message)
def show_text_message(self, message):
# 彈出顯示文字的窗口
text_window = tk.Toplevel()
text_window.title("無聲廣播")
text_label = tk.Label(text_window, text=message, padx=10, pady=10)
text_label.pack()
接著,新增 send_text_to_clients 函數:
def send_text_to_clients(self):
# 創建一個輸入對話框
text_input = tk.simpledialog.askstring("傳送文字", "請輸入要傳送的文字:")
if text_input:
selected_items = self.tree.selection() # 獲取選中的客戶端
if not selected_items:
messagebox.showwarning("警告", "請先選擇至少一個 client 電腦")
return # 如果沒有選擇,直接返回
for item in selected_items:
client_ip = self.tree.item(item, 'values')[0] # 獲取客戶端 IP
self.server_socket.sendto(text_input.encode(), (client_ip, 8600)) # 使用 8600 端口發送文字
print(f"傳送文字至 {client_ip}: {text_input}")
在 client 端,你需要增加一個 UDP 接收程序來接收傳送的文字,並在 UI 界面中顯示。
假設你的 client 端程式結構如下:
import socket
import tkinter as tk
from threading import Thread
class Client:
def __init__(self):
self.root = tk.Tk()
self.root.title("Client端")
self.label = tk.Label(self.root, text="等待接收文字...", font=("Arial", 14))
self.label.pack(pady=20)
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.client_socket.bind(('', 8600)) # 綁定端口 8600
# 啟動接收文字的線程
Thread(target=self.receive_text, daemon=True).start()
self.root.mainloop()
def receive_text(self):
while True:
data, addr = self.client_socket.recvfrom(1024)
text = data.decode()
self.show_text(text)
def show_text(self, text):
self.label.config(text=f"接收到的文字: {text}")
if __name__ == "__main__":
Client()
伺服端程式-待測
import socket
import threading
import tkinter as tk
from tkinter import filedialog, ttk, messagebox
import time
import os
import sys
import subprocess
import psutil # 用於獲取網路卡資訊
import csv
class Server:
def __init__(self, root):
self.root = root
self.root.title("Server端 UDP派送檔案 by WCS 2024.10.01")
self.root.geometry("800x600")
# 創建一個 Frame 用來放置標題和 IP 顯示
header_frame = tk.Frame(self.root)
header_frame.pack(pady=10)
# 新增標題 "UDP派送檔案"
self.title_label = tk.Label(header_frame, text="UDP派送檔案", font=("Arial", 18), fg="deepskyblue")
self.title_label.pack(side=tk.LEFT) # 將標題放在左側
# 取得伺服端的本地 IP
self.server_ip = None
#self.server_ip = self.get_local_ip()
# 顯示伺服器 IP 的 Label
#self.ip_label = tk.Label(header_frame, text=f"伺服器 IP: {self.server_ip}", font=("Arial", 14), fg="blue")
#self.ip_label.pack(side=tk.LEFT, padx=(10, 0)) # 在標題和 IP 標籤之間加點距離
# 顯示伺服器 IP 的 Label
self.ip_label = tk.Label(header_frame, text="伺服器 IP: 尚未選擇網路卡", font=("Arial", 14), fg="blue")
self.ip_label.pack(side=tk.LEFT, padx=(10, 0))
# 新增標題 "UDP派送檔案"
self.title_label = tk.Label(self.root, text="UDP派送檔案", font=("Arial", 18), fg="deepskyblue")
self.title_label.pack(pady=10)
# 建立標題
#self.label_title = tk.Label(root, text="UDP派送檔案", bg="deepskyblue", fg="white", font=("Arial", 18))
#self.label_title.pack(fill=tk.X)
# 網路卡下拉選單
self.network_card_label = tk.Label(self.root, text="選擇網路卡:", font=("Arial", 12))
self.network_card_label.pack(pady=10)
self.network_card_var = tk.StringVar(self.root)
self.network_card_options = self.get_network_cards()
self.network_card_dropdown = ttk.Combobox(self.root, textvariable=self.network_card_var, values=self.network_card_options)
self.network_card_dropdown.pack(pady=10)
# 在選擇網路卡時更新 IP 標籤
self.network_card_dropdown.bind("<<ComboboxSelected>>", self.update_ip_label)
# 建立表格
self.tree = ttk.Treeview(root, columns=("IP", "MAC", "Name", "Status"), show='headings')
# 設定表格標題與資料置中
self.tree.heading("IP", text="IP 位置", anchor="center")
self.tree.heading("MAC", text="MAC 位置", anchor="center")
self.tree.heading("Name", text="電腦名稱", anchor="center")
self.tree.heading("Status", text="狀態", anchor="center")
# 設定每個欄位的資料置中
self.tree.column("IP", anchor="center")
self.tree.column("MAC", anchor="center")
self.tree.column("Name", anchor="center")
self.tree.column("Status", anchor="center")
self.tree.pack(fill=tk.BOTH, expand=True)
# 儲存客戶端資料的字典
self.client_data_dict = {}
# 創建一個 Frame 用來放置按鈕
button_frame = tk.Frame(self.root)
button_frame.pack(pady=10)
# 建立廣播按鈕
self.broadcast_button = tk.Button(button_frame, text="手動廣播伺服器", command=self.start_broadcast_manually)
self.broadcast_button.pack(side=tk.LEFT, padx=(0, 10)) # 在按鈕之間加點距離
# 新增「匯出客戶端資料」按鈕
self.export_button = tk.Button(button_frame, text="匯出客戶端資料", command=self.export_client_data, fg="orange")
self.export_button.pack(side=tk.LEFT, padx=5, pady=10)
# 建立傳輸檔案按鈕
self.transfer_button = tk.Button(button_frame, text="傳送檔案", command=self.select_client_and_file, fg="darkcyan")
self.transfer_button.pack(side=tk.LEFT, padx=5, pady=10) # 將按鈕放在左側
# 建立傳送文字按鈕
self.send_text_button = tk.Button(button_frame, text="傳送文字", command=self.send_text_to_clients, fg="purple")
self.send_text_button.pack(side=tk.LEFT, padx=5, pady=10)
# 建立重新開機按鈕
self.reboot_button = tk.Button(button_frame, text="重新開機", command=self.reboot_selected_clients, fg="green")
self.reboot_button.pack(side=tk.LEFT, padx=5, pady=10)
# 建立關機按鈕
self.shutdown_button = tk.Button(button_frame, text="關機", command=self.shutdown_selected_clients, fg="red")
self.shutdown_button.pack(side=tk.LEFT, padx=5, pady=10)
# 新增「全選 client 端」按鈕
self.select_all_button = tk.Button(button_frame, text="全選 client 端", command=self.select_all_clients, fg="navy")
self.select_all_button.pack(side=tk.LEFT, padx=5, pady=10)
# 新增「取消全選 client 端」按鈕
self.deselect_all_button = tk.Button(button_frame, text="取消全選 client 端", command=self.deselect_all_clients, fg="purple")
self.deselect_all_button.pack(side=tk.LEFT, padx=5, pady=10)
# 新增檔案傳輸狀態顯示欄位,初始化為 "等待中"
self.status_label = tk.Label(self.root, text="目前傳輸狀態:等待中", font=("Arial", 12), fg="black")
self.status_label.pack(pady=10)
# 在 Server 的 __init__ 方法中,添加進度條
self.progress_bar = ttk.Progressbar(self.root, length=400, mode='determinate')
self.progress_bar.pack(pady=10)
# 新增備註區域,左右兩格
self.footer_frame = tk.Frame(self.root)
self.footer_frame.pack(pady=20, fill=tk.X)
# 左邊格 - 跑馬燈效果,文字顏色改為orangered
self.marquee_text = "感謝ChatGPT 感謝UDPcast https://www.udpcast.linux.lu/index.html "
self.marquee_label = tk.Label(self.footer_frame, text=self.marquee_text, font=("Arial", 12), fg="orangered")
self.marquee_label.pack(side=tk.LEFT, padx=10)
# 右邊格 - 固定文字,文字顏色改為orangered
self.fixed_label = tk.Label(self.footer_frame, text="製作者:WCS 日期:2024.10.01", font=("Arial", 12), fg="orangered")
self.fixed_label.pack(side=tk.RIGHT, padx=10)
# 啟動跑馬燈
self.move_text()
# 取得伺服端的本地 IP
#self.server_ip = self.get_local_ip()
# 取得伺服端的本地 IP
#self.server_ip = None
# 啟動伺服器
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.server_socket.bind(("", 37020))
# 啟動接收客戶端報到的線程
threading.Thread(target=self.receive_clients, daemon=True).start()
# 啟動時廣播30秒
threading.Thread(target=self.broadcast_for_30_seconds, daemon=True).start()
def send_text_to_clients(self):
# 創建一個新的窗口用來輸入文字
text_window = tk.Toplevel(self.root)
text_window.title("輸入要傳送的文字")
text_window.geometry("400x200")
# 創建文字框
text_entry = tk.Text(text_window, height=10)
text_entry.pack(pady=10, padx=10)
# 按鈕來確認傳送
send_button = tk.Button(text_window, text="傳送", command=lambda: self.confirm_send_text(text_entry.get("1.0", tk.END).strip(), text_window))
send_button.pack(pady=5)
def confirm_send_text(self, text, text_window):
selected_items = self.tree.selection() # 獲取選中的客戶端
if not selected_items:
messagebox.showwarning("警告", "請先選擇至少一個 client 電腦")
text_window.destroy() # 關閉窗口
return
for item in selected_items:
client_ip = self.tree.item(item, 'values')[0] # 獲取客戶端 IP
self.server_socket.sendto(f"TEXT:{text}".encode(), (client_ip, 8520)) # 使用 8520 端口發送文字
print(f"發送文字 '{text}' 至 {', '.join([self.tree.item(item, 'values')[0] for item in selected_items])}")
messagebox.showinfo("傳送成功", "文字已成功傳送!")
text_window.destroy() # 關閉窗口
#功能,遠端開、關機
def reboot_selected_clients(self):
self.send_command_to_selected_clients("REBOOT")
def shutdown_selected_clients(self):
self.send_command_to_selected_clients("SHUTDOWN")
def send_command_to_selected_clients(self, command):
selected_items = self.tree.selection() # 獲取選中的客戶端
if not selected_items:
messagebox.showwarning("警告", "請先選擇至少一個 client 電腦")
return
for item in selected_items:
client_ip = self.tree.item(item, 'values')[0] # 獲取客戶端 IP
self.server_socket.sendto(command.encode(), (client_ip, 8500))
print(f"發送指令 '{command}' 至 {client_ip}")
# 新增選擇所有客戶端的函數
def select_all_clients(self):
self.tree.selection_set(self.tree.get_children()) # 選擇所有項目
# 新增取消選擇所有客戶端的函數
def deselect_all_clients(self):
self.tree.selection_remove(self.tree.get_children()) # 取消選擇所有項目
def get_network_cards(self):
# 獲取網路卡列表
interfaces = psutil.net_if_addrs()
return [iface for iface in interfaces.keys()]
def update_ip_label(self, event):
# 更新 IP Label
self.server_ip = self.get_local_ip()
if self.server_ip:
self.ip_label.config(text=f"伺服器 IP: {self.server_ip}")
else:
self.ip_label.config(text="伺服器 IP: 無法獲取")
def get_local_ip(self):
selected_interface = self.network_card_var.get()
if selected_interface:
# 獲取該網路卡的 IP
interfaces = psutil.net_if_addrs()
for addr in interfaces[selected_interface]:
if addr.family == socket.AF_INET: # 確保是 IPv4
print(f"伺服端 IP: {addr.address}")
return addr.address
return None
#def get_local_ip(self):
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
print(f"伺服端 IP: {local_ip}")
return local_ip
def move_text(self):
# 跑馬燈效果,將字串每次往左移動一個字符
self.marquee_text = self.marquee_text[1:] + self.marquee_text[0]
self.marquee_label.config(text=self.marquee_text)
self.root.after(200, self.move_text) # 每200毫秒移動一次
def receive_clients(self):
while True:
try:
data, addr = self.server_socket.recvfrom(1024)
client_data = data.decode().split(',')
# 確認收到的資料是否完整
if len(client_data) == 3:
client_ip, client_mac, client_name = client_data
# 排除伺服端自己的 IP,不顯示在表格中
if client_ip != self.server_ip:
self.update_or_insert_client(client_ip, client_mac, client_name, "Online")
else:
print(f"接收到不完整的資料:{client_data}")
except Exception as e:
print(f"接收資料時發生錯誤: {e}")
break # 結束迴圈或進行其他處理
def update_or_insert_client(self, ip, mac, name, status):
if ip in self.client_data_dict or mac in self.client_data_dict:
# 更新該行的資料
item_id = self.client_data_dict.get(ip) or self.client_data_dict.get(mac)
self.tree.item(item_id, values=(ip, mac, name, status))
else:
# 插入新行並保存該行的 ID
item_id = self.tree.insert("", "end", values=(ip, mac, name, status))
self.client_data_dict[ip] = item_id
self.client_data_dict[mac] = item_id
def broadcast_for_30_seconds(self):
start_time = time.time()
while time.time() - start_time < 30:
try:
self.server_socket.sendto(b"Server here", ('<broadcast>', 37020))
time.sleep(5)
except Exception as e:
print(f"廣播時發生錯誤: {e}")
print("廣播已停止")
def export_client_data(self):
# 獲取所有客戶端資料
client_data = []
for item in self.tree.get_children():
client_info = self.tree.item(item, 'values')
client_data.append(client_info)
# 請求儲存檔案的路徑
file_path = filedialog.asksaveasfilename(defaultextension=".csv",
filetypes=[("CSV files", "*.csv"), ("All files", "*.*")])
if not file_path:
return # 如果沒有選擇檔案,則直接返回
# 寫入檔案
try:
with open(file_path, 'w', newline='') as file:
writer = csv.writer(file)
# 寫入表頭
writer.writerow(["IP", "MAC", "Name", "Status"])
# 寫入資料
writer.writerows(client_data)
messagebox.showinfo("匯出成功", "客戶端資料已成功匯出!")
except Exception as e:
messagebox.showerror("錯誤", f"匯出檔案時發生錯誤: {e}")
def start_broadcast_manually(self):
threading.Thread(target=self.broadcast_for_30_seconds, daemon=True).start()
def select_client_and_file(self):
selected_items = self.tree.selection() # 可以選取多個客戶端
if not selected_items:
messagebox.showwarning("警告", "請先選擇至少一個 client 電腦")
return # 如果沒有選擇,直接返回
if selected_items:
client_ips = [self.tree.item(item, 'values')[0] for item in selected_items] # 獲取所有選中的客戶端 IP
# 選擇檔案或資料夾
file_paths = filedialog.askopenfilenames(title="選擇檔案")
if file_paths:
# 更新狀態為正在準備傳送
self.status_label.config(text="目前傳輸狀態:正在準備傳送檔案...")
# 傳送檔案清單給每一個選中的客戶端
for client_ip in client_ips:
self.prepare_file_transfer_list(client_ip, file_paths)
def prepare_file_transfer_list(self, client_ip, file_paths):
transfer_list = "\n".join(file_paths)
print(f"準備傳輸檔案到 {client_ip}:\n{transfer_list}")
# 傳送檔案清單給 client
threading.Thread(target=self.send_file_list, args=(client_ip, file_paths), daemon=True).start()
def send_file_list(self, client_ip, file_paths):
# 禁用按鈕
self.transfer_button.config(state=tk.DISABLED)
self.broadcast_button.config(state=tk.DISABLED)
# 建立檔案傳輸指令
file_list_str = ",".join(file_paths)
try:
for idx, file_path in enumerate(file_paths):
self.server_socket.sendto(file_list_str.encode(), (client_ip, 8500))
print(f"檔案傳輸清單已傳送至 {client_ip}")
# 等待2秒後再使用 udp-sender.exe 傳送檔案
time.sleep(2)
# 根據選取的客戶端數量,計算 min-receivers
selected_clients = self.tree.selection()
client_count = len(selected_clients)
# 檢查當前目錄中是否存在 udp-sender.exe
if getattr(sys, 'frozen', False):
# 如果是打包的可執行檔
current_dir = os.path.dirname(sys.executable)
else:
# 如果是在開發環境中
current_dir = os.getcwd()
udp_sender_path = os.path.join(current_dir, "udp-sender.exe")
# 如果當前目錄中不存在,則使用預設路徑
if not os.path.exists(udp_sender_path):
udp_sender_path = r"C:\Windows\System32\udp-sender.exe"
# 使用 udp-sender.exe 傳送檔案
for file_path in file_paths:
if os.path.exists(file_path):
file_size = os.path.getsize(file_path) # 取得檔案大小
self.status_label.config(text=f"目前傳輸檔案:{os.path.basename(file_path)},大小:{file_size // (1024 * 1024)} MB")
# 開始執行 udp-sender.exe
command = f"udp-sender.exe --file \"{file_path}\" --port 8500 --min-receivers {client_count} --nokbd --autostart 3 "
subprocess.run(command, shell=True)
# 更新進度條
self.progress_bar['value'] = idx + 1
self.root.update_idletasks() # 刷新界面
# 設定進度條的最大值為檔案總數
self.progress_bar['maximum'] = len(file_paths)
# 完成傳輸,更新狀態
self.status_label.config(text="目前傳輸狀態:傳輸完成!")
messagebox.showinfo("傳輸完成", "檔案傳輸已完成!") # 顯示完成傳輸的訊息
except Exception as e:
messagebox.showerror("錯誤", f"傳送檔案時發生錯誤: {e}")
print(f"傳送檔案清單時發生錯誤: {e}")
self.status_label.config(text="傳輸失敗!")
messagebox.showinfo("傳輸失敗!", "傳輸失敗!") # 顯示完成失敗的訊息
finally:
# 無論成功或失敗,重新啟用按鈕
self.transfer_button.config(state=tk.NORMAL)
self.broadcast_button.config(state=tk.NORMAL)
def send_command_to_selected_clients(self, command):
selected_items = self.tree.selection() # 獲取選中的客戶端
if not selected_items:
messagebox.showwarning("警告", "請先選擇至少一個 client 電腦")
return
for item in selected_items:
client_ip = self.tree.item(item, 'values')[0] # 獲取客戶端 IP
self.server_socket.sendto(command.encode(), (client_ip, 8510)) # 確保埠號是 8510
print(f"發送指令 '{command}' 至 {client_ip}")
# 新增發送重新開機或關機指令的函數
def send_command_to_client(self, client_ip, command):
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.sendto(command.encode(), (client_ip, 8510)) # 使用 8510 端口發送指令
print(f"已發送指令 '{command}' 給客戶端 {client_ip}")
except Exception as e:
print(f"發送指令時發生錯誤: {e}")
if __name__ == "__main__":
root = tk.Tk()
server_app = Server(root)
root.mainloop()
clinet
這段程式碼的功能看起來是用於在網絡上進行 UDP 通信,並且能夠報告客戶端的資訊(IP、MAC、電腦名稱)、接收檔案和執行命令。以下是一些檢查和建議:
重複定義的函數:
receive_commands 函數定義了兩次。請合併這兩個定義,並確認每個定義中的邏輯是否正確。
receive_commands 的參數:
第一個 receive_commands 函數不需要 self,因為它不是類別中的方法。若這是打算成為類別的一部分,請確認整體設計。
receive_file_with_progress 的錯誤處理:
在呼叫 subprocess.Popen 時,可能會出現錯誤。建議添加錯誤處理來捕獲這些潛在的例外。
UDP 收發端口:
檢查各個端口號(8500, 8501, 8510)是否與預期的用途一致,並且沒有衝突。
報告發送的錯誤處理:
在 send_report 函數中,應該添加一些錯誤處理以處理 socket 的問題。
進度更新的判斷:
在 receive_file_with_progress 中,若沒有接收到任何進度資訊,可能需要確認進度格式是否正確。
改進的可讀性:
可以使用一些註釋來解釋程式的每個主要部分,這樣在未來維護時會更容易理解。
多執行緒的安全性:
目前的實現沒有使用任何鎖來保護共享資源,若有多個線程同時訪問共享變數,可能會導致問題。
使用 with 語句管理 socket:
使用 with 語句來自動管理 socket 的開啟和關閉會更好,這樣能減少資源泄漏的風險。
client端-待測
import socket
import threading
import time
import uuid
import os
import sys
import subprocess
import re
import tkinter as tk
from tkinter import messagebox
# 獲取 MAC 位址
def get_mac_address():
mac = ':'.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff) for elements in range(0, 2*6, 8)][::-1])
return mac
# 獲取電腦名稱
def get_computer_name():
return socket.gethostname()
# 發送客戶端的報到信息 (IP、MAC、電腦名稱)
def send_report():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
while True:
try:
report_msg = f"{socket.gethostbyname(socket.gethostname())},{get_mac_address()},{get_computer_name()}"
client_socket.sendto(report_msg.encode(), ('<broadcast>', 37020))
except Exception as e:
print(f"發送報到時發生錯誤: {e}")
time.sleep(5)
# 接收檔案清單並處理接收檔案
def receive_file_list():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.bind(("", 8500))
# 獲取當前執行檔的路徑
if getattr(sys, 'frozen', False):
base_path = sys._MEIPASS # PyInstaller 的臨時資料夾
else:
base_path = os.path.dirname(os.path.abspath(__file__))
local_receiver_path = os.path.join(base_path, 'udp-receiver.exe')
system_receiver_path = r'C:\Windows\System32\udp-receiver.exe'
# 根據檔案存在性選擇執行路徑
if os.path.exists(local_receiver_path):
receiver_path = local_receiver_path
elif os.path.exists(system_receiver_path):
receiver_path = system_receiver_path
else:
print("找不到 udp-receiver.exe,請確保它在當前資料夾或 System32 中。")
return
while True:
try:
data, addr = client_socket.recvfrom(1024)
file_list = data.decode().split(",")
print(f"接收到的檔案清單: {file_list}")
os.makedirs("received_files", exist_ok=True)
for file_name in file_list:
save_path = os.path.join("received_files", os.path.basename(file_name))
receive_file_with_progress(save_path, addr[0])
except Exception as e:
print(f"接收檔案清單時發生錯誤: {e}")
def receive_file_with_progress(save_path, server_ip):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
process = subprocess.Popen(
['udp-receiver.exe', '--file', save_path, '--port', '8500'],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, creationflags=subprocess.CREATE_NO_WINDOW
)
for line in process.stdout:
print(f"接收進度: {line.strip()}")
if re.search(r'(\d+)%', line):
client_socket.sendto(line.encode(), (server_ip, 8501))
if process.wait() == 0:
print("檔案接收完成")
client_socket.sendto("傳輸完成".encode(), (server_ip, 8501))
else:
print("檔案接收失敗")
client_socket.sendto("傳輸失敗".encode(), (server_ip, 8501))
def receive_commands():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.bind(("", 8520))
while True:
data, addr = client_socket.recvfrom(1024)
command = data.decode()
if command == "REBOOT":
os.system("shutdown /r /t 0")
elif command == "SHUTDOWN":
os.system("shutdown /s /t 0")
def receive_broadcast():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.bind(('', 8520))
while True:
data, addr = client_socket.recvfrom(1024)
message = data.decode('utf-8')
print(f"接收到來自伺服端 {addr} 的訊息: {message}")
if "無聲廣播" in message:
threading.Thread(target=show_broadcast_message, args=(message,)).start()
# 創建 Tkinter 主界面,用於顯示 "無聲廣播" 視窗
def show_broadcast_message(message):
root = tk.Tk()
root.title("無聲廣播")
label = tk.Label(root, text=message, padx=20, pady=20)
label.pack()
ok_button = tk.Button(root, text="OK", command=root.destroy)
ok_button.pack(pady=10)
root.mainloop()
# 啟動客戶端
if __name__ == "__main__":
threading.Thread(target=send_report, daemon=True).start()
threading.Thread(target=receive_file_list, daemon=True).start()
threading.Thread(target=receive_commands, daemon=True).start()
threading.Thread(target=receive_broadcast, daemon=True).start()
while True:
time.sleep(1)