原创

Kafka源码分析(二十)——Broker:日志子系统——LogSegment

本章,我们来看下LogSegment的源码,LogSegment就是对日志段的抽象:

class LogSegment(val log: FileRecords,
                 val index: OffsetIndex,
                 val timeIndex: TimeIndex,
                 val baseOffset: Long,
                 val indexIntervalBytes: Int,
                 val rollJitterMs: Long,
                 time: Time) extends Logging {
}

可以看到,LogSegment类的声明包含了以下信息:

  • FileRecords:实际保存 Kafka 消息的对象;
  • OffsetIndex:位移索引;
  • TimeIndex:时间戳索引;
  • baseOffset:初始偏移量,在磁盘上看到的日志段文件名就是 baseOffset 的值
  • indexIntervalBytes:Broker 端参数 log.index.interval.bytes 值,控制了日志段对象新增索引项的频率,默认情况下,日志段至少新写入 4KB 的消息数据才会新增一条索引项;
  • rollJitterMs:日志段新增时的一个“扰动时间值”。Broker 可能会同时创建多个日志段文件,会增加磁盘 I/O 压力,有了 rollJitterMs 值的干扰,每个日志段文件在创建时会彼此岔开一小段时间,从而缓解磁盘的 I/O 负载瓶颈。

一、核心方法

我们再来看下LogSegment的核心方法,在《Broker:日志子系统——整体架构》中,我介绍过Broker端写日志的整体流程,其中就涉及LogSegment.append()方法写日志。LogSegment的核心方法一共有3个,我用下面这张图表示:



1.1 append写消息

我们先来看最重要的append方法,它接收 5 个参数:

  • firstOffset:首位移值;
  • largestOffset:最大位移值;
  • largestTimestamp:最大时间戳;
  • shallowOffsetOfMaxTimestamp:最大时间戳对应消息的位移;
  • records:真正要写入的消息集合。
// LogSegment.scala

def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, 
           shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords) {
  if (records.sizeInBytes > 0) {
    val physicalPosition = log.sizeInBytes()
    // 1.日志段大小为0,即为空,则记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据
    if (physicalPosition == 0)
      rollingBasedTimestamp = Some(largestTimestamp)
    require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
    // 2.调用FileRecords.append方法执行真正的写入,将内存中的消息对象写入到操作系统的页缓存
    val appendedBytes = log.append(records)

    // 3.更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性
    if (largestTimestamp > maxTimestampSoFar) {
      maxTimestampSoFar = largestTimestamp
      offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
    }
    // 4.更新索引项和写入的字节数,日志段每写入4KB数据就要写入一个索引项
    if(bytesSinceLastIndexEntry > indexIntervalBytes) {
      index.append(firstOffset, physicalPosition)
      timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
      bytesSinceLastIndexEntry = 0
    }
    bytesSinceLastIndexEntry += records.sizeInBytes
  }
}

下面这张图展示了 上述append 方法的完整执行流程:



1.2 read读消息

LogSegment的read方法用来读消息,方法接收 4 个输入参数:

  • startOffset:要读取的第一条消息的位移;
  • maxSize:能读取的最大字节数;
  • maxPosition :能读到的最大文件位置;
  • minOneMessage:是否允许在消息体过大时至少返回第一条消息。

注意下第 4 个参数,当这个参数为 true 时,即使出现消息体字节数超过了 maxSize 的情形,read 方法依然会返回至少一条消息,这样可以确保不出现Consumer饥饿的情况:

// LogSegment.scala

def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
         minOneMessage: Boolean = false): FetchDataInfo = {
  if (maxSize < 0)
    throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

  // 1.定位要读取的起始文件位置, startOffset仅仅是位移值,Kafka 需要根据索引信息找到对应的物理文件位置才能开始读取消息
  val logSize = log.sizeInBytes // this may change, need to save a consistent copy
  val startOffsetAndSize = translateOffset(startOffset)

  if (startOffsetAndSize == null)
    return null

  val startPosition = startOffsetAndSize.position.toInt
  val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)

  val adjustedMaxSize =
    if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
    else maxSize

  // return a log segment but with zero size in the case below
  if (adjustedMaxSize == 0)
    return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

  // 2.计算要读取的总字节数
  val length = maxOffset match {
    case None =>
      min((maxPosition - startPosition).toInt, adjustedMaxSize)
    case Some(offset) =>
      if (offset < startOffset)
        return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
      val mapping = translateOffset(offset, startPosition)
      val endPosition =
        if (mapping == null)
          logSize // the max offset is off the end of the log, use the end of the file
        else
          mapping.position
      min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
  }
  // 3.从指定位置读取指定大小的消息集合
  FetchDataInfo(offsetMetadata, log.read(startPosition, length),
    firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}

1.3 recover恢复日志段

最后来看LogSegment的recover方法,这方法用于恢复日志段。Broker在启动时会从磁盘上加载所有日志段文件的信息到内存中,并创建对应的 LogSegment 对象,这个过程就是recover

// LogSegment.scala

def recover(maxMessageSize: Int): Int = {
  // 1.清空所有的索引文件
  index.truncate()
  index.resize(index.maxIndexSize)
  timeIndex.truncate()
  timeIndex.resize(timeIndex.maxIndexSize)
  var validBytes = 0
  var lastIndexEntry = 0
  maxTimestampSoFar = Record.NO_TIMESTAMP
  try {
    // 2.遍历日志段中的所有消息集合
    for (entry <- log.shallowEntries(maxMessageSize).asScala) {
      val record = entry.record
      record.ensureValid()
      if (record.timestamp > maxTimestampSoFar) {
        maxTimestampSoFar = record.timestamp
        offsetOfMaxTimestamp = entry.offset
      }

      // Build offset index
      if(validBytes - lastIndexEntry > indexIntervalBytes) {
        val startOffset = entry.firstOffset
        index.append(startOffset, validBytes)
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
        lastIndexEntry = validBytes
      }
      validBytes += entry.sizeInBytes()
    }
  } catch {
    case e: CorruptRecordException =>
      logger.warn("Found invalid messages in log segment %s at byte offset %d: %s."
        .format(log.file.getAbsolutePath, validBytes, e.getMessage))
  }
  val truncated = log.sizeInBytes - validBytes
  log.truncateTo(validBytes)
  index.trimToValidSize()
  timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
  timeIndex.trimToValidSize()
  truncated
}

二、总结

本章,我对LogSegment这个分段日志对象进行了讲解,我们需要重点关注它的append方法,也就是写日志的方法。LogSegment会判断每写入4KB消息,就写入一个稀疏索引。

正文到此结束

感谢赞赏~

本文目录