KBufferedReader.kt

package org.knio.core.io

import kotlinx.coroutines.sync.withLock
import org.knio.core.context.KnioContext
import org.knio.core.context.acquireReleasableCharBuffer
import org.knio.core.context.getKnioContext
import java.io.IOException
import java.nio.CharBuffer
import java.util.*

private const val UNMARKED = -1

/**
 * A buffered reader that reads characters from a KReader.
 *
 * @param reader The KReader to read characters from.
 * @param bufferSize The size of the buffer to use.
 */
class KBufferedReader(
    reader: KReader,
    bufferSize: Int,
    context: KnioContext
): KReader(context) {

    companion object {

        /**
         * Opens a buffered reader that reads characters from a [KReader].
         *
         * @param reader The KReader to read characters from.
         * @param bufferSize The size of the buffer to use. If null, the default buffer size will be used.
         *
         * @return The buffered reader.
         */
        suspend fun open(reader: KReader, bufferSize: Int? = null): KBufferedReader {
            val context = getKnioContext()
            val buffSize = bufferSize ?: context.maxStreamBufferSize

            return KBufferedReader(reader, buffSize, context)
        }
    }

    private var inStream: KReader? = reader
    private var buffer = context.byteBufferPool.acquireReleasableCharBuffer(bufferSize).apply { value.flip() }

    private var mark: Int = UNMARKED
    private var readAhead: Int = 0 /* Valid only when markedChar > 0 */


    /** Checks to make sure that the stream has not been closed  */
    @Throws(IOException::class)
    private fun ensureOpen() {
        if (buffer.released) throw IOException("Stream closed")
    }

    /**
     * Fills the input buffer, taking the mark into account if it is valid.
     *
     * @throws IOException If an I/O error occurs.
     */
    @Throws(IOException::class)
    private suspend fun fill(): Int {
        // not synchronized, as only called from other synchronized methods

        if(inStream == null) {
            return -1
        }

        val buffer = buffer.value
        var delta = 0
        if(isMarked()) {
            delta = buffer.position() - mark
            val readAheadLimit = readAhead + mark
            if(delta >= readAheadLimit) {
                // delta is larger than the readAhead limit, invalidate the mark
                mark = UNMARKED
                buffer.clear()
            } else {
                buffer.position(mark)
                buffer.compact()
                mark = 0
                buffer.position(delta)
            }
        } else {
            buffer.clear()
        }

        val read = inStream!!.read(buffer)
        if(read == -1) {
            // Nothing read, EOF reached, clean up, and return
            inStream = null
        }
        buffer.flip()

        if(isMarked()) {
            buffer.position(buffer.position() + delta)
        }

        return read
    }

    /**
     * Reads a single character.
     *
     * @return The character read, as an integer in the range 0 to 65535 (0x00-0xffff), or -1 if the end of the stream has been reached.
     * @throws IOException If an I/O error occurs.
     */
    @Throws(IOException::class)
    override suspend fun read(): Int {
        lock.withLock {
            ensureOpen()
            val buffer = buffer.value
            if(!buffer.hasRemaining()) {
                if(fill()==-1) return -1
            }

            return buffer.get().code
        }
    }

    /**
     * Reads characters into a portion of an array.
     *
     * @param cbuf The destination buffer.
     * @param off The offset at which to start storing characters.
     * @param len The maximum number of characters to read.
     * @return The number of characters read, or -1 if the end of the stream has been reached.
     * @throws IOException If an I/O error occurs.
     */
    @Throws(IOException::class)
    override suspend fun read(cbuf: CharArray, off: Int, len: Int): Int {
        lock.withLock {
            ensureOpen()

            val buffer = buffer.value
            var read = 0
            var eof = false

            while (read!=len && !eof) {
                if(buffer.hasRemaining()) {
                    val toRead = minOf(len-read, buffer.remaining())
                    buffer.get(cbuf, read+off, toRead)
                    read += toRead
                } else {
                    if(fill() == -1) {
                        eof = true
                    }
                }
            }

            return if(read==0 && eof) -1 else read
        }
    }

    /**
     * Reads characters into a CharBuffer.
     *
     * @param b The CharBuffer to read characters into.
     * @return The number of characters read, or -1 if the end of the stream has been reached.
     * @throws IOException If an I/O error occurs.
     */
    override suspend fun read(b: CharBuffer): Int {
        lock.withLock {
            ensureOpen()

            val buffer = buffer.value

            var read = 0
            var eof = false

            while(b.hasRemaining() && !eof) {
                if(!buffer.hasRemaining()) {
                    if(fill() == -1) eof = true
                } else {
                    val toRead = minOf(b.remaining(), buffer.remaining())
                    b.put(b.position(), buffer, buffer.position(), toRead)
                    b.position(b.position() + toRead)
                    buffer.position(buffer.position() + toRead)

                    read += toRead
                }
            }

            return if(read==0 && eof) -1 else read
        }
    }

    /**
     * Reads a line of text. A line is considered to be terminated by any one of a line feed ('\n'), a carriage return ('\r'), or a carriage return followed immediately by a linefeed.
     *
     * @param ignoreLF If true, the next '\n' will be skipped.
     * @return A String containing the contents of the line, not including any line-termination characters, or null if the end of the stream has been reached.
     * @throws IOException If an I/O error occurs.
     */
    @Throws(IOException::class)
    suspend fun readLine(): String? {
        val s: StringBuilder = StringBuilder()

        lock.withLock {
            ensureOpen()

            val buffer = buffer.value

            bufferLoop@ while (true) {
                if(!buffer.hasRemaining()) {
                    if(fill() == -1) {
                        return if (s.isNotEmpty()) {
                            s.toString()
                        } else{
                            null
                        }
                    }
                }

                var eol = false
                var c: Char

                charLoop@ while (buffer.hasRemaining()) {
                    c = buffer.get()
                    if (c == '\n') {
                        eol = true
                        break@charLoop
                    } else if (c == '\r') {
                        eol = true
                        if (buffer.hasRemaining() && buffer.get() != '\n') {
                            buffer.position(buffer.position() - 1)
                        }
                        break@charLoop
                    } else {
                        s.append(c)
                    }
                }

                if (eol) {
                    return s.toString()
                }
            }
        }
    }


    /**
     * Skips characters.
     *
     * @param n The number of characters to skip.
     * @return The number of characters actually skipped.
     * @throws IllegalArgumentException If `n` is negative.
     * @throws IOException If an I/O error occurs.
     */
    @Throws(IOException::class)
    override suspend fun skip(n: Long): Long {
        require(n >= 0L) { "skip value is negative" }

        lock.withLock {
            ensureOpen()

            val buffer = buffer.value

            var r = n
            while (r > 0) {
                if(!buffer.hasRemaining()) {
                    if(fill() == -1) {
                        break
                    }
                }


                if(buffer.remaining() >= r) {
                    buffer.position(buffer.position() + r.toInt())
                    r = 0
                    break
                } else {
                    r -= buffer.remaining()
                    buffer.position(buffer.limit())
                }
            }
            return n - r
        }
    }

    /**
     * Tells whether this stream is ready to be read. A buffered character stream is ready if the buffer is not empty,
     * or if the underlying character stream is ready.
     *
     * @return `true` if the stream is ready to be read, false otherwise.
     * @throws IOException If an I/O error occurs.
     */
    @Throws(IOException::class)
    override suspend fun ready(): Boolean {
        lock.withLock {
            ensureOpen()
            return buffer.value.hasRemaining() || inStream!!.ready()
        }
    }

    /**
     * Tells whether this stream supports the mark() operation, which it does.
     *
     * @return True if the stream supports the mark() operation, false otherwise.
     */
    override suspend fun markSupported(): Boolean = true

    /**
     * Marks the present position in the stream. Subsequent calls to reset() will attempt to reposition the stream to
     * this point.
     *
     * @param readLimit  Limit on the number of characters that may be read while still preserving the mark. An attempt
     * to reset the stream after reading characters up to this limit or beyond may fail. A limit value larger than the size of the input buffer will cause a new buffer to be allocated whose size is no smaller than limit. Therefore large values should be used with care.
     *
     * @throws IllegalArgumentException If readAheadLimit is < 0.
     * @throws IOException If an I/O error occurs.
     */
    @Throws(IOException::class)
    override suspend fun mark(readLimit: Int) {
        require(readLimit >= 0) { "Read-ahead limit < 0" }

        lock.withLock {
            ensureOpen()
            setMark(readLimit)
        }
    }

    /**
     * Resets the stream to the most recent mark.
     *
     * @throws IOException If the stream has never been marked, or if the mark has been invalidated.
     */
    @Throws(IOException::class)
    override suspend fun reset() {
        lock.withLock {
            ensureOpen()
            if (!resetToMark()) {
                throw IOException("Mark invalid")
            }
        }
    }

    /**
     * Closes the stream and releases any system resources associated with it.
     *
     * @throws IOException If an I/O error occurs.
     */
    @Throws(IOException::class)
    override suspend fun close() {
        lock.withLock {
            if (inStream == null) return

            inStream!!.close()
            inStream = null

            buffer.release()
        }
    }


    private fun isMarked(): Boolean {
        return mark > UNMARKED
    }

    private fun setMark(readLimit: Int): Int {
        if(buffer.value.limit() < readLimit) {
            buffer.resize(readLimit)
        }

        val mark = this.buffer.value.position()

        this.mark = mark
        this.readAhead = readLimit
        return mark
    }

    private fun resetToMark(): Boolean {
        if(!isMarked()) {
            return false
        }

        this.buffer.value.position(mark)
        return true
    }
}