Python项目实践 – 从0开始做端口扫描器

今天的话就开始我们的Python实践项目,本次我们要做的是带GUI的端口扫描器。我们主要做的功能是根据用户输入的IP和端口进行扫描,确认该端口是否开启。并且发布在 Github:https://github.com/ZherKing/ZKPortScanner 话不多说 马上开始。

一、了解原理及思路

1、原理

首先了解一下什么叫做端口扫描,我们知道访问一个对外开放公网的服务,是一组 IP 和 端口 来进行访问的,那我们进行访问的时候 访问格式是 IP:端口 。当端口开放的时候就是可以访问的,那我们的目的就是检测这个IP的端口能否进行访问

有接触过的小伙伴应该认识 TCPING 这是一个Windows内置的一个软件(需要额外安装),在进行 TCPING 命令操作的时候,我们可以确认这个 IP 的端口能否访问:

在进行 TCPING 的时候我们就可以判断 TCP 是否成功接手了。

2、思路

大概了解完后原理,我们就开始策划我们需要用 Python 来实现这个功能了。首先我们这次要做一个带GUI页面的一个软件,那么GUI的话我们会使用tkinker这个模块来进行制作,并且完成后我们会使用 Pyinstaller 来进行封装成 exe 可执行文件,那大概讲到这里。

那回到代码层面的话,大概的代码内容我们是用Python库里面自带的 socket 模块 中 的 connect 函数,先讲解一下 connect 函数的原理:指定的IP和端口通过特定主机远程socket,如果访问失败就会出现 timeout,这就是我们判断端口是否开放的一个过程。

还有一个 threading 模块,这是一个多线程的模块,主要用作于我们后期如果说要一次性扫描多个端口,又不想造成整个程序的卡顿,那么我们需要用到的就是多线程,可以减少程序的运行压力。

二、演示流程

1、尝试使用 socket 来进行访问

那我们先通过 socket 模块来设置一个远程主机的链接,来达成一个端口扫描的结果。

def scan_port(ip, port):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        sock.connect((ip, port))
    except:
        return False
    else:
        return True
    finally:
        sock.close()

示例代码中呢,是一个简单的端口扫描工具,代码通过 socket 模块来对远程主机形成链接,如果出现 timeout 的情况就是端口未开放,就返回 False,但是有个要注意的是,当代码结束后要关闭 sock 连接

那我们稍微给代码修葺一下,形成一个可以用的状态:

import socket
def scan_port(ip, port):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        sock.connect((ip, port))
    except:
        return False
    else:
        return True
    finally:
        sock.close()

if __name__ == '__main__':
    ipaddress = "127.0.0.1"
    port = 80
    result = scan_port(ipaddress, port)
    print(result)

主函数写入IP端口等信息,直接调用函数即可实现。

2、指定端口范围扫描

你肯定需要指定端口范围扫描端口吧,可能你要求用户输入的端口范围可以是以 ,隔开 或者 以 设定端口的扫描范围。这样的话有个好处就是节省GUI页面的框框,我不需要输入几个框框,比如说第一个框框是起始端口另一个框框是终止端口这样。

那我们就开始编写这个指定端口范围扫描的一个方案,首先的话我们要按照用户输入的端口进行解析,比如说用户输入了 1-5 那我们要拆分为 1 2 3 4 5 五个端口,再比如说用户输入了 12,23 那我们就要拆分为 12 23两个端口,那如何实现呢?我们可以使用 Python 语言的 split 进行切割:

def parse_ports(port_input):
    """
    解析端口输入,可以是单个端口、多个端口和端口范围的组合。
    返回一个包含所有要扫描的端口号的列表。
    """
    ports = set()
    parts = port_input.split(',')
    for part in parts:
        if '-' in part:
            start, end = map(int, part.split('-'))
            ports.update(range(start, end + 1))
        else:
            ports.add(int(part))
    return sorted(ports)

代码内容意思函数获取传入参数,对 , 进行分离,然后检测是否有没有 – ,如果有的话端口范围就要产生,通过循环语句来实现起始端口到结束端口。

那么到此为止我们就完成了这么一个端口范围的筛选,那接下来我们难道就要完工了吗?不 如果只是单单挑出端口来进行检测是否有点糟糕,这样的话代码的运行效率会很低,这时候我们就要使用多线程来进行端口扫描。

def scan_ports(ip, ports, max_workers=100):
    """
    扫描指定IP地址的端口列表。
    """
    open_ports = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_port = {executor.submit(scan_port, ip, port): port for port in ports}
        for future in future_to_port:
            port = future_to_port[future]
            if future.result():
                open_ports.append(port)

    return open_ports

这一部分的话就是使用了多线程的方法来进行扫描端口,这样的话能够大大提升端口扫描的效率

其中 max_workers=100 是一个扫描端口的最大线程数,不要整太大了不然给别人扫死了( 那我们开始对其他的代码内容进行解释(使用Copilot来解释):

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_port = {executor.submit(scan_port, ip, port): port for port in ports}
  • 使用 ThreadPoolExecutor 创建一个线程池执行器 executor,最大线程数为 max_workers
  • executor.submit(scan_port, ip, port) 将 scan_port 函数提交给线程池执行器,并传递 ip 和 port 参数,返回一个 Future 对象。
  • 使用字典推导式创建 future_to_port,将每个 Future 对象映射到相应的端口号。遍历 ports 列表,为每个端口号创建一个 Future 对象。
        for future in future_to_port:
            port = future_to_port[future]
            if future.result():
                open_ports.append(port)
  • 遍历 future_to_port 字典中的每个 Future 对象。
  • 从 future_to_port 字典中获取与当前 Future 对象对应的端口号 port。
  • 调用 future.result() 获取 Future 对象的结果,即 scan_port 函数的返回值。如果端口是开放的(future.result() 返回 True),则将端口号 port 添加到 open_ports 列表中。

最后的 return open_ports 就是返回可用端口。

3、对域名进行解析

有时候用户输入的肯定不是IP地址,而是域名,那我肯定要先进行判断用户输入的是域名还是IP:

    if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', target):
        ip = resolve_domain(target)
        if ip is None:
            print(f"无法解析域名 {target}")
            exit(1)
    else:
        ip = target

判断完是IP那就进行扫描,那如果是域名的话那我们先进行解析:

def resolve_domain(domain):
    """
    将域名解析为IP地址。
    """
    try:
        ip = socket.gethostbyname(domain)
        return ip
    except socket.gaierror:
        return None

通过 socket 模块的 gethostbyname 函数就可以直接从域名解析中获取到IP地址了。

4、完整代码(无GUI)

import socket
from concurrent.futures import ThreadPoolExecutor
import re


def parse_ports(port_input):
    """
    解析端口输入,可以是单个端口、多个端口和端口范围的组合。
    返回一个包含所有要扫描的端口号的列表。
    """
    ports = set()
    parts = port_input.split(',')
    for part in parts:
        if '-' in part:
            start, end = map(int, part.split('-'))
            ports.update(range(start, end + 1))
        else:
            ports.add(int(part))
    return sorted(ports)

def resolve_domain(domain):
    """
    将域名解析为IP地址。
    """
    try:
        ip = socket.gethostbyname(domain)
        return ip
    except socket.gaierror:
        return None

def scan_port(ip, port):
    """
    尝试连接指定的IP地址和端口。
    如果连接成功,则认为端口是开放的。
    """
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        sock.connect((ip, port))
    except:
        return False
    else:
        return True
    finally:
        sock.close()

def scan_ports(ip, ports, max_workers=100):
    """
    扫描指定IP地址的端口列表。
    """
    open_ports = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_port = {executor.submit(scan_port, ip, port): port for port in ports}
        for future in future_to_port:
            port = future_to_port[future]
            if future.result():
                open_ports.append(port)

    return open_ports

if __name__ == "__main__":
    target = input("请输入要扫描的IP地址或域名: ")
    port_input = input("请输入要扫描的端口(例如12,23或1-5): ")

    # 解析域名(如果输入的不是IP地址)
    if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', target):
        ip = resolve_domain(target)
        if ip is None:
            print(f"无法解析域名 {target}")
            exit(1)
    else:
        ip = target

    ports = parse_ports(port_input)

    print(f"开始扫描 {ip} 的端口: {', '.join(map(str, ports))}...")

    open_ports = scan_ports(ip, ports)

    if open_ports:
        print(f"开放的端口: {', '.join(map(str, open_ports))}")
    else:
        print("没有发现开放的端口。")

三、制作GUI页面

那我们就开始制作GUI页面了。这次我们用的是Python模块里面的 tkinker 来进行制作 GUI 页面。

既然要进行制作 GUI 页面,那么我们原本在主函数设置好的那些提示可要删掉了,我们做的GUI是要和代码非常贴合的。先看我们看成品:

import tkinter as tk
from tkinter import messagebox, scrolledtext

import assets.scan_ports
from assets.scan_ports import *


def gui():
# 创建主窗口
root = tk.Tk()
root.title("ZKPortScanner - 端口扫描工具 by ZherKing")

# 创建并放置标签和输入框
tk.Label(root, text="目标IP地址或域名:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
entry_target = tk.Entry(root, width=50)
entry_target.grid(row=0, column=1, padx=5, pady=5)

tk.Label(root, text="端口(例如80、80,443或者10-20):").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
entry_ports = tk.Entry(root, width=50)
entry_ports.grid(row=1, column=1, padx=5, pady=5)

tk.Label(root, text="线程数:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
entry_max_workers = tk.Entry(root, width=50)
entry_max_workers.grid(row=2, column=1, padx=5, pady=5)

# 创建并放置扫描按钮
text_result = scrolledtext.ScrolledText(root, width=80, height=20)
text_result.grid(row=4, column=0, columnspan=2, padx=5, pady=5)

btn_scan = tk.Button(root, text="开始扫描",
command=lambda: start_scan(entry_target, entry_ports, entry_max_workers, text_result))
btn_scan.grid(row=3, column=0, columnspan=2, pady=10)

# 创建并放置滚动文本框用于显示结果
text_result = scrolledtext.ScrolledText(root, width=80, height=20)
text_result.grid(row=4, column=0, columnspan=2, padx=5, pady=5)

# 运行主循环
root.mainloop()

大概的内容就是这些,具体详细的内容注释都有,给大家看看页面:

那针对这个GUI输入的内容如何传输,我们专门搞了一个函数来对接内容:

def start_scan(entry_target, entry_ports, entry_max_workers, text_result):
    target = entry_target.get().strip()
    port_input = entry_ports.get().strip()
    max_workers_input = entry_max_workers.get().strip()

    if not target or not port_input or not max_workers_input:
        messagebox.showwarning("输入错误", "请输入目标IP地址或域名、端口和线程数。")
        return

    try:
        max_workers = int(max_workers_input)
    except ValueError:
        messagebox.showwarning("输入错误", "线程数必须是一个整数。")
        return

    # 解析域名(如果输入的不是IP地址)
    if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', target):
        ip = resolve_domain(target)
        if ip is None:
            messagebox.showerror("解析错误", f"无法解析域名 {target}")
            return
    else:
        ip = target

    ports = parse_ports(port_input)
    if not ports:
        messagebox.showwarning("输入错误", "请输入有效的端口。")
        return

    text_result.delete(1.0, tk.END)
    text_result.insert(tk.END, f"开始扫描 {ip} 的端口: {', '.join(map(str, ports))},使用线程数: {max_workers}...\n")

    open_ports = scan_ports(ip, ports, max_workers)

    if open_ports:
        result = f"开放的端口: {', '.join(map(str, open_ports))}\n"
    else:
        result = "没有发现开放的端口。\n"

    text_result.insert(tk.END, result)

这个函数的内容主要是对 GUI窗口传参过来的数据做进一步的处理,然后再将内容传送回去GUI页面作为一个日志的输出。

四、成品

我们代码主要分成了三个部分:main.py gui.py scan_ports.py 三个部分,其中的文件大概是这么分的:

  • /根目录:
  • main.py #主文件 首选运行
  • /assets
    • scan_ports.py #扫描端口的主要工作文件
    • gui.py #主要干显示GUI的

源代码:

main:

import re
from assets.scan_ports import *
from assets.gui import *
if __name__ == "__main__":
    gui()

scan_ports:

import socket
from concurrent.futures import ThreadPoolExecutor
import tkinter as tk
import re
from tkinter import messagebox


def parse_ports(port_input):
    """
    解析端口输入,可以是单个端口、多个端口和端口范围的组合。
    返回一个包含所有要扫描的端口号的列表。
    """
    ports = set()
    parts = port_input.split(',')
    for part in parts:
        if '-' in part:
            start, end = map(int, part.split('-'))
            ports.update(range(start, end + 1))
        else:
            ports.add(int(part))
    return sorted(ports)

def resolve_domain(domain):
    """
    将域名解析为IP地址。
    """
    try:
        ip = socket.gethostbyname(domain)
        return ip
    except socket.gaierror:
        return None

def scan_port(ip, port):
    """
    尝试连接指定的IP地址和端口。
    如果连接成功,则认为端口是开放的。
    """
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        sock.connect((ip, port))
    except:
        return False
    else:
        return True
    finally:
        sock.close()

def scan_ports(ip, ports, max_workers=100):
    """
    扫描指定IP地址的端口列表。
    """
    open_ports = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_port = {executor.submit(scan_port, ip, port): port for port in ports}
        for future in future_to_port:
            port = future_to_port[future]
            if future.result():
                open_ports.append(port)

    return open_ports


def start_scan(entry_target, entry_ports, entry_max_workers, text_result):
    target = entry_target.get().strip()
    port_input = entry_ports.get().strip()
    max_workers_input = entry_max_workers.get().strip()

    if not target or not port_input or not max_workers_input:
        messagebox.showwarning("输入错误", "请输入目标IP地址或域名、端口和线程数。")
        return

    try:
        max_workers = int(max_workers_input)
    except ValueError:
        messagebox.showwarning("输入错误", "线程数必须是一个整数。")
        return

    # 解析域名(如果输入的不是IP地址)
    if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', target):
        ip = resolve_domain(target)
        if ip is None:
            messagebox.showerror("解析错误", f"无法解析域名 {target}")
            return
    else:
        ip = target

    ports = parse_ports(port_input)
    if not ports:
        messagebox.showwarning("输入错误", "请输入有效的端口。")
        return

    text_result.delete(1.0, tk.END)
    text_result.insert(tk.END, f"开始扫描 {ip} 的端口: {', '.join(map(str, ports))},使用线程数: {max_workers}...\n")

    open_ports = scan_ports(ip, ports, max_workers)

    if open_ports:
        result = f"开放的端口: {', '.join(map(str, open_ports))}\n"
    else:
        result = "没有发现开放的端口。\n"

    text_result.insert(tk.END, result)

gui:

import tkinter as tk
from tkinter import messagebox, scrolledtext

import assets.scan_ports
from assets.scan_ports import *


def gui():
    # 创建主窗口
    root = tk.Tk()
    root.title("ZKPortScanner - 端口扫描工具 by ZherKing")

    # 创建并放置标签和输入框
    tk.Label(root, text="目标IP地址或域名:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
    entry_target = tk.Entry(root, width=50)
    entry_target.grid(row=0, column=1, padx=5, pady=5)

    tk.Label(root, text="端口(例如80、80,443或者10-20):").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
    entry_ports = tk.Entry(root, width=50)
    entry_ports.grid(row=1, column=1, padx=5, pady=5)

    tk.Label(root, text="线程数:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
    entry_max_workers = tk.Entry(root, width=50)
    entry_max_workers.grid(row=2, column=1, padx=5, pady=5)

    # 创建并放置扫描按钮
    text_result = scrolledtext.ScrolledText(root, width=80, height=20)
    text_result.grid(row=4, column=0, columnspan=2, padx=5, pady=5)

    btn_scan = tk.Button(root, text="开始扫描",
                         command=lambda: start_scan(entry_target, entry_ports, entry_max_workers, text_result))
    btn_scan.grid(row=3, column=0, columnspan=2, pady=10)

    # 创建并放置滚动文本框用于显示结果
    text_result = scrolledtext.ScrolledText(root, width=80, height=20)
    text_result.grid(row=4, column=0, columnspan=2, padx=5, pady=5)

    # 运行主循环
    root.mainloop()

那我们本次的项目开发就到这里了,感谢大家能够阅读本博文。

评论

  1. Windows Edge 133.0.0.0
    3 周前
    2025-3-08 21:35:00

    上次端口扫描樱花穿透的节点,被封了10分钟。哈哈哈

    • 博主
      Ryugu
      Windows Edge 134.0.0.0
      3 周前
      2025-3-08 21:49:58

      好惨的娃 可以看看降低线程扫描什么的 (我也不清楚樱花机制咋样的)

发送评论 编辑评论


|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇