博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
sparkStreaming与Kafka整合
阅读量:7191 次
发布时间:2019-06-29

本文共 2037 字,大约阅读时间需要 6 分钟。

createStream那几个参数折腾了我好久。。网上都是一带而过,最终才搞懂..关于sparkStreaming的还是太少,最终尝试成功。。。

首先启动zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka

bin/kafka-server-start.sh config/server.properties &

创建一个topic

./kafka-topics.sh  --create --zookeeper 192.168.77.133:2181 \ --replication-factor 1\ --partitions 1\ --topic yangsy

随后启动一个终端为9092的提供者

./kafka-console-producer.sh --broker-list 192.168.77.133:9092 --topic yangsy

代码如下:

import org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * Created by root on 11/28/15. */object SparkStreaming {  def main(args: Array[String]) {/*    val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp")      .set("spark.executor.memory", "1g")    val sc = new StreamingContext(sparkConf, Seconds(20))    val lines = sc.textFileStream("/usr/local/spark-1.4.0-bin-2.5.0-cdh5.2.1/streaming")    val words = lines.flatMap(_.split(" "))    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)    wordCounts.print()    sc.start()    sc.awaitTermination()*/  //zookeeper的地址    val zkQuorum = "192.168.77.133:2181"    //group_id可以通过kafka的conf下的consumer.properties中查找    val group ="test-consumer-group"   //创建的topic 可以是一个或多个    val topics = "yangsy"    val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp").set("spark.executor.memory", "1g")    val sc = new StreamingContext(sparkConf, Seconds(2))    val numThreads = 2    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap     //StorageLevel.MEMORY_AND_DISK_SER为存储的级别    val lines  = KafkaUtils.createStream(sc, zkQuorum, group, topicpMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)     //对于收到的消息进行wordcount    val words = lines.flatMap(_.split(" "))    val pairs = words.map(word => (word, 1))    val wordCounts = pairs.reduceByKey(_ + _)    wordCounts.print()    sc.start()    sc.awaitTermination()  }}

随后再你启动的kafka的生产者终端随便输入消息,我这里设置的参数是每2秒获取一次,统计一次单词个数~OK~

 

转载地址:http://lcxkm.baihongyu.com/

你可能感兴趣的文章
ViewPager中的View更新
查看>>
MongoDB 分片管理
查看>>
基于Eclipse构建Hadoop源码阅读环境
查看>>
Best Practices and Commonly Made Mistakes When Using jQuery
查看>>
nodejs服务实现反向代理,解决本地开发接口请求跨域问题
查看>>
错误The request sent by the client was syntactically incorrect ()的解决
查看>>
第十一篇、RxSwift
查看>>
复分析学习9——全纯函数各阶导数在紧集上的一致估计
查看>>
run_test() 验证平台的入口
查看>>
uvm_monitor——借我一双慧眼
查看>>
AS3 与handle交互通信
查看>>
查看nginx编译模块
查看>>
接口隔离原则(Interface Segregation Principle)ISP
查看>>
新开始
查看>>
误删除了Oracle的dbf文件后的解决方法
查看>>
PHP网站,两个域名在一个空间,如何做301转向
查看>>
Mysql系列五:数据库分库分表中间件mycat的安装和mycat配置详解
查看>>
Web References - There was an error downloading 'http://localhost:/xxx/xxx.asmx'
查看>>
Python之禅及释义
查看>>
laravel5.4 开发简书网站
查看>>