KReader.kt
package org.knio.core.io
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.knio.core.context.KnioContext
import org.knio.core.context.acquireReleasableCharBuffer
import org.knio.core.lang.KAutoCloseable
import java.io.IOException
import java.nio.CharBuffer
import kotlin.jvm.Throws
import kotlin.math.min
/**
* Abstract class for reading character streams. The only methods that a subclass must implement are
* read(char[], int, int) and close(). Most subclasses, however, will override some of the methods defined here in order
* to provide higher efficiency, additional functionality, or both.
*
* The coroutine equivalent to the [java.io.Reader] class.
*/
abstract class KReader(
protected val context: KnioContext,
protected val lock: Mutex = Mutex()
): KAutoCloseable {
/**
* Marks the present position in the stream. Subsequent calls to reset() will attempt to reposition the stream to
* this point. Not all character-input streams support the mark() operation.
*
* @param readLimit the maximum limit of bytes that can be read before the mark position becomes invalid
* @throws IOException if the stream does not support mark(), or if some other I/O error occurs
*/
@Throws(IOException::class)
open suspend fun mark(readLimit: Int): Unit = throw IOException("mark not supported")
/**
* Tells whether this stream supports the mark() operation. The default implementation always returns false.
*
* @return true if this stream supports the mark() operation; false otherwise
*/
open suspend fun markSupported(): Boolean = false
/**
* Reads a single character.
*
* Subclasses that intend to support efficient single-character input should override this method.
*
* @return The character read, as an integer in the range 0 to 65535 (0x00-0xffff), or -1 if the end of the stream
* has been reached
*
* @throws IOException if an I/O error occurs
*/
@Throws(IOException::class)
open suspend fun read(): Int {
val buffer = CharArray(1)
val read = read(buffer)
return if (read == -1) -1 else buffer[0].code
}
/**
* Reads characters into an array.
*
* @param b the buffer into which the data is read
*
* @return The total number of characters read into the buffer, or -1 if there is no more data because the end of
* the stream has been reached
*
* @throws IOException if an I/O error occurs
*/
@Throws(IOException::class)
open suspend fun read(b: CharArray): Int = read(b, 0, b.size)
/**
* Reads characters into a portion of an array.
*
* @param b the buffer into which the data is read
* @param off the start offset in the destination array b
* @param len the maximum number of characters read
*
* @return The total number of characters read into the buffer, or -1 if there is no more data because the end of
* the stream has been reached
*/
@Throws(IOException::class)
open suspend fun read(b: CharArray, off: Int, len: Int): Int = read(CharBuffer.wrap(b, off, len))
/**
* Attempts to read characters into the specified character buffer. The buffer is used as a repository of characters
* as-is: the only changes made are the results of a put operation. No flipping or rewinding of the buffer is
* performed.
*
* @param b the buffer into which the data is read
* @return The total number of characters read into the buffer, or -1 if there is no more data because the end of
* the stream has been reached
* @throws IOException if an I/O error occurs
* @throws java.nio.ReadOnlyBufferException if the buffer is read-only
*/
@Throws(IOException::class)
abstract suspend fun read(b: CharBuffer): Int
open suspend fun ready(): Boolean = true
open suspend fun reset(): Unit = throw IOException("reset not supported")
/**
* Skips characters. This method will block until some characters are available, an I/O error occurs, or the end of
* the stream is reached. If the stream is already at its end before this method is invoked, then no characters are skipped and zero is returned.
*
*/
open suspend fun skip(n: Long): Long {
// Note: If skip is called frequently, it's better to use a real buffer pool rather than creating a new buffer each time
// It's best to override this method in the subclass to provide a longer lived buffer
require(n >= 0L) { "skip value is negative" }
val nn = min(n, context.maxTaskBufferSize.toLong()).toInt()
val buffer = context.byteBufferPool.acquireReleasableCharBuffer(nn)
try {
lock.withLock {
return skip0(n, buffer.value)
}
} finally {
buffer.release()
}
}
private suspend fun skip0(n: Long, c: CharBuffer): Long {
if(c.remaining()>n) {
c.limit(c.position() + n.toInt())
}
var r = n
while (r > 0) {
val nc = read(c)
if (nc == -1) break
r -= nc
}
return n - r
}
/**
* Closes the stream and releases any system resources associated with it. Once the stream has been closed, further
* read(), ready(), mark(), reset(), or skip() invocations will throw an IOException. Closing a previously closed
* stream has no effect.
*/
@Throws(IOException::class)
abstract override suspend fun close()
}