二维码

kafka对数据序列化和反序列化 - 数据结构 - 机器学习

1365 人阅读 | 时间:2021年01月15日 01:22
kafka对数据序列化和反序列化 - 数据结构 - 机器学习 #daohang ul li t,.reed .riqi,a.shangg,a.xiatt,a.shangg:hover,a.xiatt:hover,a.shang,a.xiat,a.shang:hover,a.xiat:hover,.reed-pinglun-anniu,span.now-page,#daohangs-around,#caidan-tubiao,#daohangs,#daohangs li,#btnPost{background-color:#D10B04;} .dinglanyou1 h3{border-bottom:3px solid #D10B04;} #dibuer{border-top:2px solid #D10B04;}.cebianlan .rongqi h3{border-bottom:1px solid #D10B04;} #edtSearch{border:1px solid #D10B04;} #daohang .zuo ul li{border-right:1px solid #;} #daohang ul li t a{border-top:1px solid #;border-right:1px solid #D10B04;} #daohang ul li t a:hover{border-right:1px solid #;} #daohang .you ul li a:hover,#daohang .zuo ul li a:hover,.reed-pinglun-anniu:hover{background-color:#;} a:hover,.reed h6 a:hover,#dibuer a:hover,.reed .riqiding,.cebianlan .rongqi li a:hover,#pinglun-liebiao ul.fubens li.depth-1 dl dd span.shu a,#pinglun-liebiao ul.fubens li.depth-1 dl dd span.huifuliuyan a:hover,.reed-biaoti h6 span{color:#D10B04;} .reed .kan a{color:#0A0AF5;}.reed .kan a:hover{color:#D10101;} @media screen and (max-width:1492px){a.shang,a.xiat{background:none;} a.xiat:hover,a.shang:hover{background-color:#f9f9f9;background-image:none;text-decoration:none;}} var _hmt = _hmt || [];(function() { var hm = document.createElement("script"); hm.src = "https://hm.baidu.com/hm.js?b19db5ba3b437a9e8698d2bc8fc64334"; var s = document.getElementsByTagName("script")[0]; s.parentNode.insertBefore(hm, s);})(); var _hmt = _hmt || []; (function() { var hm = document.createElement("script"); hm.src = "https://hm.baidu.com/hm.js?b19db5ba3b437a9e8698d2bc8fc64334"; var s = document.getElementsByTagName("script")[0]; s.parentNode.insertBefore(hm, s); })(); var _hmt = _hmt || []; (function() { var hm = document.createElement("script"); hm.src = "https://hm.baidu.com/hm.js?2d748c9763cfc72fb7d1ccab29f0770d"; var s = document.getElementsByTagName("script")[0]; s.parentNode.insertBefore(hm, s); })(); var _hmt = _hmt || []; (function() { var hm = document.createElement("script"); hm.src = "https://hm.baidu.com/hm.js?f6d451f3f1be23f3abf240c64c469c1b"; var s = document.getElementsByTagName("script")[0]; s.parentNode.insertBefore(hm, s); })();

当前位置:首页 » 大数据精品文章 » 正文

(function() { var s = "_" + Math.random().toString(36).slice(2); document.write('
'); (window.slotbydup = window.slotbydup || []).push({ id: "u3646201", container: s }); })();
(function() { var s = "_" + Math.random().toString(36).slice(2); document.write('
'); (window.slotbydup = window.slotbydup || []).push({ id: "u3646162", container: s }); })();

kafka对数据序列化和反序列化

1683 人参与  2019年03月06日 17:51  分类 : 大数据精品文章  评论

pom.xml中内容如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.lenovo</groupId>
  <artifactId>random</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>random</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <kafka.version>0.10.0.1</kafka.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-ipc</artifactId>
        <version>1.7.3</version>
    </dependency>
  </dependencies>
  <build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <testSourceDirectory>src/test/java</testSourceDirectory>
    <plugins>
      <!--
        Bind the maven-assembly-plugin to the package phase
        this will create a jar file without the storm dependencies
        suitable for deployment to a cluster.
       -->
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass></mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
    </plugin>  
    </plugins>
  </build> 
</project>

KafkaConsumerObject.java类内容如下:

package com.lenovo.random;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumerObject {
    public static void main(String[] args) {
        String topic = "gaofeng101"; // 定义要操作的主题   
        Properties pro = new Properties(); // 定义相应的属性保存 
        pro.setProperty("zookeeper.connect", "127.0.0.1:2181"); //这里根据实际情况填写你的zk连接地址
        pro.setProperty("metadata.broker.list", "127.0.0.1:9092"); //根据自己的配置填写连接地址
        //pro.setProperty("group.id", "group1");  
        pro.setProperty("group.id", "test-consumer-groups01");
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(pro));   // 需要定义一个主题的映射的存储集合 
        Map<String,Integer> topicMap = new HashMap<String,Integer>() ;   
        topicMap.put(topic, 1) ; // 设置要读取数据的主题 
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicMap) ;   // 现在只有一个主题,所以此处只接收第一个主题的数据即可   
//        System.err.println(messageStreams.size());
//        System.out.println(messageStreams);
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0) ; // 第一个主题  
        ConsumerIterator<byte[], byte[]> iter = stream.iterator(); 
        int i=0;
        while(iter.hasNext()) { 
        	i++;
          // MsgEntity me = (MsgEntity)BeanUtils.BytesToObject(iter.next().message());   //接收消息,并将字节数组转换为对象
        	Member me = (Member)BeanUtils.BytesToObject(iter.next().message());   //接收消息,并将字节数组转换为对象
        	System.out.println("........................................................");
        	System.out.println("Consumer接收到的消息如下:");
            me.display();
            System.out.println("Consumer接收第"+i+"个消息!");
           
        } 
    }
}

KafkaProducerObject.java类内容如下:

package com.lenovo.random;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.net.Socket;
@SuppressWarnings("deprecation")
public class KafkaProducerObject {
    public static void main(String[] args) throws UnknownHostException, IOException {
        String topic = "gaofeng101"; // 定义要操作的主题 
        Properties pro = new Properties(); // 定义相应的属性保存 
        pro.setProperty("zookeeper.connect", "127.0.0.1:2181"); //这里根据实际情况填写你的zk连接地址
        pro.setProperty("metadata.broker.list", "127.0.0.1:9092"); //根据自己的配置填写连接地址
        pro.setProperty("serializer.class", ObjectEncoder.class.getName()); //填写刚刚自定义的Encoder类
        Producer<Integer, Object> prod = new Producer<Integer, Object>(new ProducerConfig(pro)); 
        
        char []buf= {0x1d,0x1c,0x1b,0x1a};
        //创建Socket对象,连接服务器
        Socket socket=new Socket("127.0.0.1",8888);
        //通过客户端的套接字对象Socket方法,获取字节输出流,将数据写向服务器
        OutputStream out=socket.getOutputStream();
        PrintWriter pw = new PrintWriter(out);
        pw.write(buf);
        pw.flush();
        //读取服务器发回的数据,使用socket套接字对象中的字节输入流
        InputStream in=socket.getInputStream();
        byte[] data=new byte[1024];
        
       for(int i=1;i<=10000;i++) {
       int len=in.read(data);
       if(len<=0) {
       break;
       }
       System.out.println("len2 is :"+len);
       //MsgEntity me = new MsgEntity();
        //me.DataParse(data, len);
        Member mb = new Member();
        mb.DataParse(data, len);
        prod.send(new KeyedMessage<Integer, Object>(topic, mb));  //测试发送对象数据
         System.out.println("Producer接收到第"+i+"个消息!");
          try {
                   TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
       }
       in.close();
       pw.close();
       out.close();
       socket.close();
       System.out.println("发送完毕!");
    }
}

Member.java类内容如下:

package com.lenovo.random;

import java.io.Serializable;
import java.util.Date;

@SuppressWarnings("serial")
public class Member implements Serializable{
	
    private int protocol;
   	DATATIME dt = new DATATIME();
   	
   	private int len;
   	DATAVAL[] dataval;

   	public Member() {
   		dataval = new DATAVAL[6];
   		for (int i = 0; i < dataval.length; i++) {
   			dataval[i] = new DATAVAL();
   		}
   		System.out.println("tttttttttttttttttttttttttttttttttttttt");
   	}

   	protected static int byte2int(byte[] res) {
   		int targets = (res[0] & 0xff) | ((res[1] << 8) & 0xff00) // | 表示安位或
   				| ((res[2] << 24) >>> 8) | (res[3] << 24);
   		return targets;
   	}

   	protected static short byte2short(byte[] res) {
   		short targets = (short) ((res[0] & 0xff) | ((res[1] << 8) & 0xff00));
   		return targets;
   	}

   	public static double getDouble(byte[] b) {
   		long l;
   		l = b[0];
   		l &= 0xff;
   		l |= ((long) b[1] << 8);
   		l &= 0xffff;
   		l |= ((long) b[2] << 16);
   		l &= 0xffffff;
   		l |= ((long) b[3] << 24);
   		l &= 0xffffffffl;
   		l |= ((long) b[4] << 32);
   		l &= 0xffffffffffl;
   		l |= ((long) b[5] << 40);
   		l &= 0xffffffffffffl;
   		l |= ((long) b[6] << 48);
   		l &= 0xffffffffffffffl;
   		l |= ((long) b[7] << 56);
   		return Double.longBitsToDouble(l);
   	}

   	public boolean DataParse(byte[] data, int len) {
   		int tmp_protocol = 0x1a1b1c1d;
   		byte[] byte_protocol = new byte[4];
   		byte[] byte_time = new byte[8];
   		byte[] byte_time_mill = new byte[2];
   		byte[] byte_data_len = new byte[4];
   		byte[] byte_dataval = new byte[20];

   		byte[] byte_num_sensor = new byte[4];
   		byte[] byte_num_time = new byte[4];
   		byte[] byte_num_dest = new byte[4];
   		byte[] byte_val = new byte[8];

   		System.arraycopy(data, 0, byte_protocol, 0, 4);
   		if (byte2int(byte_protocol) == tmp_protocol) {
   			System.out.println("protocol validation successful!");
   			this.protocol = byte2int(byte_protocol);
   		} else {
   			System.out.println("protocol validation failed!");
   			return false;
   		}

   		System.arraycopy(data, 12, byte_data_len, 0, 4);
   		if (byte2int(byte_data_len) + 16 == len) {
   			System.out.println("the length of received data is correct!");
   			this.len = byte2int(byte_data_len);
   		} else {
   			System.out.println("the length of received data is incorrect!");
   			return false;
   		}

   		System.arraycopy(data, 4, byte_time, 0, 8);
   		for (int i = 0; i < 6; i++) {
   			System.out.print(byte_time[i] + ",");
   		}
   		System.arraycopy(byte_time, 6, byte_time_mill, 0, 2);
   		System.out.println(byte2short(byte_time_mill));
   		dt.wYear = byte_time[0];
   		dt.wMonth = byte_time[1];
   		dt.wDay = byte_time[2];
   		dt.wHour = byte_time[3];
   		dt.wMinute = byte_time[4];
   		dt.wSecond = byte_time[5];
   		dt.wMilliseconds = byte2short(byte_time_mill);

   		for (int i = 0; i < 6; i++) {
   			System.arraycopy(data, 16 + i * 20, byte_num_sensor, 0, 4);
   			System.arraycopy(data, 16 + i * 20 + 4, byte_num_time, 0, 4);
   			System.arraycopy(data, 16 + i * 20 + 8, byte_num_dest, 0, 4);
   			System.arraycopy(data, 16 + i * 20 + 12, byte_val, 0, 8);
   			dataval[i].num_sensor = byte2int(byte_num_sensor);
   			dataval[i].num_time = byte2int(byte_num_time);
   			dataval[i].num_dest = byte2int(byte_num_dest);
   			dataval[i].val = getDouble(byte_val);
   			
   			System.out.println("dataval[i].num_sensor is :" + dataval[i].num_sensor);
   			System.out.println("dataval[i].num_time is :" + dataval[i].num_time);
   			System.out.println("dataval[i].num_dest is :" + dataval[i].num_dest);
   			System.out.println("dataval[i].val is :" + dataval[i].val);
   			System.out.println("............................................");
   		}

   		return true;
   	}
   	
   	public void display() {
   		System.out.println("this.len is :"+this.len);
   		System.out.println("dt.wYear is :" + dt.wYear);
   		System.out.println("dt.wMonth is :" + dt.wMonth);
   		System.out.println("dt.wDay is :" + dt.wDay);
   		System.out.println("dt.wHour is :" + dt.wHour);
   		System.out.println("dt.wMinute is :" + dt.wMinute);
   		System.out.println("dt.wSecond is :" + dt.wSecond);
   		System.out.println("dt.wMilliseconds is :" + dt.wMilliseconds);
   		
   		for(int i=0;i<6;i++) {
   			System.out.println("dataval[i].num_sensor is :" + dataval[i].num_sensor);
   			System.out.println("dataval[i].num_time is :" + dataval[i].num_time);
   			System.out.println("dataval[i].num_dest is :" + dataval[i].num_dest);
   			System.out.println("dataval[i].val is :" + dataval[i].val);
   			System.out.println("............................................");
   		}
   	}
    
    @Override
    public String toString() {
        //return "Member [name=" + name + ", age=" + age + ", birthday=" + birthday + ", soruce=" + soruce + "]";
    	return "11 Member ";
    }

}

DATAVAL.java类中内容如下:

package com.lenovo.random;

import java.io.Serializable;

public class DATAVAL implements Serializable{
	int num_sensor;// 传感器序号
	int num_time; // 时间序号
	int num_dest; // 目的端序号
	double val; // 数值
}

DATATIME.java中内容如下:

package com.lenovo.random;

import java.io.Serializable;

public class DATATIME implements Serializable{
	int wYear;
	int wMonth;
	int wDay;
	int wHour;
	int wMinute;
	int wSecond;
	int wMilliseconds;
}

BeanUtils.java类中内容如下:

package com.lenovo.random;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class BeanUtils {
    private BeanUtils(){}
    /**
     * 对象转字节数组
     * @param obj
     * @return
     */
    public static byte[] ObjectToBytes(Object obj){
        byte[] bytes = null;
        ByteArrayOutputStream bo = null;
        ObjectOutputStream oo = null;
        try {
            bo = new ByteArrayOutputStream();
            oo = new ObjectOutputStream(bo);
            oo.writeObject(obj);
            bytes = bo.toByteArray();

        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                if(bo!=null){
                    bo.close();
                }
                if(oo!=null){
                    oo.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return bytes;
    }
    /**
     * 字节数组转对象
     * @param bytes
     * @return
     */
    public static Object BytesToObject(byte[] bytes){
        Object obj = null;
        ByteArrayInputStream bi = null;
        ObjectInputStream oi = null;
        try {
            bi =new ByteArrayInputStream(bytes);
            oi =new ObjectInputStream(bi);
            obj = oi.readObject();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                if(bi!=null){
                    bi.close();
                }
                if(oi!=null){
                    oi.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return obj;
    }
}



来源:我是码农,转载请保留出处和链接!

本文链接:http://www.54manong.com/?id=1227

(function() { var s = "_" + Math.random().toString(36).slice(2); document.write('
'); (window.slotbydup = window.slotbydup || []).push({ id: "u3646208", container: s }); })();
(function() { var s = "_" + Math.random().toString(36).slice(2); document.write('
'); (window.slotbydup = window.slotbydup || []).push({ id: "u3646147", container: s }); })();
window._bd_share_config={"common":{"bdSnsKey":{},"bdText":"","bdMini":"2","bdPic":"","bdStyle":"0","bdSize":"16"},"share":{},"image":{"viewList":["qzone","tsina","tqq","renren","weixin"],"viewText":"分享到:","viewSize":"16"},"selectShare":{"bdContainerClass":null,"bdSelectMiniList":["qzone","tsina","tqq","renren","weixin"]}};with(document)0[(getElementsByTagName('head')[0]||body).appendChild(createElement('script')).src='http://bdimg.share.baidu.com/static/api/js/share.js?v=89860593.js?cdnversion='+~(-new Date()/36e5)];
大数据技术与应用  

微信号:qq444848023    QQ号:444848023

加入【我是码农】QQ群:864689844(加群验证:我是码农)

<< 上一篇 下一篇 >>
(function() { var s = "_" + Math.random().toString(36).slice(2); document.write('
'); (window.slotbydup = window.slotbydup || []).push({ id: "u3646186", container: s }); })();
(function() { var s = "_" + Math.random().toString(36).slice(2); document.write('
'); (window.slotbydup = window.slotbydup || []).push({ id: "u3646175", container: s }); })();
搜索

网站分类

标签列表

最近发表

    (function(){ var bp = document.createElement('script'); var curProtocol = window.location.protocol.split(':')[0]; if (curProtocol === 'https'){ bp.src = 'https://zz.bdstatic.com/linksubmit/push.js'; } else{ bp.src = 'http://push.zhanzhang.baidu.com/push.js'; } var s = document.getElementsByTagName("script")[0]; s.parentNode.insertBefore(bp, s); })();

全站首页 | 数据结构 | 区块链| 大数据 | 机器学习 | 物联网和云计算 | 面试笔试

var cnzz_protocol = (("https:" == document.location.protocol) ? "https://" : "http://");document.write(unescape("%3Cspan id='cnzz_stat_icon_1276413723'%3E%3C/span%3E%3Cscript src='" + cnzz_protocol + "s23.cnzz.com/z_stat.php%3Fid%3D1276413723%26show%3Dpic1' type='text/javascript'%3E%3C/script%3E"));本站资源大部分来自互联网,版权归原作者所有!

jQuery(document).ready(function($){ /* prepend menu icon */ $('#daohangs-around').prepend('
'); /* toggle nav */ $("#caidan-tubiao").on("click", function(){ $("#daohangs").slideToggle(); $(this).toggleClass("active"); }); });

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

©著作权归作者所有:来自ZhiKuGroup博客作者没文化的原创作品,如需转载,请注明出处,否则将追究法律责任 来源:ZhiKuGroup博客,欢迎分享。

评论专区
  • 昵 称必填
  • 邮 箱选填
  • 网 址选填
◎已有 0 人评论
搜索
作者介绍
30天热门
×
×
关闭广告
关闭广告
本站会员尊享VIP特权,现在就加入我们吧!登录注册×
»
会员登录
新用户注册
×
会员注册
已有账号登录
×