Reader bug fix
This commit is contained in:
@@ -19,6 +19,7 @@
|
||||
package xyz.quaver.pupil.util
|
||||
|
||||
import android.content.Context
|
||||
import android.util.Log
|
||||
import com.google.firebase.crashlytics.FirebaseCrashlytics
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.call.*
|
||||
@@ -30,14 +31,17 @@ import io.ktor.util.collections.*
|
||||
import io.ktor.utils.io.*
|
||||
import io.ktor.utils.io.core.*
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.BufferOverflow
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import org.kodein.di.DIAware
|
||||
import org.kodein.di.android.closestDI
|
||||
import org.kodein.di.instance
|
||||
import org.kodein.log.LoggerFactory
|
||||
import org.kodein.log.newLogger
|
||||
import java.io.File
|
||||
import java.io.IOException
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.Executors
|
||||
@@ -55,8 +59,13 @@ class NetworkCache(context: Context) : DIAware {
|
||||
|
||||
private val cacheDir = File(context.cacheDir, "networkcache")
|
||||
|
||||
private val channel = ConcurrentHashMap<String, Channel<Float>>()
|
||||
private val flowMutex = Mutex()
|
||||
private val flow = ConcurrentHashMap<String, MutableStateFlow<Float>>()
|
||||
|
||||
private val requestsMutex = Mutex()
|
||||
private val requests = ConcurrentHashMap<String, Job>()
|
||||
|
||||
private val activeFilesMutex = Mutex()
|
||||
private val activeFiles = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
|
||||
|
||||
private fun urlToFilename(url: String): String {
|
||||
@@ -69,21 +78,33 @@ class NetworkCache(context: Context) : DIAware {
|
||||
cacheDir.listFiles { file -> file.name !in activeFiles }?.forEach { it.delete() }
|
||||
}
|
||||
|
||||
fun free(urls: List<String>) = urls.forEach {
|
||||
requests[it]?.cancel()
|
||||
channel.remove(it)
|
||||
activeFiles.remove(urlToFilename(it))
|
||||
fun free(urls: List<String>) = CoroutineScope(Dispatchers.IO).launch {
|
||||
requestsMutex.withLock {
|
||||
urls.forEach {
|
||||
requests[it]?.cancel()
|
||||
}
|
||||
}
|
||||
flowMutex.withLock {
|
||||
urls.forEach {
|
||||
flow.remove(it)
|
||||
}
|
||||
}
|
||||
activeFilesMutex.withLock {
|
||||
urls.forEach {
|
||||
activeFiles.remove(urlToFilename(it))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun clear() = CoroutineScope(Dispatchers.IO).launch {
|
||||
requests.values.forEach { it.cancel() }
|
||||
channel.clear()
|
||||
flow.clear()
|
||||
activeFiles.clear()
|
||||
cacheDir.listFiles()?.forEach { it.delete() }
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
suspend fun load(requestBuilder: HttpRequestBuilder.() -> Unit): Pair<Channel<Float>, File> = coroutineScope {
|
||||
suspend fun load(force: Boolean = false, requestBuilder: HttpRequestBuilder.() -> Unit): Pair<StateFlow<Float>, File> = coroutineScope {
|
||||
val request = HttpRequestBuilder().apply(requestBuilder)
|
||||
|
||||
val url = request.url.buildString()
|
||||
@@ -92,56 +113,65 @@ class NetworkCache(context: Context) : DIAware {
|
||||
val file = File(cacheDir, fileName)
|
||||
activeFiles.add(fileName)
|
||||
|
||||
val progressChannel = if (channel[url]?.isClosedForSend == false)
|
||||
channel[url]!!
|
||||
else
|
||||
Channel<Float>(1, BufferOverflow.DROP_OLDEST).also { channel[url] = it }
|
||||
val progressFlow = flowMutex.withLock {
|
||||
if (flow.contains(url)) {
|
||||
flow[url]!!
|
||||
} else MutableStateFlow(0f).also { flow[url] = it }
|
||||
}
|
||||
|
||||
if (file.exists())
|
||||
progressChannel.close()
|
||||
else
|
||||
requests[url] = networkScope.launch {
|
||||
kotlin.runCatching {
|
||||
cacheDir.mkdirs()
|
||||
file.createNewFile()
|
||||
requestsMutex.withLock {
|
||||
if (!requests.contains(url) || force) {
|
||||
if (force) requests[url]?.cancelAndJoin()
|
||||
|
||||
client.request<HttpStatement>(request).execute { httpResponse ->
|
||||
val responseChannel: ByteReadChannel = httpResponse.receive()
|
||||
val contentLength = httpResponse.contentLength() ?: -1
|
||||
var readBytes = 0f
|
||||
requests[url] = networkScope.launch {
|
||||
runCatching {
|
||||
cacheDir.mkdirs()
|
||||
file.createNewFile()
|
||||
|
||||
file.outputStream().use { outputStream ->
|
||||
while (!responseChannel.isClosedForRead) {
|
||||
if (!isActive) {
|
||||
file.delete()
|
||||
break
|
||||
}
|
||||
client.request<HttpStatement>(request).execute { httpResponse ->
|
||||
if (!httpResponse.status.isSuccess()) throw IOException("${request.url} failed with code ${httpResponse.status.value}")
|
||||
val responseChannel: ByteReadChannel = httpResponse.receive()
|
||||
val contentLength = httpResponse.contentLength() ?: -1
|
||||
var readBytes = 0f
|
||||
|
||||
val packet = responseChannel.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
|
||||
while (!packet.isEmpty) {
|
||||
file.outputStream().use { outputStream ->
|
||||
outputStream.channel.truncate(0)
|
||||
while (!responseChannel.isClosedForRead) {
|
||||
if (!isActive) {
|
||||
file.delete()
|
||||
break
|
||||
}
|
||||
|
||||
val bytes = packet.readBytes()
|
||||
outputStream.write(bytes)
|
||||
val packet = responseChannel.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
|
||||
while (!packet.isEmpty) {
|
||||
if (!isActive) {
|
||||
file.delete()
|
||||
break
|
||||
}
|
||||
|
||||
readBytes += bytes.size
|
||||
progressChannel.trySend(readBytes / contentLength)
|
||||
val bytes = packet.readBytes()
|
||||
outputStream.write(bytes)
|
||||
|
||||
readBytes += bytes.size
|
||||
progressFlow.emit(readBytes / contentLength)
|
||||
}
|
||||
}
|
||||
}
|
||||
progressFlow.emit(Float.POSITIVE_INFINITY)
|
||||
}
|
||||
}.onFailure {
|
||||
Log.d("PUPILD-NC", it.message.toString())
|
||||
file.delete()
|
||||
FirebaseCrashlytics.getInstance().recordException(it)
|
||||
progressFlow.emit(Float.NEGATIVE_INFINITY)
|
||||
requestsMutex.withLock {
|
||||
requests.remove(url)
|
||||
}
|
||||
progressChannel.close()
|
||||
}
|
||||
}.onFailure {
|
||||
logger.warning(it)
|
||||
file.delete()
|
||||
FirebaseCrashlytics.getInstance().recordException(it)
|
||||
progressChannel.close(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return@coroutineScope progressChannel to file
|
||||
return@coroutineScope progressFlow to file
|
||||
}
|
||||
}
|
||||
@@ -67,7 +67,11 @@ fun View.show() {
|
||||
}
|
||||
|
||||
class FileXImageSource(val file: FileX): ImageSource {
|
||||
private val decoder = newBitmapRegionDecoder(file.inputStream()!!)
|
||||
private val decoder by lazy {
|
||||
file.inputStream()!!.use {
|
||||
newBitmapRegionDecoder(it)
|
||||
}
|
||||
}
|
||||
|
||||
override val imageSize by lazy { Size(decoder.width.toFloat(), decoder.height.toFloat()) }
|
||||
|
||||
|
||||
Reference in New Issue
Block a user