Apache Spark for Dealing with Ethereum Transactions Data
程序员文章站
2022-07-14 16:01:06
...
读文件
from pyspark.sql.types import *
from pyspark.sql.functions import col
from graphframes import *
file_path = "xi_3777501to3800000 block.txt"
txn_fields = [
StructField("block_id", StringType(), True),
StructField("time_stamp", StringType(), True),
StructField("tx_hash", StringType(), True),
StructField("method", StringType(), True),
StructField("from_mac", StringType(), True),
StructField("to_mac", StringType(), True),
StructField("gas_limit", StringType(), True),
StructField("value", StringType(), True),
StructField("new_contract", StringType(), True),
StructField("code", StringType(), True),
StructField("is_succesful", StringType(), True)
]
txn = spark.read.csv(file_path, sep=",", header=False, schema=StructType(txn_fields))
txn.head()
txn.count()
txn.printSchema()
from_mac = txn.select('from_mac')
to_mac = txn.select('to_mac')
Transaction Raw data Example
3777501,1495909739,0xad9b464ef42fe9ed6eaec06e3fb31a845e88558d9bacb92f7dc24a655a5d6d29,Call,0x32Be343B94f860124dC4fEe278FDCBD38C102D88,0x71FA4943af0c6E4BE6dE30680290d8be3c816536,312333,0x297e9d28866b0000,,,OK
Transaction Dataframe Row Example
Row(block_id='3777501', time_stamp='1495909739', tx_hash='0xad9b464ef42fe9ed6eaec06e3fb31a845e88558d9bacb92f7dc24a655a5d6d29', method='Call', from_mac='0x32Be343B94f860124dC4fEe278FDCBD38C102D88', to_mac='0x71FA4943af0c6E4BE6dE30680290d8be3c816536', gas_limit='312333',value='0x297e9d28866b0000', new_contract=None, code=None, is_succesful='OK')
合并,去重,保存
from functools import reduce
from pyspark.sql import DataFrame
def union_all(*dfs):
return reduce(DataFrame.union, dfs)
from_mac = txn.select('from_mac').distinct() #187986
to_mac = txn.select('to_mac').distinct() #223223
all_mac = union_all(from_mac, to_mac).distinct()#244350
all_mac.coalesce(1).write.format("text").option("header", "false").mode("append").save("txn_data")
按某列排序
node_path = "./txn_data/nodes.txt"
n_fields = [
StructField("mac_address", StringType(), True)
]
nodes = spark.read.csv(node_path, sep=",", header=False, schema=StructType(n_fields))
ordered_node = nodes.orderBy("mac_address")