Merge #13033: Build txindex in parallel with validation
pull/585/head9b2704777c
[doc] Include txindex changes in the release notes. (Jim Posen)ed77dd6b30
[test] Simple unit test for TxIndex. (Jim Posen)6d772a3d44
[rpc] Public interfaces to GetTransaction block until synced. (Jim Posen)a03f804f2a
[index] Move disk IO logic from GetTransaction to TxIndex::FindTx. (Jim Posen)e0a3b80033
[validation] Replace tx index code in validation code with TxIndex. (Jim Posen)8181db88f6
[init] Initialize and start TxIndex in init code. (Jim Posen)f90c3a62f5
[index] TxIndex method to wait until caught up. (Jim Posen)70d510d93c
[index] Allow TxIndex sync thread to be interrupted. (Jim Posen)94b4f8bbb9
[index] TxIndex initial sync thread. (Jim Posen)34d68bf3a3
[index] Create new TxIndex class. (Jim Posen)c88bcec93f
[db] Migration for txindex data to new, separate database. (Jim Posen)0cb8303241
[db] Create separate database for txindex. (Jim Posen) Pull request description: I'm re-opening #11857 as a new pull request because the last one stopped loading for people ------------------------------- This refactors the tx index code to be in it's own class and get built concurrently with validation code. The main benefit is decoupling and moving the txindex into a separate DB. The primary motivation is to lay the groundwork for other indexers that might be desired (such as the [compact filters](https://github.com/bitcoin/bips/pull/636)). The basic idea is that the TxIndex spins up its own thread, which first syncs the txindex to the current block index, then once in sync the BlockConnected ValidationInterface hook writes new blocks. ### DB changes At the suggestion of some other developers, the txindex has been split out into a separate database. A data migration runs at startup on any nodes with a legacy txindex. Currently the migration blocks node initialization until complete. ### Open questions - Should the migration of txindex data from the old DB to the new DB block in init or should it happen in a background thread? The downside to backgrounding it is that `getrawtransaction` would return an error message saying the txindex is syncing while the migration is running. ### Impact In a sample size n=1 test where I synced nodes from scratch, the average time [Index writing](https://github.com/bitcoin/bitcoin/blob/master/src/validation.cpp#L1903) was 3.36ms in master and 1.72ms in this branch. The average time between `UpdateTip` log lines for sequential blocks between 400,000 and IBD end on mainnet was 0.297204s in master and 0.286134s in this branch. Most likely this is just variance in IBD times, but I can try with some more trials if people want. Tree-SHA512: 451fd7d95df89dfafceaa723cdf0f7b137615b531cf5c5035cfb54e9ccc2026cec5ac85edbcf71b7f4e2f102e36e9202b8b3a667e1504a9e1a9976ab1f0079c4
commit
a07e8caa5d
@ -0,0 +1,11 @@
|
||||
Transaction index changes
|
||||
-------------------------
|
||||
|
||||
The transaction index is now built separately from the main node procedure,
|
||||
meaning the `-txindex` flag can be toggled without a full reindex. If bitcoind
|
||||
is run with `-txindex` on a node that is already partially or fully synced
|
||||
without one, the transaction index will be built in the background and become
|
||||
available once caught up. When switching from running `-txindex` to running
|
||||
without the flag, the transaction index database will *not* be deleted
|
||||
automatically, meaning it could be turned back on at a later time without a full
|
||||
resync.
|
@ -0,0 +1,309 @@
|
||||
// Copyright (c) 2017-2018 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include <chainparams.h>
|
||||
#include <index/txindex.h>
|
||||
#include <init.h>
|
||||
#include <tinyformat.h>
|
||||
#include <ui_interface.h>
|
||||
#include <util.h>
|
||||
#include <validation.h>
|
||||
#include <warnings.h>
|
||||
|
||||
constexpr int64_t SYNC_LOG_INTERVAL = 30; // seconds
|
||||
constexpr int64_t SYNC_LOCATOR_WRITE_INTERVAL = 30; // seconds
|
||||
|
||||
std::unique_ptr<TxIndex> g_txindex;
|
||||
|
||||
template<typename... Args>
|
||||
static void FatalError(const char* fmt, const Args&... args)
|
||||
{
|
||||
std::string strMessage = tfm::format(fmt, args...);
|
||||
SetMiscWarning(strMessage);
|
||||
LogPrintf("*** %s\n", strMessage);
|
||||
uiInterface.ThreadSafeMessageBox(
|
||||
"Error: A fatal internal error occurred, see debug.log for details",
|
||||
"", CClientUIInterface::MSG_ERROR);
|
||||
StartShutdown();
|
||||
}
|
||||
|
||||
TxIndex::TxIndex(std::unique_ptr<TxIndexDB> db) :
|
||||
m_db(std::move(db)), m_synced(false), m_best_block_index(nullptr)
|
||||
{}
|
||||
|
||||
TxIndex::~TxIndex()
|
||||
{
|
||||
Interrupt();
|
||||
Stop();
|
||||
}
|
||||
|
||||
bool TxIndex::Init()
|
||||
{
|
||||
LOCK(cs_main);
|
||||
|
||||
// Attempt to migrate txindex from the old database to the new one. Even if
|
||||
// chain_tip is null, the node could be reindexing and we still want to
|
||||
// delete txindex records in the old database.
|
||||
if (!m_db->MigrateData(*pblocktree, chainActive.GetLocator())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
CBlockLocator locator;
|
||||
if (!m_db->ReadBestBlock(locator)) {
|
||||
locator.SetNull();
|
||||
}
|
||||
|
||||
m_best_block_index = FindForkInGlobalIndex(chainActive, locator);
|
||||
m_synced = m_best_block_index.load() == chainActive.Tip();
|
||||
return true;
|
||||
}
|
||||
|
||||
static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev)
|
||||
{
|
||||
AssertLockHeld(cs_main);
|
||||
|
||||
if (!pindex_prev) {
|
||||
return chainActive.Genesis();
|
||||
}
|
||||
|
||||
const CBlockIndex* pindex = chainActive.Next(pindex_prev);
|
||||
if (pindex) {
|
||||
return pindex;
|
||||
}
|
||||
|
||||
return chainActive.Next(chainActive.FindFork(pindex_prev));
|
||||
}
|
||||
|
||||
void TxIndex::ThreadSync()
|
||||
{
|
||||
const CBlockIndex* pindex = m_best_block_index.load();
|
||||
if (!m_synced) {
|
||||
auto& consensus_params = Params().GetConsensus();
|
||||
|
||||
int64_t last_log_time = 0;
|
||||
int64_t last_locator_write_time = 0;
|
||||
while (true) {
|
||||
if (m_interrupt) {
|
||||
WriteBestBlock(pindex);
|
||||
return;
|
||||
}
|
||||
|
||||
{
|
||||
LOCK(cs_main);
|
||||
const CBlockIndex* pindex_next = NextSyncBlock(pindex);
|
||||
if (!pindex_next) {
|
||||
WriteBestBlock(pindex);
|
||||
m_best_block_index = pindex;
|
||||
m_synced = true;
|
||||
break;
|
||||
}
|
||||
pindex = pindex_next;
|
||||
}
|
||||
|
||||
int64_t current_time = GetTime();
|
||||
if (last_log_time + SYNC_LOG_INTERVAL < current_time) {
|
||||
LogPrintf("Syncing txindex with block chain from height %d\n", pindex->nHeight);
|
||||
last_log_time = current_time;
|
||||
}
|
||||
|
||||
if (last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) {
|
||||
WriteBestBlock(pindex);
|
||||
last_locator_write_time = current_time;
|
||||
}
|
||||
|
||||
CBlock block;
|
||||
if (!ReadBlockFromDisk(block, pindex, consensus_params)) {
|
||||
FatalError("%s: Failed to read block %s from disk",
|
||||
__func__, pindex->GetBlockHash().ToString());
|
||||
return;
|
||||
}
|
||||
if (!WriteBlock(block, pindex)) {
|
||||
FatalError("%s: Failed to write block %s to tx index database",
|
||||
__func__, pindex->GetBlockHash().ToString());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pindex) {
|
||||
LogPrintf("txindex is enabled at height %d\n", pindex->nHeight);
|
||||
} else {
|
||||
LogPrintf("txindex is enabled\n");
|
||||
}
|
||||
}
|
||||
|
||||
bool TxIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex)
|
||||
{
|
||||
CDiskTxPos pos(pindex->GetBlockPos(), GetSizeOfCompactSize(block.vtx.size()));
|
||||
std::vector<std::pair<uint256, CDiskTxPos>> vPos;
|
||||
vPos.reserve(block.vtx.size());
|
||||
for (const auto& tx : block.vtx) {
|
||||
vPos.emplace_back(tx->GetHash(), pos);
|
||||
pos.nTxOffset += ::GetSerializeSize(*tx, SER_DISK, CLIENT_VERSION);
|
||||
}
|
||||
return m_db->WriteTxs(vPos);
|
||||
}
|
||||
|
||||
bool TxIndex::WriteBestBlock(const CBlockIndex* block_index)
|
||||
{
|
||||
LOCK(cs_main);
|
||||
if (!m_db->WriteBestBlock(chainActive.GetLocator(block_index))) {
|
||||
return error("%s: Failed to write locator to disk", __func__);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void TxIndex::BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex,
|
||||
const std::vector<CTransactionRef>& txn_conflicted)
|
||||
{
|
||||
if (!m_synced) {
|
||||
return;
|
||||
}
|
||||
|
||||
const CBlockIndex* best_block_index = m_best_block_index.load();
|
||||
if (!best_block_index) {
|
||||
if (pindex->nHeight != 0) {
|
||||
FatalError("%s: First block connected is not the genesis block (height=%d)",
|
||||
__func__, pindex->nHeight);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// Ensure block connects to an ancestor of the current best block. This should be the case
|
||||
// most of the time, but may not be immediately after the the sync thread catches up and sets
|
||||
// m_synced. Consider the case where there is a reorg and the blocks on the stale branch are
|
||||
// in the ValidationInterface queue backlog even after the sync thread has caught up to the
|
||||
// new chain tip. In this unlikely event, log a warning and let the queue clear.
|
||||
if (best_block_index->GetAncestor(pindex->nHeight - 1) != pindex->pprev) {
|
||||
LogPrintf("%s: WARNING: Block %s does not connect to an ancestor of " /* Continued */
|
||||
"known best chain (tip=%s); not updating txindex\n",
|
||||
__func__, pindex->GetBlockHash().ToString(),
|
||||
best_block_index->GetBlockHash().ToString());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (WriteBlock(*block, pindex)) {
|
||||
m_best_block_index = pindex;
|
||||
} else {
|
||||
FatalError("%s: Failed to write block %s to txindex",
|
||||
__func__, pindex->GetBlockHash().ToString());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void TxIndex::SetBestChain(const CBlockLocator& locator)
|
||||
{
|
||||
if (!m_synced) {
|
||||
return;
|
||||
}
|
||||
|
||||
const uint256& locator_tip_hash = locator.vHave.front();
|
||||
const CBlockIndex* locator_tip_index;
|
||||
{
|
||||
LOCK(cs_main);
|
||||
locator_tip_index = LookupBlockIndex(locator_tip_hash);
|
||||
}
|
||||
|
||||
if (!locator_tip_index) {
|
||||
FatalError("%s: First block (hash=%s) in locator was not found",
|
||||
__func__, locator_tip_hash.ToString());
|
||||
return;
|
||||
}
|
||||
|
||||
// This checks that SetBestChain callbacks are received after BlockConnected. The check may fail
|
||||
// immediately after the the sync thread catches up and sets m_synced. Consider the case where
|
||||
// there is a reorg and the blocks on the stale branch are in the ValidationInterface queue
|
||||
// backlog even after the sync thread has caught up to the new chain tip. In this unlikely
|
||||
// event, log a warning and let the queue clear.
|
||||
const CBlockIndex* best_block_index = m_best_block_index.load();
|
||||
if (best_block_index->GetAncestor(locator_tip_index->nHeight) != locator_tip_index) {
|
||||
LogPrintf("%s: WARNING: Locator contains block (hash=%s) not on known best " /* Continued */
|
||||
"chain (tip=%s); not writing txindex locator\n",
|
||||
__func__, locator_tip_hash.ToString(),
|
||||
best_block_index->GetBlockHash().ToString());
|
||||
return;
|
||||
}
|
||||
|
||||
if (!m_db->WriteBestBlock(locator)) {
|
||||
error("%s: Failed to write locator to disk", __func__);
|
||||
}
|
||||
}
|
||||
|
||||
bool TxIndex::BlockUntilSyncedToCurrentChain()
|
||||
{
|
||||
AssertLockNotHeld(cs_main);
|
||||
|
||||
if (!m_synced) {
|
||||
return false;
|
||||
}
|
||||
|
||||
{
|
||||
// Skip the queue-draining stuff if we know we're caught up with
|
||||
// chainActive.Tip().
|
||||
LOCK(cs_main);
|
||||
const CBlockIndex* chain_tip = chainActive.Tip();
|
||||
const CBlockIndex* best_block_index = m_best_block_index.load();
|
||||
if (best_block_index->GetAncestor(chain_tip->nHeight) == chain_tip) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
LogPrintf("%s: txindex is catching up on block notifications\n", __func__);
|
||||
SyncWithValidationInterfaceQueue();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool TxIndex::FindTx(const uint256& tx_hash, uint256& block_hash, CTransactionRef& tx) const
|
||||
{
|
||||
CDiskTxPos postx;
|
||||
if (!m_db->ReadTxPos(tx_hash, postx)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
CAutoFile file(OpenBlockFile(postx, true), SER_DISK, CLIENT_VERSION);
|
||||
if (file.IsNull()) {
|
||||
return error("%s: OpenBlockFile failed", __func__);
|
||||
}
|
||||
CBlockHeader header;
|
||||
try {
|
||||
file >> header;
|
||||
fseek(file.Get(), postx.nTxOffset, SEEK_CUR);
|
||||
file >> tx;
|
||||
} catch (const std::exception& e) {
|
||||
return error("%s: Deserialize or I/O error - %s", __func__, e.what());
|
||||
}
|
||||
if (tx->GetHash() != tx_hash) {
|
||||
return error("%s: txid mismatch", __func__);
|
||||
}
|
||||
block_hash = header.GetHash();
|
||||
return true;
|
||||
}
|
||||
|
||||
void TxIndex::Interrupt()
|
||||
{
|
||||
m_interrupt();
|
||||
}
|
||||
|
||||
void TxIndex::Start()
|
||||
{
|
||||
// Need to register this ValidationInterface before running Init(), so that
|
||||
// callbacks are not missed if Init sets m_synced to true.
|
||||
RegisterValidationInterface(this);
|
||||
if (!Init()) {
|
||||
FatalError("%s: txindex failed to initialize", __func__);
|
||||
return;
|
||||
}
|
||||
|
||||
m_thread_sync = std::thread(&TraceThread<std::function<void()>>, "txindex",
|
||||
std::bind(&TxIndex::ThreadSync, this));
|
||||
}
|
||||
|
||||
void TxIndex::Stop()
|
||||
{
|
||||
UnregisterValidationInterface(this);
|
||||
|
||||
if (m_thread_sync.joinable()) {
|
||||
m_thread_sync.join();
|
||||
}
|
||||
}
|
@ -0,0 +1,94 @@
|
||||
// Copyright (c) 2017-2018 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#ifndef BITCOIN_INDEX_TXINDEX_H
|
||||
#define BITCOIN_INDEX_TXINDEX_H
|
||||
|
||||
#include <primitives/block.h>
|
||||
#include <primitives/transaction.h>
|
||||
#include <threadinterrupt.h>
|
||||
#include <txdb.h>
|
||||
#include <uint256.h>
|
||||
#include <validationinterface.h>
|
||||
|
||||
class CBlockIndex;
|
||||
|
||||
/**
|
||||
* TxIndex is used to look up transactions included in the blockchain by hash.
|
||||
* The index is written to a LevelDB database and records the filesystem
|
||||
* location of each transaction by transaction hash.
|
||||
*/
|
||||
class TxIndex final : public CValidationInterface
|
||||
{
|
||||
private:
|
||||
const std::unique_ptr<TxIndexDB> m_db;
|
||||
|
||||
/// Whether the index is in sync with the main chain. The flag is flipped
|
||||
/// from false to true once, after which point this starts processing
|
||||
/// ValidationInterface notifications to stay in sync.
|
||||
std::atomic<bool> m_synced;
|
||||
|
||||
/// The last block in the chain that the TxIndex is in sync with.
|
||||
std::atomic<const CBlockIndex*> m_best_block_index;
|
||||
|
||||
std::thread m_thread_sync;
|
||||
CThreadInterrupt m_interrupt;
|
||||
|
||||
/// Initialize internal state from the database and block index.
|
||||
bool Init();
|
||||
|
||||
/// Sync the tx index with the block index starting from the current best
|
||||
/// block. Intended to be run in its own thread, m_thread_sync, and can be
|
||||
/// interrupted with m_interrupt. Once the txindex gets in sync, the
|
||||
/// m_synced flag is set and the BlockConnected ValidationInterface callback
|
||||
/// takes over and the sync thread exits.
|
||||
void ThreadSync();
|
||||
|
||||
/// Write update index entries for a newly connected block.
|
||||
bool WriteBlock(const CBlock& block, const CBlockIndex* pindex);
|
||||
|
||||
/// Write the current chain block locator to the DB.
|
||||
bool WriteBestBlock(const CBlockIndex* block_index);
|
||||
|
||||
protected:
|
||||
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex,
|
||||
const std::vector<CTransactionRef>& txn_conflicted) override;
|
||||
|
||||
void SetBestChain(const CBlockLocator& locator) override;
|
||||
|
||||
public:
|
||||
/// Constructs the TxIndex, which becomes available to be queried.
|
||||
explicit TxIndex(std::unique_ptr<TxIndexDB> db);
|
||||
|
||||
/// Destructor interrupts sync thread if running and blocks until it exits.
|
||||
~TxIndex();
|
||||
|
||||
/// Blocks the current thread until the transaction index is caught up to
|
||||
/// the current state of the block chain. This only blocks if the index has gotten in sync once
|
||||
/// and only needs to process blocks in the ValidationInterface queue. If the index is catching
|
||||
/// up from far behind, this method does not block and immediately returns false.
|
||||
bool BlockUntilSyncedToCurrentChain();
|
||||
|
||||
/// Look up a transaction by hash.
|
||||
///
|
||||
/// @param[in] tx_hash The hash of the transaction to be returned.
|
||||
/// @param[out] block_hash The hash of the block the transaction is found in.
|
||||
/// @param[out] tx The transaction itself.
|
||||
/// @return true if transaction is found, false otherwise
|
||||
bool FindTx(const uint256& tx_hash, uint256& block_hash, CTransactionRef& tx) const;
|
||||
|
||||
void Interrupt();
|
||||
|
||||
/// Start initializes the sync state and registers the instance as a
|
||||
/// ValidationInterface so that it stays in sync with blockchain updates.
|
||||
void Start();
|
||||
|
||||
/// Stops the instance from staying in sync with blockchain updates.
|
||||
void Stop();
|
||||
};
|
||||
|
||||
/// The global transaction index, used in GetTransaction. May be null.
|
||||
extern std::unique_ptr<TxIndex> g_txindex;
|
||||
|
||||
#endif // BITCOIN_INDEX_TXINDEX_H
|
@ -0,0 +1,66 @@
|
||||
// Copyright (c) 2017-2018 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include <index/txindex.h>
|
||||
#include <script/standard.h>
|
||||
#include <test/test_bitcoin.h>
|
||||
#include <util.h>
|
||||
#include <utiltime.h>
|
||||
#include <validation.h>
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(txindex_tests)
|
||||
|
||||
BOOST_FIXTURE_TEST_CASE(txindex_initial_sync, TestChain100Setup)
|
||||
{
|
||||
TxIndex txindex(MakeUnique<TxIndexDB>(1 << 20, true));
|
||||
|
||||
CTransactionRef tx_disk;
|
||||
uint256 block_hash;
|
||||
|
||||
// Transaction should not be found in the index before it is started.
|
||||
for (const auto& txn : m_coinbase_txns) {
|
||||
BOOST_CHECK(!txindex.FindTx(txn->GetHash(), block_hash, tx_disk));
|
||||
}
|
||||
|
||||
// BlockUntilSyncedToCurrentChain should return false before txindex is started.
|
||||
BOOST_CHECK(!txindex.BlockUntilSyncedToCurrentChain());
|
||||
|
||||
txindex.Start();
|
||||
|
||||
// Allow tx index to catch up with the block index.
|
||||
constexpr int64_t timeout_ms = 10 * 1000;
|
||||
int64_t time_start = GetTimeMillis();
|
||||
while (!txindex.BlockUntilSyncedToCurrentChain()) {
|
||||
BOOST_REQUIRE(time_start + timeout_ms > GetTimeMillis());
|
||||
MilliSleep(100);
|
||||
}
|
||||
|
||||
// Check that txindex has all txs that were in the chain before it started.
|
||||
for (const auto& txn : m_coinbase_txns) {
|
||||
if (!txindex.FindTx(txn->GetHash(), block_hash, tx_disk)) {
|
||||
BOOST_ERROR("FindTx failed");
|
||||
} else if (tx_disk->GetHash() != txn->GetHash()) {
|
||||
BOOST_ERROR("Read incorrect tx");
|
||||
}
|
||||
}
|
||||
|
||||
// Check that new transactions in new blocks make it into the index.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
CScript coinbase_script_pub_key = GetScriptForDestination(coinbaseKey.GetPubKey().GetID());
|
||||
std::vector<CMutableTransaction> no_txns;
|
||||
const CBlock& block = CreateAndProcessBlock(no_txns, coinbase_script_pub_key);
|
||||
const CTransaction& txn = *block.vtx[0];
|
||||
|
||||
BOOST_CHECK(txindex.BlockUntilSyncedToCurrentChain());
|
||||
if (!txindex.FindTx(txn.GetHash(), block_hash, tx_disk)) {
|
||||
BOOST_ERROR("FindTx failed");
|
||||
} else if (tx_disk->GetHash() != txn.GetHash()) {
|
||||
BOOST_ERROR("Read incorrect tx");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
Loading…
Reference in new issue