demo_socket_03.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. # -*- coding: utf-8 -*-
  2. import socket
  3. import concurrent.futures
  4. import ipaddress
  5. def scan_ports(ip, start_port, end_port):
  6. open_ports = []
  7. for port in range(start_port, end_port + 1):
  8. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  9. sock.settimeout(1)
  10. try:
  11. result = sock.connect_ex((ip, port))
  12. if result == 0:
  13. open_ports.append(port)
  14. sock.close()
  15. except socket.error as err:
  16. sock.close()
  17. return open_ports
  18. def scan_ip(ip):
  19. return scan_ports(ip, 1, 65535)
  20. def main():
  21. subnet = ipaddress.ip_network('10.10.10.0/24')
  22. with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
  23. futures = {executor.submit(scan_ip, str(ip)): ip for ip in subnet.hosts()}
  24. for future in concurrent.futures.as_completed(futures):
  25. target_ip = futures[future]
  26. try:
  27. open_ports = future.result()
  28. print(f"Open ports on {target_ip}: {open_ports}")
  29. except Exception as e:
  30. print(f"Error scanning {target_ip}: {e}")
  31. if __name__ == '__main__':
  32. main()