深度学习
通过Gossip协议广播的消息种类较多,不同种类的消息有不同的处理逻辑。Gossip模块利用GO语言的通道,实现一个消息的多路分用接口ChannelDeMultiplexer:
type ChannelDeMultiplexer struct {
channels []*channel
lock *sync.RWMutex
closed int32
}
其中lock是一个读写锁,用来同步对channels的处理;closed是通道是否关闭的标识,通道关闭就不能再从通道中读取数据;channels是一个channel数组,如下所示:
type channel struct {
pred common.MessageAcceptor
ch chan interface{}
}
ch是缓存消息的通道,默认只能保存10个消息。节点接收到Gossip消息的时候,会调用 DeMultiplex接口。pred用来判断订阅者对某个消息是否感兴趣,感兴趣的消息会过滤出来存放在ch里,等待下一步的处理。pre的策略可以根 据业务逻辑来实现,函数定义如下:
type MessageAcceptor func(interface{}) bool
我们来看看Gossip模块实现的几种MessageAcceptor。
1)Gossip消息过滤器。Gossip模块接收到的消息类型有:GossipMessage和Signed GossipMessage,如果不是这两种类型的消息,会过滤出来丢弃。¤
2)存活消息过滤器。过滤出类型为GossipMessage_AliveMsg、GossipMessage_MemReq和GossipMessage_MemRes的消息。
3)状态消息过滤器。过滤出状态同步消息。
4)远程状态消息过滤器。过滤出GossipMessage_StateRequest和 GossipMessage_StateResponse的消息,这两种消息是为了状态同步而主动发送和接收的消息。如果它包含了连接验证信息,则会调用 MCS验证是否是指定通道成员发送的消息。
5)主节点选举消息过滤器。过滤出某个通道上的GossipMessage_LeadershipMsg消息。
我们来看看Gossip模块中的多路分用消息过滤器漏斗,如图4-2所示。
节点的Gossip模块会绑定gRPC端口(默认是7051端口),从其他节点或者排序服务节点上获取到的消息, 通过handler调用DeMultiplex实现消息的多路分用,消息会通过消息过滤器过滤后进行业务逻辑的处理。这里的过滤器都虚线来表示,以说明同 一层的过滤器不是互斥的,就是说同一层的过滤器输入都是相同的,经过过滤器的输出会不同。如果经过多层过滤器的过滤,消息是在上一层过滤的基础上筛选出来 的。比如“状态数据过滤器”和“状态同步消息过滤器”的输入都是“状态消息过滤器”的输出,而“状态消息过滤器”的输入是gRPC消息过滤出的 Gossip消息。
图4-2 多路分用消息过滤器漏斗
ChannelDeMultiplexer中的channels可以定义多个,同一个消息只要满足pre定义的过 滤条件,就会放到对应的ch里,所以可能不同的channel里有相同的消息。对于需要广播的消息,Gossip模块还实现了消息分区,同一个消息经过过 滤器过滤以后,只会出现在一个列表中。函数实现比较简单,代码如下:
func partitionMessages(pred common.MessageAcceptor, a
*proto. SignedGossipMessage) ([]*proto.SignedGossipMessage,
[]*proto.SignedGossipMessage) {
s1 := []*proto.SignedGossipMessage{}
s2 := []*proto.SignedGossipMessage{}
for _, m := range a {
if pred(m) {
s1 = append(s1, m)
} else {
s2 = append(s2, m)
}
}
return s1, s2
}
经过partitionMessages处理过的消息a,会分成两个切片,满足过滤器pre的放到切片s1中,不满足的放到切片s2中。
广播消息过滤器有以下几种:
·区块数据过滤器:过滤出指定通道chainID的区块数据。
·状态消息过滤器:过滤出状态同步消息。
·通道内部的消息过滤器:过滤出只在通道内部广播的消息。
·组织内部的消息过滤器:过滤出只在组织内部广播的消息。
·主节点选举消息过滤器:过滤出某个通道上的GossipMessage_LeadershipMsg消息。
图4-3所示为分区消息过滤器漏斗。
图4-3 分区消息过滤器漏斗
和消息多路分用不同,经过分区以后消息只能在一个结果集中,同一层的过滤器是互斥的。比如经过“区块数据过滤器”过滤以后,“状态消息过滤器”等就只能在其他消息中进行筛选。
来源:我是码农,转载请保留出处和链接!
本文链接:http://www.54manong.com/?id=1072
微信号:qq444848023 QQ号:444848023
加入【我是码农】QQ群:864689844(加群验证:我是码农)
全站首页 | 数据结构 | 区块链| 大数据 | 机器学习 | 物联网和云计算 | 面试笔试
var cnzz_protocol = (("https:" == document.location.protocol) ? "https://" : "http://");document.write(unescape("%3Cspan id='cnzz_stat_icon_1276413723'%3E%3C/span%3E%3Cscript src='" + cnzz_protocol + "s23.cnzz.com/z_stat.php%3Fid%3D1276413723%26show%3Dpic1' type='text/javascript'%3E%3C/script%3E"));本站资源大部分来自互联网,版权归原作者所有!
评论专区