Conumer_demo1.java内容如下:
package com.lenovo.kafka_demo; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Properties; public class Conumer_demo1 extends Thread{ public static void main(String[] args) { Conumer_demo1 consumerThread = new Conumer_demo1(KafkaProperties.INTOPIC); consumerThread.start(); } private final KafkaConsumer<Integer, String> consumer; private final String topic; private static final Logger LOG = LoggerFactory.getLogger(Conumer_demo1.class); public Conumer_demo1(String topic) { Properties props = new Properties(); //bootstrap.servers 必要 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT0 + "," + KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT1 + "," + KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT2); //group id props.put(ConsumerConfig.GROUP_ID_CONFIG, "producer-consumer-demo1"); //是否后台自动提交offset 到kafka props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //消费者偏移自动提交到Kafka的频率(以毫秒为单位enable.auto.commit)设置为true props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //故障检测,心跳检测机制 的间隔时间,,在该值范围内,没有接收到心跳,则会删除该消费者 //并启动再平衡(rebanlance),值必须在group.min.session.timeout 和 group.max.session.timeout.ms之间 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); //key - value 的序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(props); this.topic = topic; } public void run() { System.out.println("ConsumerThread--run"); consumer.subscribe(Arrays.asList(KafkaProperties.INTOPIC));//订阅主题 // consumer.subscribe(Collections.singletonList(this.topic)); while (true) { //consumer.poll() ConsumerRecords<Integer, String> records = consumer.poll(200);//在指定的毫秒内一直等待broker返回。 for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") offset " + record.offset() + " partition " + record.partition() + ")"); } } } }
Producer_demo1.java中内容如下:
package com.lenovo.kafka_demo; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * */ public class Producer_demo1 extends Thread{ public static void main(String[] args) { //boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); boolean isAsync = true; Producer_demo1 producerThread = new Producer_demo1(KafkaProperties.INTOPIC, isAsync); producerThread.start(); } private final KafkaProducer<Integer, String> producer; private final String topic; private final Boolean isAsync; public Producer_demo1(String topic, Boolean isAsync) { Properties props = new Properties(); props.put("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT0 + "," + KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT1 + "," + KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT2); props.put("client.id", "Producer_demo1"); //开始的时候下面5个参数未设置,导致消费时取不到数据,需要注意 /* acks=0时,producer不会等待确认,直接添加到socket等待发送; acks=1时,等待leader写到local log就行; acks=all或acks=-1时,等待isr中所有副本确认 */ props.put("acks", "all"); //發送失敗重試 props.put("retries", 0); //批次发送,不会尝试大于此值的容量 props.put("batch.size", 16384); //默认设置为0, // 具体参数参考:http://kafka.apache.org/0102/documentation.html#producerconfigs props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<Integer, String>(props); //方法传进来的参数 this.topic = topic; this.isAsync = isAsync; //异步的 } //持续保持数据发送 public void run() { System.out.println("ProducerThread--run"); int messageNo = 1; //这里面应该是自己的数据逻辑处理 while (true) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); /* 查看源码可以知道第二个send其实就是调用的第一个send 但是callback为null send public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return this.send(record, (Callback)null); } */ if (isAsync) { // Send asynchronously producer.send( new ProducerRecord<Integer, String>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { producer.send(new ProducerRecord<Integer, String>(topic,messageNo,messageStr)).get(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } //messageNo; System.out.println("Sent message: (" + messageNo++ + ", " + messageStr + ")"); //休息0.5秒 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if(messageNo==5000){ //break; } } } } class DemoCallBack implements Callback { private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * A callback method the user can implement to provide asynchronous handling of request completion. This method will * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be * non-null. * * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error * occurred. * @param exception The exception thrown during processing of this record. Null if no error occurred. */ public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } }
KafkaProperties.java内容如下:
package com.lenovo.kafka_demo; public class KafkaProperties { public static final String INTOPIC = "producer_consumer_demo1"; //public static final String OUTTOPIC = "topic2"; public static final String KAFKA_SERVER_URL = "127.0.0.1"; public static final int KAFKA_SERVER_PORT0 = 9092; public static final int KAFKA_SERVER_PORT1 = 9093; public static final int KAFKA_SERVER_PORT2 = 9094; public static final int KAFKA_PRODUCER_BUFFER_SIZE = 65536; public static final int CONNECTION_TIMEOUT = 100000; public static final String CLIENT_ID = "SimpleConsumerDemoClient"; private KafkaProperties() {} }
pom.xml内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.lenovo</groupId> <artifactId>kafka_demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka_demo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <kafka.version>0.10.0.1</kafka.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> </dependencies> </project>
来源:我是码农,转载请保留出处和链接!
评论专区