,在伺服端"# 建立傳輸檔案按鈕",右邊,希望增加一個"傳送文字"的按鈕,功能是:伺服端使用者先選擇多個clinet端電腦後才能按下"傳送文字"的按鈕,不然會提示先選clinet端電腦,接著會出現一個文字框讓使用者可以輸入文字,並傳送到clinet端電腦,client端電腦會跳出一個UI界面的畫面來呈現,請要問如何修正伺服端及clinet端的程式?
伺服端-待測
-修改,button的排列成同一排
-新增-傳送文字功能
-字只能200以下,畫面小
-打包時,不能用noconsole-會無法傳送檔案
# 新增「全選 client 端」按鈕
self.select_all_button = tk.Button(button_frame, text="全選 client 端", command=self.select_all_clients, fg="navy")
self.select_all_button.pack(side=tk.LEFT, padx=5, pady=10)
# 新增「取消全選 client 端」按鈕
self.deselect_all_button = tk.Button(button_frame, text="取消全選 client 端", command=self.deselect_all_clients, fg="purple")
self.deselect_all_button.pack(side=tk.LEFT, padx=5, pady=10)
# 新增選擇所有客戶端的函數
def select_all_clients(self):
self.tree.selection_set(self.tree.get_children()) # 選擇所有項目
# 新增取消選擇所有客戶端的函數
def deselect_all_clients(self):
self.tree.selection_remove(self.tree.get_children()) # 取消選擇所有項目
# 建立重新開機按鈕
self.reboot_button = tk.Button(button_frame, text="重新開機", command=self.reboot_selected_clients, fg="green")
self.reboot_button.pack(side=tk.LEFT, padx=5, pady=10)
# 建立關機按鈕
self.shutdown_button = tk.Button(button_frame, text="關機", command=self.shutdown_selected_clients, fg="red")
self.shutdown_button.pack(side=tk.LEFT, padx=5, pady=10)
#功能,遠端開、關機
def reboot_selected_clients(self):
self.send_command_to_selected_clients("REBOOT")
def shutdown_selected_clients(self):
self.send_command_to_selected_clients("SHUTDOWN")
1131018-udp-cast-test01-03
-程式重整,待測
import socket
import threading
import tkinter as tk
from tkinter import filedialog, ttk, messagebox, simpledialog
import time
import os
import subprocess
from tkinter.scrolledtext import ScrolledText
import psutil # 用於獲取網路介面資訊
class Server:
def __init__(self, root):
self.root = root
self.root.title("Server UDP傳送檔案")
self.root.geometry("800x600")
self.network_interfaces = self.get_network_interfaces()
if not self.network_interfaces:
messagebox.showerror("錯誤", "未找到有效的網路介面!")
self.root.quit()
return
self.selected_interface = tk.StringVar(value=list(self.network_interfaces.keys())[0])
# 標題與 IP 顯示
header_frame = tk.Frame(self.root)
header_frame.pack(pady=10)
self.title_label = tk.Label(header_frame, text="UDP派送檔案", font=("Arial", 18), fg="deepskyblue")
self.title_label.pack(side=tk.LEFT, pady=10)
self.server_ip = self.get_local_ip()
self.ip_label = tk.Label(header_frame, text=f"伺服器端 IP: {self.server_ip}", font=("Arial", 14))
self.ip_label.pack(side=tk.LEFT, padx=(10, 0))
# 網路卡選擇下拉選單
self.interface_label = tk.Label(self.root, text="選擇網路卡:")
self.interface_label.pack(pady=5)
self.interface_dropdown = ttk.Combobox(self.root, textvariable=self.selected_interface, values=list(self.network_interfaces.keys()))
self.interface_dropdown.pack(pady=5)
self.interface_dropdown.bind("<<ComboboxSelected>>", self.update_ip_display)
# 建立表格
self.tree = ttk.Treeview(root, columns=("IP", "MAC", "Name", "Status"), show='headings')
for col in ("IP", "MAC", "Name", "Status"):
self.tree.heading(col, text=col, anchor="center")
self.tree.column(col, anchor="center")
self.tree.pack(fill=tk.BOTH, expand=True)
self.client_data_dict = {}
# 按鈕區域
button_frame = tk.Frame(self.root)
button_frame.pack(pady=10)
self.broadcast_button = tk.Button(button_frame, text="手動廣播伺服器", command=self.start_broadcast_manually)
self.broadcast_button.pack(side=tk.LEFT, padx=(0, 10))
self.transfer_button = tk.Button(button_frame, text="傳送檔案", command=self.select_client_and_file)
self.transfer_button.pack(side=tk.LEFT, padx=(0, 10))
self.send_text_button = tk.Button(button_frame, text="傳送文字", command=self.send_text)
self.send_text_button.pack(side=tk.LEFT, padx=(0, 10))
self.select_all_button = tk.Button(button_frame, text="全選 client 端", command=self.select_all_clients, fg="navy")
self.select_all_button.pack(side=tk.LEFT, padx=5, pady=10)
self.deselect_all_button = tk.Button(button_frame, text="取消全選 client 端", command=self.deselect_all_clients, fg="purple")
self.deselect_all_button.pack(side=tk.LEFT, padx=5, pady=10)
# 訊息區域
self.output_area = ScrolledText(self.root, height=10)
self.output_area.pack(fill=tk.BOTH, pady=10)
# 啟動伺服器
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.server_socket.bind(("", 37020))
threading.Thread(target=self.receive_clients, daemon=True).start()
threading.Thread(target=self.broadcast_for_30_seconds, daemon=True).start()
def get_network_interfaces(self):
interfaces = {}
for interface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET:
interfaces[interface] = addr.address
return interfaces
def get_local_ip(self):
selected_interface = self.selected_interface.get()
return self.network_interfaces[selected_interface]
def update_ip_display(self, event):
self.server_ip = self.get_local_ip()
self.ip_label.config(text=f"伺服器端 IP: {self.server_ip}")
def set_buttons_state(self, state):
for button in [self.broadcast_button, self.transfer_button, self.select_all_button, self.deselect_all_button]:
button.config(state=tk.DISABLED if not state else tk.NORMAL)
def select_all_clients(self):
self.tree.selection_set(self.tree.get_children())
def deselect_all_clients(self):
self.tree.selection_remove(self.tree.get_children())
def receive_clients(self):
while True:
try:
data, addr = self.server_socket.recvfrom(1024)
client_data = data.decode().split(',')
if len(client_data) == 3:
client_ip, client_mac, client_name = client_data
if client_ip != self.server_ip:
self.update_or_insert_client(client_ip, client_mac, client_name, "Online")
else:
self.update_output(f"接收到不完整的資料:{client_data}\n")
except Exception as e:
self.update_output(f"接收資料時發生錯誤: {e}\n")
def update_or_insert_client(self, ip, mac, name, status):
if ip in self.client_data_dict or mac in self.client_data_dict:
item_id = self.client_data_dict.get(ip) or self.client_data_dict.get(mac)
self.tree.item(item_id, values=(ip, mac, name, status))
else:
item_id = self.tree.insert("", "end", values=(ip, mac, name, status))
self.client_data_dict[ip] = item_id
self.client_data_dict[mac] = item_id
def broadcast_for_30_seconds(self):
start_time = time.time()
while time.time() - start_time < 30:
try:
self.server_socket.sendto(b"Server here", ('<broadcast>', 37020))
time.sleep(5)
except Exception as e:
self.update_output(f"廣播時發生錯誤: {e}\n")
self.update_output("廣播已停止\n")
def start_broadcast_manually(self):
threading.Thread(target=self.broadcast_for_30_seconds, daemon=True).start()
def send_text(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showwarning("警告", "請至少選擇一個 client 端")
return
text_input = simpledialog.askstring("輸入文字", "請輸入要傳送的文字:")
if text_input:
client_ips = [self.tree.item(item, 'values')[0] for item in selected_items]
for client_ip in client_ips:
self.send_text_to_clients(client_ip, text_input)
def send_text_to_clients(self, client_ip, text):
try:
self.server_socket.sendto(text.encode(), (client_ip, 8520))
self.update_output(f"文字已傳送至 {client_ip}: {text}\n")
except Exception as e:
self.update_output(f"傳送文字時發生錯誤: {e}\n")
def select_client_and_file(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showwarning("警告", "請至少選擇一個 client 端")
return
client_ips = [self.tree.item(item, 'values')[0] for item in selected_items]
file_paths = filedialog.askopenfilenames(title="選擇檔案")
if file_paths:
for client_ip in client_ips:
self.prepare_file_transfer_list(client_ip, file_paths)
def prepare_file_transfer_list(self, client_ip, file_paths):
transfer_list = "\n".join(file_paths)
self.update_output(f"準備傳輸檔案到 {client_ip}:\n{transfer_list}\n")
threading.Thread(target=self.send_file_list, args=(client_ip, file_paths), daemon=True).start()
def send_file_list(self, client_ip, file_paths):
try:
file_list_str = ",".join(file_paths)
self.server_socket.sendto(file_list_str.encode(), (client_ip, 8500))
self.update_output(f"檔案傳輸清單已傳送至 {client_ip}\n")
time.sleep(2)
selected_clients = self.tree.selection()
client_count = len(selected_clients)
for file_path in file_paths:
if os.path.exists(file_path):
self.run_udp_sender(client_ip, file_path, client_count)
except Exception as e:
self.update_output(f"傳送檔案清單時發生錯誤: {e}\n")
def run_udp_sender(self, client_ip, file_path, client_count):
self.set_buttons_state(False)
if os.path.exists(file_path):
command = [
'udp-sender.exe',
'--file', file_path,
'--port', '8500',
'--min-receivers', str(client_count),
'--autostart', '5'
]
threading.Thread(target=self.execute_command, args=(command,), daemon=True).start()
def execute_command(self, command):
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
for stdout_line in process.stdout:
self.update_output(stdout_line)
for stderr_line in process.stderr:
self.update_output(stderr_line)
process.stdout.close()
process.stderr.close()
process.wait()
self.set_buttons_state(True)
def update_output(self, message):
def append_message(msg):
self.output_area.insert(tk.END, msg)
self.output_area.see(tk.END)
self.root.after(0, lambda: append_message(message))
# 啟動伺服端
root = tk.Tk()
server = Server(root)
root.mainloop()
1131018-udp-cast-test01-01
-可行指可傳檔,但1131018-upd-receiver,無法收檔案
-對應1131009-upd-receiver-01、1131009-upd-receiver-02(用class方式),可用
-1131025
-增加"全選clinet端"、"取消全選clinet端"
import socket
import threading
import tkinter as tk
from tkinter import filedialog, ttk, messagebox
import time
import os
import subprocess
from tkinter.scrolledtext import ScrolledText
import psutil # 用於獲取網路介面資訊
class Server:
def __init__(self, root):
self.root = root
self.root.title("Server UDP傳送檔案")
self.root.geometry("800x600")
self.network_interfaces = self.get_network_interfaces()
self.selected_interface = tk.StringVar(value=list(self.network_interfaces.keys())[0])
# 標題與 IP 顯示
header_frame = tk.Frame(self.root)
header_frame.pack(pady=10)
self.title_label = tk.Label(header_frame, text="UDP派送檔案", font=("Arial", 18), fg="deepskyblue")
self.title_label.pack(side=tk.LEFT, pady=10)
self.server_ip = self.get_local_ip()
self.ip_label = tk.Label(header_frame, text=f"伺服器端 IP: {self.server_ip}", font=("Arial", 14))
self.ip_label.pack(side=tk.LEFT, padx=(10, 0))
# 網路卡選擇下拉選單
self.interface_label = tk.Label(self.root, text="選擇網路卡:")
self.interface_label.pack(pady=5)
self.interface_dropdown = ttk.Combobox(self.root, textvariable=self.selected_interface, values=list(self.network_interfaces.keys()))
self.interface_dropdown.pack(pady=5)
self.interface_dropdown.bind("<<ComboboxSelected>>", self.update_ip_display) # 綁定事件
# 建立表格
self.tree = ttk.Treeview(root, columns=("IP", "MAC", "Name", "Status"), show='headings')
self.tree.heading("IP", text="IP 位置", anchor="center")
self.tree.heading("MAC", text="MAC 位置", anchor="center")
self.tree.heading("Name", text="電腦名稱", anchor="center")
self.tree.heading("Status", text="狀態", anchor="center")
self.tree.column("IP", anchor="center")
self.tree.column("MAC", anchor="center")
self.tree.column("Name", anchor="center")
self.tree.column("Status", anchor="center")
self.tree.pack(fill=tk.BOTH, expand=True)
self.client_data_dict = {}
# 創建一個 Frame 用來放置建立廣播與傳送檔案按鈕、增加傳送文字的按鈕
button_frame = tk.Frame(self.root)
button_frame.pack(pady=10)
# 建立廣播與傳送檔案按鈕
self.broadcast_button = tk.Button(button_frame, text="手動廣播伺服器", command=self.start_broadcast_manually)
self.broadcast_button.pack(side=tk.LEFT, padx=(0, 10))
self.transfer_button = tk.Button(button_frame, text="傳送檔案", command=self.select_client_and_file)
self.transfer_button.pack(side=tk.LEFT, padx=(0, 10))
# 增加傳送文字的按鈕
self.send_text_button = tk.Button(button_frame, text="傳送文字", command=self.send_text)
self.send_text_button.pack(side=tk.LEFT, padx=(0, 10))
# 新增「全選 client 端」按鈕
self.select_all_button = tk.Button(button_frame, text="全選 client 端", command=self.select_all_clients, fg="navy")
self.select_all_button.pack(side=tk.LEFT, padx=5, pady=10)
# 新增「取消全選 client 端」按鈕
self.deselect_all_button = tk.Button(button_frame, text="取消全選 client 端", command=self.deselect_all_clients, fg="purple")
self.deselect_all_button.pack(side=tk.LEFT, padx=5, pady=10)
# 儲存按鈕的參考
self.buttons = [self.broadcast_button, self.transfer_button, self.select_all_button,self.deselect_all_button ]
# 訊息區域
self.output_area = ScrolledText(self.root, height=10)
self.output_area.pack(fill=tk.BOTH, pady=10)
# 備註區域
self.footer_frame = tk.Frame(self.root)
self.footer_frame.pack(pady=20, fill=tk.X)
self.marquee_text = "感謝ChatGPT 感謝UDPcast "
self.marquee_label = tk.Label(self.footer_frame, text=self.marquee_text, font=("Arial", 12), fg="gold")
self.marquee_label.pack(side=tk.LEFT, padx=10)
self.fixed_label = tk.Label(self.footer_frame, text="製作者:WCS 日期:202410.01", font=("Arial", 12), fg="gold")
self.fixed_label.pack(side=tk.RIGHT, padx=10)
self.move_text()
# 啟動伺服器
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.server_socket.bind(("", 37020))
# 啟動接收客戶端報到的線程
threading.Thread(target=self.receive_clients, daemon=True).start()
threading.Thread(target=self.broadcast_for_30_seconds, daemon=True).start()
def get_network_interfaces(self):
interfaces = {}
for interface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET:
interfaces[interface] = addr.address
return interfaces
def get_local_ip(self):
selected_interface = self.selected_interface.get()
return self.network_interfaces[selected_interface]
# 更新 IP 顯示的方法
def update_ip_display(self, event):
self.server_ip = self.get_local_ip() # 取得目前選擇的 IP
self.ip_label.config(text=f"伺服器端 IP: {self.server_ip}") # 更新 IP 標籤
def move_text(self):
self.marquee_text = self.marquee_text[1:] + self.marquee_text[0]
self.marquee_label.config(text=self.marquee_text)
self.root.after(200, self.move_text)
def set_buttons_state(self, state):
for button in self.buttons:
button.config(state=tk.DISABLED if not state else tk.NORMAL)
# 新增選擇所有客戶端的函數
def select_all_clients(self):
self.tree.selection_set(self.tree.get_children()) # 選擇所有項目
# 新增取消選擇所有客戶端的函數
def deselect_all_clients(self):
self.tree.selection_remove(self.tree.get_children()) # 取消選擇所有項目
def receive_clients(self):
while True:
try:
data, addr = self.server_socket.recvfrom(1024)
client_data = data.decode().split(',')
if len(client_data) == 3:
client_ip, client_mac, client_name = client_data
if client_ip != self.server_ip:
self.update_or_insert_client(client_ip, client_mac, client_name, "Online")
else:
self.update_output(f"接收到不完整的資料:{client_data}\n")
except Exception as e:
self.update_output(f"接收資料時發生錯誤: {e}\n")
def update_or_insert_client(self, ip, mac, name, status):
if ip in self.client_data_dict or mac in self.client_data_dict:
item_id = self.client_data_dict.get(ip) or self.client_data_dict.get(mac)
self.tree.item(item_id, values=(ip, mac, name, status))
else:
item_id = self.tree.insert("", "end", values=(ip, mac, name, status))
self.client_data_dict[ip] = item_id
self.client_data_dict[mac] = item_id
def broadcast_for_30_seconds(self):
start_time = time.time()
while time.time() - start_time < 30:
try:
local_ip = self.get_local_ip()
self.server_socket.sendto(b"Server here", ('<broadcast>', 37020))
time.sleep(5)
except Exception as e:
self.update_output(f"廣播時發生錯誤: {e}\n")
self.update_output("廣播已停止\n")
def start_broadcast_manually(self):
threading.Thread(target=self.broadcast_for_30_seconds, daemon=True).start()
# 傳送文字功能
def send_text(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showwarning("警告", "請至少選擇一個 client 端")
return
# 彈出對話框讓伺服端使用者輸入文字
text_input = tk.simpledialog.askstring("輸入文字", "請輸入要傳送的文字:")
if text_input:
client_ips = [self.tree.item(item, 'values')[0] for item in selected_items]
for client_ip in client_ips:
self.send_text_to_clients(client_ip, text_input)
def send_text_to_clients(self, client_ip, text):
try:
# 將文字傳送到客戶端
self.server_socket.sendto(text.encode(), (client_ip, 8520))
self.update_output(f"文字已傳送至 {client_ip}: {text}\n")
except Exception as e:
self.update_output(f"傳送文字時發生錯誤: {e}\n")
def select_client_and_file(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showwarning("警告", "請至少選擇一個 client 端")
return
client_ips = [self.tree.item(item, 'values')[0] for item in selected_items]
file_paths = filedialog.askopenfilenames(title="選擇檔案")
if file_paths:
for client_ip in client_ips:
self.prepare_file_transfer_list(client_ip, file_paths)
def prepare_file_transfer_list(self, client_ip, file_paths):
transfer_list = "\n".join(file_paths)
self.update_output(f"準備傳輸檔案到 {client_ip}:\n{transfer_list}\n")
threading.Thread(target=self.send_file_list, args=(client_ip, file_paths), daemon=True).start()
def send_file_list(self, client_ip, file_paths):
try:
file_list_str = ",".join(file_paths)
self.server_socket.sendto(file_list_str.encode(), (client_ip, 8500))
self.update_output(f"檔案傳輸清單已傳送至 {client_ip}\n")
time.sleep(2)
selected_clients = self.tree.selection()
client_count = len(selected_clients)
for file_path in file_paths:
if os.path.exists(file_path):
self.run_udp_sender(client_ip, file_path, client_count)
except Exception as e:
self.update_output(f"傳送檔案清單時發生錯誤: {e}\n")
def run_udp_sender(self, client_ip, file_path, client_count):
# 鎖定按鈕
self.set_buttons_state(False)
if os.path.exists(file_path):
command = [
'udp-sender.exe',
'--file', file_path,
'--port', '8500',
'--min-receivers', str(client_count),
'--autostart', '5'
]
threading.Thread(target=self.execute_command, args=(command,), daemon=True).start()
def execute_command(self, command):
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
for stdout_line in process.stdout:
self.update_output(stdout_line)
for stderr_line in process.stderr:
self.update_output(stderr_line)
process.stdout.close()
process.stderr.close()
process.wait()
# 解除鎖定按鈕
self.set_buttons_state(True)
def update_output(self, message):
def append_message(msg):
self.output_area.insert(tk.END, msg)
self.output_area.see(tk.END)
self.root.after(0, lambda: append_message(message))
#def update_output(self, message):
#self.root.after(0, lambda: self.output_area.insert(tk.END, message))
#self.root.after(0, lambda: self.output_area.see(tk.END))
# 啟動伺服端
root = tk.Tk()
server = Server(root)
root.mainloop()
1131018-udp-cast-test01
-可行指可傳檔,但1131018-upd-receiver,無法收檔案
-對應1131009-upd-receiver-01、1131009-upd-receiver-02(用class方式),可用
import socket
import threading
import tkinter as tk
from tkinter import filedialog, ttk, messagebox
import time
import os
import subprocess
from tkinter.scrolledtext import ScrolledText
import psutil # 用於獲取網路介面資訊
class Server:
def __init__(self, root):
self.root = root
self.root.title("Server UDP傳送檔案")
self.root.geometry("800x600")
self.network_interfaces = self.get_network_interfaces()
self.selected_interface = tk.StringVar(value=list(self.network_interfaces.keys())[0])
# 標題與 IP 顯示
header_frame = tk.Frame(self.root)
header_frame.pack(pady=10)
self.title_label = tk.Label(header_frame, text="UDP派送檔案", font=("Arial", 18), fg="deepskyblue")
self.title_label.pack(side=tk.LEFT, pady=10)
self.server_ip = self.get_local_ip()
self.ip_label = tk.Label(header_frame, text=f"伺服器端 IP: {self.server_ip}", font=("Arial", 14))
self.ip_label.pack(side=tk.LEFT, padx=(10, 0))
# 網路卡選擇下拉選單
self.interface_label = tk.Label(self.root, text="選擇網路卡:")
self.interface_label.pack(pady=5)
self.interface_dropdown = ttk.Combobox(self.root, textvariable=self.selected_interface, values=list(self.network_interfaces.keys()))
self.interface_dropdown.pack(pady=5)
self.interface_dropdown.bind("<<ComboboxSelected>>", self.update_ip_display) # 綁定事件
# 建立表格
self.tree = ttk.Treeview(root, columns=("IP", "MAC", "Name", "Status"), show='headings')
self.tree.heading("IP", text="IP 位置", anchor="center")
self.tree.heading("MAC", text="MAC 位置", anchor="center")
self.tree.heading("Name", text="電腦名稱", anchor="center")
self.tree.heading("Status", text="狀態", anchor="center")
self.tree.column("IP", anchor="center")
self.tree.column("MAC", anchor="center")
self.tree.column("Name", anchor="center")
self.tree.column("Status", anchor="center")
self.tree.pack(fill=tk.BOTH, expand=True)
self.client_data_dict = {}
# 創建一個 Frame 用來放置建立廣播與傳送檔案按鈕、增加傳送文字的按鈕
button_frame = tk.Frame(self.root)
button_frame.pack(pady=10)
# 建立廣播與傳送檔案按鈕
self.broadcast_button = tk.Button(button_frame, text="手動廣播伺服器", command=self.start_broadcast_manually)
self.broadcast_button.pack(side=tk.LEFT, padx=(0, 10))
self.transfer_button = tk.Button(button_frame, text="傳送檔案", command=self.select_client_and_file)
self.transfer_button.pack(side=tk.LEFT, padx=(0, 10))
# 增加傳送文字的按鈕
self.send_text_button = tk.Button(button_frame, text="傳送文字", command=self.send_text)
self.send_text_button.pack(side=tk.LEFT, padx=(0, 10))
# 儲存按鈕的參考
self.buttons = [self.broadcast_button, self.transfer_button]
# 訊息區域
self.output_area = ScrolledText(self.root, height=10)
self.output_area.pack(fill=tk.BOTH, pady=10)
# 備註區域
self.footer_frame = tk.Frame(self.root)
self.footer_frame.pack(pady=20, fill=tk.X)
self.marquee_text = "感謝ChatGPT 感謝UDPcast "
self.marquee_label = tk.Label(self.footer_frame, text=self.marquee_text, font=("Arial", 12), fg="gold")
self.marquee_label.pack(side=tk.LEFT, padx=10)
self.fixed_label = tk.Label(self.footer_frame, text="製作者:WCS 日期:202410.01", font=("Arial", 12), fg="gold")
self.fixed_label.pack(side=tk.RIGHT, padx=10)
self.move_text()
# 啟動伺服器
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.server_socket.bind(("", 37020))
# 啟動接收客戶端報到的線程
threading.Thread(target=self.receive_clients, daemon=True).start()
threading.Thread(target=self.broadcast_for_30_seconds, daemon=True).start()
def get_network_interfaces(self):
interfaces = {}
for interface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET:
interfaces[interface] = addr.address
return interfaces
def get_local_ip(self):
selected_interface = self.selected_interface.get()
return self.network_interfaces[selected_interface]
# 更新 IP 顯示的方法
def update_ip_display(self, event):
self.server_ip = self.get_local_ip() # 取得目前選擇的 IP
self.ip_label.config(text=f"伺服器端 IP: {self.server_ip}") # 更新 IP 標籤
def move_text(self):
self.marquee_text = self.marquee_text[1:] + self.marquee_text[0]
self.marquee_label.config(text=self.marquee_text)
self.root.after(200, self.move_text)
def set_buttons_state(self, state):
for button in self.buttons:
button.config(state=tk.DISABLED if not state else tk.NORMAL)
def receive_clients(self):
while True:
try:
data, addr = self.server_socket.recvfrom(1024)
client_data = data.decode().split(',')
if len(client_data) == 3:
client_ip, client_mac, client_name = client_data
if client_ip != self.server_ip:
self.update_or_insert_client(client_ip, client_mac, client_name, "Online")
else:
self.update_output(f"接收到不完整的資料:{client_data}\n")
except Exception as e:
self.update_output(f"接收資料時發生錯誤: {e}\n")
def update_or_insert_client(self, ip, mac, name, status):
if ip in self.client_data_dict or mac in self.client_data_dict:
item_id = self.client_data_dict.get(ip) or self.client_data_dict.get(mac)
self.tree.item(item_id, values=(ip, mac, name, status))
else:
item_id = self.tree.insert("", "end", values=(ip, mac, name, status))
self.client_data_dict[ip] = item_id
self.client_data_dict[mac] = item_id
def broadcast_for_30_seconds(self):
start_time = time.time()
while time.time() - start_time < 30:
try:
local_ip = self.get_local_ip()
self.server_socket.sendto(b"Server here", ('<broadcast>', 37020))
time.sleep(5)
except Exception as e:
self.update_output(f"廣播時發生錯誤: {e}\n")
self.update_output("廣播已停止\n")
def start_broadcast_manually(self):
threading.Thread(target=self.broadcast_for_30_seconds, daemon=True).start()
# 傳送文字功能
def send_text(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showwarning("警告", "請至少選擇一個 client 端")
return
# 彈出對話框讓伺服端使用者輸入文字
text_input = tk.simpledialog.askstring("輸入文字", "請輸入要傳送的文字:")
if text_input:
client_ips = [self.tree.item(item, 'values')[0] for item in selected_items]
for client_ip in client_ips:
self.send_text_to_clients(client_ip, text_input)
def send_text_to_clients(self, client_ip, text):
try:
# 將文字傳送到客戶端
self.server_socket.sendto(text.encode(), (client_ip, 8520))
self.update_output(f"文字已傳送至 {client_ip}: {text}\n")
except Exception as e:
self.update_output(f"傳送文字時發生錯誤: {e}\n")
def select_client_and_file(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showwarning("警告", "請至少選擇一個 client 端")
return
client_ips = [self.tree.item(item, 'values')[0] for item in selected_items]
file_paths = filedialog.askopenfilenames(title="選擇檔案")
if file_paths:
for client_ip in client_ips:
self.prepare_file_transfer_list(client_ip, file_paths)
def prepare_file_transfer_list(self, client_ip, file_paths):
transfer_list = "\n".join(file_paths)
self.update_output(f"準備傳輸檔案到 {client_ip}:\n{transfer_list}\n")
threading.Thread(target=self.send_file_list, args=(client_ip, file_paths), daemon=True).start()
def send_file_list(self, client_ip, file_paths):
try:
file_list_str = ",".join(file_paths)
self.server_socket.sendto(file_list_str.encode(), (client_ip, 8500))
self.update_output(f"檔案傳輸清單已傳送至 {client_ip}\n")
time.sleep(2)
selected_clients = self.tree.selection()
client_count = len(selected_clients)
for file_path in file_paths:
if os.path.exists(file_path):
self.run_udp_sender(client_ip, file_path, client_count)
except Exception as e:
self.update_output(f"傳送檔案清單時發生錯誤: {e}\n")
def run_udp_sender(self, client_ip, file_path, client_count):
# 鎖定按鈕
self.set_buttons_state(False)
if os.path.exists(file_path):
command = [
'udp-sender.exe',
'--file', file_path,
'--port', '8500',
'--min-receivers', str(client_count),
'--autostart', '5'
]
threading.Thread(target=self.execute_command, args=(command,), daemon=True).start()
def execute_command(self, command):
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
for stdout_line in process.stdout:
self.update_output(stdout_line)
for stderr_line in process.stderr:
self.update_output(stderr_line)
process.stdout.close()
process.stderr.close()
process.wait()
# 解除鎖定按鈕
self.set_buttons_state(True)
def update_output(self, message):
def append_message(msg):
self.output_area.insert(tk.END, msg)
self.output_area.see(tk.END)
self.root.after(0, lambda: append_message(message))
#def update_output(self, message):
#self.root.after(0, lambda: self.output_area.insert(tk.END, message))
#self.root.after(0, lambda: self.output_area.see(tk.END))
# 啟動伺服端
root = tk.Tk()
server = Server(root)
root.mainloop()
clinet端-已測-可行
-1131018-udp-receiver01
-新增接收文字訊息功能-僅200以內,小型文字傳輸
-使用8520port
import socket
import threading
import time
import uuid
import os
import subprocess
import re
import tkinter as tk
class Client:
def __init__(self):
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
def get_mac_address(self):
"""獲取 MAC 位址"""
mac = ':'.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff) for elements in range(0, 2*6, 2)][::-1])
return mac
def get_computer_name(self):
"""獲取電腦名稱"""
return socket.gethostname()
def send_report(self):
"""發送客戶端的報到信息 (IP、MAC、電腦名稱)"""
while True:
try:
report_msg = f"{socket.gethostbyname(socket.gethostname())},{self.get_mac_address()},{self.get_computer_name()}"
self.client_socket.sendto(report_msg.encode(), ('<broadcast>', 37020))
except Exception as e:
print(f"發送報到時發生錯誤: {e}")
time.sleep(5)
def receive_file_list(self):
"""接收檔案清單並處理接收檔案"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.bind(("", 8500))
while True:
try:
data, addr = client_socket.recvfrom(1024)
file_list = data.decode().split(",")
print(f"接收到的檔案清單: {file_list}")
# 建立接收檔案的資料夾
os.makedirs("received_files", exist_ok=True)
# 使用 udp-receiver.exe 接收檔案並回報進度
for file_name in file_list:
save_path = os.path.join("received_files", os.path.basename(file_name))
self.receive_file_with_progress(save_path, addr[0])
except Exception as e:
print(f"接收檔案清單時發生錯誤: {e}")
def receive_file_with_progress(self, save_path, server_ip):
"""接收檔案並顯示進度"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 使用 Popen 來執行 udp-receiver.exe 並捕獲輸出
process = subprocess.Popen(
['udp-receiver.exe', '--file', save_path, '--port', '8500'],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, creationflags=subprocess.CREATE_NO_WINDOW
)
for line in process.stdout:
# 假設 udp-receiver.exe 的進度輸出包含 "% complete" 等資訊
print(f"接收進度: {line.strip()}") # 顯示在 client 端的控制台上
if re.search(r'(\d+)%', line):
# 如果捕捉到進度,將其回報給 server
client_socket.sendto(line.encode(), (server_ip, 8501))
# 檔案接收完成
if process.wait() == 0:
print("檔案接收完成")
client_socket.sendto("傳輸完成".encode(), (server_ip, 8501))
else:
print("檔案接收失敗")
client_socket.sendto("傳輸失敗".encode(), (server_ip, 8501))
def receive_text(self):
"""接收伺服端的文字訊息並彈出 UI 界面顯示"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.bind(("", 8520))
while True:
try:
data, addr = client_socket.recvfrom(1024)
received_text = data.decode()
# 顯示彈窗界面
self.show_text_popup(received_text)
except Exception as e:
print(f"接收文字時發生錯誤: {e}")
def show_text_popup(self, text):
"""顯示接收到的文字訊息"""
root = tk.Tk()
root.title("接收到的訊息")
label = tk.Label(root, text=text, font=("Arial", 14), padx=20, pady=20)
label.pack()
button = tk.Button(root, text="關閉", command=root.destroy, padx=10, pady=5)
button.pack(pady=10)
root.mainloop()
def run(self):
"""啟動客戶端,並運行多個執行緒"""
threading.Thread(target=self.send_report, daemon=True).start()
threading.Thread(target=self.receive_file_list, daemon=True).start()
threading.Thread(target=self.receive_text, daemon=True).start()
while True:
time.sleep(1)
# 啟動客戶端
if __name__ == "__main__":
client = Client()
client.run()
clinet端-待測
-新增接收文字訊息功能
-使用8520port
-
import socket
import threading
import time
import uuid
import os
import subprocess
import re
import tkinter as tk
class Client:
def __init__(self):
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
def get_mac_address(self):
"""獲取 MAC 位址"""
mac = ':'.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff) for elements in range(0, 2*6, 2)][::-1])
return mac
def get_computer_name(self):
"""獲取電腦名稱"""
return socket.gethostname()
def send_report(self):
"""發送客戶端的報到信息 (IP、MAC、電腦名稱)"""
while True:
try:
report_msg = f"{socket.gethostbyname(socket.gethostname())},{self.get_mac_address()},{self.get_computer_name()}"
self.client_socket.sendto(report_msg.encode(), ('<broadcast>', 37020))
except Exception as e:
print(f"發送報到時發生錯誤: {e}")
time.sleep(5)
def receive_file_list(self):
"""接收檔案清單並處理接收檔案"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.bind(("", 8500))
while True:
try:
data, addr = client_socket.recvfrom(1024)
file_list = data.decode().split(",")
print(f"接收到的檔案清單: {file_list}")
# 建立接收檔案的資料夾
os.makedirs("received_files", exist_ok=True)
# 使用 udp-receiver.exe 接收檔案並回報進度
for file_name in file_list:
save_path = os.path.join("received_files", os.path.basename(file_name))
self.receive_file_with_progress(save_path, addr[0])
except Exception as e:
print(f"接收檔案清單時發生錯誤: {e}")
def receive_file_with_progress(self, save_path, server_ip):
"""接收檔案並顯示進度"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 使用 Popen 來執行 udp-receiver.exe 並捕獲輸出
process = subprocess.Popen(
['udp-receiver.exe', '--file', save_path, '--port', '8500'],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, creationflags=subprocess.CREATE_NO_WINDOW
)
for line in process.stdout:
# 假設 udp-receiver.exe 的進度輸出包含 "% complete" 等資訊
print(f"接收進度: {line.strip()}") # 顯示在 client 端的控制台上
if re.search(r'(\d+)%', line):
# 如果捕捉到進度,將其回報給 server
client_socket.sendto(line.encode(), (server_ip, 8501))
# 檔案接收完成
if process.wait() == 0:
print("檔案接收完成")
client_socket.sendto("傳輸完成".encode(), (server_ip, 8501))
else:
print("檔案接收失敗")
client_socket.sendto("傳輸失敗".encode(), (server_ip, 8501))
def receive_text(self):
"""接收伺服端的文字訊息並彈出 UI 界面顯示"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.bind(("", 8520))
while True:
try:
data, addr = client_socket.recvfrom(1024)
received_text = data.decode()
# 顯示彈窗界面
self.show_text_popup(received_text)
except Exception as e:
print(f"接收文字時發生錯誤: {e}")
def show_text_popup(self, text):
"""顯示接收到的文字訊息"""
root = tk.Tk()
root.title("接收到的訊息")
label = tk.Label(root, text=text, font=("Arial", 14), padx=20, pady=20)
label.pack()
button = tk.Button(root, text="關閉", command=root.destroy, padx=10, pady=5)
button.pack(pady=10)
root.mainloop()
def run(self):
"""啟動客戶端,並運行多個執行緒"""
threading.Thread(target=self.send_report, daemon=True).start()
threading.Thread(target=self.receive_file_list, daemon=True).start()
threading.Thread(target=self.receive_text, daemon=True).start()
while True:
time.sleep(1)
# 啟動客戶端
if __name__ == "__main__":
client = Client()
client.run()