这是一个功能强大的端口扫描器脚本,能够快速扫描目标主机的开放端口和服务。该脚本具备以下核心功能:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((host, port)) sock.close() except socket.gaierror: logger.error(f"无法解析主机名: {host}") except socket.timeout: logger.warning(f"连接 {host}:{port} 超时") except ConnectionRefusedError: logger.info(f"端口 {port} 被拒绝连接") except PermissionError: logger.error(f"无权限扫描端口 {port},可能需要管理员权限") |
|
1 2 3 4 5 |
if not is_valid_ip(target): raise ValueError(f"无效的IP地址: {target}")
if port < 1 or port > 65535: raise ValueError(f"端口号超出有效范围: {port}") |
|
1 2 3 4 5 6 7 |
try: with open(output_file, 'w') as f: json.dump(scan_results, f, indent=2) except PermissionError: logger.error(f"无权限写入文件: {output_file}") except IOError as e: logger.error(f"文件写入错误: {str(e)}") |
|
1 2 3 4 5 6 7 |
try: # 大规模扫描时的内存管理 if len(ports) > 10000: logger.warning("扫描端口数量过多,可能消耗大量内存") except MemoryError: logger.error("内存不足,无法完成扫描") sys.exit(1) |
|
|
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 端口扫描器 功能:扫描目标主机的开放端口和服务 作者:Cline 版本:1.0 """
import socket import argparse import sys import json import csv import xml.etree.ElementTree as ET from datetime import datetime import threading from concurrent.futures import ThreadPoolExecutor, as_completed import time import logging import ipaddress
# 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('port_scanner.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__)
# 常见端口和服务映射 COMMON_PORTS = { 21: 'FTP', 22: 'SSH', 23: 'Telnet', 25: 'SMTP', 53: 'DNS', 80: 'HTTP', 110: 'POP3', 143: 'IMAP', 443: 'HTTPS', 993: 'IMAPS', 995: 'POP3S', 1433: 'MSSQL', 1521: 'Oracle', 3306: 'MySQL', 3389: 'RDP', 5432: 'PostgreSQL', 6379: 'Redis', 8080: 'HTTP-Alt', 8443: 'HTTPS-Alt', 27017: 'MongoDB' }
class PortScanner: def __init__(self, config): self.target = config['target'] self.ports = config['ports'] self.scan_type = config.get('scan_type', 'tcp') # tcp, syn, udp self.timeout = config.get('timeout', 1.0) self.threads = config.get('threads', 100) self.delay = config.get('delay', 0) # 扫描延迟(毫秒) self.output_format = config.get('output_format', 'json') self.output_file = config.get('output_file', 'scan_results.json') self.verbose = config.get('verbose', False)
# 扫描结果 self.results = { 'target': self.target, 'scan_time': datetime.now().isoformat(), 'scan_type': self.scan_type, 'open_ports': [], 'closed_ports': [], 'filtered_ports': [] }
def is_valid_ip(self, ip): """验证IP地址格式""" try: ipaddress.ip_address(ip) return True except ValueError: return False
def resolve_hostname(self, hostname): """解析主机名到IP地址""" try: ip = socket.gethostbyname(hostname) return ip except socket.gaierror: logger.error(f"无法解析主机名: {hostname}") return None
def scan_port(self, port): """扫描单个端口""" host = self.target if not self.is_valid_ip(host): host = self.resolve_hostname(host) if not host: return None
# 添加扫描延迟 if self.delay > 0: time.sleep(self.delay / 1000.0)
try: if self.scan_type == 'tcp': return self.tcp_connect_scan(host, port) elif self.scan_type == 'syn': return self.syn_scan(host, port) elif self.scan_type == 'udp': return self.udp_scan(host, port) else: logger.error(f"不支持的扫描类型: {self.scan_type}") return None
except Exception as e: logger.error(f"扫描端口 {port} 时发生错误: {str(e)}") return None
def tcp_connect_scan(self, host, port): """TCP连接扫描""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout)
start_time = time.time() result = sock.connect_ex((host, port)) end_time = time.time()
response_time = round((end_time - start_time) * 1000, 2) # 毫秒 sock.close()
service = COMMON_PORTS.get(port, 'Unknown')
if result == 0: # 端口开放,尝试获取服务banner banner = self.get_service_banner(host, port) return { 'port': port, 'state': 'open', 'service': service, 'response_time': response_time, 'banner': banner } else: return { 'port': port, 'state': 'closed', 'service': service, 'response_time': response_time }
except socket.timeout: return { 'port': port, 'state': 'filtered', 'service': COMMON_PORTS.get(port, 'Unknown'), 'response_time': self.timeout * 1000 } except Exception as e: logger.debug(f"TCP扫描端口 {port} 时发生错误: {str(e)}") return { 'port': port, 'state': 'unknown', 'service': COMMON_PORTS.get(port, 'Unknown') }
def syn_scan(self, host, port): """SYN扫描(需要root权限)""" try: # 这里简化实现,实际SYN扫描需要使用原始套接字 logger.warning("SYN扫描需要root权限,降级为TCP连接扫描") return self.tcp_connect_scan(host, port) except Exception as e: logger.error(f"SYN扫描端口 {port} 时发生错误: {str(e)}") return None
def udp_scan(self, host, port): """UDP扫描""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(self.timeout)
# 发送空数据包 sock.sendto(b'', (host, port))
try: # 尝试接收响应 data, addr = sock.recvfrom(1024) sock.close()
service = COMMON_PORTS.get(port, 'Unknown') banner = data.decode('utf-8', errors='ignore')[:100]
return { 'port': port, 'state': 'open', 'service': service, 'banner': banner } except socket.timeout: # UDP端口可能开放但无响应 sock.close() return { 'port': port, 'state': 'open|filtered', 'service': COMMON_PORTS.get(port, 'Unknown') }
except Exception as e: logger.debug(f"UDP扫描端口 {port} 时发生错误: {str(e)}") return { 'port': port, 'state': 'closed', 'service': COMMON_PORTS.get(port, 'Unknown') }
def get_service_banner(self, host, port): """获取服务banner信息""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2.0) sock.connect((host, port))
# 发送简单的探测数据 if port in [21, 22, 23, 25, 80, 110, 143]: sock.send(b'\r\n')
# 接收响应 banner = sock.recv(1024).decode('utf-8', errors='ignore') sock.close()
# 清理banner信息 banner = banner.strip().replace('\r\n', ' ').replace('\n', ' ') return banner[:200] # 限制长度
except Exception as e: logger.debug(f"获取端口 {port} 的banner时发生错误: {str(e)}") return ""
def scan_ports(self): """扫描所有端口""" logger.info(f"开始扫描 {self.target} 的端口...") logger.info(f"扫描类型: {self.scan_type}, 超时时间: {self.timeout}s, 线程数: {self.threads}")
start_time = time.time()
with ThreadPoolExecutor(max_workers=self.threads) as executor: # 提交所有扫描任务 future_to_port = { executor.submit(self.scan_port, port): port for port in self.ports }
# 收集结果 for future in as_completed(future_to_port): port = future_to_port[future] try: result = future.result() if result: if result['state'] == 'open': self.results['open_ports'].append(result) if self.verbose: logger.info(f"端口 {result['port']} ({result['service']}) 开放") elif result['state'] == 'closed': self.results['closed_ports'].append(result) elif result['state'] in ['filtered', 'open|filtered']: self.results['filtered_ports'].append(result)
except Exception as e: logger.error(f"处理端口 {port} 的结果时出错: {str(e)}")
end_time = time.time() self.results['duration'] = round(end_time - start_time, 2)
# 按端口号排序结果 self.results['open_ports'].sort(key=lambda x: x['port']) self.results['closed_ports'].sort(key=lambda x: x['port']) self.results['filtered_ports'].sort(key=lambda x: x['port'])
logger.info(f"扫描完成,耗时 {self.results['duration']} 秒") logger.info(f"开放端口: {len(self.results['open_ports'])}") logger.info(f"关闭端口: {len(self.results['closed_ports'])}") logger.info(f"过滤端口: {len(self.results['filtered_ports'])}")
return self.results
def print_results(self): """打印扫描结果""" print("\n" + "="*60) print(f"端口扫描报告 - 目标: {self.target}") print("="*60) print(f"扫描时间: {self.results['scan_time']}") print(f"扫描类型: {self.results['scan_type']}") print(f"扫描耗时: {self.results['duration']} 秒") print(f"开放端口: {len(self.results['open_ports'])}") print(f"关闭端口: {len(self.results['closed_ports'])}") print(f"过滤端口: {len(self.results['filtered_ports'])}")
if self.results['open_ports']: print("\n开放端口详情:") print("-" * 80) print(f"{'端口':<8} {'服务':<15} {'响应时间(ms)':<15} {'Banner信息'}") print("-" * 80) for port_info in self.results['open_ports']: banner = port_info.get('banner', '')[:50] + ('...' if len(port_info.get('banner', '')) > 50 else '') print(f"{port_info['port']:<8} {port_info['service']:<15} {port_info['response_time']:<15} {banner}")
if not self.results['open_ports']: print("\n未发现开放端口")
def save_results(self): """保存扫描结果""" try: # 确保输出目录存在 import os output_dir = os.path.dirname(self.output_file) if os.path.dirname(self.output_file) else '.' os.makedirs(output_dir, exist_ok=True)
if self.output_format == 'json': self._save_json() elif self.output_format == 'csv': self._save_csv() elif self.output_format == 'xml': self._save_xml() else: logger.error(f"不支持的输出格式: {self.output_format}")
except Exception as e: logger.error(f"保存扫描结果时出错: {str(e)}")
def _save_json(self): """保存为JSON格式""" with open(self.output_file, 'w', encoding='utf-8') as f: json.dump(self.results, f, indent=2, ensure_ascii=False) logger.info(f"扫描结果已保存到 {self.output_file}")
def _save_csv(self): """保存为CSV格式""" all_ports = self.results['open_ports'] + self.results['closed_ports'] + self.results['filtered_ports']
with open(self.output_file, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['Port', 'State', 'Service', 'Response Time (ms)', 'Banner'])
for port_info in all_ports: writer.writerow([ port_info['port'], port_info['state'], port_info['service'], port_info.get('response_time', ''), port_info.get('banner', '') ]) logger.info(f"扫描结果已保存到 {self.output_file}")
def _save_xml(self): """保存为XML格式""" root = ET.Element("port_scan") root.set("target", self.target) root.set("scan_time", self.results['scan_time']) root.set("duration", str(self.results['duration']))
# 添加开放端口 open_ports_elem = ET.SubElement(root, "open_ports") for port_info in self.results['open_ports']: port_elem = ET.SubElement(open_ports_elem, "port") port_elem.set("number", str(port_info['port'])) port_elem.set("service", port_info['service']) port_elem.set("response_time", str(port_info.get('response_time', ''))) if port_info.get('banner'): banner_elem = ET.SubElement(port_elem, "banner") banner_elem.text = port_info['banner']
tree = ET.ElementTree(root) tree.write(self.output_file, encoding='utf-8', xml_declaration=True) logger.info(f"扫描结果已保存到 {self.output_file}")
def parse_ports(port_str): """解析端口参数""" ports = set()
# 处理逗号分隔的端口列表 for part in port_str.split(','): part = part.strip() if '-' in part: # 处理端口范围 try: start, end = map(int, part.split('-')) if 1 <= start <= 65535 and 1 <= end <= 65535 and start <= end: ports.update(range(start, end + 1)) else: raise ValueError(f"无效的端口范围: {part}") except ValueError as e: logger.error(f"解析端口范围时出错: {str(e)}") sys.exit(1) else: # 处理单个端口 try: port = int(part) if 1 <= port <= 65535: ports.add(port) else: raise ValueError(f"端口号超出有效范围: {port}") except ValueError as e: logger.error(f"解析端口时出错: {str(e)}") sys.exit(1)
return sorted(list(ports))
def main(): parser = argparse.ArgumentParser(description='端口扫描器') parser.add_argument('target', help='目标主机IP地址或域名') parser.add_argument('-p', '--ports', default='1-1000', help='要扫描的端口,支持范围(如1-1000)和列表(如22,80,443)') parser.add_argument('-t', '--type', choices=['tcp', 'syn', 'udp'], default='tcp', help='扫描类型') parser.add_argument('--timeout', type=float, default=1.0, help='连接超时时间(秒)') parser.add_argument('--threads', type=int, default=100, help='并发线程数') parser.add_argument('--delay', type=int, default=0, help='扫描延迟(毫秒)') parser.add_argument('-o', '--output', help='输出文件路径') parser.add_argument('-f', '--format', choices=['json', 'csv', 'xml'], default='json', help='输出格式') parser.add_argument('-v', '--verbose', action='store_true', help='详细输出') parser.add_argument('--top-ports', type=int, help='扫描最常见的N个端口')
args = parser.parse_args()
# 解析端口 if args.top_ports: # 使用最常见的端口 common_ports = sorted(COMMON_PORTS.keys())[:args.top_ports] ports = common_ports else: ports = parse_ports(args.ports)
# 配置输出文件 output_file = args.output if not output_file: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"scan_results_{timestamp}.{args.format}"
# 配置扫描参数 config = { 'target': args.target, 'ports': ports, 'scan_type': args.type, 'timeout': args.timeout, 'threads': args.threads, 'delay': args.delay, 'output_format': args.format, 'output_file': output_file, 'verbose': args.verbose }
# 创建扫描器实例 scanner = PortScanner(config)
# 执行扫描 try: scanner.scan_ports() scanner.print_results() scanner.save_results() except KeyboardInterrupt: logger.info("扫描被用户中断") sys.exit(1) except Exception as e: logger.error(f"扫描过程中发生错误: {str(e)}") sys.exit(1)
if __name__ == '__main__': main() |
|
1 2 3 4 5 6 7 8 9 10 11 |
# 扫描单个主机的默认端口范围(1-1000) python port_scanner.py 192.168.1.1
# 扫描特定端口 python port_scanner.py 192.168.1.1 -p 22,80,443
# 扫描端口范围 python port_scanner.py 192.168.1.1 -p 1-10000
# 扫描最常见的100个端口 python port_scanner.py 192.168.1.1 --top-ports 100 |
|
1 2 3 4 5 6 7 8 |
# TCP连接扫描(默认) python port_scanner.py 192.168.1.1 -t tcp
# UDP扫描 python port_scanner.py 192.168.1.1 -t udp
# SYN扫描(需要root权限) sudo python port_scanner.py 192.168.1.1 -t syn |
|
1 2 3 4 5 6 7 8 |
# 调整超时时间 python port_scanner.py 192.168.1.1 --timeout 2.0
# 调整并发线程数 python port_scanner.py 192.168.1.1 --threads 200
# 添加扫描延迟(毫秒) python port_scanner.py 192.168.1.1 --delay 100 |
|
1 2 3 4 5 6 7 8 |
# JSON格式输出(默认) python port_scanner.py 192.168.1.1 -o results.json
# CSV格式输出 python port_scanner.py 192.168.1.1 -f csv -o results.csv
# XML格式输出 python port_scanner.py 192.168.1.1 -f xml -o results.xml |
|
1 2 |
# 显示详细扫描过程 python port_scanner.py 192.168.1.1 -v |
可以通过脚本扩展实现批量扫描多个主机:
|
1 2 3 4 5 6 |
targets = ['192.168.1.1', '192.168.1.2', '192.168.1.3'] for target in targets: config = {'target': target, 'ports': [22, 80, 443]} scanner = PortScanner(config) scanner.scan_ports() scanner.save_results() |
可以结合cron定时任务实现定期扫描:
|
1 2 |
# 每小时扫描一次关键服务器 0 * * * * /usr/bin/python3 /path/to/port_scanner.py 192.168.1.1 -p 22,80,443 -o /var/log/scan_$(date +\%Y\%m\%d_\%H\%M\%S).json |
扫描结果可以用于进一步分析:
|
1 2 3 4 5 6 7 8 9 10 |
import json
# 加载扫描结果 with open('scan_results.json', 'r') as f: results = json.load(f)
# 分析开放端口 open_ports = results['open_ports'] for port in open_ports: print(f"发现开放端口: {port['port']} ({port['service']})") |
这个端口扫描器是一个功能完整、安全可靠的网络探测工具,能够帮助网络管理员和安全专业人员快速识别网络资产和服务状态。