KFileInputStream.kt
package org.knio.core.io
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.knio.core.context.KnioContext
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousFileChannel
import org.knio.core.nio.readSuspend
import org.knio.core.context.getKnioContext
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import kotlin.Throws
/**
* A FileInputStream obtains input bytes from a file in a file system.
*
* This is a coroutine-based asynchronous equivalent of [java.io.FileInputStream].
*
* @param path The path to the file to read.
*/
class KFileInputStream private constructor(
private val path: Path,
context: KnioContext
): KInputStream(context) {
private val mutex = Mutex()
private val channel: AsynchronousFileChannel = context.channelFactory.openFileChannel(path, StandardOpenOption.READ)
companion object {
/**
* Opens a file input stream for the specified file.
*
* @param path The path to the file to read.
* @return The file input stream.
*/
suspend fun open(path: Path): KFileInputStream {
return KFileInputStream(path, getKnioContext())
}
/**
* Opens a file input stream for the specified file path.
*
* @param path The path to the file to read as a String.
* @return The file input stream.
*/
suspend fun open(path: String): KFileInputStream {
return open(Path.of(path))
}
}
/** The current position in the file. */
private var position: Long = 0
/** The mark position in the file. */
private var markPosition: Long? = null
/** The mark limit for the read-ahead limit. */
private var markLimit: Int = 0
@Throws(IOException::class)
override suspend fun available(): Int {
// all reads perform I/O operations, so we can't know how many bytes are available without reading
return 0
}
/**
* Returns the total number of bytes in the file.
*
* @return The total number of bytes in the file.
*/
@Throws(IOException::class)
suspend fun size(): Long {
@Suppress("BlockingMethodInNonBlockingContext")
return channel.size()
}
private suspend fun remaining(): Long {
return size() - position
}
@Throws(IOException::class)
override suspend fun mark(readLimit: Int) {
markPosition = position
markLimit = readLimit
}
/**
* Checks if mark and reset are supported.
*
* @return True if mark and reset are supported, false otherwise.
*/
@Throws(IOException::class)
override suspend fun markSupported(): Boolean {
return true
}
/**
* Reads bytes from the input stream into the specified ByteBuffer.
*
* An attempt is made to read `b.remaining()` bytes, but a smaller number may be read.
*
* @return The number of bytes read, or -1 if the end of the file is reached.
*/
@Throws(IOException::class)
override suspend fun read(b: ByteBuffer): Int = mutex.withLock {
return read0(b)
}
private suspend fun read0(buffer: ByteBuffer): Int {
val count = channel.readSuspend(buffer, position)
if (count > 0) {
position += count
}
return count
}
/**
* Resets the input stream to the previously marked position.
*
* @throws IOException If the mark position is invalid.
*/
@Throws(IOException::class)
override suspend fun reset() = mutex.withLock {
reset0()
}
private suspend fun reset0() {
val markPosition = markPosition ?: throw IOException("Mark not set")
if (position < markPosition || position - markPosition > markLimit) {
throw IOException("Mark invalid")
}
position = markPosition
}
/**
* Skips over and discards n bytes of data from the input stream.
*
* The skip method may, for a variety of reasons, end up skipping over some smaller number of bytes, possibly 0.
* If n is negative, the method will try to skip backwards. The actual number of bytes skipped is returned.
* If it skips forwards, it returns a positive value. If it skips backwards, it returns a negative value.
*
* @param n The number of bytes to skip.
* @return The actual number of bytes skipped.
* @throws IOException If an I/O error occurs.
*/
@Throws(IOException::class)
override suspend fun skip(n: Long): Long = mutex.withLock {
return skip0(n)
}
private suspend fun skip0(n: Long): Long {
// This differs from the Java implementation in that will only skip up to the end of the file or the beginning.
// It returns the number of skipped bytes, as the documentation states, rather than going past or throwing an
// exception.
if (n >= 0) {
val skip = minOf(n, remaining())
position += skip
return skip
} else {
val rewind = -1 * minOf(-n, position)
position += rewind
return rewind
}
}
/**
* Closes this file input stream and releases any system resources associated with the stream.
*/
@Throws(IOException::class)
override suspend fun close() = mutex.withLock {
close0()
}
/**
* Closes the file input stream without locking the mutex.
*/
private suspend fun close0() {
@Suppress("BlockingMethodInNonBlockingContext")
channel.close()
}
}