Kotlin:高性能并发文件 I/O 最佳实践?

Kotlin: High-Performance concurrent file I/O best-practices?

提问人:Gavin Ray 提问时间:9/11/2022 更新时间:9/11/2022 访问量:280

问:

在 Kotlin 中,以多读取器、单写入器方式允许并发文件 I/O 的最高性能方法是什么?

我有以下内容,但我不确定协程设施产生了多少开销:

搜索这方面的例子并不多(尝试在 Github 上搜索,有极少数 Kotlin 存储库实现了这一点)。ReadWriteMutex

class DiskManagerImpl(file: File) : DiskManager {
    private val mutex = ReadWriteMutexImpl()
    private val channel = AsynchronousFileChannel.open(file.toPath(),
        StandardOpenOption.READ, StandardOpenOption.WRITE)

    override suspend fun readPage(pageId: PageId, buffer: MemorySegment) = withContext(Dispatchers.IO) {
        mutex.withReadLock {
            val offset = pageId * PAGE_SIZE
            val bytesRead = channel.readAsync(buffer.asByteBuffer(), offset.toLong())
            require(bytesRead == PAGE_SIZE) { "Failed to read page $pageId" }
        }
    }

    override suspend fun writePage(pageId: PageId, buffer: MemorySegment) = withContext(Dispatchers.IO) {
        mutex.withWriteLock {
            val offset = pageId * PAGE_SIZE
            val bytesWritten = channel.writeAsync(buffer.asByteBuffer(), offset.toLong())
            require(bytesWritten == PAGE_SIZE) { "Failed to write page $pageId" }
        }
    }
}
class ReadWriteMutexImpl : ReadWriteMutex {
    private val read = Mutex()
    private val write = Mutex()
    private val readers = atomic(0)

    override suspend fun lockRead() {
        if (readers.getAndIncrement() == 0) {
            read.lock()
        }
    }

    override fun unlockRead() {
        if (readers.decrementAndGet() == 0) {
            read.unlock()
        }
    }

    override suspend fun lockWrite() {
        read.lock()
        write.lock()
    }

    override fun unlockWrite() {
        write.unlock()
        read.unlock()
    }
}

suspend inline fun <T> ReadWriteMutex.withReadLock(block: () -> T): T {
    lockRead()
    return try {
        block()
    } finally {
        unlockRead()
    }
}

suspend inline fun <T> ReadWriteMutex.withWriteLock(block: () -> T): T {
    lockWrite()
    return try {
        block()
    } finally {
        unlockWrite()
    }
}
性能 Kotlin 并发 IO JVM

评论

2赞 broot 9/11/2022
我可能是错的,但我不确定与协程一起使用是否有意义。AFAIK 操作系统不支持文件的非阻塞 IO,因此与 - 它将阻塞 IO 委托给特殊的线程池几乎相同。相反,你可以阻止。如果保留现有代码,则不需要。您可以从任何调度程序调用挂起函数,用于阻止此处没有的代码。AsynchronousFileChannelAsynchronousFileChannelDispatchers.IOchannel.readAsync()withContext(Dispatchers.IO)Dispatchers.IO
1赞 broot 9/11/2022
此外,我认为对于什么是最佳/性能最高的解决方案没有简单的答案。这真的取决于具体情况。例如,您的解决方案非常有利于读者。读取器和写入器不在 FIFO 中处理,只要至少有一个读取器,写入器就必须等待。最坏的情况是作家永远不会写作。处理多读取器和多写入器的最有效方法之一是仅追加,但这需要完全改变方法。所以这真的取决于具体情况。
0赞 Gavin Ray 9/11/2022
啊,这是可靠的建议,非常感谢!另外,关于异步文件 IO 的一般概念:Linux 有 AIO + io_uring,它有点晦涩难懂,但 Windows 有一个标记为“FILE_FLAG_OVERLAPPED”的异步文件。创建 AsyncFileChannel 时,将设置此标志 (github.com/openjdk/jdk/blob/...)
0赞 Gavin Ray 9/11/2022
> “读者和写者不是在FIFO中处理的,只要至少有一个读者,写者就得等待。啊啪啪,你不说吗?这根本行不通 - 这是用于始终存在并发读取的 SQL 数据库缓冲池
0赞 broot 9/11/2022
好吧,也许我错了,可以根据平台使用真正的非阻塞。文档说:“AsynchronousFileChannel 与线程池相关联,任务提交到该线程池以处理 I/O 事件”,但也许我读错了,或者这是默认行为,可以对其进行优化。对于FIFO:至少这是我在你的代码中看到的。如果我们已经处于阅读模式,读者总是被允许进入。他们只增加/减少计数器,所以他们可以在任何时间内这样做。AsynchronousFileChannel

答: 暂无答案