欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring Boot与Spark、Cassandra系统集成开发示例

程序员文章站 2023-12-03 10:03:46
本文演示以spark作为分析引擎,cassandra作为数据存储,而使用spring boot来开发驱动程序的示例。 1.前置条件 安装spark(本文使用sp...

本文演示以spark作为分析引擎,cassandra作为数据存储,而使用spring boot来开发驱动程序的示例。

1.前置条件

  • 安装spark(本文使用spark-1.5.1,如安装目录为/opt/spark)
  • 安装cassandra(3.0+)

创建keyspace

create keyspace hfcb with replication = { 'class' : 'simplestrategy', 'replication_factor' : 3 };

创建table

create table person (
 id text primary key,
 first_name text,
 last_name text
);

插入测试数据

insert into person (id,first_name,last_name) values('1','wang','yunfei');
insert into person (id,first_name,last_name) values('2','peng','chao');
insert into person (id,first_name,last_name) values('3','li','jian');
insert into person (id,first_name,last_name) values('4','zhang','jie');
insert into person (id,first_name,last_name) values('5','liang','wei');

2.spark-cassandra-connector安装

让spark-1.5.1能够使用cassandra作为数据存储,需要加上下面jar包的依赖(示例将包放置于 /opt/spark/managed-lib/ 目录,可任意):

cassandra-clientutil-3.0.2.jar
cassandra-driver-core-3.1.4.jar
guava-16.0.1.jar
cassandra-thrift-3.0.2.jar 
joda-convert-1.2.jar
joda-time-2.9.9.jar
libthrift-0.9.1.jar
spark-cassandra-connector_2.10-1.5.1.jar

在 /opt/spark/conf 目录下,新建 spark-env.sh 文件,输入下面内容

spark_classpath=/opt/spark/managed-lib/*

3.spring boot应用开发

添加 spark-cassandra-connector 和 spark 依赖

<dependency>
   <groupid>com.datastax.spark</groupid>
   <artifactid>spark-cassandra-connector_2.10</artifactid>
   <version>1.5.1</version>
  </dependency>
  <dependency>
   <groupid>org.apache.spark</groupid>
   <artifactid>spark-core_2.10</artifactid>
   <version>1.5.1</version>
  </dependency>
  <dependency>
   <groupid>org.apache.spark</groupid>
   <artifactid>spark-sql_2.10</artifactid>
   <version>1.5.1</version>
  </dependency>

在 application.yml 中配置 spark 与 cassandra 路径

spark.master: spark://master:7077
cassandra.host: 192.168.1.140
cassandra.keyspace: hfcb

此处特别说明 spark://master:7077 是域名形式而不是ip地址,可修改本地 hosts 文件将 master 与 ip 地址映射。

配置 sparkcontext 和 cassandrasqlcontext

@configuration
public class sparkcassandraconfig {
 @value("${spark.master}")
 string sparkmasterurl;
 @value("${cassandra.host}")
 string cassandrahost;
 @value("${cassandra.keyspace}")
 string cassandrakeyspace;
 @bean
 public javasparkcontext javasparkcontext(){
  sparkconf conf = new sparkconf(true)
    .set("spark.cassandra.connection.host", cassandrahost)
//    .set("spark.cassandra.auth.username", "cassandra")
//    .set("spark.cassandra.auth.password", "cassandra")
    .set("spark.submit.deploymode", "client");
  javasparkcontext context = new javasparkcontext(sparkmasterurl, "sparkdemo", conf);
  return context;
 }
 @bean
 public cassandrasqlcontext sqlcontext(){
  cassandrasqlcontext cassandrasqlcontext = new cassandrasqlcontext(javasparkcontext().sc());
  cassandrasqlcontext.setkeyspace(cassandrakeyspace);
  return cassandrasqlcontext;
 }
 }

简单调用

@repository
public class personrepository {
 @autowired
 cassandrasqlcontext cassandrasqlcontext;
 public long countperson(){
  dataframe people = cassandrasqlcontext.sql("select * from person order by id");
  return people.count();
 }
}

启动即可如常规spring boot程序一样执行。

源码地址:

总结

以上所述是小编给大家介绍的spring boot与spark、cassandra系统集成开发示例,希望对大家有所帮助