|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
端口扫描器
功能:扫描目标主机的开放端口和服务
作者:Cline
版本:1.0
"""
import socket
import argparse
import sys
import json
import csv
import xml.etree.ElementTree as ET
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import logging
import ipaddress
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('port_scanner.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# 常见端口和服务映射
COMMON_PORTS = {
21: 'FTP',
22: 'SSH',
23: 'Telnet',
25: 'SMTP',
53: 'DNS',
80: 'HTTP',
110: 'POP3',
143: 'IMAP',
443: 'HTTPS',
993: 'IMAPS',
995: 'POP3S',
1433: 'MSSQL',
1521: 'Oracle',
3306: 'MySQL',
3389: 'RDP',
5432: 'PostgreSQL',
6379: 'Redis',
8080: 'HTTP-Alt',
8443: 'HTTPS-Alt',
27017: 'MongoDB'
}
class PortScanner:
def __init__(self, config):
self.target = config['target']
self.ports = config['ports']
self.scan_type = config.get('scan_type', 'tcp') # tcp, syn, udp
self.timeout = config.get('timeout', 1.0)
self.threads = config.get('threads', 100)
self.delay = config.get('delay', 0) # 扫描延迟(毫秒)
self.output_format = config.get('output_format', 'json')
self.output_file = config.get('output_file', 'scan_results.json')
self.verbose = config.get('verbose', False)
# 扫描结果
self.results = {
'target': self.target,
'scan_time': datetime.now().isoformat(),
'scan_type': self.scan_type,
'open_ports': [],
'closed_ports': [],
'filtered_ports': []
}
def is_valid_ip(self, ip):
"""验证IP地址格式"""
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def resolve_hostname(self, hostname):
"""解析主机名到IP地址"""
try:
ip = socket.gethostbyname(hostname)
return ip
except socket.gaierror:
logger.error(f"无法解析主机名: {hostname}")
return None
def scan_port(self, port):
"""扫描单个端口"""
host = self.target
if not self.is_valid_ip(host):
host = self.resolve_hostname(host)
if not host:
return None
# 添加扫描延迟
if self.delay > 0:
time.sleep(self.delay / 1000.0)
try:
if self.scan_type == 'tcp':
return self.tcp_connect_scan(host, port)
elif self.scan_type == 'syn':
return self.syn_scan(host, port)
elif self.scan_type == 'udp':
return self.udp_scan(host, port)
else:
logger.error(f"不支持的扫描类型: {self.scan_type}")
return None
except Exception as e:
logger.error(f"扫描端口 {port} 时发生错误: {str(e)}")
return None
def tcp_connect_scan(self, host, port):
"""TCP连接扫描"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
start_time = time.time()
result = sock.connect_ex((host, port))
end_time = time.time()
response_time = round((end_time - start_time) * 1000, 2) # 毫秒
sock.close()
service = COMMON_PORTS.get(port, 'Unknown')
if result == 0:
# 端口开放,尝试获取服务banner
banner = self.get_service_banner(host, port)
return {
'port': port,
'state': 'open',
'service': service,
'response_time': response_time,
'banner': banner
}
else:
return {
'port': port,
'state': 'closed',
'service': service,
'response_time': response_time
}
except socket.timeout:
return {
'port': port,
'state': 'filtered',
'service': COMMON_PORTS.get(port, 'Unknown'),
'response_time': self.timeout * 1000
}
except Exception as e:
logger.debug(f"TCP扫描端口 {port} 时发生错误: {str(e)}")
return {
'port': port,
'state': 'unknown',
'service': COMMON_PORTS.get(port, 'Unknown')
}
def syn_scan(self, host, port):
"""SYN扫描(需要root权限)"""
try:
# 这里简化实现,实际SYN扫描需要使用原始套接字
logger.warning("SYN扫描需要root权限,降级为TCP连接扫描")
return self.tcp_connect_scan(host, port)
except Exception as e:
logger.error(f"SYN扫描端口 {port} 时发生错误: {str(e)}")
return None
def udp_scan(self, host, port):
"""UDP扫描"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(self.timeout)
# 发送空数据包
sock.sendto(b'', (host, port))
try:
# 尝试接收响应
data, addr = sock.recvfrom(1024)
sock.close()
service = COMMON_PORTS.get(port, 'Unknown')
banner = data.decode('utf-8', errors='ignore')[:100]
return {
'port': port,
'state': 'open',
'service': service,
'banner': banner
}
except socket.timeout:
# UDP端口可能开放但无响应
sock.close()
return {
'port': port,
'state': 'open|filtered',
'service': COMMON_PORTS.get(port, 'Unknown')
}
except Exception as e:
logger.debug(f"UDP扫描端口 {port} 时发生错误: {str(e)}")
return {
'port': port,
'state': 'closed',
'service': COMMON_PORTS.get(port, 'Unknown')
}
def get_service_banner(self, host, port):
"""获取服务banner信息"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2.0)
sock.connect((host, port))
# 发送简单的探测数据
if port in [21, 22, 23, 25, 80, 110, 143]:
sock.send(b'\r\n')
# 接收响应
banner = sock.recv(1024).decode('utf-8', errors='ignore')
sock.close()
# 清理banner信息
banner = banner.strip().replace('\r\n', ' ').replace('\n', ' ')
return banner[:200] # 限制长度
except Exception as e:
logger.debug(f"获取端口 {port} 的banner时发生错误: {str(e)}")
return ""
def scan_ports(self):
"""扫描所有端口"""
logger.info(f"开始扫描 {self.target} 的端口...")
logger.info(f"扫描类型: {self.scan_type}, 超时时间: {self.timeout}s, 线程数: {self.threads}")
start_time = time.time()
with ThreadPoolExecutor(max_workers=self.threads) as executor:
# 提交所有扫描任务
future_to_port = {
executor.submit(self.scan_port, port): port
for port in self.ports
}
# 收集结果
for future in as_completed(future_to_port):
port = future_to_port[future]
try:
result = future.result()
if result:
if result['state'] == 'open':
self.results['open_ports'].append(result)
if self.verbose:
logger.info(f"端口 {result['port']} ({result['service']}) 开放")
elif result['state'] == 'closed':
self.results['closed_ports'].append(result)
elif result['state'] in ['filtered', 'open|filtered']:
self.results['filtered_ports'].append(result)
except Exception as e:
logger.error(f"处理端口 {port} 的结果时出错: {str(e)}")
end_time = time.time()
self.results['duration'] = round(end_time - start_time, 2)
# 按端口号排序结果
self.results['open_ports'].sort(key=lambda x: x['port'])
self.results['closed_ports'].sort(key=lambda x: x['port'])
self.results['filtered_ports'].sort(key=lambda x: x['port'])
logger.info(f"扫描完成,耗时 {self.results['duration']} 秒")
logger.info(f"开放端口: {len(self.results['open_ports'])}")
logger.info(f"关闭端口: {len(self.results['closed_ports'])}")
logger.info(f"过滤端口: {len(self.results['filtered_ports'])}")
return self.results
def print_results(self):
"""打印扫描结果"""
print("\n" + "="*60)
print(f"端口扫描报告 - 目标: {self.target}")
print("="*60)
print(f"扫描时间: {self.results['scan_time']}")
print(f"扫描类型: {self.results['scan_type']}")
print(f"扫描耗时: {self.results['duration']} 秒")
print(f"开放端口: {len(self.results['open_ports'])}")
print(f"关闭端口: {len(self.results['closed_ports'])}")
print(f"过滤端口: {len(self.results['filtered_ports'])}")
if self.results['open_ports']:
print("\n开放端口详情:")
print("-" * 80)
print(f"{'端口':<8} {'服务':<15} {'响应时间(ms)':<15} {'Banner信息'}")
print("-" * 80)
for port_info in self.results['open_ports']:
banner = port_info.get('banner', '')[:50] + ('...' if len(port_info.get('banner', '')) > 50 else '')
print(f"{port_info['port']:<8} {port_info['service']:<15} {port_info['response_time']:<15} {banner}")
if not self.results['open_ports']:
print("\n未发现开放端口")
def save_results(self):
"""保存扫描结果"""
try:
# 确保输出目录存在
import os
output_dir = os.path.dirname(self.output_file) if os.path.dirname(self.output_file) else '.'
os.makedirs(output_dir, exist_ok=True)
if self.output_format == 'json':
self._save_json()
elif self.output_format == 'csv':
self._save_csv()
elif self.output_format == 'xml':
self._save_xml()
else:
logger.error(f"不支持的输出格式: {self.output_format}")
except Exception as e:
logger.error(f"保存扫描结果时出错: {str(e)}")
def _save_json(self):
"""保存为JSON格式"""
with open(self.output_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
logger.info(f"扫描结果已保存到 {self.output_file}")
def _save_csv(self):
"""保存为CSV格式"""
all_ports = self.results['open_ports'] + self.results['closed_ports'] + self.results['filtered_ports']
with open(self.output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Port', 'State', 'Service', 'Response Time (ms)', 'Banner'])
for port_info in all_ports:
writer.writerow([
port_info['port'],
port_info['state'],
port_info['service'],
port_info.get('response_time', ''),
port_info.get('banner', '')
])
logger.info(f"扫描结果已保存到 {self.output_file}")
def _save_xml(self):
"""保存为XML格式"""
root = ET.Element("port_scan")
root.set("target", self.target)
root.set("scan_time", self.results['scan_time'])
root.set("duration", str(self.results['duration']))
# 添加开放端口
open_ports_elem = ET.SubElement(root, "open_ports")
for port_info in self.results['open_ports']:
port_elem = ET.SubElement(open_ports_elem, "port")
port_elem.set("number", str(port_info['port']))
port_elem.set("service", port_info['service'])
port_elem.set("response_time", str(port_info.get('response_time', '')))
if port_info.get('banner'):
banner_elem = ET.SubElement(port_elem, "banner")
banner_elem.text = port_info['banner']
tree = ET.ElementTree(root)
tree.write(self.output_file, encoding='utf-8', xml_declaration=True)
logger.info(f"扫描结果已保存到 {self.output_file}")
def parse_ports(port_str):
"""解析端口参数"""
ports = set()
# 处理逗号分隔的端口列表
for part in port_str.split(','):
part = part.strip()
if '-' in part:
# 处理端口范围
try:
start, end = map(int, part.split('-'))
if 1 <= start <= 65535 and 1 <= end <= 65535 and start <= end:
ports.update(range(start, end + 1))
else:
raise ValueError(f"无效的端口范围: {part}")
except ValueError as e:
logger.error(f"解析端口范围时出错: {str(e)}")
sys.exit(1)
else:
# 处理单个端口
try:
port = int(part)
if 1 <= port <= 65535:
ports.add(port)
else:
raise ValueError(f"端口号超出有效范围: {port}")
except ValueError as e:
logger.error(f"解析端口时出错: {str(e)}")
sys.exit(1)
return sorted(list(ports))
def main():
parser = argparse.ArgumentParser(description='端口扫描器')
parser.add_argument('target', help='目标主机IP地址或域名')
parser.add_argument('-p', '--ports', default='1-1000', help='要扫描的端口,支持范围(如1-1000)和列表(如22,80,443)')
parser.add_argument('-t', '--type', choices=['tcp', 'syn', 'udp'], default='tcp', help='扫描类型')
parser.add_argument('--timeout', type=float, default=1.0, help='连接超时时间(秒)')
parser.add_argument('--threads', type=int, default=100, help='并发线程数')
parser.add_argument('--delay', type=int, default=0, help='扫描延迟(毫秒)')
parser.add_argument('-o', '--output', help='输出文件路径')
parser.add_argument('-f', '--format', choices=['json', 'csv', 'xml'], default='json', help='输出格式')
parser.add_argument('-v', '--verbose', action='store_true', help='详细输出')
parser.add_argument('--top-ports', type=int, help='扫描最常见的N个端口')
args = parser.parse_args()
# 解析端口
if args.top_ports:
# 使用最常见的端口
common_ports = sorted(COMMON_PORTS.keys())[:args.top_ports]
ports = common_ports
else:
ports = parse_ports(args.ports)
# 配置输出文件
output_file = args.output
if not output_file:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"scan_results_{timestamp}.{args.format}"
# 配置扫描参数
config = {
'target': args.target,
'ports': ports,
'scan_type': args.type,
'timeout': args.timeout,
'threads': args.threads,
'delay': args.delay,
'output_format': args.format,
'output_file': output_file,
'verbose': args.verbose
}
# 创建扫描器实例
scanner = PortScanner(config)
# 执行扫描
try:
scanner.scan_ports()
scanner.print_results()
scanner.save_results()
except KeyboardInterrupt:
logger.info("扫描被用户中断")
sys.exit(1)
except Exception as e:
logger.error(f"扫描过程中发生错误: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
main()
|