balance_query_eth.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. # -*- coding: utf-8 -*-
  2. from web3 import Web3
  3. import time
  4. # 设置 RPC URL
  5. rpc_url = "rpc"
  6. # 初始化 web3.py 提供器
  7. w3 = Web3(Web3.HTTPProvider(rpc_url))
  8. # 检查是否连接成功
  9. def check_connection():
  10. try:
  11. network_id = w3.eth.chain_id
  12. print(f"已成功连接到链节点,网络ID为: {network_id}。正在查询钱包余额...")
  13. return True
  14. except Exception as e:
  15. print("无法连接到链节点,请检查 URL 是否正确")
  16. return False
  17. # 钱包地址列表
  18. wallet_addresses = [
  19. 'key'
  20. ]
  21. def query_balances():
  22. if not check_connection():
  23. return
  24. wallet_num = 1
  25. for wallet_address in wallet_addresses:
  26. retry_count = 0
  27. max_retries = 3
  28. delay = 2 # 延迟时间,单位为秒
  29. result_message = ""
  30. result_balance = -1
  31. # 转换为校验和地址
  32. try:
  33. checksum_address = w3.to_checksum_address(wallet_address)
  34. except Exception as e:
  35. print(f"钱包 {wallet_address} 地址格式无效: {str(e)}")
  36. wallet_num += 1
  37. continue
  38. while retry_count < max_retries:
  39. try:
  40. # 查询余额
  41. balance = w3.eth.get_balance(checksum_address)
  42. # 将余额从 Wei 转换为 Ether
  43. balance_eth = w3.from_wei(balance, 'ether')
  44. result_message = f"Wallet address: {checksum_address} {wallet_num}, balance: {balance_eth} Token"
  45. result_balance = balance_eth
  46. print(result_message)
  47. wallet_num += 1
  48. break
  49. except Exception as e:
  50. print(f"查询钱包 {checksum_address} {wallet_num} 余额时发生错误: {str(e)}")
  51. retry_count += 1
  52. print(f"正在重试...(第 {retry_count} 次)")
  53. time.sleep(delay)
  54. if retry_count == max_retries:
  55. result_message = f"钱包 {checksum_address} 查询余额失败,已达到最大重试次数。"
  56. print(result_message)
  57. if __name__ == "__main__":
  58. query_balances()