socket编程

版本1 只能处理一个请求

import socket

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个 TCP 套接字
    server.bind(("0.0.0.0", 8012)) # 绑定 IP 地址和端口号
    server.listen(1)
    client_socket, client_address = server.accept()
    client_socket.send(bytes("hello world", encoding="utf-8"))
    client_socket.close()

if __name__ == "__main__":
    main()

版本2 可以处理多个请求,但每次只能处理一个请求 可以使用多次,但无法用 ctrl+c 退出

import socket

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        client_socket.send(bytes("hello world", encoding="utf-8"))
        client_socket.close()

if __name__ == "__main__":
    main()

版本3 可以使用多次,可以用 ctrl+c 退出

import socket
import signal
import sys
import threading
import time

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        client_socket.send(bytes("hello world", encoding="utf-8"))
        client_socket.close()

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本4 在上一个版本的基础上加了一个 sleep 函数,用于观察一次只能处理一个连接的问题

import socket
import signal
import sys
import threading
import time

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        client_socket.send(bytes("hello world", encoding="utf-8"))
        time.sleep(5)
        client_socket.close()

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 使用多线程实现的,可以同时处理多个请求的版本

import socket
import signal
import sys
import threading
import time

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(bytes("hello world", encoding="utf-8"))
    time.sleep(5)
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 discard

import socket
import signal
import sys
import threading
import time
import datetime
import struct

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    while True:
        data = client_socket.recv(1024)
        if not data:
            break  # 如果没有数据,退出循环
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 time

import socket
import signal
import sys
import threading
import time
import datetime
import struct

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(struct.pack("!I", int(time.time()) + 2209017600))
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 daytime

import socket
import signal
import sys
import threading
import time
import datetime

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(bytes(time.strftime("%a, %d %b %Y %H:%M:%S %z"), encoding="utf-8"))
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 chargen

import socket
import signal
import sys
import threading
import time
import datetime
import string
import random

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    while True:
        for i in range(72):
            # 生成随机字符
            char = random.choice(string.ascii_letters + string.digits + string.punctuation)
            client_socket.send(bytes(str(char.encode('utf-8'), encoding="utf-8"), encoding="utf-8"))
        client_socket.send(bytes("\n", encoding="utf-8"))
        time.sleep(1)

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 echo

import socket
import signal
import sys
import threading
import time
import datetime

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(bytes("hello world\n", encoding="utf-8"))
    client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
    while True:
        # 接收数据
        data = client_socket.recv(1024)
        # print(data)
        if not data:
            break  # 如果没有数据,退出循环
        # 发送回客户端
        # time.sleep(10)
        client_socket.sendall(data)
    # client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
    # # time.sleep(1)
    # client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

五个协议基本上都是只修改 work 函数 echo daytime time discard chargen

版本6 静态http

import socket
import signal
import sys
import threading
import time
import datetime

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

HTTP_VERSION = '1.0'
SERVER_NAME = 'plusplus123'

def parser_request():
    return
def parser_request_line():
    return
def parser_request_header():
    return

def read_staticfile(filepath):
    return

def builtin_response(statusCode):
    return

def build_response(statusCode, responseHeader, responseBoday):
    return


def work(client_socket):

    RESPONSE_400
    RESPONSE_403
    RESPONSE_404
    RESPONSE_500

    "<h1>400 Bad Request</h1>"
    "<h1>403 Forbidden</h1>"
    "<h1>404 Not Found</h1>"
    "<h1>500 Server Error</h1>"

    "HTTP/1.1 404 Not Found\r\nServer: plusplus123/0.1\r\nContent-Type: text/html;charset=UTF-8\r\nContent-Length: "
    "<html><head><title>404</title></head><body><h1>404 Not Found</h1></body></html>"

    # client_socket.send(bytes("hello world\n", encoding="utf-8"))
    # client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
    while True:
        # 接收数据
        data = client_socket.recv(2048)
        # print(data)
        if not data:
            break  # 如果没有数据,退出循环

        request = data
        # 按行分割请求报文
        lines = request.splitlines()
        
        # 解析请求行
        request_line = lines[0]
        method, path, http_version = request_line.split()
        
        # 解析请求头
        headers = {}
        for line in lines[1:]:
            if line == '':
                break  # 空行表示头部结束
            key, value = line.split(': ', 1)
            headers[key] = value

        # 发送回客户端
        # time.sleep(10)
        client_socket.sendall(data)
    # client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
    # # time.sleep(1)
    # client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

application/octet-stream

版本6 有配置的静态http

版本6 有配置的静态http+cgi

版本6 有配置的静态http+cgi+fastcgi

版本6 有配置的静态http+cgi+fastcgi+http代理

版本6 有配置的静态http+cgi+fastcgi+http代理+socks5代理

python的命令行参数 https://docs.python.org/zh-cn/3/library/argparse.html https://docs.python.org/zh-cn/3/library/getopt.html

python的配置文件 https://docs.python.org/zh-cn/3/library/configparser.html https://docs.python.org/zh-cn/3/library/json.html

好像只有 windows 下才需要特别处理信号,linux是可以直接用 ctrl+c 退出的 https://www.fournoas.com/posts/handling-signal-in-python-on-different-platforms/ https://www.fournoas.com/posts/why-does-ctrl-c-not-kill-python-process-in-windows-console/