sparksreaming网络c 程序例子子打印不出数据

用实例讲解Spark Sreaming
本篇文章用Spark Streaming +Hbase为列,Spark Streaming专为流式数据处理,对Spark核心API进行了相应的扩展。
什么是Spark Streaming?
首先,什么是流式处理呢?数据流是一个数据持续不断到达的无边界序列集。流式处理是把连续不断的数据输入分割成单元数据块来处理。流式处理是一个低延迟的处理和流式数据分析。Spark Streaming对Spark核心API进行了相应的扩展,支持高吞吐、低延迟、可扩展的流式数据处理。实时数据处理应用的场景有下面几个:
网站监控和网络监控;
异常监测;
网页点击;
广告数据;
物联网(IOT)
Spark Streaming支持的数据源包括HDFS文件,TCP socket,Kafka,Flume,Twitter等,数据流可以通过Spark核心API、DataFrame SQL或者机器学习API处理,并可以持久化到本地文件、HDFS、数据库或者其它任意支持Hadoop输出格式的形式。
Spark Streaming如何工作?
Spark Streaming以X秒(batch size)为时间间隔把数据流分割成Dstream,组成一个RDD序列。你的Spark应用处理RDD,并把处理的结果批量返回。
Spark Streaming例子的架构图
Spark Streaming例子代码分下面几部分:
- 读取流式数据;
- 处理流式数据;
- 写处理结果倒Hbase表。
Spark处理部分的代码涉及到如下内容:
读取Hbase表的数据;
按天计算数据统计;
写统计结果到Hbase表,列簇:stats。
数据集来自油泵信号数据,以CSV格式存储在指定目录下。Spark Streaming监控此目录,CSV文件的格式如图3。
采用Scala的case class来定义数据表结构,parseSensor函数解析逗号分隔的数据。
Hbase表结构
流式处理的Hbase表结构如下:
油泵名字 + 日期 + 时间戳 组合成row key;
列簇是由输入数据列、报警数据列等组成,并设置过期时间。
每天等统计数据表结构如下:
油泵名和日期组成row key;
列簇为stats,包含列有最大值、最小值和平均值;
配置写入Hbase表
Spark直接用写数据到Hbase里,跟在MapReduce中写数据到Hbase表一样,下面就直接用TableOutputFormat类了。
Spark Streaming代码
Spark Streaming的基本步骤:
初始化Spark StreamingContext对象;
在DStream上进行transformation操作和输出操作;
开始接收数据并用streamingContext.start();
等待处理停止,streamingContext.awaitTermination()。
初始化Spark StreamingContext对象
创建 对象,StreamingContext是Spark Streaming处理的入口,这里设置2秒的时间间隔。
val sparkConf = new SparkConf().setAppName(&HBaseStream&)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))
接下来用StreamingContext的textFileStream(directory)创建输入流跟踪Hadoop文件系统的新文件,并处理此目录下的所有文件,这里directory指文件目录。
val linesDStream = ssc.textFileStream(&/user/user01/stream&)
linesDStream是数据流,每条记录是按行记录的text格式。
对DStream进行transformation操作和输出操作
接下来进行解析,对linesDStream进行map操作,map操作是对RDD应用Sensor.parseSensor函数,返回Sensor的RDD。
val sensorDStream = linesDStream.map(Sensor.parseSensor)
对DStream的每个RDD执行 方法,使用filter过滤Sensor中低psi值来创建报警,使用Hbase的Put对象转换sensor和alter数据以便能写入到Hbase。然后使用PairRDDFunctions的方法将最终结果写入到任何Hadoop兼容到存储系统。
sensorRDD.foreachRDD { rdd =&
val alertRDD = rdd.filter(sensor =& sensor.psi & 5.0)
rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)
rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
sensorRDD经过Put对象转换,然后写入到Hbase。
开始接收数据
通过streamingContext.start()显式的启动数据接收,然后调用streamingContext.awaitTermination()来等待计算完成。
ssc.start()
ssc.awaitTermination()
Spark读写Hbase
现在开始读取Hbase的sensor表,计算每条的统计指标并把对应的数据写入stats列簇。
下面的代码读取Hbase的sensor表psi列数据,用计算统计数据,然后写入stats列簇。
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, &data:psi&)
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val resultRDD = hBaseRDD.map(tuple =& tuple._2)
val keyValueRDD = resultRDD.
map(result =& (Bytes.toString(result.getRow()).
split(& &)(0), Bytes.toDouble(result.value)))
val keyStatsRDD = keyValueRDD.
groupByKey().
mapValues(list =& StatCounter(list))
keyStatsRDD.map { case (k, v) =& convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)
下面的流程图显示newAPIHadoopRDD输出,(row key,result)的键值对。PairRDDFunctions 的saveAsHadoopDataset方法把Put对象存入到Hbase。
运行Spark Streaming应用
运行Spark Streaming应用跟运行Spark应用类似,比较简单,此处不赘述,参见。
好文,顶一下
文章真差,踩一下
------分隔线----------------------------
把开源带在你的身边-精美linux小纪念品
初级应用->
高级应用-> |
编程开发->Spark(137)
原文链接:http://www.infoq.com/cn/articles/spark-sreaming-practice
本篇文章用Spark Streaming +Hbase为列,Spark Streaming专为流式数据处理,对Spark核心API进行了相应的扩展。
什么是Spark Streaming?
首先,什么是流式处理呢?数据流是一个数据持续不断到达的无边界序列集。流式处理是把连续不断的数据输入分割成单元数据块来处理。流式处理是一个低延迟的处理和流式数据分析。Spark Streaming对Spark核心API进行了相应的扩展,支持高吞吐、低延迟、可扩展的流式数据处理。实时数据处理应用的场景有下面几个:
网站监控和网络监控;
异常监测;
网页点击;
广告数据;
物联网(IOT)
Spark Streaming支持的数据源包括HDFS文件,TCP socket,Kafka,Flume,Twitter等,数据流可以通过Spark核心API、DataFrame SQL或者机器学习API处理,并可以持久化到本地文件、HDFS、数据库或者其它任意支持Hadoop输出格式的形式。
Spark Streaming如何工作?
Spark Streaming以X秒(batch size)为时间间隔把数据流分割成Dstream,组成一个RDD序列。你的Spark应用处理RDD,并把处理的结果批量返回。
Spark Streaming例子的架构图
Spark Streaming例子代码分下面几部分:
- 读取流式数据;
- 处理流式数据;
- 写处理结果倒Hbase表。
Spark处理部分的代码涉及到如下内容:
读取Hbase表的数据;
按天计算数据统计;
写统计结果到Hbase表,列簇:stats。
数据集来自油泵信号数据,以CSV格式存储在指定目录下。Spark Streaming监控此目录,CSV文件的格式如图3。
采用Scala的case class来定义数据表结构,parseSensor函数解析逗号分隔的数据。
Hbase表结构
流式处理的Hbase表结构如下:
油泵名字 + 日期 + 时间戳 组合成row key;
列簇是由输入数据列、报警数据列等组成,并设置过期时间。
每天等统计数据表结构如下:
油泵名和日期组成row key;
列簇为stats,包含列有最大值、最小值和平均值;
配置写入Hbase表
Spark直接用写数据到Hbase里,跟在MapReduce中写数据到Hbase表一样,下面就直接用TableOutputFormat类了。
Spark Streaming代码
Spark Streaming的基本步骤:
初始化Spark StreamingContext对象;
在DStream上进行transformation操作和输出操作;
开始接收数据并用streamingContext.start();
等待处理停止,streamingContext.awaitTermination()。
初始化Spark StreamingContext对象
创建 对象,StreamingContext是Spark
Streaming处理的入口,这里设置2秒的时间间隔。
val sparkConf = new SparkConf().setAppName("HBaseStream")
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))
接下来用StreamingContext的textFileStream(directory)创建输入流跟踪Hadoop文件系统的新文件,并处理此目录下的所有文件,这里directory指文件目录。
val linesDStream = ssc.textFileStream("/user/user01/stream")
linesDStream是数据流,每条记录是按行记录的text格式。
对DStream进行transformation操作和输出操作
接下来进行解析,对linesDStream进行map操作,map操作是对RDD应用Sensor.parseSensor函数,返回Sensor的RDD。
val sensorDStream = linesDStream.map(Sensor.parseSensor)
对DStream的每个RDD执行 方法,使用filter过滤Sensor中低psi值来创建报警,使用Hbase的Put对象转换sensor和alter数据以便能写入到Hbase。然后使用PairRDDFunctions的方法将最终结果写入到任何Hadoop兼容到存储系统。
sensorRDD.foreachRDD { rdd =&
val alertRDD = rdd.filter(sensor =& sensor.psi & 5.0)
rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)
rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
sensorRDD经过Put对象转换,然后写入到Hbase。
开始接收数据
通过streamingContext.start()显式的启动数据接收,然后调用streamingContext.awaitTermination()来等待计算完成。
ssc.start()
ssc.awaitTermination()
Spark读写Hbase
现在开始读取Hbase的sensor表,计算每条的统计指标并把对应的数据写入stats列簇。
下面的代码读取Hbase的sensor表psi列数据,用计算统计数据,然后写入stats列簇。
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi")
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val resultRDD = hBaseRDD.map(tuple =& tuple._2)
val keyValueRDD = resultRDD.
map(result =& (Bytes.toString(result.getRow()).
split(" ")(0), Bytes.toDouble(result.value)))
val keyStatsRDD = keyValueRDD.
groupByKey().
mapValues(list =& StatCounter(list))
keyStatsRDD.map { case (k, v) =& convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)
下面的流程图显示newAPIHadoopRDD输出,(row key,result)的键值对。PairRDDFunctions 的saveAsHadoopDataset方法把Put对象存入到Hbase。
运行Spark Streaming应用
运行Spark Streaming应用跟运行Spark应用类似,比较简单,此处不赘述,参见。用实例讲解Spark Sreaming - 推酷
用实例讲解Spark Sreaming
作者: 侠天
本篇文章用Spark Streaming +Hbase为列,Spark Streaming专为流式数据处理,对Spark核心API进行了相应的扩展。什么是Spark Streaming?首先,什么是流式处理呢?数据流是一个数据持续不断到达的无边界序列集。流式处理是把连续不断的数据输入分割成单元数据块来处理。流式处理是一个低延迟的处理和流式数据分析。Spark Streaming对Spark核心API进行了相应的扩展,支持高吞吐、低延迟、可扩展的流式数据处理。实时数据处理应用的场景有下面几个:
网站监控和网络监控;
物联网(IOT)
Spark Streaming
支持的数据源包括HDFS文件,TCP socket,Kafka,Flume,Twitter等,数据流可以通过Spark核心API、DataFrame SQL或者机器学习API处理,并可以持久化到本地文件、HDFS、数据库或者其它任意支持Hadoop输出格式的形式。Spark Streaming如何工作?Spark Streaming以X秒(batch size)为时间间隔把数据流分割成Dstream,组成一个RDD序列。你的Spark应用处理RDD,并把处理的结果批量返回。
Spark Streaming例子的架构图
Spark Streaming例子代码分下面几部分:
- 读取流式数据;
- 处理流式数据;
- 写处理结果倒Hbase表。
Spark处理部分的代码涉及到如下内容:
读取Hbase表的数据;
按天计算数据统计;
写统计结果到Hbase表,列簇:stats。
数据集数据集来自油泵信号数据,以CSV格式存储在指定目录下。Spark Streaming监控此目录,CSV文件的格式如上图。
采用Scala的case class来定义数据表结构,parseSensor函数解析逗号分隔的数据。
Hbase表结构
流式处理的Hbase表结构如下:
油泵名字 + 日期 + 时间戳 组合成
列簇是由输入数据列、报警数据列等组成,并设置过期时间。
每天等统计数据表结构如下:
油泵名和日期组成
列簇为stats,包含列有最大值、最小值和平均值;
配置写入Hbase表
Spark直接用TableOutputFormat类写数据到Hbase里,跟在MapReduce中写数据到Hbase表一样,下面就直接用TableOutputFormat类了。Spark Streaming代码
Spark Streaming的基本步骤:
初始化Spark StreamingContext对象;
在DStream上进行transformation操作和输出操作;
开始接收数据并用streamingContext.start();
等待处理停止,streamingContext.awaitTermination()。
初始化Spark StreamingContext
对象创建 StreamingContext对象,StreamingContext是Spark Streaming处理的入口,这里设置2秒的时间间隔。
接下来用StreamingContext的textFileStream(directory)创建输入流跟踪Hadoop文件系统的新文件,并处理此目录下的所有文件,这里directory指文件目录。
linesDStream是数据流,每条记录是按行记录的text格式。
图6对DStream进行transformation操作和输出操作接下来进行解析,对linesDStream进行map操作,map操作是对RDD应用Sensor.parseSensor函数,返回Sensor的RDD。
对DStream的每个RDD执行foreachRDD 方法,使用filter过滤Sensor中低psi值来创建报警,使用Hbase的Put对象转换sensor和alter数据以便能写入到Hbase。然后使用PairRDDFunctions的saveAsHadoopDataset方法将最终结果写入到任何Hadoop兼容到存储系统。
sensorRDD经过Put对象转换,然后写入到Hbase。
开始接收数据通过streamingContext.start()显式的启动数据接收,然后调用streamingContext.awaitTermination()来等待计算完成。
Spark读写Hbase
现在开始读取Hbase的sensor表,计算每条的统计指标并把对应的数据写入stats列簇。
下面的代码读取Hbase的sensor表psi列数据,用StatCounter计算统计数据,然后写入stats列簇。
下面的流程图显示newAPIHadoopRDD输出,(row key,result)的键值对。PairRDDFunctions 的saveAsHadoopDataset方法把Put对象存入到Hbase。
运行Spark Streaming应用运行Spark Streaming应用跟运行Spark应用类似,比较简单,此处不赘述,参见Spark Streaming官方文档。
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致用实例讲解Spark Sreaming_itency知识共享
用实例讲解Spark Sreaming
浏览量:11708
原文链接:http://www.infoq.com/cn/articles/spark-sreaming-practice
本篇文章用Spark Streaming +Hbase为列,Spark Streaming专为流式数据处理,对Spark核心API进行了相应的扩展。
什么是Spark Streaming?
首先,什么是流式处理呢?数据流是一个数据持续不断到达的无边界序列集。流式处理是把连续不断的数据输入分割成单元数据块来处理。流式处理是一个低延迟的处理和流式数据分析。Spark Streaming对Spark核心API进行了相应的扩展,支持高吞吐、低延迟、可扩展的流式数据处理。实时数据处理应用的场景有下面几个:
网站监控和网络监控;
异常监测;
网页点击;
广告数据;
物联网(IOT)
Spark Streaming支持的数据源包括HDFS文件,TCP socket,Kafka,Flume,Twitter等,数据流可以通过Spark核心API、DataFrame SQL或者机器学习API处理,并可以持久化到本地文件、HDFS、数据库或者其它任意支持Hadoop输出格式的形式。
Spark Streaming如何工作?
Spark Streaming以X秒(batch size)为时间间隔把数据流分割成Dstream,组成一个RDD序列。你的Spark应用处理RDD,并把处理的结果批量返回。
Spark Streaming例子的架构图
Spark Streaming例子代码分下面几部分:
- 读取流式数据;
- 处理流式数据;
- 写处理结果倒Hbase表。
Spark处理部分的代码涉及到如下内容:
读取Hbase表的数据;
按天计算数据统计;
写统计结果到Hbase表,列簇:stats。
数据集来自油泵信号数据,以CSV格式存储在指定目录下。Spark Streaming监控此目录,CSV文件的格式如图3。
采用Scala的case class来定义数据表结构,parseSensor函数解析逗号分隔的数据。
Hbase表结构
流式处理的Hbase表结构如下:
油泵名字 + 日期 + 时间戳 组合成row key;
列簇是由输入数据列、报警数据列等组成,并设置过期时间。
每天等统计数据表结构如下:
油泵名和日期组成row key;
列簇为stats,包含列有最大值、最小值和平均值;
配置写入Hbase表
Spark直接用写数据到Hbase里,跟在MapReduce中写数据到Hbase表一样,下面就直接用TableOutputFormat类了。
Spark Streaming代码
Spark Streaming的基本步骤:
初始化Spark StreamingContext对象;
在DStream上进行transformation操作和输出操作;
开始接收数据并用streamingContext.start();
等待处理停止,streamingContext.awaitTermination()。
初始化Spark StreamingContext对象
创建 对象,StreamingContext是Spark
Streaming处理的入口,这里设置2秒的时间间隔。
val sparkConf = new SparkConf().setAppName("HBaseStream")
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))
接下来用StreamingContext的textFileStream(directory)创建输入流跟踪Hadoop文件系统的新文件,并处理此目录下的所有文件,这里directory指文件目录。
val linesDStream = ssc.textFileStream("/user/user01/stream")
linesDStream是数据流,每条记录是按行记录的text格式。
对DStream进行transformation操作和输出操作
接下来进行解析,对linesDStream进行map操作,map操作是对RDD应用Sensor.parseSensor函数,返回Sensor的RDD。
val sensorDStream = linesDStream.map(Sensor.parseSensor)
对DStream的每个RDD执行 方法,使用filter过滤Sensor中低psi值来创建报警,使用Hbase的Put对象转换sensor和alter数据以便能写入到Hbase。然后使用PairRDDFunctions的方法将最终结果写入到任何Hadoop兼容到存储系统。
sensorRDD.foreachRDD { rdd =&
val alertRDD = rdd.filter(sensor =& sensor.psi & 5.0)
rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)
rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
sensorRDD经过Put对象转换,然后写入到Hbase。
开始接收数据
通过streamingContext.start()显式的启动数据接收,然后调用streamingContext.awaitTermination()来等待计算完成。
ssc.start()
ssc.awaitTermination()
Spark读写Hbase
现在开始读取Hbase的sensor表,计算每条的统计指标并把对应的数据写入stats列簇。
下面的代码读取Hbase的sensor表psi列数据,用计算统计数据,然后写入stats列簇。
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi")
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val resultRDD = hBaseRDD.map(tuple =& tuple._2)
val keyValueRDD = resultRDD.
map(result =& (Bytes.toString(result.getRow()).
split(" ")(0), Bytes.toDouble(result.value)))
val keyStatsRDD = keyValueRDD.
groupByKey().
mapValues(list =& StatCounter(list))
keyStatsRDD.map { case (k, v) =& convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)
下面的流程图显示newAPIHadoopRDD输出,(row key,result)的键值对。PairRDDFunctions 的saveAsHadoopDataset方法把Put对象存入到Hbase。
运行Spark Streaming应用
运行Spark Streaming应用跟运行Spark应用类似,比较简单,此处不赘述,参见。
转载自:http://blog.csdn.net/u/article/details/
其实Docker的跨平台性到现在也一直没有做太好,包括在Mac上目前最普遍的方式还是通过boot2docker这样一个类似虚拟机的东西...这里我们就来看一下MacOS上通过boot2docker安装使用Docker的教程
这篇文章主要介绍了美团的以Python为主导的云计算发展战略,美团对于Python的Django和Tornado框架的应用着实令人眼前一亮,是为国内Python技术的一大主要推动力量,需要的朋友可以参考下
这篇文章主要介绍了搜狐云发展中DomeOS的开发与Docker的应用,DomeOS是搜狐自助研发的企业级业务编排运维管理系统,需要的朋友可以参考下
网易蜂巢提供对Docker容器的警报和性能监控服务,通过图形化面板操作十分简洁,这里前提假设服务器端已经架设在容器中,那么接下来就让我们来看网易蜂巢的容器运维管理服务使用指南
这篇文章主要介绍了Centos6.4安装erlang&rabbitmq的方法,需要的朋友可以参考下
一些朋友问小编Centos7安装Rclone如何用命令同步国外网盘文件?今天小编就来教教大家!希望能够帮助到大家!有需要的朋友一起去看看吧
今天小编要为大家带来的是centos7使用docker部署gitlab-ce-zh应用详解!希望对大家会有帮助!有需要的朋友一起去看看吧
下面小编就为大家分享一篇利用Service Fabric承载eShop On Containers的实现方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
微软为各个Windows10版本推送了累积更新,对于已经安装了Windows10周年更新系统的Insiders用户来说,微软今天为其准备了Build更新,下面小编位大家带来Windows10周年更新版核心修改更新内容汇总
本文主要介绍了如何使用Docker构建PHP的开发环境,文中作者也探讨了构建基于Docker的开发环境应该使用单容器还是多容器,各有什么利弊。推荐PHP开发者阅读。}

我要回帖

更多关于 简单c语言程序例子 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信