| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- # -*- coding: utf-8 -*-
- import time
- import random
- import threading
- from web3 import Web3
- from eth_account import Account
- private_key_list = [
- "2a185eae14ca82ac934a5f952e12e0d4a64043c911c33a19afd48ef1a1d70c46",
- "df5d009c711475b8f9987f235b373cfdbd93c858063efa9eebd9188dbc534097",
- "64f548051325022a7aa9e32c5756161d227fd5fb641c3f4623558cb574775254",
- "aa6fc003745793d7c192b2c404e332770b501ac0b81879d7538d6659ca84b7cf",
- "e92637de4029ae6397c18ff62cab0460e6caee1f166a0e710d52d46556c16627",
- "1741e24dd3a1a5cecf39dea82ba6ba62506d7a07ebcc7998b9b66eb8423e93db",
- "b898cf63a5ec89105ba755ef3b7533c25ea9130ab50fb0db14779fb6efd4f9c6",
- "f139f86b139f92070b6e6139177e59d85532c867dda6a8833e479fd60a8d37b7",
- "399ec8a9a9fcbb3118f993351c7624bdffc9b93d31e6576ebe69e9b24a2ecab3",
- "48cd0f0db84c9ca488292c9da449236d14a845e07618b6c0975cb158ccd60794"
- ]
- def claim_token(private_key):
- # 配置信息
- rpc_url = "https://rpc.sepolia.ethpandaops.io"
- target_chain_id = 11155111
- contract_address = "0x3edf60dd017ace33a0220f78741b5581c385a1ba"
-
- try:
- # 连接Web3
- web3 = Web3(Web3.HTTPProvider(rpc_url))
- if not web3.is_connected():
- print(f"❌ [{private_key[:8]}...] 无法连接到目标网络")
- return False
- # 校验链ID是否为Sepolia
- chain_id = web3.eth.chain_id
- if chain_id != target_chain_id:
- print(f"❌ [{private_key[:8]}...] 连接了错误的网络,当前链ID: {chain_id}, 期望: {target_chain_id}")
- return False
- # 创建账户对象
- account = Account.from_key("0x" + private_key)
- # 查询当前账户余额
- balance = web3.eth.get_balance(account.address)
- balance_eth = web3.from_wei(balance, 'ether')
- print(f"[{private_key[:8]}...] 当前账户余额: {balance_eth} Token")
- # 构造调用数据
- wallet_address = account.address[2:].lower() # 去掉0x前缀
- wallet_param = wallet_address.zfill(64) # 补足64位(32字节)
- call_data = "0x6a627842" + wallet_param
- # 构造交易
- transaction = {
- "to": Web3.to_checksum_address(contract_address),
- "value": 0,
- "gas": 100000, # 固定Gas限制,可根据需要调整
- "gasPrice": web3.to_wei(20, "gwei"), # 固定Gas价格,可调整
- "nonce": web3.eth.get_transaction_count(account.address),
- "data": call_data,
- "chainId": target_chain_id,
- }
- print(f"[{private_key[:8]}...] 📋 调用数据: {call_data}")
- print(f"[{private_key[:8]}...] 📤 发送交易中...")
- # 签名交易
- signed_txn = web3.eth.account.sign_transaction(transaction, private_key)
- # 发送交易
- tx_hash = web3.eth.send_raw_transaction(signed_txn.rawTransaction)
- tx_hash_hex = tx_hash.hex()
- print(f"[{private_key[:8]}...] 📤 交易已发送: {tx_hash_hex}")
- print(f"[{private_key[:8]}...] ⏳ 等待交易确认...")
- # 等待交易确认
- receipt = web3.eth.wait_for_transaction_receipt(tx_hash, timeout=300)
- if receipt.status == 1:
- print(f"[{private_key[:8]}...] ✅ 代币领取成功! Gas使用: {receipt.gasUsed}")
- print(f"[{private_key[:8]}...] 🔗 交易链接: https://sepolia.etherscan.io/tx/{tx_hash_hex}")
- return True
- else:
- print(f"[{private_key[:8]}...] ❌ 交易执行失败")
- return False
- except Exception as e:
- error_msg = str(e).lower()
- if "insufficient funds" in error_msg:
- print(f"[{private_key[:8]}...] ❌ ETH余额不足,无法支付Gas费用")
- elif "nonce too low" in error_msg:
- print(f"[{private_key[:8]}...] ❌ Nonce值过低,请稍后重试")
- elif "replacement transaction underpriced" in error_msg:
- print(f"[{private_key[:8]}...] ❌ 交易费用过低,请提高Gas价格")
- else:
- print(f"[{private_key[:8]}...] ❌ 领取失败: {e}")
- return False
- def worker(private_key, claim_times):
- """为每个 private_key 执行的线程任务"""
- for i in range(claim_times):
- print(f"[{private_key[:8]}...] 开始领取第 {i+1} 次")
- n = 1
- retry_min_time = 10
- retry_max_time = 20
- retry_time = random.uniform(retry_min_time, retry_max_time)
- retry_max_times = 3
- while True:
- if n > retry_max_times:
- print(f"[{private_key[:8]}...] 领取 {i+1} 次失败, 重试次数超过 {retry_max_times} 次, 跳过")
- break
- if claim_token(private_key):
- break
- print(f"[{private_key[:8]}...] 领取 {i+1} 次失败, 重试第 {n} 次, 等待 {retry_time} 秒后再次领取")
- n += 1
- time.sleep(retry_time)
- print(f"[{private_key[:8]}...] 领取第 {i+1} 次成功, 等待 {retry_time} 秒后再次领取")
- time.sleep(retry_time)
- if __name__ == "__main__":
- claim_times = 20
- threads = []
- # 为每个 private_key 创建一个线程
- for private_key in private_key_list:
- thread = threading.Thread(target=worker, args=(private_key, claim_times))
- threads.append(thread)
- thread.start()
- # 等待所有线程完成
- for thread in threads:
- thread.join()
- print("所有线程已完成")
|