一文带您了解Python 网络编程:socket 示例

套接字(Sockets )和套接字 API 用于在网络上传递消息,它们提供了一种进程间通信 (inter-process communication,IPC) 的形式。网络可以是计算机上的一个逻辑本地网络,也可以是一个物理上连接到外部网络的网络,并通过该外部网络连接到其他网络。

套接字(Sockets )有着悠久的历史。它们的使用起源于1971年的ARPAnet,后来在1983年发布的伯克利软件发布版(Berkeley Software Distribution,BSD)操作系统中成为了一种API,被称为伯克利套接字(Berkeley sockets)。

在90年代,随着万维网(World Wide Web)的兴起,网络编程也迅速发展。利用新连接的网络并使用套接字的不仅仅是Web服务器和浏览器。各种类型和规模的客户端-服务器应用程序也得到了广泛应用。

今天,尽管套接字API使用的底层协议多年来有所演变,并且出现了新协议,但底层API本身保持不变。

最常见的套接字应用程序类型是客户端-服务器应用程序,其中一方充当服务器并等待来自客户端的连接,这是主要的网络模式。

Python Socket API 概述

Python 的 socket 模块提供了一组API接口,用于访问套接字 API(the Berkeley sockets API)。该模块中的主要API 函数和方法包括:

  • socket()
  • .bind()
  • .listen()
  • .accept()
  • .connect()
  • .connect_ex()
  • .send()
  • .recv()
  • .close()

Python 提供了一个方便且一致的 API,它直接映射到系统调用及其对应的 C 函数。作为其标准库的一部分,Python 还提供了一些类,使得使用这些底层套接字函数更加简单,比如 socketserver 模块,这是一个用于网络服务器的框架;此外,还有许多模块实现了更高级的互联网协议,如 HTTP 和 SMTP。

TCP 套接字

使用 socket.socket() 创建一个套接字对象,并将套接字类型指定为 socket.SOCK_STREAM。默认使用的协议是传输控制协议 ( Transmission Control Protocol ,TCP)。

传输控制协议 (TCP) 具有以下特点:

  • 可靠性:网络中丢失的数据包会被发送方检测并重新传输。
  • 按序数据传递:您的应用程序将按发送方写入数据的顺序读取数据。

相比之下,也可以使用 socket.SOCK_DGRAM 创建的用户数据报协议 (User Datagram Protocol,UDP) 套接字不具备可靠性,接收方读取的数据可能会与发送方写入的数据顺序不一致。TCP 让您无需担心数据包丢失、数据到达顺序混乱以及其他在网络通信中不可避免的陷阱。下图是 TCP 的套接字 API 调用顺序和数据流:

左侧列表示服务器。右侧列表示客户端。从左上角开始,注意服务器为设置“监听”套接字所进行的 API 调用:

  • socket()
  • .bind()
  • .listen()
  • .accept()

监听套接字的作用正如其名称所示:它监听来自客户端的连接。当客户端连接时,服务器调用 .accept() 来接受或完成连接。客户端调用 .connect() 来建立与服务器的连接,并启动三次握手。握手步骤很重要,因为它确保连接的每一端在网络中是可达的,换句话说,客户端可以到达服务器,反之亦然。有时,可能只有一个主机、客户端或服务器可以到达另一个。在中间部分是往返通信阶段,客户端和服务器通过调用 .send().recv() 来交换数据。最后,客户端和服务器关闭各自的套接字。

Echo Client and Server

上面介绍了套接字 API 以及客户端和服务器如何通信,下面是一个最为简单的第一个客户端和服务器。将从一个简单的实现开始。服务器将简单地将接收到的内容原样返回给客户端。

以下是服务器的代码:

import socket
HOST = "127.0.0.1"  # Standard loopback interface address (localhost)
PORT = 65432  # Port to listen on (non-privileged ports are > 1023)


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind((HOST, PORT))
    s.listen()
    while True:
      conn, addr = s.accept()
      with conn:
          print(f"Connected by {addr}")
          while True:
              data = conn.recv(1024)
              if not data:
                  break
              conn.sendall(data)
              print(data)

这段代码实现了一个简单的回显服务器,功能如下:

  1. 导入模块:使用 socket 模块来进行网络编程。
  2. 定义地址和端口:服务器监听本地主机 (127.0.0.1) 和端口 65432
  3. 创建套接字:使用 IPv4 和 TCP 协议创建一个套接字。
  4. 绑定和监听:将套接字绑定到指定的地址和端口,然后开始监听连接请求。
  5. 处理连接:接受客户端连接并打印客户端地址。
  6. 数据接收与回显:接收客户端发送的数据并将其回显给客户端,直到客户端断开连接。

以下是client 代码:

import socket


HOST = "127.0.0.1"  # The server's hostname or IP address
PORT = 65432  # The port used by the server


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect((HOST, PORT))
    s.sendall(b"Hello, world")
    data = s.recv(1024)


print(f"Received {data!r}")

这段代码实现了一个简单的客户端,功能如下:

  1. 导入模块:使用 socket 模块进行网络编程。
  2. 定义服务器地址和端口:客户端连接到本地主机 (127.0.0.1) 的端口 65432
  3. 创建并连接套接字:使用 IPv4 和 TCP 协议创建套接字,并连接到指定的服务器地址和端口。
  4. 发送数据:向服务器发送字节数据 b"Hello, world"
  5. 接收数据:从服务器接收最多 1024 字节的数据。
  6. 打印接收到的数据:将接收到的数据以原始格式输出。

通信过程解析

现在,您将更详细地了解客户端和服务器之间是如何进行通信的:

使用回环接口( loopback interface )(IPv4 地址 127.0.0.1 或 IPv6 地址 ::1)时,数据不会离开主机或接触到外部网络。在上面的示意图中,回环接口( loopback interface )位于主机内部。这代表了回环接口的内部特性,显示了穿越它的连接和数据仅在主机内部。这也是为什么回环接口和 IP 地址 127.0.0.1 或 ::1 被称为“localhost”。

应用程序使用回环接口( loopback interface )与在主机上运行的其他进程进行通信,同时确保安全性和与外部网络的隔离。因为它是内部的,仅从主机内部可以访问,所以不会暴露在外部。

当您在应用程序中使用 127.0.0.1 或 ::1 以外的 IP 地址时,它通常绑定到连接到外部网络的以太网接口。这是通向“localhost”之外的其他主机的网关。

多连接客户端和服务器

在接下来的两个部分中,您将创建一个服务器和客户端,使用来自 selectors 模块的选择器对象来处理多个连接。服务器示例代码如下:

# multiconn-server.py

import sys
import socket
import selectors
import types

sel = selectors.DefaultSelector()

host = "127.0.0.1"  
port = 65432 
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
print(f"Listening on {(host, port)}")
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)


def accept_wrapper(sock):
    conn, addr = sock.accept()  # Should be ready to read
    print(f"Accepted connection from {addr}")
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
    events = selectors.EVENT_READ | selectors.EVENT_WRITE
    sel.register(conn, events, data=data)


def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)  # Should be ready to read
        if recv_data:
            data.outb += recv_data
        else:
            print(f"Closing connection to {data.addr}")
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if data.outb:
            print(f"Echoing {data.outb!r} to {data.addr}")
            sent = sock.send(data.outb)  # Should be ready to write
            data.outb = data.outb[sent:]


try:
    while True:
        events = sel.select(timeout=None)
        for key, mask in events:
            if key.data is None:
                accept_wrapper(key.fileobj)
            else:
                service_connection(key, mask)
except KeyboardInterrupt:
    print("Caught keyboard interrupt, exiting")
finally:
    sel.close()

这段代码实现了一个多连接的服务器,能够同时处理多个客户端的连接。

导入和初始化

  • socketselectors 模块用于创建和管理网络连接。
  • sel 是选择器对象,用于管理多个套接字的 I/O 事件。
  • 服务器设置
  • 创建一个监听套接字 lsock,绑定到 127.0.0.1 地址和端口 65432
  • 设置套接字为非阻塞模式,并使用选择器 sel 注册监听套接字,监控其 EVENT_READ 事件(即有新的连接请求)。

accept_wrapper 函数

  • 当有新连接到来时,接受连接,并为每个连接创建一个新的非阻塞套接字。
  • 为新连接创建一个数据对象 data,包含连接的地址以及输入和输出缓冲区。
  • 使用选择器注册新连接,监控其 EVENT_READEVENT_WRITE 事件。

service_connection 函数:处理现有连接的 I/O 操作:

  • 如果有数据可读 (EVENT_READ),从客户端读取数据并将其存储在输出缓冲区中。
  • 如果可以写入 (EVENT_WRITE),将输出缓冲区中的数据发送回客户端(回显)。

事件循环

  • try 块中,持续监听和处理所有注册的套接字的 I/O 事件。
  • 如果有新的连接请求,调用 accept_wrapper 处理。
  • 如果有现有连接的读写事件,调用 service_connection 处理。
  • 使用 except KeyboardInterrupt 捕获键盘中断,安全地关闭选择器并退出。

该服务器可以同时处理多个客户端连接,接收并回显客户端发送的数据。通过使用 selectors 模块,服务器能够高效地管理多个非阻塞套接字,从而实现多连接处理。

client示例代码如下:

import sys
import socket
import selectors
import types


sel = selectors.DefaultSelector()
messages = [b"Message 1 from client.", b"Message 2 from client."]


def start_connections(host, port, num_conns):
    server_addr = (host, port)
    for i in range(0, num_conns):
        connid = i + 1
        print(f"Starting connection {connid} to {server_addr}")
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.connect_ex(server_addr)
        events = selectors.EVENT_READ | selectors.EVENT_WRITE
        data = types.SimpleNamespace(
            connid=connid,
            msg_total=sum(len(m) for m in messages),
            recv_total=0,
            messages=messages.copy(),
            outb=b"",
        )
        sel.register(sock, events, data=data)


def service_connection(key, mask):
    sock = key.fileobj
    data = key.data


    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)  # Should be ready to read
        if recv_data:
            print(f"Received {recv_data!r} from connection {data.connid}")
            data.recv_total += len(recv_data)
        if not recv_data or data.recv_total == data.msg_total:
            print(f"Closing connection {data.connid}")
            sel.unregister(sock)
            sock.close()


    if mask & selectors.EVENT_WRITE:
        if data.messages:
            data.outb = data.messages.pop(0)
        if data.outb:
            print(f"Sending {data.outb!r} to connection {data.connid}")
            sent = sock.send(data.outb)  # Should be ready to write
            data.outb = data.outb[sent:]


# Start the connections
host, port = "127.0.0.1", 65432
num_conns = 2
start_connections(host, port, num_conns)


try:
    while True:
        events = sel.select(timeout=None)
        for key, mask in events:
            service_connection(key, mask)
except KeyboardInterrupt:
    print("Caught keyboard interrupt, exiting")
finally:
    sel.close()

导入和初始化

  • socketselectors 模块用于创建和管理网络连接,处理多个非阻塞套接字的 I/O 事件。
  • messages 是客户端将发送的消息列表。

start_connections 函数

  • 用于启动多个客户端连接。
  • 每个连接都会创建一个套接字并设置为非阻塞模式,然后尝试连接到服务器。
  • 使用 selectors 注册每个套接字,以监控其读写事件。

service_connection 函数

  • 处理每个连接的 I/O 操作。
  • 如果有数据可读 (EVENT_READ),从服务器读取数据并检查是否接收完毕。
  • 如果可以写入 (EVENT_WRITE),发送消息到服务器。
  • 当所有消息发送完毕且数据接收完成后,关闭连接。

事件循环

  • 持续监听和处理连接的 I/O 事件,直到所有连接关闭或用户中断程序。

这个代码实现了一个多连接的客户端,可以同时处理多个与服务器的连接,发送和接收数据。

实现一个ping

ping 通过发送 ICMP 回显请求来检查主机是否在线并连接到网络。下面代码实现一个ping 功能:

import os
import socket
import struct
import time
import select


ICMP_ECHO_REQUEST = 8  # ICMP Echo Request type (ping)
ICMP_CODE = socket.getprotobyname("icmp")


def checksum(source_string):
    """
    计算ICMP校验和
    """
    sum = 0
    count_to = (len(source_string) // 2) * 2


    for count in range(0, count_to, 2):
        this_val = source_string[count + 1] * 256 + source_string[count]
        sum = sum + this_val
        sum = sum & 0xffffffff


    if count_to < len(source_string):
        sum = sum + source_string[len(source_string) - 1]
        sum = sum & 0xffffffff


    sum = (sum >> 16) + (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer


def create_packet(id):
    """
    创建ICMP数据包
    """
    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, 0, id, 1)
    data = struct.pack('d', time.time())
    my_checksum = checksum(header + data)
    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), id, 1)
    return header + data


def ping(host, count=4):
    """
    发送ICMP Echo Request,并接收Echo Reply
    """
    try:
        dest = socket.gethostbyname(host)
    except socket.gaierror:
        print(f"Ping request could not find host {host}. Please check the name and try again.")
        return


    print(f"正在 Ping {host} [{dest}] 具有 32 字节的数据:")


    sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    sock.settimeout(1)


    packet_id = os.getpid() & 0xFFFF
    times = []


    for i in range(count):
        packet = create_packet(packet_id)
        start_time = time.time()


        sock.sendto(packet, (dest, 1))


        while True:
            start_select = time.time()
            what_ready = select.select([sock], [], [], 1)
            how_long_in_select = (time.time() - start_select)


            if what_ready[0] == []:  # 超时
                print("请求超时。")
                break


            time_received = time.time()
            rec_packet, addr = sock.recvfrom(1024)


            icmp_header = rec_packet[20:28]
            type, code, checksum, packet_id, sequence = struct.unpack("bbHHh", icmp_header)


            if type == 0 and packet_id == os.getpid() & 0xFFFF:
                bytes_in_double = struct.calcsize("d")
                time_sent = struct.unpack("d", rec_packet[28:28 + bytes_in_double])[0]
                rtt = (time_received - time_sent) * 1000
                times.append(rtt)
                ttl = struct.unpack("B", rec_packet[8:9])[0]
                print(f"来自 {addr[0]} 的回复: 字节=32 时间={int(rtt)}ms TTL={ttl}")
                break


            if time.time() - start_time > 1:
                print("请求超时。")
                break


        time.sleep(1)  # 等待1秒钟再发送下一个请求


    sock.close()


    # 输出统计信息
    if times:
        print(f"\n{dest} 的 Ping 统计信息:")
        print(f"    数据包: 已发送 = {count}, 已接收 = {len(times)}, 丢失 = {count - len(times)} ({((count - len(times)) / count) * 100}% 丢失),")
        print(f"往返行程的估计时间(以毫秒为单位):")
        print(f"    最短 = {int(min(times))}ms,最长 = {int(max(times))}ms,平均 = {int(sum(times) / len(times))}ms")
    else:
        print("请求超时。")


# 使用示例
ping("www.baidu.com")
print("*"*50)
ping('172.30.81.86')

这段代码实现了一个简单的 ping 命令,使用 Python 发送 ICMP Echo 请求,并接收 Echo 回复,以检查目标主机是否在线并测量响应时间。以下是代码的简要解读:

  1. ICMP_ECHO_REQUEST 和 ICMP_CODE:
  2. ICMP_ECHO_REQUEST 是 ICMP Echo 请求的类型码(值为 8),表示这是一个 ping 请求。
  3. ICMP_CODE 通过 socket.getprotobyname("icmp") 获取 ICMP 协议的编号,用于创建原始套接字。
  4. checksum 函数:
  5. 计算 ICMP 数据包的校验和,用于确保数据在传输过程中没有损坏。
  6. create_packet 函数:
  7. 创建 ICMP 数据包,包括报头和数据部分。报头包含类型、代码、校验和、ID 和序列号。数据部分包含当前的时间戳,用于计算往返时间(RTT)。
  8. ping 函数:
  9. ping 函数接受目标主机名和发送请求的次数(默认为 4)。
  10. 首先解析目标主机名为 IP 地址,然后创建一个原始套接字。
  11. 通过循环发送 ICMP 请求并接收响应。每次发送请求后,使用 select 函数等待响应,处理超时情况。
  12. 接收到响应后,解析 ICMP 报头,提取 RTT(往返时间)和 TTL(生存时间),并输出格式化的结果。
  13. 完成所有请求后,输出统计信息,包括数据包的发送和接收数量、丢失的百分比以及 RTT 的最短、最长和平均值。
  14. 输出结果:
  15. 代码执行时,将以格式化的方式输出 ping 的结果,类似于系统的 ping 命令。

这段代码实现了一个基本的 ping 工具,可以通过 ICMP 协议检测网络连接情况并测量延迟。


Python socket 编程是一个基本但非常重要的主题。网络和套接字涉及的内容非常广泛,需要相当多的时间来熟悉和掌握这些概念。就像学习 Python 一样,随着你对这些独立部分的逐步熟悉,并投入更多时间进行理解和实践,这些内容最终会变得更加清晰易懂。

原文链接:,转发请注明来源!