sparkstreaming自定义数据源
程序员文章站
2022-06-07 17:57:23
...
sparkstreaming自定义数据源
/**
* 自定义一个Receiver,这个Receiver从socket中接收数据
* 接收过来的数据解析成以 \n 分隔开的text
使用方式:nc -lk 9999
*/
object CustomReceiver {
def main(args: Array[String]) {
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(1))
// 调用 receiverStream api,将自定义的Receiver传进去
val lines = ssc.receiverStream(new CustomReceiver("10.148.15.10", 9999))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
ssc.stop(false)
}
}
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// 启动一个线程,开始接收数据
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
logInfo("Stopped receiving")
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
restart("Error receiving data", t)
}
}
}