KFileOutputStream.kt

package org.knio.core.io

import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.knio.core.context.KnioContext
import org.knio.core.context.getKnioContext
import org.knio.core.nio.writeSuspend
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousFileChannel
import java.nio.file.Path
import java.nio.file.StandardOpenOption

/**
 * A `KFileOutputStream` is an asynchronous file output stream that supports coroutine-based
 * non-blocking I/O operations.
 *
 * This class is equivalent to the [java.io.FileOutputStream].
 *
 * @param path The path to the file to write.
 * @param context The context to use for I/O operations.
 */
class KFileOutputStream private constructor (
    path: Path,
    private val context: KnioContext
): KOutputStream() {

    private val fileChannel: AsynchronousFileChannel = context.channelFactory.openFileChannel(
        path,
        StandardOpenOption.WRITE
    )

    private val mutex = Mutex()
    private var position: Long = 0

    companion object {
        /**
         * Opens a `KFileOutputStream` for the given `Path`.
         *
         * @param path The `Path` to open the output stream for.
         * @return A `KFileOutputStream` for the given `Path`.
         */
        suspend fun open(path: Path): KFileOutputStream {
            return KFileOutputStream(path, getKnioContext())
        }
    }

    @Throws(IOException::class)
    override suspend fun write(b: ByteBuffer) = mutex.withLock {
        write0(b)
    }

    /**
     * Writes the remaining bytes in the `ByteBuffer` to the file.
     *
     * @param b The `ByteBuffer` to write.
     */
    @Throws(IOException::class)
    private suspend fun write0(b: ByteBuffer) {
        while (b.hasRemaining()) {
            val read = fileChannel.writeSuspend(b, position)

            if (read == -1 || read == 0) {
                break
            } else {
                position += read
            }
        }
    }

    /**
     * Closes this file output stream and releases any system resources associated with this stream.
     * This file output stream may no longer be used for writing bytes.
     *
     * If this stream has an associated channel then the channel is closed as well.
     */
    override suspend fun close() = mutex.withLock {
        close0()
    }

    /**
     * Closes the file channel without acquiring the mutex.
     */
    private suspend fun close0() {
        @Suppress("BlockingMethodInNonBlockingContext")
        fileChannel.close()
    }
}