兄弟连区块链教程Fabric1.0源代码分析consenter(共识插件)
兄弟连区块链教程Fabric1.0源代码分析consenter(共识插件),2018年下半年,区块链行业正逐渐褪去发展之初的浮躁、回归理性,表面上看相关人才需求与身价似乎正在回落。但事实上,正是初期泡沫的渐退,让人们更多的关注点放在了区块链真正的技术之上。 # Fabric 1.0源代码笔记 之 consenter(共识插件)
## 1、consenter概述
consenter,即共识插件,负责接受交易信息进行排序,以及对交易进行切割并打包,打包后返回批量交易。
Orderer包含三种共识插件:
* solo,单节点的排序功能,用于试验。
* kafka,基于kafka集群实现的排序,可用于生产环境。 * SBFT,支持拜占庭容错的排序实现,尚未完成开发。
consenter代码分布在orderer/multichain、orderer/solo、orderer/kafka、orderer/common/blockcutter、orderer/common/filter目录下。目录结构如下:
* orderer/multichain目录:
* chainsupport.go,Consenter和Chain接口定义。 * orderer/solo目录,solo版本共识插件。 * orderer/kafka目录,kafka版本共识插件。
* orderer/common/blockcutter目录,block cutter相关实现,即Receiver接口定义及实现。
* orderer/common/filter目录,过滤器相关实现。
## 2、Consenter和Chain接口定义
```go
type Consenter interface { //共识插件接口 //获取共识插件对应的Chain实例
HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error) }
type Chain interface {
//接受消息
Enqueue(env *cb.Envelope) bool Errored() <-chan struct{} Start() //开始 Halt() //挂起 }
//代码在orderer/multichain/chainsupport.go
1 / 12
```
## 3、solo版本共识插件
### 3.1、Consenter接口实现
```go
type consenter struct{}
//构造consenter
func New() multichain.Consenter { return &consenter{} }
//获取solo共识插件对应的Chain实例
func (solo *consenter) HandleChain(support multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error) { return newChain(support), nil }
//代码在orderer/solo/consensus.go ```
### 3.2、Chain接口实现
```go
type chain struct {
support multichain.ConsenterSupport sendChan chan *cb.Envelope //交易数据通道 exitChan chan struct{} //退出信号 }
//构造chain
func newChain(support multichain.ConsenterSupport) *chain //go ch.main()
func (ch *chain) Start() //关闭通道,close(ch.exitChan) func (ch *chain) Halt()
//Envelope写入通道ch.sendChan
func (ch *chain) Enqueue(env *cb.Envelope) bool //获取ch.exitChan
func (ch *chain) Errored() <-chan struct{} //goroutine
func (ch *chain) main()
2 / 12
//代码在orderer/solo/consensus.go ```
### 3.3、main()实现
```go
func (ch *chain) main() {
var timer <-chan time.Time //超时通道
for {
select {
case msg := <-ch.sendChan: //接受交易消息 batches, committers, ok, _ := ch.support.BlockCutter().Ordered(msg)
if ok && len(batches) == 0 && timer == nil { timer =
time.After(ch.support.SharedConfig().BatchTimeout()) continue }
for i, batch := range batches {
block := ch.support.CreateNextBlock(batch) //每个批处理创建一个块
ch.support.WriteBlock(block, committers[i], nil) //写入块 }
if len(batches) > 0 { timer = nil }
case <-timer:
//clear the timer timer = nil
batch, committers := ch.support.BlockCutter().Cut() if len(batch) == 0 {
logger.Warningf(\requests, this might indicate a bug\ continue }
logger.Debugf(\ block := ch.support.CreateNextBlock(batch) ch.support.WriteBlock(block, committers, nil) case <-ch.exitChan: //退出信号 logger.Debugf(\ return
3 / 12
} } }
//代码在orderer/solo/consensus.go ```
## 4、kafka版本共识插件
### 4.1、Consenter接口实现
```go
type consenterImpl struct {
brokerConfigVal *sarama.Config tlsConfigVal localconfig.TLS retryOptionsVal localconfig.Retry kafkaVersionVal sarama.KafkaVersion }
//构造consenterImpl
func New(tlsConfig localconfig.TLS, retryOptions localconfig.Retry, kafkaVersion sarama.KafkaVersion) multichain.Consenter
//构造chainImpl
func (consenter *consenterImpl) HandleChain(support
multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error)
func (consenter *consenterImpl) brokerConfig() *sarama.Config func (consenter *consenterImpl) retryOptions() localconfig.Retry //代码在orderer/kafka/consenter.go ```
### 4.2、Chain接口实现
```go
type chainImpl struct {
consenter commonConsenter
support multichain.ConsenterSupport
channel channel lastOffsetPersisted int64 lastCutBlockNumber uint64
producer sarama.SyncProducer parentConsumer sarama.Consumer
channelConsumer sarama.PartitionConsumer
4 / 12
errorChan chan struct{} haltChan chan struct{} startChan chan struct{} }
//构造chainImpl
func newChain(consenter commonConsenter, support
multichain.ConsenterSupport, lastOffsetPersisted int64) (*chainImpl, error)
//获取chain.errorChan
func (chain *chainImpl) Errored() <-chan struct{} //go startThread(chain)
func (chain *chainImpl) Start() //结束
func (chain *chainImpl) Halt()
//接收Envelope消息,序列化后发给kafka
func (chain *chainImpl) Enqueue(env *cb.Envelope) bool //goroutine,调取chain.processMessagesToBlocks() func startThread(chain *chainImpl)
//goroutine实际功能实现
func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) func (chain *chainImpl) closeKafkaObjects() []error
func getLastCutBlockNumber(blockchainHeight uint64) uint64
func getLastOffsetPersisted(metadataValue []byte, chainID string) int64 func newConnectMessage() *ab.KafkaMessage
func newRegularMessage(payload []byte) *ab.KafkaMessage
func newTimeToCutMessage(blockNumber uint64) *ab.KafkaMessage
//构造sarama.ProducerMessage
func newProducerMessage(channel channel, pld []byte) *sarama.ProducerMessage
func processConnect(channelName string) error
func processRegular(regularMessage *ab.KafkaMessageRegular, support multichain.ConsenterSupport, timer *<-chan time.Time, receivedOffset int64, lastCutBlockNumber *uint64) error
func processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, support multichain.ConsenterSupport, lastCutBlockNumber *uint64, timer *<-chan time.Time, receivedOffset int64) error
func sendConnectMessage(retryOptions localconfig.Retry, exitChan chan struct{}, producer sarama.SyncProducer, channel channel) error func sendTimeToCut(producer sarama.SyncProducer, channel channel, timeToCutBlockNumber uint64, timer *<-chan time.Time) error
5 / 12