欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spark 加载数据库mysql表中数据进行分析

程序员文章站 2022-04-09 12:53:14
1.工程maven依赖包 2.spark加载数据库中数据 3.spark支持加载多种数据库,仅需要用户依赖不同的数据库驱动包,并且代码进行微调即可 根据以上java代码,仅需调整18行,更改驱动加载类即可。 ......

1.工程maven依赖包

 1  
 2 <properties>
 3     <spark_version>2.3.1</spark_version>
 4     <!-- elasticsearch-->
 5     <elasticsearch.version>5.5.2</elasticsearch.version>
 6     <fastjson.version>1.2.28</fastjson.version>
 7     <elasticsearch-hadoop.version>6.3.2</elasticsearch-hadoop.version>
 8     <elasticsearch-spark.version>5.5.2</elasticsearch-spark.version>
 9 </properties>
10 <dependencies>
11     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
12     <dependency>
13         <groupId>org.apache.spark</groupId>
14         <artifactId>spark-core_2.11</artifactId>
15         <version>${spark_version}</version>
16     </dependency>
17     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
18     <dependency>
19         <groupId>org.apache.spark</groupId>
20         <artifactId>spark-sql_2.11</artifactId>
21         <version>${spark_version}</version>
22     </dependency>
23     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-yarn -->
24     <dependency>
25         <groupId>org.apache.spark</groupId>
26         <artifactId>spark-yarn_2.11</artifactId>
27         <version>${spark_version}</version>
28     </dependency>
29     <dependency>
30         <groupId>org.elasticsearch</groupId>
31         <artifactId>elasticsearch-spark-20_2.11</artifactId>
32         <version>${elasticsearch-spark.version}</version>
33     </dependency>
34     <dependency>
35         <groupId>mysql</groupId>
36         <artifactId>mysql-connector-java</artifactId>
37         <version>5.1.46</version>
38     </dependency>
39 </dependencies>

2.spark加载数据库中数据

 1 public class GoodsFromMySQL {
 2 
 3     /**
 4      * 加载数据库数据
 5      *
 6      * @param sc           spark context
 7      * @param sparkSession spark session
 8      */
 9     public static void loadGoodsInfo(SparkContext sc, SparkSession sparkSession) {
10         String url = "jdbc:mysql://x.x.x.x:3306/db-test";
11 
12         String sql = "(SELECT item_name as itemName, goods_category as goodsCategory FROM goods where dict_type='100203' and item_name " +
13                 "is not null) as my-goods";
14 
15         SQLContext sqlContext = SQLContext.getOrCreate(sc);
16         DataFrameReader reader = sqlContext.read().format("jdbc").
17                 option("url", url).option("dbtable", sql).
18                 option("driver", "com.mysql.jdbc.Driver").
19                 option("user", "root").
20                 option("password", "xxxxx");
21 
22 
23         Dataset<Row> goodsDataSet = reader.load();
24 
25         // Looks the schema of this DataFrame.
26         goodsDataSet.printSchema();
27 
28         goodsDataSet.write().mode(SaveMode.Overwrite).json("/data/app/source_new.json");
29     }
30 
31 
32     public static void main(String[] args) {
33         SparkConf conf = new SparkConf().setAppName("my-app");
34         SparkContext sc = new SparkContext(conf);
35 
36         SparkSession sparkSession = new SparkSession(sc);
37 
38         loadGoodsInfo(sc, sparkSession);
39     }
40 }

3.spark支持加载多种数据库,仅需要用户依赖不同的数据库驱动包,并且代码进行微调即可

  根据以上java代码,仅需调整18行,更改驱动加载类即可。