socket编程
版本1 只能处理一个请求
import socket
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个 TCP 套接字
server.bind(("0.0.0.0", 8012)) # 绑定 IP 地址和端口号
server.listen(1)
client_socket, client_address = server.accept()
client_socket.send(bytes("hello world", encoding="utf-8"))
client_socket.close()
if __name__ == "__main__":
main()
版本2 可以处理多个请求,但每次只能处理一个请求 可以使用多次,但无法用 ctrl+c 退出
import socket
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", 8012))
server.listen(5)
while True:
client_socket, client_address = server.accept()
client_socket.send(bytes("hello world", encoding="utf-8"))
client_socket.close()
if __name__ == "__main__":
main()
版本3 可以使用多次,可以用 ctrl+c 退出
import socket
import signal
import sys
import threading
import time
# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
sys.exit(0)
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", 8012))
server.listen(5)
while True:
client_socket, client_address = server.accept()
client_socket.send(bytes("hello world", encoding="utf-8"))
client_socket.close()
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号
work_thread = threading.Thread(target=main) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
while True: # 主线程的循环
time.sleep(1)
版本4 在上一个版本的基础上加了一个 sleep 函数,用于观察一次只能处理一个连接的问题
import socket
import signal
import sys
import threading
import time
# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
sys.exit(0)
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", 8012))
server.listen(5)
while True:
client_socket, client_address = server.accept()
client_socket.send(bytes("hello world", encoding="utf-8"))
time.sleep(5)
client_socket.close()
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号
work_thread = threading.Thread(target=main) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
while True: # 主线程的循环
time.sleep(1)
版本5 使用多线程实现的,可以同时处理多个请求的版本
import socket
import signal
import sys
import threading
import time
# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
sys.exit(0)
def work(client_socket):
client_socket.send(bytes("hello world", encoding="utf-8"))
time.sleep(5)
client_socket.close()
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", 8012))
server.listen(5)
while True:
client_socket, client_address = server.accept()
work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号
work_thread = threading.Thread(target=main) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
while True: # 主线程的循环
time.sleep(1)
版本5 基础上的 discard
import socket
import signal
import sys
import threading
import time
import datetime
import struct
# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
sys.exit(0)
def work(client_socket):
while True:
data = client_socket.recv(1024)
if not data:
break # 如果没有数据,退出循环
client_socket.close()
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 8012))
server.listen(5)
while True:
client_socket, client_address = server.accept()
work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号
work_thread = threading.Thread(target=main) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
while True: # 主线程的循环
time.sleep(1)
版本5 基础上的 time
import socket
import signal
import sys
import threading
import time
import datetime
import struct
# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
sys.exit(0)
def work(client_socket):
client_socket.send(struct.pack("!I", int(time.time()) + 2209017600))
client_socket.close()
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 8012))
server.listen(5)
while True:
client_socket, client_address = server.accept()
work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号
work_thread = threading.Thread(target=main) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
while True: # 主线程的循环
time.sleep(1)
版本5 基础上的 daytime
import socket
import signal
import sys
import threading
import time
import datetime
# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
sys.exit(0)
def work(client_socket):
client_socket.send(bytes(time.strftime("%a, %d %b %Y %H:%M:%S %z"), encoding="utf-8"))
client_socket.close()
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 8012))
server.listen(5)
while True:
client_socket, client_address = server.accept()
work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号
work_thread = threading.Thread(target=main) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
while True: # 主线程的循环
time.sleep(1)
版本5 基础上的 chargen
import socket
import signal
import sys
import threading
import time
import datetime
import string
import random
# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
sys.exit(0)
def work(client_socket):
while True:
for i in range(72):
# 生成随机字符
char = random.choice(string.ascii_letters + string.digits + string.punctuation)
client_socket.send(bytes(str(char.encode('utf-8'), encoding="utf-8"), encoding="utf-8"))
client_socket.send(bytes("\n", encoding="utf-8"))
time.sleep(1)
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 8012))
server.listen(5)
while True:
client_socket, client_address = server.accept()
work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号
work_thread = threading.Thread(target=main) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
while True: # 主线程的循环
time.sleep(1)
版本5 基础上的 echo
import socket
import signal
import sys
import threading
import time
import datetime
# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
sys.exit(0)
def work(client_socket):
client_socket.send(bytes("hello world\n", encoding="utf-8"))
client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
while True:
# 接收数据
data = client_socket.recv(1024)
# print(data)
if not data:
break # 如果没有数据,退出循环
# 发送回客户端
# time.sleep(10)
client_socket.sendall(data)
# client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
# # time.sleep(1)
# client_socket.close()
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 8012))
server.listen(5)
while True:
client_socket, client_address = server.accept()
work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号
work_thread = threading.Thread(target=main) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
while True: # 主线程的循环
time.sleep(1)
五个协议基本上都是只修改 work 函数 echo daytime time discard chargen
版本6 静态http
import socket
import signal
import sys
import threading
import time
import datetime
# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
sys.exit(0)
HTTP_VERSION = '1.0'
SERVER_NAME = 'plusplus123'
def parser_request():
return
def parser_request_line():
return
def parser_request_header():
return
def read_staticfile(filepath):
return
def builtin_response(statusCode):
return
def build_response(statusCode, responseHeader, responseBoday):
return
def work(client_socket):
RESPONSE_400
RESPONSE_403
RESPONSE_404
RESPONSE_500
"<h1>400 Bad Request</h1>"
"<h1>403 Forbidden</h1>"
"<h1>404 Not Found</h1>"
"<h1>500 Server Error</h1>"
"HTTP/1.1 404 Not Found\r\nServer: plusplus123/0.1\r\nContent-Type: text/html;charset=UTF-8\r\nContent-Length: "
"<html><head><title>404</title></head><body><h1>404 Not Found</h1></body></html>"
# client_socket.send(bytes("hello world\n", encoding="utf-8"))
# client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
while True:
# 接收数据
data = client_socket.recv(2048)
# print(data)
if not data:
break # 如果没有数据,退出循环
request = data
# 按行分割请求报文
lines = request.splitlines()
# 解析请求行
request_line = lines[0]
method, path, http_version = request_line.split()
# 解析请求头
headers = {}
for line in lines[1:]:
if line == '':
break # 空行表示头部结束
key, value = line.split(': ', 1)
headers[key] = value
# 发送回客户端
# time.sleep(10)
client_socket.sendall(data)
# client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
# # time.sleep(1)
# client_socket.close()
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 8012))
server.listen(5)
while True:
client_socket, client_address = server.accept()
work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号
work_thread = threading.Thread(target=main) # 创建一个子线程对象
work_thread.daemon = True # 将子线程设置为守护线程
work_thread.start() # 启动子线程
while True: # 主线程的循环
time.sleep(1)
application/octet-stream
版本6 有配置的静态http
版本6 有配置的静态http+cgi
版本6 有配置的静态http+cgi+fastcgi
版本6 有配置的静态http+cgi+fastcgi+http代理
版本6 有配置的静态http+cgi+fastcgi+http代理+socks5代理
python的命令行参数 https://docs.python.org/zh-cn/3/library/argparse.html https://docs.python.org/zh-cn/3/library/getopt.html
python的配置文件 https://docs.python.org/zh-cn/3/library/configparser.html https://docs.python.org/zh-cn/3/library/json.html
好像只有 windows 下才需要特别处理信号,linux是可以直接用 ctrl+c 退出的 https://www.fournoas.com/posts/handling-signal-in-python-on-different-platforms/ https://www.fournoas.com/posts/why-does-ctrl-c-not-kill-python-process-in-windows-console/