importsocketimportthreadingimportsysimporttimeclassTCPClientTest:"""TCP客户端 - 适配多客户端服务器,支持发送1/2/3指令并接收响应"""def__init__(self,host="127.0.0.1",port=8888):self.client_socket=None self.host=host self.port=port self.is_connected=False # 连接状态标记 self.recv_thread=None # 接收响应的线程 defconnect_to_server(self):"""连接到TCP服务器"""try:# 创建客户端套接字 self.client_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 设置超时(避免连接卡住) self.client_socket.settimeout(10)# 连接服务器 self.client_socket.connect((self.host,self.port))self.is_connected=Trueprint(f"[客户端] 成功连接到服务器 {self.host}:{self.port}")# 启动接收响应的线程(后台持续接收服务器消息) self.recv_thread=threading.Thread(target=self.receive_messages,daemon=True)self.recv_thread.start()returnTrue except socket.timeout:print(f"[客户端] 连接超时 - 服务器 {self.host}:{self.port} 未响应")returnFalse except ConnectionRefusedError:print(f"[客户端] 连接失败 - 服务器 {self.host}:{self.port} 未启动或拒绝连接")returnFalse except Exception as e:print(f"[客户端] 连接异常: {str(e)}")returnFalse defreceive_messages(self):"""后台接收服务器响应的线程"""print("[接收线程] 已启动,等待服务器消息...")whileself.is_connected:try:# 接收服务器数据(缓冲区1024字节) recv_data=self.client_socket.recv(1024).decode('utf-8').strip()ifnotrecv_data:print("[客户端] 服务器主动断开连接")self.is_connected=Falsebreak# 打印服务器响应print(f"\n[服务器响应] {recv_data}")print("根据接收到数据,do something")self.recvdata=recv_data except socket.timeout:continue# 超时继续等待 except Exception as e:ifself.is_connected:print(f"\n[接收异常] {str(e)}")breakdefsend_command(self,command):"""发送指令到服务器"""ifnotself.is_connectedorself.client_socket is None:print("[发送失败] 未连接到服务器")returnFalsetry:# 编码并发送指令 self.client_socket.sendall(command.encode('utf-8'))print(f"\n[发送成功] 指令: {command}")returnTrue except BrokenPipeError:print("[发送失败] 连接已断开(Broken Pipe)")self.is_connected=FalsereturnFalse except Exception as e:print(f"[发送异常] {str(e)}")self.is_connected=FalsereturnFalse defdisconnect(self):"""断开与服务器的连接"""self.is_connected=Falseifself.client_socket:try:self.client_socket.close()print("[客户端] 已断开与服务器的连接")except Exception as e:print(f"[关闭异常] {str(e)}")defmain():# 创建客户端实例(可修改host/port适配你的服务器) client=TCPClientTest(host="127.0.0.1",port=8888)# 连接服务器ifnotclient.connect_to_server():sys.exit(1)# 交互输入指令print("\n=== 客户端操作说明 ===")print("输入 1/2/3 发送对应指令到服务器")print("输入 quit 退出客户端")print("======================\n")try:whileclient.is_connected:# 手动输入指令 command=input("请输入指令(1/2/3)或输入quit退出: ").strip()# 退出逻辑ifcommand.lower()=="quit":client.disconnect()breakclient.send_command(command)except KeyboardInterrupt:print("\n\n[用户操作] 检测到Ctrl+C,正在退出...")client.disconnect()except Exception as e:print(f"\n[客户端异常] {str(e)}")client.disconnect()if__name__=="__main__":main()
importsocketimportthreadingimporttimeimportuuidclassTCPServerTest:"""TCP服务器测试类 - 支持多客户端并发,接1发送get 1,仅Ctrl+C关闭"""def__init__(self):self.server_socket=None self.is_server_running=False # 服务器总运行标记 self.clients={}# 客户端连接池:{client_id:(client_socket,address,thread)}self.clients_lock=threading.Lock()# 保护客户端连接池的线程锁 self.recv_data={}# 每个客户端的接收缓冲区:{client_id:data_str}self.recv_data_lock=threading.Lock()# 保护接收缓冲区的锁 self.HOST="127.0.0.1"self.PORT=8888self.BUFFER_SIZE=1024defis_socket_valid(self,sock):"""检查套接字有效性"""ifsock is None:returnFalsetry:returnsock.fileno()!=-1except(OSError,AttributeError):returnFalse defhandle_client(self,client_socket,address,client_id):"""处理单个客户端的通信(独立线程)"""print(f"[客户端{client_id}] 已连接 - 地址: {address[0]}:{address[1]}")# 连接成功立即返回OK self.send_message(client_id,"OK")whileself.is_server_running:try:ifnotself.is_socket_valid(client_socket):break# 接收客户端数据 recv_data=client_socket.recv(self.BUFFER_SIZE).decode('utf-8').strip()ifnotrecv_data:# 客户端主动断开print(f"[客户端{client_id}] 主动断开连接")break# 存储接收到的数据到对应客户端的缓冲区 with self.recv_data_lock:ifclient_idnotin self.recv_data:self.recv_data[client_id]=""self.recv_data[client_id]+=recv_dataprint(f"[客户端{client_id}] 收到数据: {recv_data} | 累计: {self.recv_data[client_id]}")except Exception as e:print(f"[客户端{client_id}] 通信异常: {str(e)}")break# 客户端断开,清理资源 with self.clients_lock:ifclient_id in self.clients:del self.clients[client_id]with self.recv_data_lock:ifclient_id in self.recv_data:del self.recv_data[client_id]client_socket.close()print(f"[客户端{client_id}] 连接已清理 | 当前在线客户端数: {len(self.clients)}")defsend_message(self,client_id,send_data):"""向指定客户端发送数据"""with self.clients_lock:ifclient_idnotin self.clients:print(f"[发送失败] 客户端{client_id}不存在")return-1client_socket,_,_=self.clients[client_id]ifnotself.is_socket_valid(client_socket):print(f"[发送失败] 客户端{client_id}套接字无效")return-1try:data_bytes=send_data.encode("utf-8")client_socket.sendall(data_bytes)print(f"[客户端{client_id}] 发送成功: {send_data}")return1except Exception as e:print(f"[客户端{client_id}] 发送失败: {str(e)}")return-1defbroadcast_message(self,send_data):"""广播消息给所有在线客户端"""with self.clients_lock:online_clients=list(self.clients.keys())forclient_id in online_clients:self.send_message(client_id,send_data)print(f"[广播] 已发送: {send_data} | 覆盖客户端数: {len(online_clients)}")defstart_server(self):"""启动服务器,持续监听多客户端连接"""try:# 创建服务器套接字 self.server_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)self.server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,True)self.server_socket.bind((self.HOST,self.PORT))self.server_socket.listen(10)# 增大监听队列,支持更多并发连接 self.is_server_running=Trueprint(f"[服务器启动成功] 监听 {self.HOST}:{self.PORT}")print(f"[核心特性] 多客户端并发 | 接收到1立即发送get 1 | 按 Ctrl+C 关闭服务器\n")# 持续接受新客户端连接whileself.is_server_running:try:client_socket,address=self.server_socket.accept()client_id=str(uuid.uuid4())[:8]# 生成短UUID作为客户端ID # 创建客户端处理线程 client_thread=threading.Thread(target=self.handle_client,args=(client_socket,address,client_id),daemon=True)client_thread.start()# 将客户端信息加入连接池 with self.clients_lock:self.clients[client_id]=(client_socket,address,client_thread)print(f"[新客户端] ID:{client_id} | 当前在线客户端数: {len(self.clients)}")except Exception as e:ifself.is_server_running:print(f"[接受连接异常] {str(e)},1秒后重试...")time.sleep(1)except Exception as e:print(f"[服务器启动失败] {str(e)}")self.is_server_running=False finally:# 关闭服务器时清理所有客户端连接 self.stop_server()defstop_server(self):"""关闭服务器,清理所有资源"""self.is_server_running=False # 关闭所有客户端连接 with self.clients_lock:forclient_id inlist(self.clients.keys()):client_socket,_,_=self.clients[client_id]ifself.is_socket_valid(client_socket):client_socket.close()del self.clients[client_id]# 关闭服务器套接字ifself.is_socket_valid(self.server_socket):self.server_socket.close()print("[服务器] 已关闭,所有客户端连接已清理")defprocess_client_commands(self):"""处理客户端指令(核心修改:接1发送get 1),独立线程运行"""print("in process_client_commands")whileself.is_server_running:time.sleep(0.1)# 降低轮询频率,减少资源占用 with self.recv_data_lock:# 遍历所有客户端的接收缓冲区forclient_id inlist(self.recv_data.keys()):data=self.recv_data[client_id]ifnotdata:continue# 核心修改:接收到1,立即发送get1给对应客户端if"1"==data:print(f"[客户端{client_id}] 检测到指令1,执行响应")self.send_message(client_id,"1 do operator")# 保留2/3指令处理(如需修改可直接改响应内容,格式同上面)if"2"==data:print(f"[客户端{client_id}] 检测到指令2,执行响应")self.send_message(client_id,"2 do operator")if"3"==data:print(f"[客户端{client_id}] 检测到指令3,执行响应")self.send_message(client_id,"3 do operator")self.recv_data[client_id]=""if__name__=="__main__":# 实例化服务器 server=TCPServerTest()# 启动服务器主线程(监听客户端连接) server_thread=threading.Thread(target=server.start_server,daemon=True)server_thread.start()time.sleep(2)# 测试:启动指令处理线程(核心:接1发get1)#cmd_thread=threading.Thread(target=server.process_client_commands,daemon=True)#cmd_thread.start()server.process_client_commands()# 主线程永久阻塞,仅Ctrl+C退出#try:#whileTrue:#time.sleep(1)# # 可选:打印当前在线客户端数(取消注释即可) # # with server.clients_lock:# #print(f"\r[当前在线] {len(server.clients)} 个客户端",end="")#exceptKeyboardInterrupt:#print("\n\n[用户操作] 检测到Ctrl+C,正在关闭服务器...")#server.stop_server()#time.sleep(1)#print("[服务器] 已完全关闭,程序退出")