这是一个功能强大的端口扫描器脚本,能够快速扫描目标主机的开放端口和服务。该脚本具备以下核心功能:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((host, port)) sock.close() except socket.gaierror: logger.error(f"无法解析主机名: {host}") except socket.timeout: logger.warning(f"连接 {host}:{port} 超时") except ConnectionRefusedError: logger.info(f"端口 {port} 被拒绝连接") except PermissionError: logger.error(f"无权限扫描端口 {port},可能需要管理员权限") |
|
1 2 3 4 5 |
if not is_valid_ip(target): raise ValueError(f"无效的IP地址: {target}")
if port < 1 or port > 65535: raise ValueError(f"端口号超出有效范围: {port}") |
|
1 2 3 4 5 6 7 |
try: with open(output_file, 'w') as f: json.dump(scan_results, f, indent=2) except PermissionError: logger.error(f"无权限写入文件: {output_file}") except IOError as e: logger.error(f"文件写入错误: {str(e)}") |
|
1 2 3 4 5 6 7 |
try: # 大规模扫描时的内存管理 if len(ports) > 10000: logger.warning("扫描端口数量过多,可能消耗大量内存") except MemoryError: logger.error("内存不足,无法完成扫描") sys.exit(1) |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 |
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 端口扫描器 功能:扫描目标主机的开放端口和服务 作者:Cline 版本:1.0 """
import socket import argparse import sys import json import csv import xml.etree.ElementTree as ET from datetime import datetime import threading from concurrent.futures import ThreadPoolExecutor, as_completed import time import logging import ipaddress
# 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('port_scanner.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__)
# 常见端口和服务映射 COMMON_PORTS = { 21: 'FTP', 22: 'SSH', 23: 'Telnet', 25: 'SMTP', 53: 'DNS', 80: 'HTTP', 110: 'POP3', 143: 'IMAP', 443: 'HTTPS', 993: 'IMAPS', 995: 'POP3S', 1433: 'MSSQL', 1521: 'Oracle', 3306: 'MySQL', 3389: 'RDP', 5432: 'PostgreSQL', 6379: 'Redis', 8080: 'HTTP-Alt', 8443: 'HTTPS-Alt', 27017: 'MongoDB' }
class PortScanner: def __init__(self, config): self.target = config['target'] self.ports = config['ports'] self.scan_type = config.get('scan_type', 'tcp') # tcp, syn, udp self.timeout = config.get('timeout', 1.0) self.threads = config.get('threads', 100) self.delay = config.get('delay', 0) # 扫描延迟(毫秒) self.output_format = config.get('output_format', 'json') self.output_file = config.get('output_file', 'scan_results.json') self.verbose = config.get('verbose', False)
# 扫描结果 self.results = { 'target': self.target, 'scan_time': datetime.now().isoformat(), 'scan_type': self.scan_type, 'open_ports': [], 'closed_ports': [], 'filtered_ports': [] }
def is_valid_ip(self, ip): """验证IP地址格式""" try: ipaddress.ip_address(ip) return True except ValueError: return False
def resolve_hostname(self, hostname): """解析主机名到IP地址""" try: ip = socket.gethostbyname(hostname) return ip except socket.gaierror: logger.error(f"无法解析主机名: {hostname}") return None
def scan_port(self, port): """扫描单个端口""" host = self.target if not self.is_valid_ip(host): host = self.resolve_hostname(host) if not host: return None
# 添加扫描延迟 if self.delay > 0: time.sleep(self.delay / 1000.0)
try: if self.scan_type == 'tcp': return self.tcp_connect_scan(host, port) elif self.scan_type == 'syn': return self.syn_scan(host, port) elif self.scan_type == 'udp': return self.udp_scan(host, port) else: logger.error(f"不支持的扫描类型: {self.scan_type}") return None
except Exception as e: logger.error(f"扫描端口 {port} 时发生错误: {str(e)}") return None
def tcp_connect_scan(self, host, port): """TCP连接扫描""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout)
start_time = time.time() result = sock.connect_ex((host, port)) end_time = time.time()
response_time = round((end_time - start_time) * 1000, 2) # 毫秒 sock.close()
service = COMMON_PORTS.get(port, 'Unknown')
if result == 0: # 端口开放,尝试获取服务banner banner = self.get_service_banner(host, port) return { 'port': port, 'state': 'open', 'service': service, 'response_time': response_time, 'banner': banner } else: return { 'port': port, 'state': 'closed', 'service': service, 'response_time': response_time }
except socket.timeout: return { 'port': port, 'state': 'filtered', 'service': COMMON_PORTS.get(port, 'Unknown'), 'response_time': self.timeout * 1000 } except Exception as e: logger.debug(f"TCP扫描端口 {port} 时发生错误: {str(e)}") return { 'port': port, 'state': 'unknown', 'service': COMMON_PORTS.get(port, 'Unknown') }
def syn_scan(self, host, port): """SYN扫描(需要root权限)""" try: # 这里简化实现,实际SYN扫描需要使用原始套接字 logger.warning("SYN扫描需要root权限,降级为TCP连接扫描") return self.tcp_connect_scan(host, port) except Exception as e: logger.error(f"SYN扫描端口 {port} 时发生错误: {str(e)}") return None
def udp_scan(self, host, port): """UDP扫描""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(self.timeout)
# 发送空数据包 sock.sendto(b'', (host, port))
try: # 尝试接收响应 data, addr = sock.recvfrom(1024) sock.close()
service = COMMON_PORTS.get(port, 'Unknown') banner = data.decode('utf-8', errors='ignore')[:100]
return { 'port': port, 'state': 'open', 'service': service, 'banner': banner } except socket.timeout: # UDP端口可能开放但无响应 sock.close() return { 'port': port, 'state': 'open|filtered', 'service': COMMON_PORTS.get(port, 'Unknown') }
except Exception as e: logger.debug(f"UDP扫描端口 {port} 时发生错误: {str(e)}") return { 'port': port, 'state': 'closed', 'service': COMMON_PORTS.get(port, 'Unknown') }
def get_service_banner(self, host, port): """获取服务banner信息""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2.0) sock.connect((host, port))
# 发送简单的探测数据 if port in [21, 22, 23, 25, 80, 110, 143]: sock.send(b'\r\n')
# 接收响应 banner = sock.recv(1024).decode('utf-8', errors='ignore') sock.close()
# 清理banner信息 banner = banner.strip().replace('\r\n', ' ').replace('\n', ' ') return banner[:200] # 限制长度
except Exception as e: logger.debug(f"获取端口 {port} 的banner时发生错误: {str(e)}") return ""
def scan_ports(self): """扫描所有端口""" logger.info(f"开始扫描 {self.target} 的端口...") logger.info(f"扫描类型: {self.scan_type}, 超时时间: {self.timeout}s, 线程数: {self.threads}")
start_time = time.time()
with ThreadPoolExecutor(max_workers=self.threads) as executor: # 提交所有扫描任务 future_to_port = { executor.submit(self.scan_port, port): port for port in self.ports }
# 收集结果 for future in as_completed(future_to_port): port = future_to_port[future] try: result = future.result() if result: if result['state'] == 'open': self.results['open_ports'].append(result) if self.verbose: logger.info(f"端口 {result['port']} ({result['service']}) 开放") elif result['state'] == 'closed': self.results['closed_ports'].append(result) elif result['state'] in ['filtered', 'open|filtered']: self.results['filtered_ports'].append(result)
except Exception as e: logger.error(f"处理端口 {port} 的结果时出错: {str(e)}")
end_time = time.time() self.results['duration'] = round(end_time - start_time, 2)
# 按端口号排序结果 self.results['open_ports'].sort(key=lambda x: x['port']) self.results['closed_ports'].sort(key=lambda x: x['port']) self.results['filtered_ports'].sort(key=lambda x: x['port'])
logger.info(f"扫描完成,耗时 {self.results['duration']} 秒") logger.info(f"开放端口: {len(self.results['open_ports'])}") logger.info(f"关闭端口: {len(self.results['closed_ports'])}") logger.info(f"过滤端口: {len(self.results['filtered_ports'])}")
return self.results
def print_results(self): """打印扫描结果""" print("\n" + "="*60) print(f"端口扫描报告 - 目标: {self.target}") print("="*60) print(f"扫描时间: {self.results['scan_time']}") print(f"扫描类型: {self.results['scan_type']}") print(f"扫描耗时: {self.results['duration']} 秒") print(f"开放端口: {len(self.results['open_ports'])}") print(f"关闭端口: {len(self.results['closed_ports'])}") print(f"过滤端口: {len(self.results['filtered_ports'])}")
if self.results['open_ports']: print("\n开放端口详情:") print("-" * 80) print(f"{'端口':<8} {'服务':<15} {'响应时间(ms)':<15} {'Banner信息'}") print("-" * 80) for port_info in self.results['open_ports']: banner = port_info.get('banner', '')[:50] + ('...' if len(port_info.get('banner', '')) > 50 else '') print(f"{port_info['port']:<8} {port_info['service']:<15} {port_info['response_time']:<15} {banner}")
if not self.results['open_ports']: print("\n未发现开放端口")
def save_results(self): """保存扫描结果""" try: # 确保输出目录存在 import os output_dir = os.path.dirname(self.output_file) if os.path.dirname(self.output_file) else '.' os.makedirs(output_dir, exist_ok=True)
if self.output_format == 'json': self._save_json() elif self.output_format == 'csv': self._save_csv() elif self.output_format == 'xml': self._save_xml() else: logger.error(f"不支持的输出格式: {self.output_format}")
except Exception as e: logger.error(f"保存扫描结果时出错: {str(e)}")
def _save_json(self): """保存为JSON格式""" with open(self.output_file, 'w', encoding='utf-8') as f: json.dump(self.results, f, indent=2, ensure_ascii=False) logger.info(f"扫描结果已保存到 {self.output_file}")
def _save_csv(self): """保存为CSV格式""" all_ports = self.results['open_ports'] + self.results['closed_ports'] + self.results['filtered_ports']
with open(self.output_file, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['Port', 'State', 'Service', 'Response Time (ms)', 'Banner'])
for port_info in all_ports: writer.writerow([ port_info['port'], port_info['state'], port_info['service'], port_info.get('response_time', ''), port_info.get('banner', '') ]) logger.info(f"扫描结果已保存到 {self.output_file}")
def _save_xml(self): """保存为XML格式""" root = ET.Element("port_scan") root.set("target", self.target) root.set("scan_time", self.results['scan_time']) root.set("duration", str(self.results['duration']))
# 添加开放端口 open_ports_elem = ET.SubElement(root, "open_ports") for port_info in self.results['open_ports']: port_elem = ET.SubElement(open_ports_elem, "port") port_elem.set("number", str(port_info['port'])) port_elem.set("service", port_info['service']) port_elem.set("response_time", str(port_info.get('response_time', ''))) if port_info.get('banner'): banner_elem = ET.SubElement(port_elem, "banner") banner_elem.text = port_info['banner']
tree = ET.ElementTree(root) tree.write(self.output_file, encoding='utf-8', xml_declaration=True) logger.info(f"扫描结果已保存到 {self.output_file}")
def parse_ports(port_str): """解析端口参数""" ports = set()
# 处理逗号分隔的端口列表 for part in port_str.split(','): part = part.strip() if '-' in part: # 处理端口范围 try: start, end = map(int, part.split('-')) if 1 <= start <= 65535 and 1 <= end <= 65535 and start <= end: ports.update(range(start, end + 1)) else: raise ValueError(f"无效的端口范围: {part}") except ValueError as e: logger.error(f"解析端口范围时出错: {str(e)}") sys.exit(1) else: # 处理单个端口 try: port = int(part) if 1 <= port <= 65535: ports.add(port) else: raise ValueError(f"端口号超出有效范围: {port}") except ValueError as e: logger.error(f"解析端口时出错: {str(e)}") sys.exit(1)
return sorted(list(ports))
def main(): parser = argparse.ArgumentParser(description='端口扫描器') parser.add_argument('target', help='目标主机IP地址或域名') parser.add_argument('-p', '--ports', default='1-1000', help='要扫描的端口,支持范围(如1-1000)和列表(如22,80,443)') parser.add_argument('-t', '--type', choices=['tcp', 'syn', 'udp'], default='tcp', help='扫描类型') parser.add_argument('--timeout', type=float, default=1.0, help='连接超时时间(秒)') parser.add_argument('--threads', type=int, default=100, help='并发线程数') parser.add_argument('--delay', type=int, default=0, help='扫描延迟(毫秒)') parser.add_argument('-o', '--output', help='输出文件路径') parser.add_argument('-f', '--format', choices=['json', 'csv', 'xml'], default='json', help='输出格式') parser.add_argument('-v', '--verbose', action='store_true', help='详细输出') parser.add_argument('--top-ports', type=int, help='扫描最常见的N个端口')
args = parser.parse_args()
# 解析端口 if args.top_ports: # 使用最常见的端口 common_ports = sorted(COMMON_PORTS.keys())[:args.top_ports] ports = common_ports else: ports = parse_ports(args.ports)
# 配置输出文件 output_file = args.output if not output_file: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"scan_results_{timestamp}.{args.format}"
# 配置扫描参数 config = { 'target': args.target, 'ports': ports, 'scan_type': args.type, 'timeout': args.timeout, 'threads': args.threads, 'delay': args.delay, 'output_format': args.format, 'output_file': output_file, 'verbose': args.verbose }
# 创建扫描器实例 scanner = PortScanner(config)
# 执行扫描 try: scanner.scan_ports() scanner.print_results() scanner.save_results() except KeyboardInterrupt: logger.info("扫描被用户中断") sys.exit(1) except Exception as e: logger.error(f"扫描过程中发生错误: {str(e)}") sys.exit(1)
if __name__ == '__main__': main() |
|
1 2 3 4 5 6 7 8 9 10 11 |
# 扫描单个主机的默认端口范围(1-1000) python port_scanner.py 192.168.1.1
# 扫描特定端口 python port_scanner.py 192.168.1.1 -p 22,80,443
# 扫描端口范围 python port_scanner.py 192.168.1.1 -p 1-10000
# 扫描最常见的100个端口 python port_scanner.py 192.168.1.1 --top-ports 100 |
|
1 2 3 4 5 6 7 8 |
# TCP连接扫描(默认) python port_scanner.py 192.168.1.1 -t tcp
# UDP扫描 python port_scanner.py 192.168.1.1 -t udp
# SYN扫描(需要root权限) sudo python port_scanner.py 192.168.1.1 -t syn |
|
1 2 3 4 5 6 7 8 |
# 调整超时时间 python port_scanner.py 192.168.1.1 --timeout 2.0
# 调整并发线程数 python port_scanner.py 192.168.1.1 --threads 200
# 添加扫描延迟(毫秒) python port_scanner.py 192.168.1.1 --delay 100 |
|
1 2 3 4 5 6 7 8 |
# JSON格式输出(默认) python port_scanner.py 192.168.1.1 -o results.json
# CSV格式输出 python port_scanner.py 192.168.1.1 -f csv -o results.csv
# XML格式输出 python port_scanner.py 192.168.1.1 -f xml -o results.xml |
|
1 2 |
# 显示详细扫描过程 python port_scanner.py 192.168.1.1 -v |
可以通过脚本扩展实现批量扫描多个主机:
|
1 2 3 4 5 6 |
targets = ['192.168.1.1', '192.168.1.2', '192.168.1.3'] for target in targets: config = {'target': target, 'ports': [22, 80, 443]} scanner = PortScanner(config) scanner.scan_ports() scanner.save_results() |
可以结合cron定时任务实现定期扫描:
|
1 2 |
# 每小时扫描一次关键服务器 0 * * * * /usr/bin/python3 /path/to/port_scanner.py 192.168.1.1 -p 22,80,443 -o /var/log/scan_$(date +\%Y\%m\%d_\%H\%M\%S).json |
扫描结果可以用于进一步分析:
|
1 2 3 4 5 6 7 8 9 10 |
import json
# 加载扫描结果 with open('scan_results.json', 'r') as f: results = json.load(f)
# 分析开放端口 open_ports = results['open_ports'] for port in open_ports: print(f"发现开放端口: {port['port']} ({port['service']})") |
这个端口扫描器是一个功能完整、安全可靠的网络探测工具,能够帮助网络管理员和安全专业人员快速识别网络资产和服务状态。