syncer

package
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2020 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Copyright © 2019 Annchain Authors <EMAIL ADDRESS>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright © 2019 Annchain Authors <EMAIL ADDRESS>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright © 2019 Annchain Authors <EMAIL ADDRESS>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright © 2019 Annchain Authors <EMAIL ADDRESS>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright © 2019 Annchain Authors <EMAIL ADDRESS>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright © 2019 Annchain Authors <EMAIL ADDRESS>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright © 2019 Annchain Authors <EMAIL ADDRESS>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright © 2019 Annchain Authors <EMAIL ADDRESS>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const BloomFilterRate = 4 //sending 4 req
View Source
const (

	// TODO: this value will be set to optimal value in the future.
	// If generating sequencer is very fast with few transactions, it should be bigger,
	// otherwise it should be smaller
	SyncerCheckTime = time.Second * 6
)

Variables

View Source
var MaxBufferSiza = 4096 * 16

Functions

func InitLoggers

func InitLoggers(logger *logrus.Logger, logdir string)

Types

type Announcer

type Announcer struct {
	// contains filtered or unexported fields
}

func NewAnnouncer

func NewAnnouncer(messageSender MessageSender) *Announcer

func (*Announcer) BroadcastNewTx

func (m *Announcer) BroadcastNewTx(txi types.Txi)

BroadcastNewTx brodcast newly created txi message

type BloomFilterFireStatus

type BloomFilterFireStatus struct {
	// contains filtered or unexported fields
}

adding bloom filter status , avoid send too frequently, after sending a request , wait until got response or time out

func NewBloomFilterFireStatus

func NewBloomFilterFireStatus(minFrequencyTime int, responseTimeOut int) *BloomFilterFireStatus

func (*BloomFilterFireStatus) Check

func (b *BloomFilterFireStatus) Check() bool

func (*BloomFilterFireStatus) Set

func (b *BloomFilterFireStatus) Set(requestId uint32)

func (*BloomFilterFireStatus) UpdateResponse

func (b *BloomFilterFireStatus) UpdateResponse(requestId uint32)

type CatchupSyncer

type CatchupSyncer struct {
	NodeStatusDataProvider og.NodeStatusDataProvider
	PeerProvider           og.PeerProvider
	Hub                    *og.Hub

	Downloader *downloader.Downloader
	SyncMode   downloader.SyncMode

	// should be enabled until quit
	EnableEvent chan bool
	Enabled     bool

	OnWorkingStateChanged         []chan CatchupSyncerStatus
	OnNewTxiReceived              []chan types.Txi
	NewPeerConnectedEventListener chan string

	WorkState CatchupSyncerStatus

	BootStrapNode bool
	// contains filtered or unexported fields
}

func (*CatchupSyncer) CacheNewTxEnabled

func (c *CatchupSyncer) CacheNewTxEnabled() bool

func (*CatchupSyncer) Init

func (c *CatchupSyncer) Init()

func (CatchupSyncer) Name

func (CatchupSyncer) Name() string

func (*CatchupSyncer) NotifyWorkingStateChanged

func (c *CatchupSyncer) NotifyWorkingStateChanged(status CatchupSyncerStatus, force bool)

NotifyWorkingStateChanged if starts status is true ,stops status is false

func (*CatchupSyncer) Start

func (c *CatchupSyncer) Start()

func (*CatchupSyncer) Stop

func (c *CatchupSyncer) Stop()

type CatchupSyncerStatus

type CatchupSyncerStatus int
const (
	Started CatchupSyncerStatus = iota
	Stopped
)

func (CatchupSyncerStatus) String

func (m CatchupSyncerStatus) String() string

type FireHistory

type FireHistory struct {
	StartTime  time.Time
	LastTime   time.Time
	FiredTimes int
}

type IncrementalSyncer

type IncrementalSyncer struct {
	EnableEvent      chan bool
	Enabled          bool
	OnNewTxiReceived []chan []types.Txi

	NewLatestSequencerCh chan bool

	RemoveContrlMsgFromCache func(hash types2.Hash)
	SequencerCache           *SequencerCache
	// contains filtered or unexported fields
}

IncrementalSyncer fetches tx from other peers. (incremental) IncrementalSyncer will not fire duplicate requests in a period of time.

func NewIncrementalSyncer

func NewIncrementalSyncer(config *SyncerConfig, messageSender MessageSender, getTxsHashes func() types2.Hashes,
	isKnownHash func(hash types2.Hash) bool, getHeight func() uint64, cacheNewTxEnabled func() bool) *IncrementalSyncer

func (*IncrementalSyncer) CacheTx added in v0.0.2

func (m *IncrementalSyncer) CacheTx(tx types.Txi)

func (*IncrementalSyncer) CacheTxs added in v0.0.2

func (m *IncrementalSyncer) CacheTxs(txs types.Txis)

func (*IncrementalSyncer) ClearQueue

func (m *IncrementalSyncer) ClearQueue()

func (*IncrementalSyncer) Enqueue

func (m *IncrementalSyncer) Enqueue(phash *types2.Hash, childHash types2.Hash, sendBloomfilter bool)

func (*IncrementalSyncer) GetBenchmarks

func (m *IncrementalSyncer) GetBenchmarks() map[string]interface{}

func (*IncrementalSyncer) GetNotifying

func (m *IncrementalSyncer) GetNotifying() bool

func (*IncrementalSyncer) HandleActionTx added in v0.0.2

func (m *IncrementalSyncer) HandleActionTx(request *archive.MessageNewActionTx, peerId string)

func (*IncrementalSyncer) HandleArchive added in v0.0.2

func (m *IncrementalSyncer) HandleArchive(request *types.MessageNewArchive, peerId string)

func (*IncrementalSyncer) HandleCampaign

func (m *IncrementalSyncer) HandleCampaign(request *types.MessageCampaign, peerId string)

func (*IncrementalSyncer) HandleFetchByHashResponse

func (m *IncrementalSyncer) HandleFetchByHashResponse(syncResponse *archive.MessageSyncResponse, sourceId string)

func (*IncrementalSyncer) HandleNewSequencer

func (m *IncrementalSyncer) HandleNewSequencer(newSeq *archive.MessageNewSequencer, peerId string)

func (*IncrementalSyncer) HandleNewTx

func (m *IncrementalSyncer) HandleNewTx(newTx *archive.MessageNewTx, peerId string)

func (*IncrementalSyncer) HandleNewTxi

func (m *IncrementalSyncer) HandleNewTxi(tx types.Txi, peerId string)

func (*IncrementalSyncer) HandleNewTxs

func (m *IncrementalSyncer) HandleNewTxs(newTxs *archive.MessageNewTxs, peerId string)

func (*IncrementalSyncer) HandleTermChange

func (m *IncrementalSyncer) HandleTermChange(request *types.MessageTermChange, peerId string)

func (*IncrementalSyncer) IsCachedHash

func (m *IncrementalSyncer) IsCachedHash(hash types2.Hash) bool

func (*IncrementalSyncer) Name

func (m *IncrementalSyncer) Name() string

func (*IncrementalSyncer) RemoveConfirmedFromCache

func (m *IncrementalSyncer) RemoveConfirmedFromCache()

func (*IncrementalSyncer) SetNotifying

func (m *IncrementalSyncer) SetNotifying(v bool)

func (*IncrementalSyncer) Start

func (m *IncrementalSyncer) Start()

func (*IncrementalSyncer) Stop

func (m *IncrementalSyncer) Stop()

func (*IncrementalSyncer) SyncHashList added in v0.0.3

func (m *IncrementalSyncer) SyncHashList(seqHash types2.Hash)

func (*IncrementalSyncer) TxEnable

func (m *IncrementalSyncer) TxEnable() bool

type MessageSender

type MessageSender interface {
	BroadcastMessage(message msg.OgMessage)
	MulticastMessage(message msg.OgMessage)
	MulticastToSource(message msg.OgMessage, sourceMsgHash *types2.Hash)
	BroadcastMessageWithLink(message msg.OgMessage)
	SendToPeer(peerId string, msg msg.OgMessage)
}

type SequencerCache added in v0.0.3

type SequencerCache struct {
	// contains filtered or unexported fields
}

func NewSequencerCache added in v0.0.3

func NewSequencerCache(size int) *SequencerCache

func (*SequencerCache) Add added in v0.0.3

func (p *SequencerCache) Add(seqHash types.Hash, peerId string)

func (*SequencerCache) GetPeer added in v0.0.3

func (p *SequencerCache) GetPeer(seqHash types.Hash) string

func (SequencerCache) String added in v0.0.3

func (s SequencerCache) String() string

type SyncBuffer

type SyncBuffer struct {
	Txs     map[types2.Hash]types.Txi
	TxsList types2.Hashes
	Seq     *types.Sequencer

	Verifiers []protocol.Verifier
	// contains filtered or unexported fields
}

func NewSyncBuffer

func NewSyncBuffer(config SyncBufferConfig) *SyncBuffer

func (*SyncBuffer) AddTxs

func (s *SyncBuffer) AddTxs(seq *types.Sequencer, txs types.Txis) error

func (*SyncBuffer) Count

func (s *SyncBuffer) Count() int

func (*SyncBuffer) Get

func (s *SyncBuffer) Get(hash types2.Hash) types.Txi

func (*SyncBuffer) GetAllKeys

func (s *SyncBuffer) GetAllKeys() types2.Hashes

func (*SyncBuffer) Handle

func (s *SyncBuffer) Handle() error

func (*SyncBuffer) Name

func (s *SyncBuffer) Name() string

func (*SyncBuffer) Start

func (s *SyncBuffer) Start()

func (*SyncBuffer) Stop

func (s *SyncBuffer) Stop()

type SyncBufferConfig

type SyncBufferConfig struct {
	TxPool    pool.ITxPool
	Verifiers []protocol.Verifier
	Dag       og.IDag
}

func DefaultSyncBufferConfig

func DefaultSyncBufferConfig(txPool pool.ITxPool, dag og.IDag, Verifiers []protocol.Verifier) SyncBufferConfig

type SyncManager

type SyncManager struct {
	Hub                    *og.Hub
	CatchupSyncer          *CatchupSyncer
	IncrementalSyncer      *IncrementalSyncer
	NodeStatusDataProvider og.NodeStatusDataProvider

	OnUpToDate []chan bool // listeners registered for enabling/disabling generating and receiving txs (fully synced or not)

	//forceSyncCycle uint
	//syncFlag       uint32 //1 for is syncing
	BootstrapNode                    bool //if bootstrap node just accept txs in starting ,no sync
	CatchupSyncerWorkingStateChanged chan CatchupSyncerStatus

	Status SyncStatus
	// contains filtered or unexported fields
}

func NewSyncManager

func NewSyncManager(config SyncManagerConfig, hub *og.Hub, NodeStatusDataProvider og.NodeStatusDataProvider) *SyncManager

func (*SyncManager) GetBenchmarks

func (s *SyncManager) GetBenchmarks() map[string]interface{}

func (*SyncManager) Name

func (s *SyncManager) Name() string

func (*SyncManager) NotifyUpToDateEvent

func (s *SyncManager) NotifyUpToDateEvent(isUpToDate bool)

func (*SyncManager) Start

func (s *SyncManager) Start()

func (*SyncManager) Stop

func (s *SyncManager) Stop()

type SyncManagerConfig

type SyncManagerConfig struct {
	Mode           downloader.SyncMode
	BootstrapNode  bool
	ForceSyncCycle uint //milliseconds
}

type SyncStatus

type SyncStatus int
const (
	SyncStatusIncremental SyncStatus = iota
	SyncStatusFull
)

func (SyncStatus) String

func (m SyncStatus) String() string

type SyncerConfig

type SyncerConfig struct {
	AcquireTxQueueSize                       uint
	MaxBatchSize                             int
	BatchTimeoutMilliSecond                  uint
	AcquireTxDedupCacheMaxSize               int
	AcquireTxDedupCacheExpirationSeconds     int
	BufferedIncomingTxCacheEnabled           bool
	BufferedIncomingTxCacheMaxSize           int
	BufferedIncomingTxCacheExpirationSeconds int
	FiredTxCacheMaxSize                      int
	FiredTxCacheExpirationSeconds            int
	NewTxsChannelSize                        int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL