KInputStreamReader.kt
package org.knio.core.io
import kotlinx.coroutines.sync.withLock
import org.knio.core.context.KnioContext
import org.knio.core.context.getKnioContext
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.CharBuffer
import java.nio.ReadOnlyBufferException
import java.nio.charset.Charset
import java.nio.charset.CoderResult
/**
* A reader that reads characters from an [KInputStream].
*
* This class is equivalent to the [java.io.InputStreamReader].
*
* @property inputStream the input stream to read from
* @property byteBufferPool the buffer pool to use for acquiring buffers
* @property bufferSize the size of the buffer to use
*/
class KInputStreamReader private constructor (
private val inputStream: KInputStream,
charset: Charset = Charsets.UTF_8,
private val bufferSize: Int = 8192,
context: KnioContext
): KReader(context) {
companion object {
suspend fun open(inputStream: KInputStream, charset: Charset = Charsets.UTF_8): KInputStreamReader {
return KInputStreamReader(inputStream, charset, context = getKnioContext())
}
}
private val buffer: ByteBuffer = context.byteBufferPool.acquire(bufferSize).flip()
private val decoder = charset.newDecoder()
private var eof = false
private var isClosed = false
/**
* The name of the character encoding being used by this stream.
*/
val encoding: String
get() = decoder.charset().name()
/**
* Reads characters into the given [CharBuffer].
*
* @param b the buffer to read characters into
* @return the number of characters read, or -1 if the end of the stream has been reached
* @throws ReadOnlyBufferException if the buffer is read-only
* @throws IOException if an I/O error occurs
*/
override suspend fun read(b: CharBuffer): Int {
lock.withLock {
if (b.isReadOnly) {
throw ReadOnlyBufferException()
}
val startPosition = b.position()
var r: CoderResult
do {
r = decoder.decode(buffer, b, eof)
when {
r.isMalformed -> throw IOException("malformed input")
r.isUnmappable -> throw IOException("unmappable input")
r.isOverflow -> {
return b.position() - startPosition
}
r.isUnderflow -> {
if (b.position() > startPosition) {
return b.position() - startPosition
}
if (eof) {
if (buffer.hasRemaining()) {
throw IOException("unexpected end of stream")
}
return -1
}
buffer.apply {
if (hasRemaining()) compact() else clear()
}
if (inputStream.read(buffer) == -1) {
eof = true
}
buffer.flip()
}
}
} while (!r.isError)
return if (r.length() == 0 && eof) -1 else r.length()
}
}
/**
* Closes this reader and releases any system resources associated with it.
*
* @throws IOException if an I/O error occurs
*/
override suspend fun close() = lock.withLock {
if (isClosed) {
return
}
isClosed = true
context.byteBufferPool.release(buffer)
inputStream.close()
}
}