好文档 - 专业文书写作范文服务资料分享网站

兄弟连区块链教程Fabric1.0源代码分析consenter(共识插件)

天下 分享 时间: 加入收藏 我要投稿 点赞

兄弟连区块链教程Fabric1.0源代码分析consenter(共识插件)

兄弟连区块链教程Fabric1.0源代码分析consenter(共识插件),2018年下半年,区块链行业正逐渐褪去发展之初的浮躁、回归理性,表面上看相关人才需求与身价似乎正在回落。但事实上,正是初期泡沫的渐退,让人们更多的关注点放在了区块链真正的技术之上。 # Fabric 1.0源代码笔记 之 consenter(共识插件)

## 1、consenter概述

consenter,即共识插件,负责接受交易信息进行排序,以及对交易进行切割并打包,打包后返回批量交易。

Orderer包含三种共识插件:

* solo,单节点的排序功能,用于试验。

* kafka,基于kafka集群实现的排序,可用于生产环境。 * SBFT,支持拜占庭容错的排序实现,尚未完成开发。

consenter代码分布在orderer/multichain、orderer/solo、orderer/kafka、orderer/common/blockcutter、orderer/common/filter目录下。目录结构如下:

* orderer/multichain目录:

* chainsupport.go,Consenter和Chain接口定义。 * orderer/solo目录,solo版本共识插件。 * orderer/kafka目录,kafka版本共识插件。

* orderer/common/blockcutter目录,block cutter相关实现,即Receiver接口定义及实现。

* orderer/common/filter目录,过滤器相关实现。

## 2、Consenter和Chain接口定义

```go

type Consenter interface { //共识插件接口 //获取共识插件对应的Chain实例

HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error) }

type Chain interface {

//接受消息

Enqueue(env *cb.Envelope) bool Errored() <-chan struct{} Start() //开始 Halt() //挂起 }

//代码在orderer/multichain/chainsupport.go

1 / 12

```

## 3、solo版本共识插件

### 3.1、Consenter接口实现

```go

type consenter struct{}

//构造consenter

func New() multichain.Consenter { return &consenter{} }

//获取solo共识插件对应的Chain实例

func (solo *consenter) HandleChain(support multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error) { return newChain(support), nil }

//代码在orderer/solo/consensus.go ```

### 3.2、Chain接口实现

```go

type chain struct {

support multichain.ConsenterSupport sendChan chan *cb.Envelope //交易数据通道 exitChan chan struct{} //退出信号 }

//构造chain

func newChain(support multichain.ConsenterSupport) *chain //go ch.main()

func (ch *chain) Start() //关闭通道,close(ch.exitChan) func (ch *chain) Halt()

//Envelope写入通道ch.sendChan

func (ch *chain) Enqueue(env *cb.Envelope) bool //获取ch.exitChan

func (ch *chain) Errored() <-chan struct{} //goroutine

func (ch *chain) main()

2 / 12

//代码在orderer/solo/consensus.go ```

### 3.3、main()实现

```go

func (ch *chain) main() {

var timer <-chan time.Time //超时通道

for {

select {

case msg := <-ch.sendChan: //接受交易消息 batches, committers, ok, _ := ch.support.BlockCutter().Ordered(msg)

if ok && len(batches) == 0 && timer == nil { timer =

time.After(ch.support.SharedConfig().BatchTimeout()) continue }

for i, batch := range batches {

block := ch.support.CreateNextBlock(batch) //每个批处理创建一个块

ch.support.WriteBlock(block, committers[i], nil) //写入块 }

if len(batches) > 0 { timer = nil }

case <-timer:

//clear the timer timer = nil

batch, committers := ch.support.BlockCutter().Cut() if len(batch) == 0 {

logger.Warningf(\requests, this might indicate a bug\ continue }

logger.Debugf(\ block := ch.support.CreateNextBlock(batch) ch.support.WriteBlock(block, committers, nil) case <-ch.exitChan: //退出信号 logger.Debugf(\ return

3 / 12

} } }

//代码在orderer/solo/consensus.go ```

## 4、kafka版本共识插件

### 4.1、Consenter接口实现

```go

type consenterImpl struct {

brokerConfigVal *sarama.Config tlsConfigVal localconfig.TLS retryOptionsVal localconfig.Retry kafkaVersionVal sarama.KafkaVersion }

//构造consenterImpl

func New(tlsConfig localconfig.TLS, retryOptions localconfig.Retry, kafkaVersion sarama.KafkaVersion) multichain.Consenter

//构造chainImpl

func (consenter *consenterImpl) HandleChain(support

multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error)

func (consenter *consenterImpl) brokerConfig() *sarama.Config func (consenter *consenterImpl) retryOptions() localconfig.Retry //代码在orderer/kafka/consenter.go ```

### 4.2、Chain接口实现

```go

type chainImpl struct {

consenter commonConsenter

support multichain.ConsenterSupport

channel channel lastOffsetPersisted int64 lastCutBlockNumber uint64

producer sarama.SyncProducer parentConsumer sarama.Consumer

channelConsumer sarama.PartitionConsumer

4 / 12

errorChan chan struct{} haltChan chan struct{} startChan chan struct{} }

//构造chainImpl

func newChain(consenter commonConsenter, support

multichain.ConsenterSupport, lastOffsetPersisted int64) (*chainImpl, error)

//获取chain.errorChan

func (chain *chainImpl) Errored() <-chan struct{} //go startThread(chain)

func (chain *chainImpl) Start() //结束

func (chain *chainImpl) Halt()

//接收Envelope消息,序列化后发给kafka

func (chain *chainImpl) Enqueue(env *cb.Envelope) bool //goroutine,调取chain.processMessagesToBlocks() func startThread(chain *chainImpl)

//goroutine实际功能实现

func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) func (chain *chainImpl) closeKafkaObjects() []error

func getLastCutBlockNumber(blockchainHeight uint64) uint64

func getLastOffsetPersisted(metadataValue []byte, chainID string) int64 func newConnectMessage() *ab.KafkaMessage

func newRegularMessage(payload []byte) *ab.KafkaMessage

func newTimeToCutMessage(blockNumber uint64) *ab.KafkaMessage

//构造sarama.ProducerMessage

func newProducerMessage(channel channel, pld []byte) *sarama.ProducerMessage

func processConnect(channelName string) error

func processRegular(regularMessage *ab.KafkaMessageRegular, support multichain.ConsenterSupport, timer *<-chan time.Time, receivedOffset int64, lastCutBlockNumber *uint64) error

func processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, support multichain.ConsenterSupport, lastCutBlockNumber *uint64, timer *<-chan time.Time, receivedOffset int64) error

func sendConnectMessage(retryOptions localconfig.Retry, exitChan chan struct{}, producer sarama.SyncProducer, channel channel) error func sendTimeToCut(producer sarama.SyncProducer, channel channel, timeToCutBlockNumber uint64, timer *<-chan time.Time) error

5 / 12

兄弟连区块链教程Fabric1.0源代码分析consenter(共识插件)

兄弟连区块链教程Fabric1.0源代码分析consenter(共识插件)兄弟连区块链教程Fabric1.0源代码分析consenter(共识插件),2018年下半年,区块链行业正逐渐褪去发展之初的浮躁、回归理性,表面上看相关人才需求与身价似乎正在回落。但事实上,正是初期泡沫的渐退,让人们更多的关注点放在了区块链真正的技术之上。#Fabric1.0源
推荐度:
点击下载文档文档为doc格式
9c7l73cd3i570pk9t8239nplx1m5bx00ajc
领取福利

微信扫码领取福利

微信扫码分享