Skip to content

Commit

Permalink
Use single file descriptor for blockchain file.
Browse files Browse the repository at this point in the history
  • Loading branch information
cheatfate committed Sep 5, 2024
1 parent 95db490 commit 575d711
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 201 deletions.
235 changes: 93 additions & 142 deletions beacon_chain/beacon_chain_file.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type
data: seq[byte]

ChainFileData* = object
head*: BlockData
tail*: BlockData
head*: Opt[BlockData]
tail*: Opt[BlockData]

ChainFileHandle* = object
data*: ChainFileData
Expand All @@ -57,6 +57,10 @@ type
FileRepaired,
FileCorrupted

ChainFileFlag* {.pure.} = enum
Repair,
OpenAlways

ChainFileError* = object
kind*: ChainFileErrorType
message*: string
Expand Down Expand Up @@ -271,14 +275,25 @@ template isBlob(h: ChainFileHeader | ChainFileFooter): bool =
template isLast(h: ChainFileHeader | ChainFileFooter): bool =
h.kind.isLast()

proc store*(chunkfile: string, signedBlock: ForkedSignedBeaconBlock,
proc isFilePresent*(filename: string): bool =
isFile(filename)

template head*(chandle: ChainFileHandle): Opt[BlockData] =
chandle.data.head

template tail*(chandle: ChainFileHandle): Opt[BlockData] =
chandle.data.tail

proc setHead*(chandle: var ChainFileHandle, bdata: BlockData) =
chandle.data.head = Opt.some(bdata)

proc setTail*(chandle: var ChainFileHandle, bdata: BlockData) =
chandle.data.tail = Opt.some(bdata)

proc store*(chandle: ChainFileHandle, signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], compressed = true): Result[void, string] =
let
flags = {OpenFlags.Append, OpenFlags.Create}
handle = openFile(chunkfile, flags).valueOr:
return err(ioErrorMsg(error))
origOffset = getFilePos(handle).valueOr:
discard closeFile(handle)
let origOffset =
updateFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).valueOr:
return err(ioErrorMsg(error))

block:
Expand All @@ -295,13 +310,13 @@ proc store*(chunkfile: string, signedBlock: ForkedSignedBeaconBlock,
(res, len(res))
slot = signedBlock.slot
buffer = Chunk.init(kind, uint64(slot), uint32(plainSize), data)
wrote = writeFile(handle, buffer).valueOr:
discard truncate(handle, origOffset)
discard closeFile(handle)
wrote = writeFile(chandle.handle, buffer).valueOr:
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(ioErrorMsg(error))
if wrote != uint(len(buffer)):
discard truncate(handle, origOffset)
discard closeFile(handle)
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(IncompleteWriteError)

if blobs.isSome():
Expand All @@ -318,18 +333,25 @@ proc store*(chunkfile: string, signedBlock: ForkedSignedBeaconBlock,
let res = SSZ.encode(blob[])
(res, len(res))
slot = blob[].signed_block_header.message.slot
buffer =
Chunk.init(kind, uint64(slot), uint32(plainSize), data)
wrote = writeFile(handle, buffer).valueOr:
discard truncate(handle, origOffset)
discard closeFile(handle)
buffer = Chunk.init(kind, uint64(slot), uint32(plainSize), data)

setFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).isOkOr:
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(ioErrorMsg(error))

let
wrote = writeFile(chandle.handle, buffer).valueOr:
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(ioErrorMsg(error))
if wrote != uint(len(buffer)):
discard truncate(handle, origOffset)
discard closeFile(handle)
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(IncompleteWriteError)

closeFile(handle).isOkOr:
fsync(chandle.handle).isOkOr:
discard truncate(chandle.handle, origOffset)
return err(ioErrorMsg(error))

ok()
Expand Down Expand Up @@ -862,142 +884,71 @@ proc checkRepair*(filename: string,
return err(ioErrorMsg(error))
ok(ChainFileCheckResult.FileCorrupted)

proc init*(t: typedesc[ChainFileHandle],
filename: string,
repair: bool = true): Result[ChainFileHandle, string] =
if not(isFile(filename)):
# We return None if file is missing, because its not an error.
return err("File not found")

block:
let res = checkRepair(filename, repair)
if res.isErr():
return err(res.error)
if res.get() notin {ChainFileCheckResult.FileMissing, FileEmpty, FileOk,
FileRepaired}:
return err("Chain file data is corrupted")

proc init*(t: typedesc[ChainFileHandle], filename: string,
flags: set[ChainFileFlag]): Result[ChainFileHandle, string] =
let
handle =
block:
let res = openFile(filename, {OpenFlags.Read})
if res.isErr():
return err(ioErrorMsg(res.error))
res.get()
head =
block:
let res = getChainFileHead(handle)
if res.isErr():
discard closeFile(handle)
return err(res.error)
let cres = res.get()
if cres.isNone():
# Empty file is ok.
discard closeFile(handle)
return err("Unexpected end of file encountered")
cres.get()
if not(isFile(filename)):
if ChainFileFlag.OpenAlways in flags:
let flags = {OpenFlags.Read, OpenFlags.Write, OpenFlags.Create}
openFile(filename, flags).valueOr:
return err(ioErrorMsg(error))
else:
return err("File not found")
else:
# If file exists we perform automatic check/repair procedure.
let res =
checkRepair(filename, ChainFileFlag.Repair in flags).valueOr:
return err(error)

block:
let res = setFilePos(handle, 0'i64, SeekPosition.SeekEnd)
if res.isErr():
if res notin {ChainFileCheckResult.FileMissing, FileEmpty,
FileOk, FileRepaired}:
return err("Chain file data is corrupted")

let flags = {OpenFlags.Read, OpenFlags.Write}
openFile(filename, flags).valueOr:
return err(ioErrorMsg(error))

head = getChainFileHead(handle).valueOr:
discard closeFile(handle)
return err(ioErrorMsg(res.error))
return err(error)

let
tail =
block:
let res = getChainFileTail(handle)
if res.isErr():
discard closeFile(handle)
return err(res.error)
let tres = res.get()
if tres.isNone():
discard closeFile(handle)
return err("Unexpected end of file encountered")
tres.get()

ok(ChainFileHandle(
handle: handle,
data: ChainFileData(head: head, tail: tail)))
setFilePos(handle, 0'i64, SeekPosition.SeekEnd).isOkOr:
discard closeFile(handle)
return err(ioErrorMsg(error))

let tail = getChainFileTail(handle).valueOr:
discard closeFile(handle)
return err(error)

ok(ChainFileHandle(handle: handle,
data: ChainFileData(head: head, tail: tail)))

proc close*(ch: ChainFileHandle): Result[void, string] =
closeFile(ch.handle).isOkOr:
return err(ioErrorMsg(error))
ok()

proc init*(t: typedesc[ChainFileData],
filename: string): Result[Opt[ChainFileData], string] =
if not(isFile(filename)):
# We return None if file is missing, because its not an error.
return ok(Opt.none(ChainFileData))

block:
let res = checkRepair(filename, true)
if res.isErr():
return err(res.error)
if res.get() notin {ChainFileCheckResult.FileMissing, FileEmpty, FileOk,
FileRepaired}:
return err("Chain file data is corrupted")

let
handle =
block:
let res = openFile(filename, {OpenFlags.Read})
if res.isErr():
return err(ioErrorMsg(res.error))
res.get()
head =
block:
let res = getChainFileHead(handle)
if res.isErr():
discard closeFile(handle)
return err(res.error)
let cres = res.get()
if cres.isNone():
# Empty file is ok.
discard closeFile(handle)
return ok(Opt.none(ChainFileData))
cres.get()

block:
let res = setFilePos(handle, 0'i64, SeekPosition.SeekEnd)
if res.isErr():
discard closeFile(handle)
return err(ioErrorMsg(res.error))

let
tail =
block:
let res = getChainFileTail(handle)
if res.isErr():
discard closeFile(handle)
return err(res.error)
let tres = res.get()
if tres.isNone():
discard closeFile(handle)
return err("Unexpected end of file encountered")
tres.get()

block:
let res = closeFile(handle)
if res.isErr():
return err(ioErrorMsg(res.error))

ok(Opt.some(ChainFileData(head: head, tail: tail)))

proc seekForSlot*(ch: ChainFileHandle,
slot: Slot): Result[Opt[int64], string] =
if ch.head.isNone() or ch.tail.isNone():
return err("Attempt to seek for slot in empty file")

let
headRange =
if ch.data.head.blck.slot() >= slot:
ch.data.head.blck.slot() - slot
else:
slot - ch.data.head.blck.slot()
block:
let headSlot = ch.head.get().blck.slot()
if headSlot >= slot:
headSlot - slot
else:
slot - headSlot
tailRange =
if ch.data.tail.blck.slot() >= slot:
ch.data.tail.blck.slot() - slot
else:
slot - ch.data.tail.blck.slot()
block:
let tailSlot = ch.tail.get().blck.slot()
if tailSlot >= slot:
tailSlot - slot
else:
slot - tailSlot
offset =
if headRange <= tailRange:
? seekForSlotForward(ch.handle, slot)
Expand Down
Loading

0 comments on commit 575d711

Please sign in to comment.