kafka对数据序列化和反序列化 - 数据结构 - 机器学习
1377 人阅读 | 时间:2021年01月15日 01:22
数据结构 - 机器学习
深度学习

当前位置:首页 » 大数据精品文章 » 正文
kafka对数据序列化和反序列化
1683 人参与 2019年03月06日 17:51 分类 : 大数据精品文章 评论
pom.xml中内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.lenovo</groupId> <artifactId>random</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>random</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <kafka.version>0.10.0.1</kafka.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.3</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-ipc</artifactId> <version>1.7.3</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/test/java</testSourceDirectory> <plugins> <!-- Bind the maven-assembly-plugin to the package phase this will create a jar file without the storm dependencies suitable for deployment to a cluster. --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
KafkaConsumerObject.java类内容如下:
package com.lenovo.random; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumerObject { public static void main(String[] args) { String topic = "gaofeng101"; // 定义要操作的主题 Properties pro = new Properties(); // 定义相应的属性保存 pro.setProperty("zookeeper.connect", "127.0.0.1:2181"); //这里根据实际情况填写你的zk连接地址 pro.setProperty("metadata.broker.list", "127.0.0.1:9092"); //根据自己的配置填写连接地址 //pro.setProperty("group.id", "group1"); pro.setProperty("group.id", "test-consumer-groups01"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(pro)); // 需要定义一个主题的映射的存储集合 Map<String,Integer> topicMap = new HashMap<String,Integer>() ; topicMap.put(topic, 1) ; // 设置要读取数据的主题 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicMap) ; // 现在只有一个主题,所以此处只接收第一个主题的数据即可 // System.err.println(messageStreams.size()); // System.out.println(messageStreams); KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0) ; // 第一个主题 ConsumerIterator<byte[], byte[]> iter = stream.iterator(); int i=0; while(iter.hasNext()) { i++; // MsgEntity me = (MsgEntity)BeanUtils.BytesToObject(iter.next().message()); //接收消息,并将字节数组转换为对象 Member me = (Member)BeanUtils.BytesToObject(iter.next().message()); //接收消息,并将字节数组转换为对象 System.out.println("........................................................"); System.out.println("Consumer接收到的消息如下:"); me.display(); System.out.println("Consumer接收第"+i+"个消息!"); } } }
KafkaProducerObject.java类内容如下:
package com.lenovo.random; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; import java.util.Date; import java.util.Properties; import java.util.concurrent.TimeUnit; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.net.Socket; @SuppressWarnings("deprecation") public class KafkaProducerObject { public static void main(String[] args) throws UnknownHostException, IOException { String topic = "gaofeng101"; // 定义要操作的主题 Properties pro = new Properties(); // 定义相应的属性保存 pro.setProperty("zookeeper.connect", "127.0.0.1:2181"); //这里根据实际情况填写你的zk连接地址 pro.setProperty("metadata.broker.list", "127.0.0.1:9092"); //根据自己的配置填写连接地址 pro.setProperty("serializer.class", ObjectEncoder.class.getName()); //填写刚刚自定义的Encoder类 Producer<Integer, Object> prod = new Producer<Integer, Object>(new ProducerConfig(pro)); char []buf= {0x1d,0x1c,0x1b,0x1a}; //创建Socket对象,连接服务器 Socket socket=new Socket("127.0.0.1",8888); //通过客户端的套接字对象Socket方法,获取字节输出流,将数据写向服务器 OutputStream out=socket.getOutputStream(); PrintWriter pw = new PrintWriter(out); pw.write(buf); pw.flush(); //读取服务器发回的数据,使用socket套接字对象中的字节输入流 InputStream in=socket.getInputStream(); byte[] data=new byte[1024]; for(int i=1;i<=10000;i++) { int len=in.read(data); if(len<=0) { break; } System.out.println("len2 is :"+len); //MsgEntity me = new MsgEntity(); //me.DataParse(data, len); Member mb = new Member(); mb.DataParse(data, len); prod.send(new KeyedMessage<Integer, Object>(topic, mb)); //测试发送对象数据 System.out.println("Producer接收到第"+i+"个消息!"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } in.close(); pw.close(); out.close(); socket.close(); System.out.println("发送完毕!"); } }
Member.java类内容如下:
package com.lenovo.random; import java.io.Serializable; import java.util.Date; @SuppressWarnings("serial") public class Member implements Serializable{ private int protocol; DATATIME dt = new DATATIME(); private int len; DATAVAL[] dataval; public Member() { dataval = new DATAVAL[6]; for (int i = 0; i < dataval.length; i++) { dataval[i] = new DATAVAL(); } System.out.println("tttttttttttttttttttttttttttttttttttttt"); } protected static int byte2int(byte[] res) { int targets = (res[0] & 0xff) | ((res[1] << 8) & 0xff00) // | 表示安位或 | ((res[2] << 24) >>> 8) | (res[3] << 24); return targets; } protected static short byte2short(byte[] res) { short targets = (short) ((res[0] & 0xff) | ((res[1] << 8) & 0xff00)); return targets; } public static double getDouble(byte[] b) { long l; l = b[0]; l &= 0xff; l |= ((long) b[1] << 8); l &= 0xffff; l |= ((long) b[2] << 16); l &= 0xffffff; l |= ((long) b[3] << 24); l &= 0xffffffffl; l |= ((long) b[4] << 32); l &= 0xffffffffffl; l |= ((long) b[5] << 40); l &= 0xffffffffffffl; l |= ((long) b[6] << 48); l &= 0xffffffffffffffl; l |= ((long) b[7] << 56); return Double.longBitsToDouble(l); } public boolean DataParse(byte[] data, int len) { int tmp_protocol = 0x1a1b1c1d; byte[] byte_protocol = new byte[4]; byte[] byte_time = new byte[8]; byte[] byte_time_mill = new byte[2]; byte[] byte_data_len = new byte[4]; byte[] byte_dataval = new byte[20]; byte[] byte_num_sensor = new byte[4]; byte[] byte_num_time = new byte[4]; byte[] byte_num_dest = new byte[4]; byte[] byte_val = new byte[8]; System.arraycopy(data, 0, byte_protocol, 0, 4); if (byte2int(byte_protocol) == tmp_protocol) { System.out.println("protocol validation successful!"); this.protocol = byte2int(byte_protocol); } else { System.out.println("protocol validation failed!"); return false; } System.arraycopy(data, 12, byte_data_len, 0, 4); if (byte2int(byte_data_len) + 16 == len) { System.out.println("the length of received data is correct!"); this.len = byte2int(byte_data_len); } else { System.out.println("the length of received data is incorrect!"); return false; } System.arraycopy(data, 4, byte_time, 0, 8); for (int i = 0; i < 6; i++) { System.out.print(byte_time[i] + ","); } System.arraycopy(byte_time, 6, byte_time_mill, 0, 2); System.out.println(byte2short(byte_time_mill)); dt.wYear = byte_time[0]; dt.wMonth = byte_time[1]; dt.wDay = byte_time[2]; dt.wHour = byte_time[3]; dt.wMinute = byte_time[4]; dt.wSecond = byte_time[5]; dt.wMilliseconds = byte2short(byte_time_mill); for (int i = 0; i < 6; i++) { System.arraycopy(data, 16 + i * 20, byte_num_sensor, 0, 4); System.arraycopy(data, 16 + i * 20 + 4, byte_num_time, 0, 4); System.arraycopy(data, 16 + i * 20 + 8, byte_num_dest, 0, 4); System.arraycopy(data, 16 + i * 20 + 12, byte_val, 0, 8); dataval[i].num_sensor = byte2int(byte_num_sensor); dataval[i].num_time = byte2int(byte_num_time); dataval[i].num_dest = byte2int(byte_num_dest); dataval[i].val = getDouble(byte_val); System.out.println("dataval[i].num_sensor is :" + dataval[i].num_sensor); System.out.println("dataval[i].num_time is :" + dataval[i].num_time); System.out.println("dataval[i].num_dest is :" + dataval[i].num_dest); System.out.println("dataval[i].val is :" + dataval[i].val); System.out.println("............................................"); } return true; } public void display() { System.out.println("this.len is :"+this.len); System.out.println("dt.wYear is :" + dt.wYear); System.out.println("dt.wMonth is :" + dt.wMonth); System.out.println("dt.wDay is :" + dt.wDay); System.out.println("dt.wHour is :" + dt.wHour); System.out.println("dt.wMinute is :" + dt.wMinute); System.out.println("dt.wSecond is :" + dt.wSecond); System.out.println("dt.wMilliseconds is :" + dt.wMilliseconds); for(int i=0;i<6;i++) { System.out.println("dataval[i].num_sensor is :" + dataval[i].num_sensor); System.out.println("dataval[i].num_time is :" + dataval[i].num_time); System.out.println("dataval[i].num_dest is :" + dataval[i].num_dest); System.out.println("dataval[i].val is :" + dataval[i].val); System.out.println("............................................"); } } @Override public String toString() { //return "Member [name=" + name + ", age=" + age + ", birthday=" + birthday + ", soruce=" + soruce + "]"; return "11 Member "; } }
DATAVAL.java类中内容如下:
package com.lenovo.random; import java.io.Serializable; public class DATAVAL implements Serializable{ int num_sensor;// 传感器序号 int num_time; // 时间序号 int num_dest; // 目的端序号 double val; // 数值 }
DATATIME.java中内容如下:
package com.lenovo.random; import java.io.Serializable; public class DATATIME implements Serializable{ int wYear; int wMonth; int wDay; int wHour; int wMinute; int wSecond; int wMilliseconds; }
BeanUtils.java类中内容如下:
package com.lenovo.random; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class BeanUtils { private BeanUtils(){} /** * 对象转字节数组 * @param obj * @return */ public static byte[] ObjectToBytes(Object obj){ byte[] bytes = null; ByteArrayOutputStream bo = null; ObjectOutputStream oo = null; try { bo = new ByteArrayOutputStream(); oo = new ObjectOutputStream(bo); oo.writeObject(obj); bytes = bo.toByteArray(); } catch (IOException e) { e.printStackTrace(); }finally { try { if(bo!=null){ bo.close(); } if(oo!=null){ oo.close(); } } catch (IOException e) { e.printStackTrace(); } } return bytes; } /** * 字节数组转对象 * @param bytes * @return */ public static Object BytesToObject(byte[] bytes){ Object obj = null; ByteArrayInputStream bi = null; ObjectInputStream oi = null; try { bi =new ByteArrayInputStream(bytes); oi =new ObjectInputStream(bi); obj = oi.readObject(); } catch (Exception e) { e.printStackTrace(); }finally { try { if(bi!=null){ bi.close(); } if(oi!=null){ oi.close(); } } catch (IOException e) { e.printStackTrace(); } } return obj; } }
来源:我是码农,转载请保留出处和链接!
本文链接:http://www.54manong.com/?id=1227
(function() {
var s = "_" + Math.random().toString(36).slice(2);
document.write('');
(window.slotbydup = window.slotbydup || []).push({
id: "u3646208",
container: s
});
})();
(function() {
var s = "_" + Math.random().toString(36).slice(2);
document.write('');
(window.slotbydup = window.slotbydup || []).push({
id: "u3646147",
container: s
});
})();
微信号:qq444848023 QQ号:444848023
加入【我是码农】QQ群:864689844(加群验证:我是码农)
- 第九章 更自由,更开放,大数据的机遇和挑战2018-10-16 12:40
- 随机数生成 C语言2019-03-06 17:48
- 零售业:CMO缺失2018-10-16 12:57
- 变革不是梦想是现实2018-10-16 12:55
(function() {
var s = "_" + Math.random().toString(36).slice(2);
document.write('');
(window.slotbydup = window.slotbydup || []).push({
id: "u3646186",
container: s
});
})();
(function() {
var s = "_" + Math.random().toString(36).slice(2);
document.write('');
(window.slotbydup = window.slotbydup || []).push({
id: "u3646175",
container: s
});
})();
搜索
网站分类
- 数据结构
- 数据结构视频教程
- 数据结构练习题
- 数据结构试卷
- 数据结构习题解析
- 数据结构电子书
- 数据结构精品文章
- 区块链
- 区块链精品文章
- 区块链电子书
- 大数据
- 大数据精品文章
- 大数据电子书
- 机器学习
- 机器学习精品文章
- 机器学习电子书
- 面试笔试
- 物联网/云计算
标签列表
- 数据结构 (39)
- 数据结构电子书 (20)
- 数据结构习题解析 (8)
- 数据结构试卷 (10)
- 区块链是什么 (261)
- 数据结构视频教程 (31)
- 大数据技术与应用 (12)
- 百面机器学习 (14)
- 机器学电子书 (29)
- 大数据电子书 (37)
- 程序员面试 (10)
- RFID (21)
最近发表
- 找出数组中有3个出现一次的数字
- 《百面机器学习》电子书下载
- 区块链精品电子书《深度探索区块链:Hyperledger技术与应用_区块链技术丛书》张增骏
- 区块链精品电子书《比特币:一个虚幻而真实的金融世界》
- 区块链精品电子书《图说区块链》-徐明星 & 田颖 & 李霁月
- 区块链精品电子书《是非区块链:技术、投机与泡沫》-英国《金融时报》
- 区块链精品电子书《商业区块链:开启加密经济新时代》-威廉·穆贾雅
- 区块链精品电子书《人工智能时代,一本书读懂区块链金融 (互联网_时代企业管理实战系列)》-马兆林
-
(function(){
var bp = document.createElement('script');
var curProtocol = window.location.protocol.split(':')[0];
if (curProtocol === 'https'){
bp.src = 'https://zz.bdstatic.com/linksubmit/push.js';
}
else{
bp.src = 'http://push.zhanzhang.baidu.com/push.js';
}
var s = document.getElementsByTagName("script")[0];
s.parentNode.insertBefore(bp, s);
})();
全站首页 | 数据结构 | 区块链| 大数据 | 机器学习 | 物联网和云计算 | 面试笔试
var cnzz_protocol = (("https:" == document.location.protocol) ? "https://" : "http://");document.write(unescape("%3Cspan id='cnzz_stat_icon_1276413723'%3E%3C/span%3E%3Cscript src='" + cnzz_protocol + "s23.cnzz.com/z_stat.php%3Fid%3D1276413723%26show%3Dpic1' type='text/javascript'%3E%3C/script%3E"));本站资源大部分来自互联网,版权归原作者所有!
©著作权归作者所有:来自ZhiKuGroup博客作者没文化的原创作品,如需转载,请注明出处,否则将追究法律责任
来源:ZhiKuGroup博客,欢迎分享。
评论专区