| 1234567891011121314151617181920212223242526272829303132333435363738394041 |
- # -*- coding: utf-8 -*-
- import socket
- import concurrent.futures
- import ipaddress
- def scan_ports(ip, start_port, end_port):
- open_ports = []
- for port in range(start_port, end_port + 1):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(1)
- try:
- result = sock.connect_ex((ip, port))
- if result == 0:
- open_ports.append(port)
- sock.close()
- except socket.error as err:
- sock.close()
- return open_ports
- def scan_ip(ip):
- return scan_ports(ip, 1, 65535)
- def main():
- subnet = ipaddress.ip_network('10.10.10.0/24')
- with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
- futures = {executor.submit(scan_ip, str(ip)): ip for ip in subnet.hosts()}
- for future in concurrent.futures.as_completed(futures):
- target_ip = futures[future]
- try:
- open_ports = future.result()
- print(f"Open ports on {target_ip}: {open_ports}")
- except Exception as e:
- print(f"Error scanning {target_ip}: {e}")
- if __name__ == '__main__':
- main()
|