diff --git a/beacon_chain/beacon_chain_file.nim b/beacon_chain/beacon_chain_file.nim index cd6dedb2da..c4fd7927a7 100644 --- a/beacon_chain/beacon_chain_file.nim +++ b/beacon_chain/beacon_chain_file.nim @@ -33,8 +33,8 @@ type data: seq[byte] ChainFileData* = object - head*: BlockData - tail*: BlockData + head*: Opt[BlockData] + tail*: Opt[BlockData] ChainFileHandle* = object data*: ChainFileData @@ -57,6 +57,10 @@ type FileRepaired, FileCorrupted + ChainFileFlag* {.pure.} = enum + Repair, + OpenAlways + ChainFileError* = object kind*: ChainFileErrorType message*: string @@ -271,14 +275,25 @@ template isBlob(h: ChainFileHeader | ChainFileFooter): bool = template isLast(h: ChainFileHeader | ChainFileFooter): bool = h.kind.isLast() -proc store*(chunkfile: string, signedBlock: ForkedSignedBeaconBlock, +proc isFilePresent*(filename: string): bool = + isFile(filename) + +template head*(chandle: ChainFileHandle): Opt[BlockData] = + chandle.data.head + +template tail*(chandle: ChainFileHandle): Opt[BlockData] = + chandle.data.tail + +proc setHead*(chandle: var ChainFileHandle, bdata: BlockData) = + chandle.data.head = Opt.some(bdata) + +proc setTail*(chandle: var ChainFileHandle, bdata: BlockData) = + chandle.data.tail = Opt.some(bdata) + +proc store*(chandle: ChainFileHandle, signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], compressed = true): Result[void, string] = - let - flags = {OpenFlags.Append, OpenFlags.Create} - handle = openFile(chunkfile, flags).valueOr: - return err(ioErrorMsg(error)) - origOffset = getFilePos(handle).valueOr: - discard closeFile(handle) + let origOffset = + updateFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).valueOr: return err(ioErrorMsg(error)) block: @@ -295,13 +310,13 @@ proc store*(chunkfile: string, signedBlock: ForkedSignedBeaconBlock, (res, len(res)) slot = signedBlock.slot buffer = Chunk.init(kind, uint64(slot), uint32(plainSize), data) - wrote = writeFile(handle, buffer).valueOr: - discard truncate(handle, origOffset) - discard closeFile(handle) + wrote = writeFile(chandle.handle, buffer).valueOr: + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) return err(ioErrorMsg(error)) if wrote != uint(len(buffer)): - discard truncate(handle, origOffset) - discard closeFile(handle) + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) return err(IncompleteWriteError) if blobs.isSome(): @@ -318,18 +333,25 @@ proc store*(chunkfile: string, signedBlock: ForkedSignedBeaconBlock, let res = SSZ.encode(blob[]) (res, len(res)) slot = blob[].signed_block_header.message.slot - buffer = - Chunk.init(kind, uint64(slot), uint32(plainSize), data) - wrote = writeFile(handle, buffer).valueOr: - discard truncate(handle, origOffset) - discard closeFile(handle) + buffer = Chunk.init(kind, uint64(slot), uint32(plainSize), data) + + setFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).isOkOr: + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) + return err(ioErrorMsg(error)) + + let + wrote = writeFile(chandle.handle, buffer).valueOr: + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) return err(ioErrorMsg(error)) if wrote != uint(len(buffer)): - discard truncate(handle, origOffset) - discard closeFile(handle) + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) return err(IncompleteWriteError) - closeFile(handle).isOkOr: + fsync(chandle.handle).isOkOr: + discard truncate(chandle.handle, origOffset) return err(ioErrorMsg(error)) ok() @@ -862,142 +884,71 @@ proc checkRepair*(filename: string, return err(ioErrorMsg(error)) ok(ChainFileCheckResult.FileCorrupted) -proc init*(t: typedesc[ChainFileHandle], - filename: string, - repair: bool = true): Result[ChainFileHandle, string] = - if not(isFile(filename)): - # We return None if file is missing, because its not an error. - return err("File not found") - - block: - let res = checkRepair(filename, repair) - if res.isErr(): - return err(res.error) - if res.get() notin {ChainFileCheckResult.FileMissing, FileEmpty, FileOk, - FileRepaired}: - return err("Chain file data is corrupted") - +proc init*(t: typedesc[ChainFileHandle], filename: string, + flags: set[ChainFileFlag]): Result[ChainFileHandle, string] = let handle = - block: - let res = openFile(filename, {OpenFlags.Read}) - if res.isErr(): - return err(ioErrorMsg(res.error)) - res.get() - head = - block: - let res = getChainFileHead(handle) - if res.isErr(): - discard closeFile(handle) - return err(res.error) - let cres = res.get() - if cres.isNone(): - # Empty file is ok. - discard closeFile(handle) - return err("Unexpected end of file encountered") - cres.get() + if not(isFile(filename)): + if ChainFileFlag.OpenAlways in flags: + let flags = {OpenFlags.Read, OpenFlags.Write, OpenFlags.Create} + openFile(filename, flags).valueOr: + return err(ioErrorMsg(error)) + else: + return err("File not found") + else: + # If file exists we perform automatic check/repair procedure. + let res = + checkRepair(filename, ChainFileFlag.Repair in flags).valueOr: + return err(error) - block: - let res = setFilePos(handle, 0'i64, SeekPosition.SeekEnd) - if res.isErr(): + if res notin {ChainFileCheckResult.FileMissing, FileEmpty, + FileOk, FileRepaired}: + return err("Chain file data is corrupted") + + let flags = {OpenFlags.Read, OpenFlags.Write} + openFile(filename, flags).valueOr: + return err(ioErrorMsg(error)) + + head = getChainFileHead(handle).valueOr: discard closeFile(handle) - return err(ioErrorMsg(res.error)) + return err(error) - let - tail = - block: - let res = getChainFileTail(handle) - if res.isErr(): - discard closeFile(handle) - return err(res.error) - let tres = res.get() - if tres.isNone(): - discard closeFile(handle) - return err("Unexpected end of file encountered") - tres.get() - - ok(ChainFileHandle( - handle: handle, - data: ChainFileData(head: head, tail: tail))) + setFilePos(handle, 0'i64, SeekPosition.SeekEnd).isOkOr: + discard closeFile(handle) + return err(ioErrorMsg(error)) + + let tail = getChainFileTail(handle).valueOr: + discard closeFile(handle) + return err(error) + + ok(ChainFileHandle(handle: handle, + data: ChainFileData(head: head, tail: tail))) proc close*(ch: ChainFileHandle): Result[void, string] = closeFile(ch.handle).isOkOr: return err(ioErrorMsg(error)) ok() -proc init*(t: typedesc[ChainFileData], - filename: string): Result[Opt[ChainFileData], string] = - if not(isFile(filename)): - # We return None if file is missing, because its not an error. - return ok(Opt.none(ChainFileData)) - - block: - let res = checkRepair(filename, true) - if res.isErr(): - return err(res.error) - if res.get() notin {ChainFileCheckResult.FileMissing, FileEmpty, FileOk, - FileRepaired}: - return err("Chain file data is corrupted") - - let - handle = - block: - let res = openFile(filename, {OpenFlags.Read}) - if res.isErr(): - return err(ioErrorMsg(res.error)) - res.get() - head = - block: - let res = getChainFileHead(handle) - if res.isErr(): - discard closeFile(handle) - return err(res.error) - let cres = res.get() - if cres.isNone(): - # Empty file is ok. - discard closeFile(handle) - return ok(Opt.none(ChainFileData)) - cres.get() - - block: - let res = setFilePos(handle, 0'i64, SeekPosition.SeekEnd) - if res.isErr(): - discard closeFile(handle) - return err(ioErrorMsg(res.error)) - - let - tail = - block: - let res = getChainFileTail(handle) - if res.isErr(): - discard closeFile(handle) - return err(res.error) - let tres = res.get() - if tres.isNone(): - discard closeFile(handle) - return err("Unexpected end of file encountered") - tres.get() - - block: - let res = closeFile(handle) - if res.isErr(): - return err(ioErrorMsg(res.error)) - - ok(Opt.some(ChainFileData(head: head, tail: tail))) - proc seekForSlot*(ch: ChainFileHandle, slot: Slot): Result[Opt[int64], string] = + if ch.head.isNone() or ch.tail.isNone(): + return err("Attempt to seek for slot in empty file") + let headRange = - if ch.data.head.blck.slot() >= slot: - ch.data.head.blck.slot() - slot - else: - slot - ch.data.head.blck.slot() + block: + let headSlot = ch.head.get().blck.slot() + if headSlot >= slot: + headSlot - slot + else: + slot - headSlot tailRange = - if ch.data.tail.blck.slot() >= slot: - ch.data.tail.blck.slot() - slot - else: - slot - ch.data.tail.blck.slot() + block: + let tailSlot = ch.tail.get().blck.slot() + if tailSlot >= slot: + tailSlot - slot + else: + slot - tailSlot offset = if headRange <= tailRange: ? seekForSlotForward(ch.handle, slot) diff --git a/beacon_chain/consensus_object_pools/blockchain_list.nim b/beacon_chain/consensus_object_pools/blockchain_list.nim index 5c67c5a9d8..6c5bde6ae3 100644 --- a/beacon_chain/consensus_object_pools/blockchain_list.nim +++ b/beacon_chain/consensus_object_pools/blockchain_list.nim @@ -24,8 +24,6 @@ const type ChainListRef* = ref object path*: string - head*: Opt[BlockData] - tail*: Opt[BlockData] handle*: Opt[ChainFileHandle] template chainFilePath*(directory: string): string = @@ -37,37 +35,38 @@ template filePath*(clist: ChainListRef): string = proc init*(T: type ChainListRef, directory: string): ChainListRef = let filename = directory.chainFilePath() - res = ChainFileData.init(filename) - if res.isErr(): - fatal "Unexpected failure while reading backfill data", reason = res.error - quit 1 - let datares = res.get() - if datares.isNone(): - ChainListRef(path: directory) - else: - ChainListRef( - path: directory, - head: Opt.some(datares.get().head), - tail: Opt.some(datares.get().tail)) + handle = + if not(isFilePresent(filename)): + Opt.none(ChainFileHandle) + else: + let + flags = {ChainFileFlag.Repair} + res = ChainFileHandle.init(filename, flags).valueOr: + fatal "Unexpected failure while loading backfill data", + filename = filename, reason = error + quit 1 + Opt.some(res) + ChainListRef(path: directory, handle: handle) proc init*(T: type ChainListRef, directory: string, slot: Slot): Result[ChainListRef, string] = let + flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways} filename = directory.chainFilePath() - handle = - block: - let res = ChainFileHandle.init(filename) - if res.isErr(): - fatal "Unexpected failure while reading backfill data", - filename = filename, reason = res.error - quit 1 - res.get() - let offset {.used.} = ? seekForSlot(handle, slot) - ok(ChainListRef( - path: directory, - head: Opt.some(handle.data.head), - tail: Opt.some(handle.data.tail), - handle: Opt.some(handle))) + handle = ? ChainFileHandle.init(filename, flags) + offset {.used.} = ? seekForSlot(handle, slot) + ok(ChainListRef(path: directory, handle: Opt.some(handle))) + +proc seekForSlot*(clist: ChainListRef, slot: Slot): Result[void, string] = + if clist.handle.isNone(): + let + flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways} + filename = clist.path.chainFilePath() + handle = ? ChainFileHandle.init(filename, flags) + clist.handle = Opt.some(handle) + + let offset {.used.} = ? seekForSlot(clist.handle.get(), slot) + ok() proc close*(clist: ChainListRef): Result[void, string] = if clist.handle.isNone(): @@ -78,8 +77,6 @@ proc close*(clist: ChainListRef): Result[void, string] = proc clear*(clist: ChainListRef): Result[void, string] = ? clist.close() ? clearFile(clist.path.chainFilePath()) - clist.head = Opt.none(BlockData) - clist.tail = Opt.none(BlockData) clist.handle = Opt.none(ChainFileHandle) ok() @@ -105,6 +102,42 @@ template shortLog*(x: Opt[BlockData]): string = else: shortLog(x.get()) +func tail*(clist: ChainListRef): Opt[BlockData] = + if clist.handle.isSome(): + clist.handle.get().data.tail + else: + Opt.none(BlockData) + +func head*(clist: ChainListRef): Opt[BlockData] = + if clist.handle.isSome(): + clist.handle.get().data.head + else: + Opt.none(BlockData) + +proc setHead*(clist: ChainListRef, bdata: BlockData) = + doAssert(clist.handle.isSome()) + var handle = clist.handle.get() + handle.setHead(bdata) + clist.handle = Opt.some(handle) + +proc setTail*(clist: ChainListRef, bdata: BlockData) = + doAssert(clist.handle.isSome()) + var handle = clist.handle.get() + handle.setTail(bdata) + clist.handle = Opt.some(handle) + +proc store*(clist: ChainListRef, signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars]): Result[void, string] = + if clist.handle.isNone(): + let + filename = clist.path.chainFilePath() + flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways} + handle = ? ChainFileHandle.init(filename, flags) + clist.handle = Opt.some(handle) + store(handle, signedBlock, blobs, true) + else: + store(clist.handle.get(), signedBlock, blobs, true) + proc checkBlobs(signedBlock: ForkedSignedBeaconBlock, blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] = withBlck(signedBlock): @@ -140,27 +173,22 @@ proc addBackfillBlockData*( signed_block_root = signedBlock.root signed_block_parent_root = signedBlock.parent_root - let - verifyBlockTick = Moment.now() + let verifyBlockTick = Moment.now() if clist.tail.isNone(): - block: - let res = checkBlobs(signedBlock, blobsOpt) - if res.isErr(): - return err(res.error) + ? checkBlobs(signedBlock, blobsOpt) - let - storeBlockTick = Moment.now() - res = store(chainFilePath(clist.path), signedBlock, blobsOpt, true) - if res.isErr(): + let storeBlockTick = Moment.now() + + store(clist, signedBlock, blobsOpt).isOkOr: fatal "Unexpected failure while trying to store data", - filename = chainFilePath(clist.path), reason = res.error() + filename = chainFilePath(clist.path), reason = error quit 1 - clist.tail = Opt.some(BlockData(blck: signedBlock, blob: blobsOpt)) - + let bdata = BlockData(blck: signedBlock, blob: blobsOpt) + clist.setTail(bdata) if clist.head.isNone(): - clist.head = Opt.some(BlockData(blck: signedBlock, blob: blobsOpt)) + clist.setHead(bdata) debug "Initial block backfilled", verify_block_duration = shortLog(storeBlockTick - verifyBlockTick), @@ -185,24 +213,20 @@ proc addBackfillBlockData*( debug "Block does not match expected backfill root" return err(VerifierError.MissingParent) - block: - let res = checkBlobs(signedBlock, blobsOpt) - if res.isErr(): - return err(res.error) + ? checkBlobs(signedBlock, blobsOpt) - let - storeBlockTick = Moment.now() - res = store(chainFilePath(clist.path), signedBlock, blobsOpt) - if res.isErr(): + let storeBlockTick = Moment.now() + + store(clist, signedBlock, blobsOpt).isOkOr: fatal "Unexpected failure while trying to store data", - filename = chainFilePath(clist.path), reason = res.error() + filename = chainFilePath(clist.path), reason = error quit 1 debug "Block backfilled", verify_block_duration = shortLog(storeBlockTick - verifyBlockTick), store_block_duration = shortLog(Moment.now() - storeBlockTick) - clist.tail = Opt.some(BlockData(blck: signedBlock, blob: blobsOpt)) + clist.setTail(BlockData(blck: signedBlock, blob: blobsOpt)) ok() diff --git a/beacon_chain/sync/sync_overseer.nim b/beacon_chain/sync/sync_overseer.nim index 0730c99857..2813d7dc82 100644 --- a/beacon_chain/sync/sync_overseer.nim +++ b/beacon_chain/sync/sync_overseer.nim @@ -310,12 +310,11 @@ proc rebuildState(overseer: SyncOverseerRef): Future[void] {. batchVerifier = overseer.batchVerifier clist = block: - let res = ChainListRef.init(overseer.clist.path, dag.head.slot) - if res.isErr(): - fatal "Unable to read backfill data", reason = res.error, + overseer.clist.seekForSlot(dag.head.slot).isOkOr: + fatal "Unable to find slot in backfill data", reason = error, path = overseer.clist.path - return - res.get() + quit 1 + overseer.clist var blocks: seq[BlockData]