socket编程

版本1 只能处理一个请求

import socket

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个 TCP 套接字
    server.bind(("0.0.0.0", 8012)) # 绑定 IP 地址和端口号
    server.listen(1)
    client_socket, client_address = server.accept()
    client_socket.send(bytes("hello world", encoding="utf-8"))
    client_socket.close()

if __name__ == "__main__":
    main()

版本2 可以处理多个请求,但每次只能处理一个请求 可以使用多次,但无法用 ctrl+c 退出

import socket

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        client_socket.send(bytes("hello world", encoding="utf-8"))
        client_socket.close()

if __name__ == "__main__":
    main()

版本3 可以使用多次,可以用 ctrl+c 退出

import socket
import signal
import sys
import threading
import time

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        client_socket.send(bytes("hello world", encoding="utf-8"))
        client_socket.close()

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本4 在上一个版本的基础上加了一个 sleep 函数,用于观察一次只能处理一个连接的问题

import socket
import signal
import sys
import threading
import time

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        client_socket.send(bytes("hello world", encoding="utf-8"))
        time.sleep(5)
        client_socket.close()

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 使用多线程实现的,可以同时处理多个请求的版本

import socket
import signal
import sys
import threading
import time

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(bytes("hello world", encoding="utf-8"))
    time.sleep(5)
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 discard

import socket
import signal
import sys
import threading
import time
import datetime
import struct

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    while True:
        data = client_socket.recv(1024)
        if not data:
            break  # 如果没有数据,退出循环
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 time

import socket
import signal
import sys
import threading
import time
import datetime
import struct

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(struct.pack("!I", int(time.time()) + 2209017600))
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 daytime

import socket
import signal
import sys
import threading
import time
import datetime

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(bytes(time.strftime("%a, %d %b %Y %H:%M:%S %z"), encoding="utf-8"))
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 chargen

import socket
import signal
import sys
import threading
import time
import datetime
import string
import random

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    while True:
        for i in range(72):
            # 生成随机字符
            char = random.choice(string.ascii_letters + string.digits + string.punctuation)
            client_socket.send(bytes(str(char.encode('utf-8'), encoding="utf-8"), encoding="utf-8"))
        client_socket.send(bytes("\n", encoding="utf-8"))
        time.sleep(1)

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 echo

import socket
import signal
import sys
import threading
import time
import datetime

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(bytes("hello world\n", encoding="utf-8"))
    client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
    while True:
        # 接收数据
        data = client_socket.recv(1024)
        # print(data)
        if not data:
            break  # 如果没有数据,退出循环
        # 发送回客户端
        # time.sleep(10)
        client_socket.sendall(data)
    # client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
    # # time.sleep(1)
    # client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

五个协议基本上都是只修改 work 函数 echo daytime time discard chargen

版本6 静态http

版本6 有配置的静态http

版本6 有配置的静态http+cgi

版本6 有配置的静态http+cgi+fastcgi

版本6 有配置的静态http+cgi+fastcgi+http代理

版本6 有配置的静态http+cgi+fastcgi+http代理+socks5代理

python的命令行参数 https://docs.python.org/zh-cn/3/library/argparse.html https://docs.python.org/zh-cn/3/library/getopt.html

python的配置文件 https://docs.python.org/zh-cn/3/library/configparser.html https://docs.python.org/zh-cn/3/library/json.html

好像只有 windows 下才需要特别处理信号,linux是可以直接用 ctrl+c 退出的 https://www.fournoas.com/posts/handling-signal-in-python-on-different-platforms/ https://www.fournoas.com/posts/why-does-ctrl-c-not-kill-python-process-in-windows-console/