今天的话就开始我们的Python实践项目,本次我们要做的是带GUI的端口扫描器。我们主要做的功能是根据用户输入的IP和端口进行扫描,确认该端口是否开启。并且发布在 Github:https://github.com/ZherKing/ZKPortScanner 话不多说 马上开始。
一、了解原理及思路
1、原理
首先了解一下什么叫做端口扫描,我们知道访问一个对外开放公网的服务,是一组 IP 和 端口 来进行访问的,那我们进行访问的时候 访问格式是 IP:端口 。当端口开放的时候就是可以访问的,那我们的目的就是检测这个IP的端口能否进行访问。
有接触过的小伙伴应该认识 TCPING 这是一个Windows内置的一个软件(需要额外安装),在进行 TCPING 命令操作的时候,我们可以确认这个 IP 的端口能否访问:
在进行 TCPING 的时候我们就可以判断 TCP 是否成功接手了。
2、思路
大概了解完后原理,我们就开始策划我们需要用 Python 来实现这个功能了。首先我们这次要做一个带GUI页面的一个软件,那么GUI的话我们会使用tkinker这个模块来进行制作,并且完成后我们会使用 Pyinstaller 来进行封装成 exe 可执行文件,那大概讲到这里。
那回到代码层面的话,大概的代码内容我们是用Python库里面自带的 socket 模块 中 的 connect 函数,先讲解一下 connect 函数的原理:指定的IP和端口通过特定主机远程socket,如果访问失败就会出现 timeout,这就是我们判断端口是否开放的一个过程。
还有一个 threading 模块,这是一个多线程的模块,主要用作于我们后期如果说要一次性扫描多个端口,又不想造成整个程序的卡顿,那么我们需要用到的就是多线程,可以减少程序的运行压力。
二、演示流程
1、尝试使用 socket 来进行访问
那我们先通过 socket 模块来设置一个远程主机的链接,来达成一个端口扫描的结果。
def scan_port(ip, port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect((ip, port))
except:
return False
else:
return True
finally:
sock.close()
示例代码中呢,是一个简单的端口扫描工具,代码通过 socket 模块来对远程主机形成链接,如果出现 timeout 的情况就是端口未开放,就返回 False,但是有个要注意的是,当代码结束后要关闭 sock 连接。
那我们稍微给代码修葺一下,形成一个可以用的状态:
import socket
def scan_port(ip, port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect((ip, port))
except:
return False
else:
return True
finally:
sock.close()
if __name__ == '__main__':
ipaddress = "127.0.0.1"
port = 80
result = scan_port(ipaddress, port)
print(result)
主函数写入IP端口等信息,直接调用函数即可实现。
2、指定端口范围扫描
你肯定需要指定端口范围扫描端口吧,可能你要求用户输入的端口范围可以是以 ,隔开 或者 以 – 设定端口的扫描范围。这样的话有个好处就是节省GUI页面的框框,我不需要输入几个框框,比如说第一个框框是起始端口另一个框框是终止端口这样。
那我们就开始编写这个指定端口范围扫描的一个方案,首先的话我们要按照用户输入的端口进行解析,比如说用户输入了 1-5 那我们要拆分为 1 2 3 4 5 五个端口,再比如说用户输入了 12,23 那我们就要拆分为 12 23两个端口,那如何实现呢?我们可以使用 Python 语言的 split 进行切割:
def parse_ports(port_input):
"""
解析端口输入,可以是单个端口、多个端口和端口范围的组合。
返回一个包含所有要扫描的端口号的列表。
"""
ports = set()
parts = port_input.split(',')
for part in parts:
if '-' in part:
start, end = map(int, part.split('-'))
ports.update(range(start, end + 1))
else:
ports.add(int(part))
return sorted(ports)
代码内容意思函数获取传入参数,对 , 进行分离,然后检测是否有没有 – ,如果有的话端口范围就要产生,通过循环语句来实现起始端口到结束端口。
那么到此为止我们就完成了这么一个端口范围的筛选,那接下来我们难道就要完工了吗?不 如果只是单单挑出端口来进行检测是否有点糟糕,这样的话代码的运行效率会很低,这时候我们就要使用多线程来进行端口扫描。
def scan_ports(ip, ports, max_workers=100):
"""
扫描指定IP地址的端口列表。
"""
open_ports = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_port = {executor.submit(scan_port, ip, port): port for port in ports}
for future in future_to_port:
port = future_to_port[future]
if future.result():
open_ports.append(port)
return open_ports
这一部分的话就是使用了多线程的方法来进行扫描端口,这样的话能够大大提升端口扫描的效率。
其中 max_workers=100 是一个扫描端口的最大线程数,不要整太大了不然给别人扫死了( 那我们开始对其他的代码内容进行解释(使用Copilot来解释):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_port = {executor.submit(scan_port, ip, port): port for port in ports}
- 使用 ThreadPoolExecutor 创建一个线程池执行器 executor,最大线程数为 max_workers。
- executor.submit(scan_port, ip, port) 将 scan_port 函数提交给线程池执行器,并传递 ip 和 port 参数,返回一个 Future 对象。
- 使用字典推导式创建 future_to_port,将每个 Future 对象映射到相应的端口号。遍历 ports 列表,为每个端口号创建一个 Future 对象。
for future in future_to_port:
port = future_to_port[future]
if future.result():
open_ports.append(port)
- 遍历 future_to_port 字典中的每个 Future 对象。
- 从 future_to_port 字典中获取与当前 Future 对象对应的端口号 port。
- 调用 future.result() 获取 Future 对象的结果,即 scan_port 函数的返回值。如果端口是开放的(future.result() 返回 True),则将端口号 port 添加到 open_ports 列表中。
最后的 return open_ports 就是返回可用端口。
3、对域名进行解析
有时候用户输入的肯定不是IP地址,而是域名,那我肯定要先进行判断用户输入的是域名还是IP:
if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', target):
ip = resolve_domain(target)
if ip is None:
print(f"无法解析域名 {target}")
exit(1)
else:
ip = target
判断完是IP那就进行扫描,那如果是域名的话那我们先进行解析:
def resolve_domain(domain):
"""
将域名解析为IP地址。
"""
try:
ip = socket.gethostbyname(domain)
return ip
except socket.gaierror:
return None
通过 socket 模块的 gethostbyname 函数就可以直接从域名解析中获取到IP地址了。
4、完整代码(无GUI)
import socket
from concurrent.futures import ThreadPoolExecutor
import re
def parse_ports(port_input):
"""
解析端口输入,可以是单个端口、多个端口和端口范围的组合。
返回一个包含所有要扫描的端口号的列表。
"""
ports = set()
parts = port_input.split(',')
for part in parts:
if '-' in part:
start, end = map(int, part.split('-'))
ports.update(range(start, end + 1))
else:
ports.add(int(part))
return sorted(ports)
def resolve_domain(domain):
"""
将域名解析为IP地址。
"""
try:
ip = socket.gethostbyname(domain)
return ip
except socket.gaierror:
return None
def scan_port(ip, port):
"""
尝试连接指定的IP地址和端口。
如果连接成功,则认为端口是开放的。
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect((ip, port))
except:
return False
else:
return True
finally:
sock.close()
def scan_ports(ip, ports, max_workers=100):
"""
扫描指定IP地址的端口列表。
"""
open_ports = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_port = {executor.submit(scan_port, ip, port): port for port in ports}
for future in future_to_port:
port = future_to_port[future]
if future.result():
open_ports.append(port)
return open_ports
if __name__ == "__main__":
target = input("请输入要扫描的IP地址或域名: ")
port_input = input("请输入要扫描的端口(例如12,23或1-5): ")
# 解析域名(如果输入的不是IP地址)
if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', target):
ip = resolve_domain(target)
if ip is None:
print(f"无法解析域名 {target}")
exit(1)
else:
ip = target
ports = parse_ports(port_input)
print(f"开始扫描 {ip} 的端口: {', '.join(map(str, ports))}...")
open_ports = scan_ports(ip, ports)
if open_ports:
print(f"开放的端口: {', '.join(map(str, open_ports))}")
else:
print("没有发现开放的端口。")
三、制作GUI页面
那我们就开始制作GUI页面了。这次我们用的是Python模块里面的 tkinker 来进行制作 GUI 页面。
既然要进行制作 GUI 页面,那么我们原本在主函数设置好的那些提示可要删掉了,我们做的GUI是要和代码非常贴合的。先看我们看成品:
import tkinter as tk
from tkinter import messagebox, scrolledtext
import assets.scan_ports
from assets.scan_ports import *
def gui():
# 创建主窗口
root = tk.Tk()
root.title("ZKPortScanner - 端口扫描工具 by ZherKing")
# 创建并放置标签和输入框
tk.Label(root, text="目标IP地址或域名:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
entry_target = tk.Entry(root, width=50)
entry_target.grid(row=0, column=1, padx=5, pady=5)
tk.Label(root, text="端口(例如80、80,443或者10-20):").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
entry_ports = tk.Entry(root, width=50)
entry_ports.grid(row=1, column=1, padx=5, pady=5)
tk.Label(root, text="线程数:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
entry_max_workers = tk.Entry(root, width=50)
entry_max_workers.grid(row=2, column=1, padx=5, pady=5)
# 创建并放置扫描按钮
text_result = scrolledtext.ScrolledText(root, width=80, height=20)
text_result.grid(row=4, column=0, columnspan=2, padx=5, pady=5)
btn_scan = tk.Button(root, text="开始扫描",
command=lambda: start_scan(entry_target, entry_ports, entry_max_workers, text_result))
btn_scan.grid(row=3, column=0, columnspan=2, pady=10)
# 创建并放置滚动文本框用于显示结果
text_result = scrolledtext.ScrolledText(root, width=80, height=20)
text_result.grid(row=4, column=0, columnspan=2, padx=5, pady=5)
# 运行主循环
root.mainloop()
大概的内容就是这些,具体详细的内容注释都有,给大家看看页面:
那针对这个GUI输入的内容如何传输,我们专门搞了一个函数来对接内容:
def start_scan(entry_target, entry_ports, entry_max_workers, text_result):
target = entry_target.get().strip()
port_input = entry_ports.get().strip()
max_workers_input = entry_max_workers.get().strip()
if not target or not port_input or not max_workers_input:
messagebox.showwarning("输入错误", "请输入目标IP地址或域名、端口和线程数。")
return
try:
max_workers = int(max_workers_input)
except ValueError:
messagebox.showwarning("输入错误", "线程数必须是一个整数。")
return
# 解析域名(如果输入的不是IP地址)
if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', target):
ip = resolve_domain(target)
if ip is None:
messagebox.showerror("解析错误", f"无法解析域名 {target}")
return
else:
ip = target
ports = parse_ports(port_input)
if not ports:
messagebox.showwarning("输入错误", "请输入有效的端口。")
return
text_result.delete(1.0, tk.END)
text_result.insert(tk.END, f"开始扫描 {ip} 的端口: {', '.join(map(str, ports))},使用线程数: {max_workers}...\n")
open_ports = scan_ports(ip, ports, max_workers)
if open_ports:
result = f"开放的端口: {', '.join(map(str, open_ports))}\n"
else:
result = "没有发现开放的端口。\n"
text_result.insert(tk.END, result)
这个函数的内容主要是对 GUI窗口传参过来的数据做进一步的处理,然后再将内容传送回去GUI页面作为一个日志的输出。
四、成品
我们代码主要分成了三个部分:main.py gui.py scan_ports.py 三个部分,其中的文件大概是这么分的:
- /根目录:
- main.py #主文件 首选运行
- /assets
- scan_ports.py #扫描端口的主要工作文件
- gui.py #主要干显示GUI的
源代码:
main:
import re
from assets.scan_ports import *
from assets.gui import *
if __name__ == "__main__":
gui()
scan_ports:
import socket
from concurrent.futures import ThreadPoolExecutor
import tkinter as tk
import re
from tkinter import messagebox
def parse_ports(port_input):
"""
解析端口输入,可以是单个端口、多个端口和端口范围的组合。
返回一个包含所有要扫描的端口号的列表。
"""
ports = set()
parts = port_input.split(',')
for part in parts:
if '-' in part:
start, end = map(int, part.split('-'))
ports.update(range(start, end + 1))
else:
ports.add(int(part))
return sorted(ports)
def resolve_domain(domain):
"""
将域名解析为IP地址。
"""
try:
ip = socket.gethostbyname(domain)
return ip
except socket.gaierror:
return None
def scan_port(ip, port):
"""
尝试连接指定的IP地址和端口。
如果连接成功,则认为端口是开放的。
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect((ip, port))
except:
return False
else:
return True
finally:
sock.close()
def scan_ports(ip, ports, max_workers=100):
"""
扫描指定IP地址的端口列表。
"""
open_ports = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_port = {executor.submit(scan_port, ip, port): port for port in ports}
for future in future_to_port:
port = future_to_port[future]
if future.result():
open_ports.append(port)
return open_ports
def start_scan(entry_target, entry_ports, entry_max_workers, text_result):
target = entry_target.get().strip()
port_input = entry_ports.get().strip()
max_workers_input = entry_max_workers.get().strip()
if not target or not port_input or not max_workers_input:
messagebox.showwarning("输入错误", "请输入目标IP地址或域名、端口和线程数。")
return
try:
max_workers = int(max_workers_input)
except ValueError:
messagebox.showwarning("输入错误", "线程数必须是一个整数。")
return
# 解析域名(如果输入的不是IP地址)
if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', target):
ip = resolve_domain(target)
if ip is None:
messagebox.showerror("解析错误", f"无法解析域名 {target}")
return
else:
ip = target
ports = parse_ports(port_input)
if not ports:
messagebox.showwarning("输入错误", "请输入有效的端口。")
return
text_result.delete(1.0, tk.END)
text_result.insert(tk.END, f"开始扫描 {ip} 的端口: {', '.join(map(str, ports))},使用线程数: {max_workers}...\n")
open_ports = scan_ports(ip, ports, max_workers)
if open_ports:
result = f"开放的端口: {', '.join(map(str, open_ports))}\n"
else:
result = "没有发现开放的端口。\n"
text_result.insert(tk.END, result)
gui:
import tkinter as tk
from tkinter import messagebox, scrolledtext
import assets.scan_ports
from assets.scan_ports import *
def gui():
# 创建主窗口
root = tk.Tk()
root.title("ZKPortScanner - 端口扫描工具 by ZherKing")
# 创建并放置标签和输入框
tk.Label(root, text="目标IP地址或域名:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
entry_target = tk.Entry(root, width=50)
entry_target.grid(row=0, column=1, padx=5, pady=5)
tk.Label(root, text="端口(例如80、80,443或者10-20):").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
entry_ports = tk.Entry(root, width=50)
entry_ports.grid(row=1, column=1, padx=5, pady=5)
tk.Label(root, text="线程数:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
entry_max_workers = tk.Entry(root, width=50)
entry_max_workers.grid(row=2, column=1, padx=5, pady=5)
# 创建并放置扫描按钮
text_result = scrolledtext.ScrolledText(root, width=80, height=20)
text_result.grid(row=4, column=0, columnspan=2, padx=5, pady=5)
btn_scan = tk.Button(root, text="开始扫描",
command=lambda: start_scan(entry_target, entry_ports, entry_max_workers, text_result))
btn_scan.grid(row=3, column=0, columnspan=2, pady=10)
# 创建并放置滚动文本框用于显示结果
text_result = scrolledtext.ScrolledText(root, width=80, height=20)
text_result.grid(row=4, column=0, columnspan=2, padx=5, pady=5)
# 运行主循环
root.mainloop()
那我们本次的项目开发就到这里了,感谢大家能够阅读本博文。
上次端口扫描樱花穿透的节点,被封了10分钟。哈哈哈
好惨的娃 可以看看降低线程扫描什么的 (我也不清楚樱花机制咋样的)