Python实现端口扫描

端口扫描

端口扫描工具(Port Scanner)指用于探测服务器或主机开放端口情况的工具。常被计算机管理员用于确认安全策略,同时被攻击者用于识别目标主机上的可运作的网络服务。

端口扫描定义是客户端向一定范围的服务器端口发送对应请求,以此确认可使用的端口。虽然其本身并不是恶意的网络活动,但也是网络攻击者探测目标主机服务,以利用该服务的已知漏洞的重要手段。端口扫描的主要用途仍然只是确认远程机器某个服务的可用性。

扫描多个主机以获取特定的某个端口被称为端口清扫(Portsweep),以此获取特定的服务。例如,基于SQL服务的计算机蠕虫就会清扫大量主机的同一端口以在 1433 端口上建立TCP连接。

实现原理

最简单的端口扫描工具使用TCP连接扫描的方式,即利用操作系统原生的网络功能,且通常作为SYN扫描的替代选项。Nmap将这种模式称为连接扫描,因为使用了类似Unix系统的connect()命令。如果该端口是开放的,操作系统就能完成TCP三次握手,然后端口扫描工具会立即关闭刚建立的该连接,防止拒绝服务攻击。这种扫描模式的优势是用户无需特殊权限。但使用操作系统原生网络功能不能实现底层控制,因此这种扫描方式并不流行。并且TCP扫描很容易被发现,尤其作为端口清扫的手段:这些服务会记录发送者的IP地址,入侵检测系统可能触发警报。

还有另外一种扫描方式是SYN扫描,端口扫描工具不使用操作系统原生网络功能,而是自行生成、发送IP数据包,并监控其回应。这种扫描模式被称为“半开放扫描”,因为它从不建立完整的TCP连接。端口扫描工具生成一个SYN包,如果目标端口开放,则会返回SYN-ACK包。扫描端回应一个RST包,然后在握手完成前关闭连接。如果端口关闭了但未使用过滤,目标端口应该会持续返回RST包。这种粗略的网络利用方式有几个优点:给扫描工具全权控制数据包发送和等待回应时长的权力,允许更详细的回应分析。关于哪一种对目标主机的扫描方式更不具备入侵性存在一些争议,但SYN扫描的优势是从不会建立完整的连接。然而,RST包可能导致网络堵塞,尤其是一些简单如打印机之类的网络设备。

此次使用的是第一种扫描方式,直接利用操作系统的socket连接接口,初步测试目标服务器的端口是否可以连接,如果可以则返回端口打开状态

实现步骤

可以把程序分成下面几部分:

1.读取端口及目标服务器
2.测试TCP端口连接
3.输出开放端口结果

1.读取端口及目标服务器

直接把目标服务器和端口范围作为参数传给我们的程序,程序运行参数:

python port_scan.py -

程序中使用sys.argv[]来读取并初步处理,具体的语法请参考python sys.argv[]用法

#!/usr/bin/python
# -*- coding: UTF-8 -*-
'''
@Author:joy_nick
@博客:http://byd.dropsec.xyz/
'''
import sys

# portscan.py <host> <start_port>-<end_port>
host = sys.argv[1]
portstrs = sys.argv[2].split('-')

start_port = int(portstrs[0])
end_port = int(portstrs[1])

2.测试TCP端口连接

我们进入到一个循环,在这个循环中依次对端口范围内的端口进行连接测试。

首先要在文件开始部分引入 socket 包:

from socket import *

连接测试方法是:

创建socket
调用connect()函数
关闭连接

依次实现如下:

获取目标IP地址:

target_ip = gethostbyname(host)

进入循环开始连接:
opened_ports = []

for port in range(start_port, end_port + 1):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.settimeout(10)
    result = sock.connect_ex((target_ip, port))
    if result == 0:
        opened_ports.append(port)

附:

1.Python socket()介绍及使用方法

2.Python socket编程

3.输出开放端口结果

这一步骤很简单,只需要打印opened_ports列表:

print("Opened ports:")

for i in opened_ports:
    print(i)

完整的代码如下:

#!/usr/bin/python
# -*- coding: utf-8 -*-
'''
@Author:joy_nick
@博客:http://byd.dropsec.xyz/
'''
import sys
from socket import *

# port_scan.py <host> <start_port>-<end_port>
host = sys.argv[1]
portstrs = sys.argv[2].split('-')

start_port = int(portstrs[0])
end_port = int(portstrs[1])

target_ip = gethostbyname(host)
opened_ports = []

for port in range(start_port, end_port):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.settimeout(10)
    result = sock.connect_ex((target_ip, port))
    if result == 0:
        opened_ports.append(port)

print("Opened ports:")

for i in opened_ports:
    print(i)

执行测试

命令为:

python port_scan.py 192.168.1.200 21-23

多线程端口扫描器

上面的简单程序中,我们依次测试每个端口,如果要提高性能,可以考虑采用多线程的方式。

改进的方式如下:

1.把TCP连接测试封装进函数
2.每次循环都创建一个线程来执行1中的扫描函数
3.为了简化实现,把开放端口输出步骤写入到1的测试函数中
首先引入thread包,这个包是Python多线程实现需要的:

import thread

实现TCP测试函数,需要注意print输出时候需要加锁,如果不加锁可能会出现多个输出混合在一起的错误状态,而锁需要在程序启动时创建,从而能让新建的线程共享这个锁:

def tcp_test(port):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.settimeout(10)
    result = sock.connect_ex((target_ip, port))
    if result == 0:
        lock.acquire()
        print "Opened Port:",port
        lock.release()

附:

1.Python的锁

2.多线程lock.acquire()

注意当输出执行完后要释放锁lock。

输入的处理及lock的创建可以放在main函数中:

if __name__=='__main__':
    # portscan.py <host> <start_port>-<end_port>
    host = sys.argv[1]
    portstrs = sys.argv[2].split('-')

    start_port = int(portstrs[0])
    end_port = int(portstrs[1])

    target_ip = gethostbyname(host)

    lock = thread.allocate_lock()
然后修改for循环:
for port in range(start_port, end_port):
    thread.start_new_thread(tcp_test, (port,))

thread.start_new_thread 用来创建一个线程,该函数的第一个参数是一个线程中执行的函数,第二个参数必须是个元组,作为函数的输入,由于 tcp\_test 函数只有一个参数,所以我们使用(port,)这种形式表示这个参数为元组。

thread.start_new_thread参考资料如下:

这个是thread.start_new_thread(function,args[,kwargs])函数原型,其中function参数是你将要调用的线程函数;args是讲传递给你的线程函数的参数,它必须是个tuple类型(元组);而kwargs是可选的参数。线程的结束一般依靠线程函数的自然结束;也可以在线程函数中调用thread.exit(),它抛出SystemExit exception,达到退出线程的目的。

这里运行程序会报错:

Unhandled exception in thread started by 
sys.excepthook is missing 
lost sys.stderr 

网上解释为:

第一个问题:因为主进程已经结束,相关的资源已经释放,而线程还在后台运行,所以会导致线程找不到相关的资源和定义

第二个问题:因为主程序结束的时候,并没有等待子线程结束,也没有强制关闭子线程,因此还在后台运行,有两个办法可以让他们同时结束,一个办法是在在构建进程的时候增加参数 deamon=True, 第二个办法就是在程序最后增加thread1.join(),thread2.join()

但是并没有什么卵用,我的解决办法为:

在主函数上加里一个time.sleep(1)

完整的程序代码如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
'''
@Author:joy_nick
@博客:http://byd.dropsec.xyz/
'''
import sys
import thread
from socket import *

def tcp_test(port):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.settimeout(10)
    result = sock.connect_ex((target_ip, port))
    if result == 0:
        lock.acquire()
        print '主机:%s'%(target_ip)
        print "开放的端口为:",port
        print '\n'
        lock.release()
        thread.exit()

if __name__=='__main__':
    # portscan.py <host> <start_port>-<end_port>
    host = sys.argv[1]
    portstrs = sys.argv[2].split('-')

    start_port = int(portstrs[0])
    end_port = int(portstrs[1])

    target_ip = gethostbyname(host)

    lock = thread.allocate_lock()

    for port in range(start_port, end_port):
        thread.start_new_thread(tcp_test, (port,))
        time.sleep(1)

运行实例如下:

大爷,赏个铜板呗!