python
主页 > 脚本 > python >

Python实现快速扫描目标主机的开放端口和服务

2025-12-04 | 佚名 | 点击:

功能介绍

这是一个功能强大的端口扫描器脚本,能够快速扫描目标主机的开放端口和服务。该脚本具备以下核心功能:

场景应用

1. 网络安全审计

2. 系统管理维护

3. 网络故障排查

4. 合规性检查

报错处理

1. 网络连接异常

1

2

3

4

5

6

7

8

9

10

11

12

13

try:

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    sock.settimeout(timeout)

    result = sock.connect_ex((host, port))

    sock.close()

except socket.gaierror:

    logger.error(f"无法解析主机名: {host}")

except socket.timeout:

    logger.warning(f"连接 {host}:{port} 超时")

except ConnectionRefusedError:

    logger.info(f"端口 {port} 被拒绝连接")

except PermissionError:

    logger.error(f"无权限扫描端口 {port},可能需要管理员权限")

2. 参数验证错误

1

2

3

4

5

if not is_valid_ip(target):

    raise ValueError(f"无效的IP地址: {target}")

 

if port < 1 or port > 65535:

    raise ValueError(f"端口号超出有效范围: {port}")

3. 文件操作异常

1

2

3

4

5

6

7

try:

    with open(output_file, 'w') as f:

        json.dump(scan_results, f, indent=2)

except PermissionError:

    logger.error(f"无权限写入文件: {output_file}")

except IOError as e:

    logger.error(f"文件写入错误: {str(e)}")

4. 内存不足异常

1

2

3

4

5

6

7

try:

    # 大规模扫描时的内存管理

    if len(ports) > 10000:

        logger.warning("扫描端口数量过多,可能消耗大量内存")

except MemoryError:

    logger.error("内存不足,无法完成扫描")

    sys.exit(1)

代码实现

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

端口扫描器

功能:扫描目标主机的开放端口和服务

作者:Cline

版本:1.0

"""

 

import socket

import argparse

import sys

import json

import csv

import xml.etree.ElementTree as ET

from datetime import datetime

import threading

from concurrent.futures import ThreadPoolExecutor, as_completed

import time

import logging

import ipaddress

 

# 配置日志

logging.basicConfig(

    level=logging.INFO,

    format='%(asctime)s - %(levelname)s - %(message)s',

    handlers=[

        logging.FileHandler('port_scanner.log'),

        logging.StreamHandler(sys.stdout)

    ]

)

logger = logging.getLogger(__name__)

 

# 常见端口和服务映射

COMMON_PORTS = {

    21: 'FTP',

    22: 'SSH',

    23: 'Telnet',

    25: 'SMTP',

    53: 'DNS',

    80: 'HTTP',

    110: 'POP3',

    143: 'IMAP',

    443: 'HTTPS',

    993: 'IMAPS',

    995: 'POP3S',

    1433: 'MSSQL',

    1521: 'Oracle',

    3306: 'MySQL',

    3389: 'RDP',

    5432: 'PostgreSQL',

    6379: 'Redis',

    8080: 'HTTP-Alt',

    8443: 'HTTPS-Alt',

    27017: 'MongoDB'

}

 

class PortScanner:

    def __init__(self, config):

        self.target = config['target']

        self.ports = config['ports']

        self.scan_type = config.get('scan_type', 'tcp')  # tcp, syn, udp

        self.timeout = config.get('timeout', 1.0)

        self.threads = config.get('threads', 100)

        self.delay = config.get('delay', 0)  # 扫描延迟(毫秒)

        self.output_format = config.get('output_format', 'json')

        self.output_file = config.get('output_file', 'scan_results.json')

        self.verbose = config.get('verbose', False)

         

        # 扫描结果

        self.results = {

            'target': self.target,

            'scan_time': datetime.now().isoformat(),

            'scan_type': self.scan_type,

            'open_ports': [],

            'closed_ports': [],

            'filtered_ports': []

        }

         

    def is_valid_ip(self, ip):

        """验证IP地址格式"""

        try:

            ipaddress.ip_address(ip)

            return True

        except ValueError:

            return False

             

    def resolve_hostname(self, hostname):

        """解析主机名到IP地址"""

        try:

            ip = socket.gethostbyname(hostname)

            return ip

        except socket.gaierror:

            logger.error(f"无法解析主机名: {hostname}")

            return None

             

    def scan_port(self, port):

        """扫描单个端口"""

        host = self.target

        if not self.is_valid_ip(host):

            host = self.resolve_hostname(host)

            if not host:

                return None

                 

        # 添加扫描延迟

        if self.delay > 0:

            time.sleep(self.delay / 1000.0)

             

        try:

            if self.scan_type == 'tcp':

                return self.tcp_connect_scan(host, port)

            elif self.scan_type == 'syn':

                return self.syn_scan(host, port)

            elif self.scan_type == 'udp':

                return self.udp_scan(host, port)

            else:

                logger.error(f"不支持的扫描类型: {self.scan_type}")

                return None

                 

        except Exception as e:

            logger.error(f"扫描端口 {port} 时发生错误: {str(e)}")

            return None

             

    def tcp_connect_scan(self, host, port):

        """TCP连接扫描"""

        try:

            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            sock.settimeout(self.timeout)

             

            start_time = time.time()

            result = sock.connect_ex((host, port))

            end_time = time.time()

             

            response_time = round((end_time - start_time) * 1000, 2)  # 毫秒

            sock.close()

             

            service = COMMON_PORTS.get(port, 'Unknown')

             

            if result == 0:

                # 端口开放,尝试获取服务banner

                banner = self.get_service_banner(host, port)

                return {

                    'port': port,

                    'state': 'open',

                    'service': service,

                    'response_time': response_time,

                    'banner': banner

                }

            else:

                return {

                    'port': port,

                    'state': 'closed',

                    'service': service,

                    'response_time': response_time

                }

                 

        except socket.timeout:

            return {

                'port': port,

                'state': 'filtered',

                'service': COMMON_PORTS.get(port, 'Unknown'),

                'response_time': self.timeout * 1000

            }

        except Exception as e:

            logger.debug(f"TCP扫描端口 {port} 时发生错误: {str(e)}")

            return {

                'port': port,

                'state': 'unknown',

                'service': COMMON_PORTS.get(port, 'Unknown')

            }

             

    def syn_scan(self, host, port):

        """SYN扫描(需要root权限)"""

        try:

            # 这里简化实现,实际SYN扫描需要使用原始套接字

            logger.warning("SYN扫描需要root权限,降级为TCP连接扫描")

            return self.tcp_connect_scan(host, port)

        except Exception as e:

            logger.error(f"SYN扫描端口 {port} 时发生错误: {str(e)}")

            return None

             

    def udp_scan(self, host, port):

        """UDP扫描"""

        try:

            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

            sock.settimeout(self.timeout)

             

            # 发送空数据包

            sock.sendto(b'', (host, port))

             

            try:

                # 尝试接收响应

                data, addr = sock.recvfrom(1024)

                sock.close()

                 

                service = COMMON_PORTS.get(port, 'Unknown')

                banner = data.decode('utf-8', errors='ignore')[:100]

                 

                return {

                    'port': port,

                    'state': 'open',

                    'service': service,

                    'banner': banner

                }

            except socket.timeout:

                # UDP端口可能开放但无响应

                sock.close()

                return {

                    'port': port,

                    'state': 'open|filtered',

                    'service': COMMON_PORTS.get(port, 'Unknown')

                }

                 

        except Exception as e:

            logger.debug(f"UDP扫描端口 {port} 时发生错误: {str(e)}")

            return {

                'port': port,

                'state': 'closed',

                'service': COMMON_PORTS.get(port, 'Unknown')

            }

             

    def get_service_banner(self, host, port):

        """获取服务banner信息"""

        try:

            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            sock.settimeout(2.0)

            sock.connect((host, port))

             

            # 发送简单的探测数据

            if port in [21, 22, 23, 25, 80, 110, 143]:

                sock.send(b'\r\n')

                 

            # 接收响应

            banner = sock.recv(1024).decode('utf-8', errors='ignore')

            sock.close()

             

            # 清理banner信息

            banner = banner.strip().replace('\r\n', ' ').replace('\n', ' ')

            return banner[:200]  # 限制长度

             

        except Exception as e:

            logger.debug(f"获取端口 {port} 的banner时发生错误: {str(e)}")

            return ""

             

    def scan_ports(self):

        """扫描所有端口"""

        logger.info(f"开始扫描 {self.target} 的端口...")

        logger.info(f"扫描类型: {self.scan_type}, 超时时间: {self.timeout}s, 线程数: {self.threads}")

         

        start_time = time.time()

         

        with ThreadPoolExecutor(max_workers=self.threads) as executor:

            # 提交所有扫描任务

            future_to_port = {

                executor.submit(self.scan_port, port): port

                for port in self.ports

            }

             

            # 收集结果

            for future in as_completed(future_to_port):

                port = future_to_port[future]

                try:

                    result = future.result()

                    if result:

                        if result['state'] == 'open':

                            self.results['open_ports'].append(result)

                            if self.verbose:

                                logger.info(f"端口 {result['port']} ({result['service']}) 开放")

                        elif result['state'] == 'closed':

                            self.results['closed_ports'].append(result)

                        elif result['state'] in ['filtered', 'open|filtered']:

                            self.results['filtered_ports'].append(result)

                             

                except Exception as e:

                    logger.error(f"处理端口 {port} 的结果时出错: {str(e)}")

                     

        end_time = time.time()

        self.results['duration'] = round(end_time - start_time, 2)

         

        # 按端口号排序结果

        self.results['open_ports'].sort(key=lambda x: x['port'])

        self.results['closed_ports'].sort(key=lambda x: x['port'])

        self.results['filtered_ports'].sort(key=lambda x: x['port'])

         

        logger.info(f"扫描完成,耗时 {self.results['duration']} 秒")

        logger.info(f"开放端口: {len(self.results['open_ports'])}")

        logger.info(f"关闭端口: {len(self.results['closed_ports'])}")

        logger.info(f"过滤端口: {len(self.results['filtered_ports'])}")

         

        return self.results

         

    def print_results(self):

        """打印扫描结果"""

        print("\n" + "="*60)

        print(f"端口扫描报告 - 目标: {self.target}")

        print("="*60)

        print(f"扫描时间: {self.results['scan_time']}")

        print(f"扫描类型: {self.results['scan_type']}")

        print(f"扫描耗时: {self.results['duration']} 秒")

        print(f"开放端口: {len(self.results['open_ports'])}")

        print(f"关闭端口: {len(self.results['closed_ports'])}")

        print(f"过滤端口: {len(self.results['filtered_ports'])}")

         

        if self.results['open_ports']:

            print("\n开放端口详情:")

            print("-" * 80)

            print(f"{'端口':<8} {'服务':<15} {'响应时间(ms)':<15} {'Banner信息'}")

            print("-" * 80)

            for port_info in self.results['open_ports']:

                banner = port_info.get('banner', '')[:50] + ('...' if len(port_info.get('banner', '')) > 50 else '')

                print(f"{port_info['port']:<8} {port_info['service']:<15} {port_info['response_time']:<15} {banner}")

                 

        if not self.results['open_ports']:

            print("\n未发现开放端口")

             

    def save_results(self):

        """保存扫描结果"""

        try:

            # 确保输出目录存在

            import os

            output_dir = os.path.dirname(self.output_file) if os.path.dirname(self.output_file) else '.'

            os.makedirs(output_dir, exist_ok=True)

             

            if self.output_format == 'json':

                self._save_json()

            elif self.output_format == 'csv':

                self._save_csv()

            elif self.output_format == 'xml':

                self._save_xml()

            else:

                logger.error(f"不支持的输出格式: {self.output_format}")

                 

        except Exception as e:

            logger.error(f"保存扫描结果时出错: {str(e)}")

             

    def _save_json(self):

        """保存为JSON格式"""

        with open(self.output_file, 'w', encoding='utf-8') as f:

            json.dump(self.results, f, indent=2, ensure_ascii=False)

        logger.info(f"扫描结果已保存到 {self.output_file}")

         

    def _save_csv(self):

        """保存为CSV格式"""

        all_ports = self.results['open_ports'] + self.results['closed_ports'] + self.results['filtered_ports']

         

        with open(self.output_file, 'w', newline='', encoding='utf-8') as f:

            writer = csv.writer(f)

            writer.writerow(['Port', 'State', 'Service', 'Response Time (ms)', 'Banner'])

             

            for port_info in all_ports:

                writer.writerow([

                    port_info['port'],

                    port_info['state'],

                    port_info['service'],

                    port_info.get('response_time', ''),

                    port_info.get('banner', '')

                ])

        logger.info(f"扫描结果已保存到 {self.output_file}")

         

    def _save_xml(self):

        """保存为XML格式"""

        root = ET.Element("port_scan")

        root.set("target", self.target)

        root.set("scan_time", self.results['scan_time'])

        root.set("duration", str(self.results['duration']))

         

        # 添加开放端口

        open_ports_elem = ET.SubElement(root, "open_ports")

        for port_info in self.results['open_ports']:

            port_elem = ET.SubElement(open_ports_elem, "port")

            port_elem.set("number", str(port_info['port']))

            port_elem.set("service", port_info['service'])

            port_elem.set("response_time", str(port_info.get('response_time', '')))

            if port_info.get('banner'):

                banner_elem = ET.SubElement(port_elem, "banner")

                banner_elem.text = port_info['banner']

                 

        tree = ET.ElementTree(root)

        tree.write(self.output_file, encoding='utf-8', xml_declaration=True)

        logger.info(f"扫描结果已保存到 {self.output_file}")

 

def parse_ports(port_str):

    """解析端口参数"""

    ports = set()

     

    # 处理逗号分隔的端口列表

    for part in port_str.split(','):

        part = part.strip()

        if '-' in part:

            # 处理端口范围

            try:

                start, end = map(int, part.split('-'))

                if 1 <= start <= 65535 and 1 <= end <= 65535 and start <= end:

                    ports.update(range(start, end + 1))

                else:

                    raise ValueError(f"无效的端口范围: {part}")

            except ValueError as e:

                logger.error(f"解析端口范围时出错: {str(e)}")

                sys.exit(1)

        else:

            # 处理单个端口

            try:

                port = int(part)

                if 1 <= port <= 65535:

                    ports.add(port)

                else:

                    raise ValueError(f"端口号超出有效范围: {port}")

            except ValueError as e:

                logger.error(f"解析端口时出错: {str(e)}")

                sys.exit(1)

                 

    return sorted(list(ports))

 

def main():

    parser = argparse.ArgumentParser(description='端口扫描器')

    parser.add_argument('target', help='目标主机IP地址或域名')

    parser.add_argument('-p', '--ports', default='1-1000', help='要扫描的端口,支持范围(如1-1000)和列表(如22,80,443)')

    parser.add_argument('-t', '--type', choices=['tcp', 'syn', 'udp'], default='tcp', help='扫描类型')

    parser.add_argument('--timeout', type=float, default=1.0, help='连接超时时间(秒)')

    parser.add_argument('--threads', type=int, default=100, help='并发线程数')

    parser.add_argument('--delay', type=int, default=0, help='扫描延迟(毫秒)')

    parser.add_argument('-o', '--output', help='输出文件路径')

    parser.add_argument('-f', '--format', choices=['json', 'csv', 'xml'], default='json', help='输出格式')

    parser.add_argument('-v', '--verbose', action='store_true', help='详细输出')

    parser.add_argument('--top-ports', type=int, help='扫描最常见的N个端口')

     

    args = parser.parse_args()

     

    # 解析端口

    if args.top_ports:

        # 使用最常见的端口

        common_ports = sorted(COMMON_PORTS.keys())[:args.top_ports]

        ports = common_ports

    else:

        ports = parse_ports(args.ports)

         

    # 配置输出文件

    output_file = args.output

    if not output_file:

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        output_file = f"scan_results_{timestamp}.{args.format}"

         

    # 配置扫描参数

    config = {

        'target': args.target,

        'ports': ports,

        'scan_type': args.type,

        'timeout': args.timeout,

        'threads': args.threads,

        'delay': args.delay,

        'output_format': args.format,

        'output_file': output_file,

        'verbose': args.verbose

    }

     

    # 创建扫描器实例

    scanner = PortScanner(config)

     

    # 执行扫描

    try:

        scanner.scan_ports()

        scanner.print_results()

        scanner.save_results()

    except KeyboardInterrupt:

        logger.info("扫描被用户中断")

        sys.exit(1)

    except Exception as e:

        logger.error(f"扫描过程中发生错误: {str(e)}")

        sys.exit(1)

 

if __name__ == '__main__':

    main()

使用说明

1. 基本使用

1

2

3

4

5

6

7

8

9

10

11

# 扫描单个主机的默认端口范围(1-1000)

python port_scanner.py 192.168.1.1

 

# 扫描特定端口

python port_scanner.py 192.168.1.1 -p 22,80,443

 

# 扫描端口范围

python port_scanner.py 192.168.1.1 -p 1-10000

 

# 扫描最常见的100个端口

python port_scanner.py 192.168.1.1 --top-ports 100

2. 扫描类型

1

2

3

4

5

6

7

8

# TCP连接扫描(默认)

python port_scanner.py 192.168.1.1 -t tcp

 

# UDP扫描

python port_scanner.py 192.168.1.1 -t udp

 

# SYN扫描(需要root权限)

sudo python port_scanner.py 192.168.1.1 -t syn

3. 性能调优

1

2

3

4

5

6

7

8

# 调整超时时间

python port_scanner.py 192.168.1.1 --timeout 2.0

 

# 调整并发线程数

python port_scanner.py 192.168.1.1 --threads 200

 

# 添加扫描延迟(毫秒)

python port_scanner.py 192.168.1.1 --delay 100

4. 输出格式

1

2

3

4

5

6

7

8

# JSON格式输出(默认)

python port_scanner.py 192.168.1.1 -o results.json

 

# CSV格式输出

python port_scanner.py 192.168.1.1 -f csv -o results.csv

 

# XML格式输出

python port_scanner.py 192.168.1.1 -f xml -o results.xml

5. 详细输出

1

2

# 显示详细扫描过程

python port_scanner.py 192.168.1.1 -v

高级特性

1. 批量扫描

可以通过脚本扩展实现批量扫描多个主机:

1

2

3

4

5

6

targets = ['192.168.1.1', '192.168.1.2', '192.168.1.3']

for target in targets:

    config = {'target': target, 'ports': [22, 80, 443]}

    scanner = PortScanner(config)

    scanner.scan_ports()

    scanner.save_results()

2. 定期扫描

可以结合cron定时任务实现定期扫描:

1

2

# 每小时扫描一次关键服务器

0 * * * * /usr/bin/python3 /path/to/port_scanner.py 192.168.1.1 -p 22,80,443 -o /var/log/scan_$(date +\%Y\%m\%d_\%H\%M\%S).json

3. 结果分析

扫描结果可以用于进一步分析:

1

2

3

4

5

6

7

8

9

10

import json

 

# 加载扫描结果

with open('scan_results.json', 'r') as f:

    results = json.load(f)

 

# 分析开放端口

open_ports = results['open_ports']

for port in open_ports:

    print(f"发现开放端口: {port['port']} ({port['service']})")

安全考虑

1. 权限控制

2. 扫描频率

3. 日志记录

性能优化

1. 并发控制

2. 内存管理

3. 网络优化

这个端口扫描器是一个功能完整、安全可靠的网络探测工具,能够帮助网络管理员和安全专业人员快速识别网络资产和服务状态。

原文链接:
相关文章
最新更新