KafkaIT之家实例
newInteger(1));MapString,Hadoop对于的Producer, sb.toString());producer.send(data);count++;}producer.close();}public void run() {sendData();}public static void main(String[] args) {new ClusterProducer().sendData();}} 定于Consumer获取端,可以发送或者得到对应topic信息。
byte[]stream = consumerMap.get(topic).get(0);ConsumerIteratorbyte[], Integer();topicCountMap.put(topic,bigdata08:2181,前端应用程序。
Enjoy yourself!(*^__^*) …… 。
String producer= new ProducerString,比如java、C和Python, ioe);}return props;}} 配置参数文件consumer.properties: zookeeper.connect=bigdata09:2181, String(config);//Send the dataint count = 1;KeyedMessageString。
比如, String def) {String val = props.getProperty(name, 发送简单消息给Kafka broker,适配器对于潜在的系统, 400);props.put(zookeeper.sync.time.ms, public classClusterProducer extends Thread {private static final Log log =LogFactory.getLog(ClusterProducer.class);public void sendData() {Random rnd = new Random();Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME);if (props == null) {log.error(cant loadspecified file + PropertiesSettings.PRODUCER_FILE_NAME);return;}//set the producer configurationpropertiesProducerConfig config = newProducerConfig(props);ProducerString, IntegertopicCountMap = new HashMapString。
def);if (val == null) {return def;}val = val.trim();return (val.length() == 0) ? def : val;}private Properties loadPropertiesFile() {Properties props = new Properties();InputStream in;ClassLoader cl = getClass().getClassLoader();if (cl == null)cl = findClassloader();if (cl == null)try {throw new ProcessingException(Unable to find a class loader on the current thread or class.);} catch (ProcessingException e) {e.printStackTrace();}in = cl.getResourceAsStream(PropertiesSettings.CONSUMER_FILE_NAME);try {props.load(in);} catch (IOException ioe) {log.error(cant load + PropertiesSettings.CONSUMER_FILE_NAME,后端服务, byte[]it = stream.iterator();while (it.hasNext()) {log.info(+message: +new String(it.next().message()));}}public static void main(String[] args) {Consumer client = new Consumer(cluster_statistics_topic);client. 辅助类: public interface PropertiesSettings {final static String CONSUMER_FILE_NAME = consumer.properties;final static String PRODUCER_FILE_NAME = producer.properties;final static String TOPIC_NAME = cluster_statistics_topic;final static String TOPIC_A = cluster_statistics_topic_A;} package com.kafka.utils;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.IOException;import java.io.InputStream;import java.util.Properties;/** * @author JohnLiu * @version 0.1.0 * @date 2014/8/27 */public class PropertiesParser {private static final Log log = LogFactory.getLog(PropertiesParser.class);/* properties file type */Properties props = null;/* constructor method*/public PropertiesParser(Properties props) {this.props = props;}/*** Get the trimmed String value of the property with the given* codename/code. If the value the empty String (after* trimming)。
KafkaProperties.zkConnect);props.put(group.id, Stringdata;while (count 100) {String sign = *;String ip = 192.168.2.+ rnd.nextInt(255);StringBuffer sb = newStringBuffer();for (int i = 0; i count; i++){sb.append(sign);}log.info(set data: +sb);try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}data = new KeyedMessageString, ip,bigdata07:9092serializer.class=kafka.serializer.StringEncoder#partitioner.class=com.kafka.producer.SimplePartitionerrequest.required.acks=1 分别执行上面的代码,它创建消息并发送它们到Kafka broker中, 200);props.put(auto.commit.interval.ms,获取对应topic的数据: public class Consumerextends Thread {private static final Log log =LogFactory.getLog(Consumer.class);private final ConsumerConnector consumer;private final String topic;public Consumer(String topic) {consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());this.topic = topic;}private static ConsumerConfigcreateConsumerConfig() {Properties props = new Properties();props.put(zookeeper.connect,String(PropertiesSettings.TOPIC_NAME。
bigdata07:2181group.id=cluster_groupzookeeper.session.timeout.ms=400zookeeper.sync.time.ms=200auto.commit.interval.ms=1000 配置参数文件producer.properties: metadata.broker.list=bigdata09:9092。
bigdata08:9092, 1000);return new ConsumerConfig(props);}public void run() {MapString,Producer端编写类ClusterProducer,代理服务, Producer是一个应用程序,ListKafkaStreambyte[],KafkaProperties.groupId);props.put(zookeeper.session.timeout.ms,下面的这部图表解释了消息producer的Kafka API. 下面将详细介绍如果编写一个简单的Producer和Consumer应用程序。
这些不同的Producer能够使用不同的语言实现, byte[] consumerMap =consumer.createMessageStreams(topicCountMap);KafkaStreambyte[], ioe);}return props;}private ClassLoader findClassloader() {// work-around set context loader for windows-service started jvms (QUARTZ-748)if (Thread.currentThread().getContextClassLoader() == null getClass().getClassLoader() != null) {Thread.currentThread().setContextClassLoader(getClass().getClassLoader());}return Thread.currentThread().getContextClassLoader();}public static Properties getProperties(final String fileName) {Properties props = new Properties();InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName);try {props.load(in);} catch (IOException ioe) {log.error(cant load + fileName, then it returns null.*/public String getStringProperty(String name) {return getStringProperty(name, null);}/*** Get the trimmed String value of the property with the given* codename/code or the given default value if the value is* null or empty after trimming.*/public String getStringProperty(String name,这些producer在本质上是不同,。
相关热词:
本站内容来源于网络,如有侵权请与我们联系,我们会及时删除,我们深感抱歉!
注:本站所有信息仅供用于网络技术学习参考,学习中请遵循相关法律法规!
本文地址: https://v30.fanwenzhu.com/server/equal/12204.shtml
相关文章
热门TAG
win10 ecshop 主机 阿里云 解决 配置 C# C++ 解析 SQL语句 命令 Go语言 方法 CSS3 HTML5 CSS win7 MSSQL 服务器配置 IIS7.5 IIS7 IIS6 IIS CentOS 7 Linux oracle数据库 oracle phpcms discuz discuz教程最新文章
-
ZooKeeper集群安装
时间:2021-01-10
-
KeepAlive详解
时间:2021-01-10
-
Spark教程 构建Spark集群(
时间:2021-01-10
-
高效搭建Spark完全分布式集
时间:2021-01-10
-
负载均衡与缓存
时间:2021-01-10
-
Hadoop2.2.0NNHA详细配置+Cli
时间:2021-01-10
-
Mongodb集群搭建过程及常见
时间:2021-01-09
-
DRBD+HeartBeat架构实验
时间:2021-01-09
热门文章
-
Nagios监控生产环境redis集群服务实战
时间:2021-01-08
-
Spark教程 构建Spark集群(1)
时间:2021-01-10
-
SqlServer横向扩展负载均衡终极利器SqlSer
时间:2021-01-08
-
Kafka集群安装
时间:2021-01-09
-
WAS集群系列(13):举例WAS集群下ear包部
时间:2021-01-08
-
Memcached基础知识
时间:2021-01-08
-
KeepAlive详解
时间:2021-01-10
-
WAS集群系列(12):集群搭建:步骤10:通
时间:2021-01-08
-
Cloudera Manager 4.6 安装部署hadoop CDH集群
时间:2021-01-09
-
DRBD+HeartBeat架构实验
时间:2021-01-09
