meta: rename compress module to fx.compression namespace
This commit is contained in:
@@ -0,0 +1,19 @@
|
||||
include(../cmake/Templates.cmake)
|
||||
|
||||
find_package(ZSTD)
|
||||
|
||||
if (ZSTD_FOUND)
|
||||
set(libs ${libs} ${ZSTD_LIBRARY})
|
||||
set(include_dirs ${include_dirs} ${ZSTD_INCLUDE_DIR})
|
||||
set(function_sources ${function_sources} ${CMAKE_CURRENT_SOURCE_DIR}/function/zstd.c)
|
||||
set(defines ${defines} FX_COMPRESSOR_SUPPORTED_ZSTD)
|
||||
message(STATUS "Enabling ZSTD support in fx-compress")
|
||||
endif ()
|
||||
|
||||
add_fx_module(
|
||||
NAME compress
|
||||
DEPENDENCIES core
|
||||
EXTRA_SOURCES ${function_sources}
|
||||
DEFINES ${defines}
|
||||
LIBS ${libs}
|
||||
INCLUDE_DIRS ${include_dirs})
|
||||
@@ -0,0 +1,239 @@
|
||||
#include <assert.h>
|
||||
#include <fx/compress/compressor.h>
|
||||
#include <fx/core/ringbuffer.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#define COMPRESSOR_DISPATCH_STATIC(func, compressor, ...) \
|
||||
do { \
|
||||
struct compressor_data _compressor; \
|
||||
enum fx_status status \
|
||||
= compressor_get_data(compressor, &_compressor); \
|
||||
if (!FX_OK(status)) { \
|
||||
return status; \
|
||||
} \
|
||||
return func(&_compressor, __VA_ARGS__); \
|
||||
} while (0)
|
||||
#define COMPRESSOR_DISPATCH_STATIC_0(func, compressor) \
|
||||
do { \
|
||||
struct compressor_data _compressor; \
|
||||
enum fx_status status \
|
||||
= compressor_get_data(compressor, &_compressor); \
|
||||
if (!FX_OK(status)) { \
|
||||
return status; \
|
||||
} \
|
||||
return func(&_compressor); \
|
||||
} while (0)
|
||||
|
||||
/*** PRIVATE DATA *************************************************************/
|
||||
|
||||
struct compressor_data {
|
||||
fx_compressor *c_obj;
|
||||
fx_compressor_class *c_ops;
|
||||
fx_compressor_data *c_data;
|
||||
};
|
||||
|
||||
/*** PRIVATE FUNCTIONS ********************************************************/
|
||||
|
||||
static enum fx_status compressor_get_data(
|
||||
fx_compressor *compressor, struct compressor_data *out)
|
||||
{
|
||||
out->c_obj = compressor;
|
||||
return fx_object_get_data(
|
||||
compressor, FX_TYPE_COMPRESSOR, NULL, (void **)&out->c_data,
|
||||
(void **)&out->c_ops);
|
||||
}
|
||||
|
||||
static enum fx_status compressor_get_mode(
|
||||
struct compressor_data *p, enum fx_compressor_mode *out)
|
||||
{
|
||||
if (out) {
|
||||
*out = p->c_data->c_mode;
|
||||
}
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
static enum fx_status compressor_set_mode(
|
||||
struct compressor_data *p, enum fx_compressor_mode mode)
|
||||
{
|
||||
if (!p->c_ops->c_set_mode) {
|
||||
return FX_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
return p->c_ops->c_set_mode(p->c_obj, mode);
|
||||
}
|
||||
|
||||
static enum fx_status compressor_set_buffer(
|
||||
struct compressor_data *p, fx_ringbuffer *inbuf, fx_ringbuffer *outbuf)
|
||||
{
|
||||
p->c_data->c_in = inbuf;
|
||||
p->c_data->c_out = outbuf;
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
static enum fx_status compress(struct compressor_data *p)
|
||||
{
|
||||
if (p->c_data->c_mode != FX_COMPRESSOR_MODE_COMPRESS) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (!p->c_ops->c_compress) {
|
||||
return FX_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
return p->c_ops->c_compress(p->c_obj);
|
||||
}
|
||||
|
||||
static enum fx_status decompress(struct compressor_data *p)
|
||||
{
|
||||
if (p->c_data->c_mode != FX_COMPRESSOR_MODE_DECOMPRESS) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (!p->c_ops->c_decompress) {
|
||||
return FX_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
return p->c_ops->c_decompress(p->c_obj);
|
||||
}
|
||||
|
||||
static enum fx_status compressor_step(struct compressor_data *p)
|
||||
{
|
||||
switch (p->c_data->c_mode) {
|
||||
case FX_COMPRESSOR_MODE_COMPRESS:
|
||||
return compress(p);
|
||||
case FX_COMPRESSOR_MODE_DECOMPRESS:
|
||||
return decompress(p);
|
||||
default:
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
}
|
||||
|
||||
static enum fx_status compressor_end(struct compressor_data *p)
|
||||
{
|
||||
if (p->c_data->c_mode != FX_COMPRESSOR_MODE_COMPRESS) {
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
if (!p->c_ops->c_compress_end) {
|
||||
return FX_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
while (fx_ringbuffer_available_data_remaining(p->c_data->c_in)) {
|
||||
if (!fx_ringbuffer_write_capacity_remaining(p->c_data->c_out)) {
|
||||
return FX_ERR_NO_SPACE;
|
||||
}
|
||||
|
||||
enum fx_status status = compressor_step(p);
|
||||
if (!FX_OK(status)) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
return p->c_ops->c_compress_end(p->c_obj);
|
||||
}
|
||||
|
||||
static enum fx_status compressor_reset(struct compressor_data *p)
|
||||
{
|
||||
p->c_data->c_flags &= ~FX_COMPRESSOR_EOF;
|
||||
|
||||
if (p->c_ops->c_reset) {
|
||||
return p->c_ops->c_reset(p->c_obj);
|
||||
}
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
static bool compressor_eof(const struct compressor_data *p)
|
||||
{
|
||||
return (p->c_data->c_flags & FX_COMPRESSOR_EOF) != 0;
|
||||
}
|
||||
|
||||
/*** PUBLIC FUNCTIONS *********************************************************/
|
||||
|
||||
enum fx_status fx_compressor_get_buffer_size(
|
||||
fx_type type, fx_compressor_mode mode, size_t *inbuf_size, size_t *outbuf_size)
|
||||
{
|
||||
fx_class *c = fx_class_get(type);
|
||||
if (!c) {
|
||||
return FX_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
fx_compressor_class *ops = fx_class_get_interface(c, FX_TYPE_COMPRESSOR);
|
||||
if (!ops) {
|
||||
return FX_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
if (!ops->c_buffer_size) {
|
||||
return FX_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
return ops->c_buffer_size(mode, inbuf_size, outbuf_size);
|
||||
}
|
||||
|
||||
enum fx_status fx_compressor_get_mode(
|
||||
const fx_compressor *compressor, enum fx_compressor_mode *out)
|
||||
{
|
||||
COMPRESSOR_DISPATCH_STATIC(
|
||||
compressor_get_mode, (fx_compressor *)compressor, out);
|
||||
}
|
||||
|
||||
enum fx_status fx_compressor_set_mode(
|
||||
fx_compressor *compressor, enum fx_compressor_mode mode)
|
||||
{
|
||||
COMPRESSOR_DISPATCH_STATIC(compressor_set_mode, compressor, mode);
|
||||
}
|
||||
|
||||
enum fx_status fx_compressor_set_buffer(
|
||||
fx_compressor *compressor, fx_ringbuffer *inbuf, fx_ringbuffer *outbuf)
|
||||
{
|
||||
COMPRESSOR_DISPATCH_STATIC(compressor_set_buffer, compressor, inbuf, outbuf);
|
||||
}
|
||||
|
||||
enum fx_status fx_compressor_step(fx_compressor *compressor)
|
||||
{
|
||||
COMPRESSOR_DISPATCH_STATIC_0(compressor_step, compressor);
|
||||
}
|
||||
|
||||
enum fx_status fx_compressor_end(fx_compressor *compressor)
|
||||
{
|
||||
COMPRESSOR_DISPATCH_STATIC_0(compressor_end, compressor);
|
||||
}
|
||||
|
||||
enum fx_status fx_compressor_reset(fx_compressor *compressor)
|
||||
{
|
||||
COMPRESSOR_DISPATCH_STATIC_0(compressor_reset, compressor);
|
||||
}
|
||||
|
||||
bool fx_compressor_eof(const fx_compressor *compressor)
|
||||
{
|
||||
COMPRESSOR_DISPATCH_STATIC_0(compressor_eof, (fx_compressor *)compressor);
|
||||
}
|
||||
|
||||
/*** VIRTUAL FUNCTIONS ********************************************************/
|
||||
|
||||
static void compressor_init(fx_object *obj, void *priv)
|
||||
{
|
||||
}
|
||||
|
||||
static void compressor_fini(fx_object *obj, void *priv)
|
||||
{
|
||||
}
|
||||
|
||||
/*** CLASS DEFINITION *********************************************************/
|
||||
|
||||
FX_TYPE_CLASS_DEFINITION_BEGIN(fx_compressor)
|
||||
FX_TYPE_CLASS_INTERFACE_BEGIN(fx_object, FX_TYPE_OBJECT)
|
||||
FX_INTERFACE_ENTRY(to_string) = NULL;
|
||||
FX_TYPE_CLASS_INTERFACE_END(fx_object, FX_TYPE_OBJECT)
|
||||
FX_TYPE_CLASS_DEFINITION_END(fx_compressor)
|
||||
|
||||
FX_TYPE_DEFINITION_BEGIN(fx_compressor)
|
||||
FX_TYPE_ID(0x452ee0f9, 0xfe12, 0x48a1, 0xb596, 0xad5b7a3940e7);
|
||||
FX_TYPE_CLASS(fx_compressor_class);
|
||||
FX_TYPE_INSTANCE_PROTECTED(fx_compressor_data);
|
||||
FX_TYPE_INSTANCE_INIT(compressor_init);
|
||||
FX_TYPE_INSTANCE_FINI(compressor_fini);
|
||||
FX_TYPE_DEFINITION_END(fx_compressor)
|
||||
@@ -0,0 +1,890 @@
|
||||
#include <fx/compress/compressor.h>
|
||||
#include <fx/compress/cstream.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
/*** PRIVATE DATA *************************************************************/
|
||||
|
||||
enum cstream_flags {
|
||||
CSTREAM_CURSOR_MOVED = 0x01u,
|
||||
};
|
||||
|
||||
struct fx_cstream_p {
|
||||
enum cstream_flags s_flags;
|
||||
fx_stream *s_endpoint;
|
||||
fx_compressor *s_compressor;
|
||||
/* s_in is the input buffer, and s_out is the output buffer.
|
||||
*
|
||||
* the input buffer holds data that will be provided to the
|
||||
* (de)compression function. in compression mode, this data is provided
|
||||
* by the code using the cstream (via fx_cstream_write). in decompression
|
||||
* mode, this data is read from s_endpoint.
|
||||
*
|
||||
* the output buffer holds data produced by the (de)compression
|
||||
* function. in compression mode, this data will be written to
|
||||
* s_endpoint. in decompression mode, this data will be returned to the
|
||||
* code using the cstream (via fx_cstream_read)
|
||||
*
|
||||
* heavy usage of cstream's compressed sections facility can result
|
||||
* in the input buffer holding uncompressed data while the stream is in
|
||||
* decompression mode. this is handled by the uncompressed read code path.
|
||||
*/
|
||||
fx_ringbuffer *s_in, *s_out;
|
||||
fx_compressor_mode s_mode;
|
||||
|
||||
unsigned int s_compression_depth;
|
||||
/* tracks the number of bytes read from or written to the endpoint.
|
||||
* this counter is not reset at the beginning/end of each section.
|
||||
*
|
||||
* during compressed sections, this counter is incremented by the number
|
||||
* of compressed bytes written/consumed.
|
||||
*
|
||||
* during uncompressed sections, this counter is incremented by the
|
||||
* number of uncompressed bytes written/returned.
|
||||
*
|
||||
* this does not include bytes read/written while the cursor is moved.
|
||||
*/
|
||||
size_t s_tx_bytes;
|
||||
/* tracks the number of compressed bytes that have passed through this
|
||||
* stream in the current section.
|
||||
*
|
||||
* in compression mode, this tracks the number of post-compression bytes
|
||||
* that have been written to the endpoint within the current section,
|
||||
* including any bytes written during end_compression_section()
|
||||
*
|
||||
* in decompression mode, this tracks the number of compressed bytes
|
||||
* that were decompressed while reading the current section. it does not
|
||||
* include any uncompressed bytes that may have been read from the
|
||||
* endpoint while reading a compressed section due to cstream's
|
||||
* read-ahead caching behaviour.
|
||||
*/
|
||||
size_t s_tx_bytes_compressed;
|
||||
/* tracks the number of uncompressed bytes that have passed through this
|
||||
* stream in the current section.
|
||||
*
|
||||
* in compression mode, this tracks the number of bytes given to
|
||||
* fx_cstream_write
|
||||
*
|
||||
* in decompression mode, this tracks the number of bytes returned by
|
||||
* fx_cstream_read
|
||||
*/
|
||||
size_t s_tx_bytes_uncompressed;
|
||||
|
||||
/* when the endpoint cursor is moved, the previous cursor position is
|
||||
* saved here so it can be restored later */
|
||||
size_t s_cursor;
|
||||
};
|
||||
|
||||
/*** PRIVATE FUNCTIONS ********************************************************/
|
||||
|
||||
static enum fx_status read_cursor(
|
||||
struct fx_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
{
|
||||
return fx_stream_read_bytes(stream->s_endpoint, buf, count, out_nr_read);
|
||||
}
|
||||
|
||||
static enum fx_status read_uncompressed(
|
||||
struct fx_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
{
|
||||
size_t remaining = count;
|
||||
unsigned char *dest = buf;
|
||||
size_t nr_read_from_buf = 0;
|
||||
size_t nr_read_from_endpoint = 0;
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
|
||||
/* liberal usage of begin_compressed_section and end_compressed_section
|
||||
* can result in uncompressed data getting stuck in the input buffer.
|
||||
* return any data remaining in the input buffer before reading more
|
||||
* from the endpoint */
|
||||
|
||||
while (remaining > 0) {
|
||||
const void *data;
|
||||
size_t available;
|
||||
status = fx_ringbuffer_open_read_buffer(
|
||||
stream->s_in, &data, &available);
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t to_copy = remaining;
|
||||
if (to_copy > available) {
|
||||
to_copy = available;
|
||||
}
|
||||
|
||||
memcpy(dest, data, to_copy);
|
||||
|
||||
fx_ringbuffer_close_read_buffer(stream->s_in, &data, to_copy);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += to_copy;
|
||||
stream->s_tx_bytes += to_copy;
|
||||
dest += to_copy;
|
||||
remaining -= to_copy;
|
||||
nr_read_from_buf += to_copy;
|
||||
}
|
||||
|
||||
if (remaining == 0) {
|
||||
*out_nr_read = nr_read_from_buf;
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
status = fx_stream_read_bytes(
|
||||
stream->s_endpoint, dest, remaining, &nr_read_from_endpoint);
|
||||
stream->s_tx_bytes_uncompressed += nr_read_from_endpoint;
|
||||
stream->s_tx_bytes += nr_read_from_endpoint;
|
||||
|
||||
*out_nr_read = nr_read_from_endpoint + nr_read_from_buf;
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
/* read compressed data from the endpoint and store it in the input buffer.
|
||||
* note that uncompressed data that is trailing the compressed blob may
|
||||
* also be read by this function, but this will be handled by read_uncompressed.
|
||||
*/
|
||||
static enum fx_status refill_input_buffer(struct fx_cstream_p *stream)
|
||||
{
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
size_t nr_read = 0;
|
||||
|
||||
while (1) {
|
||||
void *data;
|
||||
size_t capacity;
|
||||
status = fx_ringbuffer_open_write_buffer(
|
||||
stream->s_in, &data, &capacity);
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t r = 0;
|
||||
status = fx_stream_read_bytes(
|
||||
stream->s_endpoint, data, capacity, &r);
|
||||
|
||||
fx_ringbuffer_close_write_buffer(stream->s_in, &data, r);
|
||||
nr_read += r;
|
||||
|
||||
if (r < capacity) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (status == FX_ERR_NO_SPACE && nr_read > 0) {
|
||||
status = FX_SUCCESS;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
/* push compressed data out of the input buffer, through the (de)compressor,
|
||||
* and store the resulting uncompressed data in the output buffer */
|
||||
static enum fx_status refill_output_buffer(struct fx_cstream_p *stream)
|
||||
{
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
|
||||
if (fx_compressor_eof(stream->s_compressor)) {
|
||||
return FX_ERR_NO_DATA;
|
||||
}
|
||||
|
||||
if (!fx_ringbuffer_available_data_remaining(stream->s_in)) {
|
||||
status = refill_input_buffer(stream);
|
||||
}
|
||||
|
||||
if (!FX_OK(status)) {
|
||||
return status;
|
||||
}
|
||||
|
||||
size_t bytes_before = fx_ringbuffer_available_data_remaining(stream->s_in);
|
||||
status = fx_compressor_step(stream->s_compressor);
|
||||
size_t bytes_after = fx_ringbuffer_available_data_remaining(stream->s_in);
|
||||
|
||||
stream->s_tx_bytes_compressed += (bytes_before - bytes_after);
|
||||
stream->s_tx_bytes += (bytes_before - bytes_after);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_read(
|
||||
struct fx_cstream_p *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
{
|
||||
if (stream->s_mode != FX_COMPRESSOR_MODE_DECOMPRESS) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
||||
return read_cursor(stream, buf, count, out_nr_read);
|
||||
}
|
||||
|
||||
if (stream->s_compression_depth == 0) {
|
||||
return read_uncompressed(stream, buf, count, out_nr_read);
|
||||
}
|
||||
|
||||
unsigned char *dest = buf;
|
||||
size_t nr_read = 0;
|
||||
size_t remaining = count;
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
|
||||
while (remaining > 0) {
|
||||
if (!fx_ringbuffer_available_data_remaining(stream->s_out)) {
|
||||
status = refill_output_buffer(stream);
|
||||
}
|
||||
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
const void *data;
|
||||
size_t available;
|
||||
status = fx_ringbuffer_open_read_buffer(
|
||||
stream->s_out, &data, &available);
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t to_copy = remaining;
|
||||
if (to_copy > available) {
|
||||
to_copy = available;
|
||||
}
|
||||
|
||||
memcpy(dest, data, to_copy);
|
||||
|
||||
fx_ringbuffer_close_read_buffer(stream->s_out, &data, to_copy);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += to_copy;
|
||||
dest += to_copy;
|
||||
nr_read += to_copy;
|
||||
remaining -= to_copy;
|
||||
}
|
||||
|
||||
if (status == FX_ERR_NO_DATA) {
|
||||
status = FX_SUCCESS;
|
||||
}
|
||||
|
||||
*out_nr_read = nr_read;
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum fx_status write_cursor(
|
||||
struct fx_cstream_p *stream, const void *buf, size_t count, size_t *nr_written)
|
||||
{
|
||||
return fx_stream_write_bytes(stream->s_endpoint, buf, count, nr_written);
|
||||
}
|
||||
|
||||
static enum fx_status write_uncompressed(
|
||||
struct fx_cstream_p *stream, const void *buf, size_t count, size_t *nr_written)
|
||||
{
|
||||
size_t w = 0;
|
||||
enum fx_status status
|
||||
= fx_stream_write_bytes(stream->s_endpoint, buf, count, &w);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += w;
|
||||
stream->s_tx_bytes += w;
|
||||
|
||||
*nr_written = w;
|
||||
return status;
|
||||
}
|
||||
|
||||
/* push uncompressed data out of the input buffer, through the compressor,
|
||||
* and store the resulting compressed data in the output buffer */
|
||||
static enum fx_status flush_input_buffer(struct fx_cstream_p *stream)
|
||||
{
|
||||
if (!fx_ringbuffer_available_data_remaining(stream->s_in)) {
|
||||
return FX_ERR_NO_DATA;
|
||||
}
|
||||
|
||||
return fx_compressor_step(stream->s_compressor);
|
||||
}
|
||||
|
||||
/* push compressed data from the output buffer into the endpoint */
|
||||
static enum fx_status flush_output_buffer(struct fx_cstream_p *stream)
|
||||
{
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
size_t nr_written = 0;
|
||||
|
||||
while (1) {
|
||||
const void *data;
|
||||
size_t capacity;
|
||||
status = fx_ringbuffer_open_read_buffer(
|
||||
stream->s_out, &data, &capacity);
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t w = 0;
|
||||
status = fx_stream_write_bytes(
|
||||
stream->s_endpoint, data, capacity, &w);
|
||||
|
||||
fx_ringbuffer_close_read_buffer(stream->s_out, &data, w);
|
||||
nr_written += w;
|
||||
stream->s_tx_bytes_compressed += w;
|
||||
stream->s_tx_bytes += w;
|
||||
|
||||
if (w < capacity) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (status == FX_ERR_NO_DATA && nr_written > 0) {
|
||||
status = FX_SUCCESS;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_write(
|
||||
struct fx_cstream_p *stream, const void *buf, size_t count,
|
||||
size_t *out_nr_written)
|
||||
{
|
||||
if (stream->s_mode != FX_COMPRESSOR_MODE_COMPRESS) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
||||
return write_cursor(stream, buf, count, out_nr_written);
|
||||
}
|
||||
|
||||
if (stream->s_compression_depth == 0) {
|
||||
return write_uncompressed(stream, buf, count, out_nr_written);
|
||||
}
|
||||
|
||||
const unsigned char *src = buf;
|
||||
size_t nr_written = 0;
|
||||
size_t remaining = count;
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
|
||||
while (remaining > 0) {
|
||||
if (!fx_ringbuffer_write_capacity_remaining(stream->s_out)) {
|
||||
status = flush_output_buffer(stream);
|
||||
}
|
||||
|
||||
if (!fx_ringbuffer_write_capacity_remaining(stream->s_in)) {
|
||||
status = flush_input_buffer(stream);
|
||||
}
|
||||
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
void *data;
|
||||
size_t available;
|
||||
status = fx_ringbuffer_open_write_buffer(
|
||||
stream->s_in, &data, &available);
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t to_copy = remaining;
|
||||
if (to_copy > available) {
|
||||
to_copy = available;
|
||||
}
|
||||
|
||||
memcpy(data, src, to_copy);
|
||||
|
||||
fx_ringbuffer_close_write_buffer(stream->s_in, &data, to_copy);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += to_copy;
|
||||
src += to_copy;
|
||||
nr_written += to_copy;
|
||||
remaining -= to_copy;
|
||||
}
|
||||
|
||||
if (status == FX_ERR_NO_DATA) {
|
||||
status = FX_SUCCESS;
|
||||
}
|
||||
|
||||
*out_nr_written = nr_written;
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum fx_status skip_uncompressed(
|
||||
struct fx_cstream_p *stream, size_t count, size_t *out_nr_skipped)
|
||||
{
|
||||
size_t remaining = count;
|
||||
size_t nr_read_from_buf = 0;
|
||||
size_t nr_read_from_endpoint = 0;
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
|
||||
/* liberal usage of begin_compressed_section and end_compressed_section
|
||||
* can result in uncompressed data getting stuck in the input buffer.
|
||||
* return any data remaining in the input buffer before reading more
|
||||
* from the endpoint */
|
||||
|
||||
while (remaining > 0) {
|
||||
const void *data;
|
||||
size_t available;
|
||||
status = fx_ringbuffer_open_read_buffer(
|
||||
stream->s_in, &data, &available);
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t to_copy = remaining;
|
||||
if (to_copy > available) {
|
||||
to_copy = available;
|
||||
}
|
||||
|
||||
fx_ringbuffer_close_read_buffer(stream->s_in, &data, to_copy);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += to_copy;
|
||||
stream->s_tx_bytes += to_copy;
|
||||
remaining -= to_copy;
|
||||
nr_read_from_buf += to_copy;
|
||||
}
|
||||
|
||||
if (remaining == 0) {
|
||||
if (out_nr_skipped) {
|
||||
*out_nr_skipped = nr_read_from_buf;
|
||||
}
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
size_t cursor = fx_stream_cursor(stream->s_endpoint);
|
||||
|
||||
status = fx_stream_seek(stream->s_endpoint, remaining, FX_STREAM_SEEK_CURRENT);
|
||||
nr_read_from_endpoint = fx_stream_cursor(stream->s_endpoint) - cursor;
|
||||
stream->s_tx_bytes_uncompressed += nr_read_from_endpoint;
|
||||
stream->s_tx_bytes += nr_read_from_endpoint;
|
||||
|
||||
if (out_nr_skipped) {
|
||||
*out_nr_skipped = nr_read_from_endpoint + nr_read_from_buf;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_skip(
|
||||
struct fx_cstream_p *stream, size_t count, size_t *out_nr_skipped)
|
||||
{
|
||||
if (stream->s_mode != FX_COMPRESSOR_MODE_DECOMPRESS) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (stream->s_compression_depth == 0) {
|
||||
return skip_uncompressed(stream, count, out_nr_skipped);
|
||||
}
|
||||
|
||||
if (fx_compressor_eof(stream->s_compressor)
|
||||
&& !fx_ringbuffer_available_data_remaining(stream->s_out)) {
|
||||
if (out_nr_skipped) {
|
||||
*out_nr_skipped = 0;
|
||||
}
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
size_t nr_read = 0;
|
||||
size_t remaining = count;
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
|
||||
while (remaining > 0) {
|
||||
if (!fx_ringbuffer_available_data_remaining(stream->s_out)) {
|
||||
status = refill_output_buffer(stream);
|
||||
}
|
||||
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
const void *data;
|
||||
size_t available;
|
||||
status = fx_ringbuffer_open_read_buffer(
|
||||
stream->s_out, &data, &available);
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
size_t to_copy = remaining;
|
||||
if (to_copy > available) {
|
||||
to_copy = available;
|
||||
}
|
||||
|
||||
fx_ringbuffer_close_read_buffer(stream->s_out, &data, to_copy);
|
||||
|
||||
stream->s_tx_bytes_uncompressed += to_copy;
|
||||
nr_read += to_copy;
|
||||
remaining -= to_copy;
|
||||
}
|
||||
|
||||
if (status == FX_ERR_NO_DATA) {
|
||||
status = FX_SUCCESS;
|
||||
}
|
||||
|
||||
if (out_nr_skipped) {
|
||||
*out_nr_skipped = nr_read;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_reset(struct fx_cstream_p *stream)
|
||||
{
|
||||
if (stream->s_mode != FX_COMPRESSOR_MODE_DECOMPRESS) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
stream->s_flags = 0;
|
||||
|
||||
fx_stream_seek(stream->s_endpoint, 0, FX_STREAM_SEEK_START);
|
||||
fx_ringbuffer_clear(stream->s_in);
|
||||
fx_ringbuffer_clear(stream->s_out);
|
||||
fx_compressor_reset(stream->s_compressor);
|
||||
|
||||
stream->s_compression_depth = 0;
|
||||
stream->s_tx_bytes = 0;
|
||||
stream->s_tx_bytes_uncompressed = 0;
|
||||
stream->s_tx_bytes_compressed = 0;
|
||||
stream->s_cursor = 0;
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_begin_compressed_section(
|
||||
struct fx_cstream_p *stream, size_t *tx_uncompressed_bytes)
|
||||
{
|
||||
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (tx_uncompressed_bytes) {
|
||||
*tx_uncompressed_bytes = stream->s_tx_bytes_uncompressed;
|
||||
}
|
||||
|
||||
if (stream->s_compression_depth > 0) {
|
||||
stream->s_compression_depth++;
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
stream->s_compression_depth = 1;
|
||||
stream->s_tx_bytes_uncompressed = 0;
|
||||
stream->s_tx_bytes_compressed = 0;
|
||||
fx_compressor_reset(stream->s_compressor);
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_end_compressed_section(
|
||||
struct fx_cstream_p *stream, size_t *tx_compressed_bytes,
|
||||
size_t *tx_uncompressed_bytes)
|
||||
{
|
||||
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
tx_compressed_bytes
|
||||
&& (*tx_compressed_bytes = stream->s_tx_bytes_compressed);
|
||||
|
||||
tx_uncompressed_bytes
|
||||
&& (*tx_uncompressed_bytes = stream->s_tx_bytes_uncompressed);
|
||||
|
||||
if (stream->s_compression_depth > 1) {
|
||||
stream->s_compression_depth--;
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
stream->s_compression_depth = 0;
|
||||
|
||||
if (stream->s_mode == FX_COMPRESSOR_MODE_DECOMPRESS) {
|
||||
stream->s_tx_bytes_compressed = 0;
|
||||
stream->s_tx_bytes_uncompressed = 0;
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
while (1) {
|
||||
status = fx_compressor_end(stream->s_compressor);
|
||||
if (!FX_OK(status) && status != FX_ERR_NO_SPACE) {
|
||||
break;
|
||||
}
|
||||
|
||||
status = flush_output_buffer(stream);
|
||||
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (fx_compressor_eof(stream->s_compressor)) {
|
||||
status = FX_SUCCESS;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* refresh these output variables to account for any data
|
||||
* written by fx_compressor_end */
|
||||
tx_compressed_bytes
|
||||
&& (*tx_compressed_bytes = stream->s_tx_bytes_compressed);
|
||||
|
||||
tx_uncompressed_bytes
|
||||
&& (*tx_uncompressed_bytes = stream->s_tx_bytes_uncompressed);
|
||||
|
||||
if (!FX_OK(status)) {
|
||||
return status;
|
||||
}
|
||||
|
||||
stream->s_tx_bytes_compressed = 0;
|
||||
stream->s_tx_bytes_uncompressed = 0;
|
||||
|
||||
return flush_output_buffer(stream);
|
||||
}
|
||||
|
||||
static bool cstream_in_compressed_section(const struct fx_cstream_p *stream)
|
||||
{
|
||||
return stream->s_compression_depth > 0;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_tx_bytes(const struct fx_cstream_p *stream, size_t *out)
|
||||
{
|
||||
*out = stream->s_tx_bytes;
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_tx_bytes_compressed(
|
||||
const struct fx_cstream_p *stream, size_t *out)
|
||||
{
|
||||
*out = stream->s_tx_bytes_compressed;
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_tx_bytes_uncompressed(
|
||||
const struct fx_cstream_p *stream, size_t *out)
|
||||
{
|
||||
*out = stream->s_tx_bytes_uncompressed;
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_set_cursor_position(
|
||||
struct fx_cstream_p *stream, size_t pos)
|
||||
{
|
||||
if (stream->s_compression_depth > 0) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
if (stream->s_flags & CSTREAM_CURSOR_MOVED) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
stream->s_cursor = fx_stream_cursor(stream->s_endpoint);
|
||||
|
||||
enum fx_status status
|
||||
= fx_stream_seek(stream->s_endpoint, pos, FX_STREAM_SEEK_START);
|
||||
if (!FX_OK(status)) {
|
||||
stream->s_cursor = 0;
|
||||
return status;
|
||||
}
|
||||
|
||||
stream->s_flags |= CSTREAM_CURSOR_MOVED;
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
static enum fx_status cstream_restore_cursor_position(struct fx_cstream_p *stream)
|
||||
{
|
||||
if (!(stream->s_flags & CSTREAM_CURSOR_MOVED)) {
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
enum fx_status status = fx_stream_seek(
|
||||
stream->s_endpoint, stream->s_cursor, FX_STREAM_SEEK_START);
|
||||
stream->s_cursor = 0;
|
||||
|
||||
if (!FX_OK(status)) {
|
||||
return status;
|
||||
}
|
||||
|
||||
stream->s_flags &= ~CSTREAM_CURSOR_MOVED;
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
/*** PUBLIC FUNCTIONS *********************************************************/
|
||||
|
||||
enum fx_status fx_cstream_open(
|
||||
fx_stream *endpoint, fx_type compressor_type, fx_compressor_mode mode,
|
||||
fx_cstream **out)
|
||||
{
|
||||
size_t inbuf_size = 0, outbuf_size = 0;
|
||||
enum fx_status status = fx_compressor_get_buffer_size(
|
||||
compressor_type, mode, &inbuf_size, &outbuf_size);
|
||||
if (!FX_OK(status)) {
|
||||
return status;
|
||||
}
|
||||
|
||||
fx_cstream *stream = fx_object_create(FX_TYPE_CSTREAM);
|
||||
if (!stream) {
|
||||
return FX_ERR_NO_MEMORY;
|
||||
}
|
||||
|
||||
struct fx_cstream_p *p = fx_object_get_private(stream, FX_TYPE_CSTREAM);
|
||||
fx_stream_cfg *cfg = fx_object_get_protected(stream, FX_TYPE_STREAM);
|
||||
|
||||
p->s_mode = mode;
|
||||
p->s_endpoint = endpoint;
|
||||
|
||||
cfg->s_mode = (mode == FX_COMPRESSOR_MODE_COMPRESS) ? FX_STREAM_WRITE
|
||||
: FX_STREAM_READ;
|
||||
|
||||
p->s_in = fx_ringbuffer_create(inbuf_size + 1);
|
||||
if (!FX_OK(status)) {
|
||||
free(stream);
|
||||
return status;
|
||||
}
|
||||
|
||||
p->s_out = fx_ringbuffer_create(outbuf_size + 1);
|
||||
if (!FX_OK(status)) {
|
||||
fx_cstream_unref(stream);
|
||||
return status;
|
||||
}
|
||||
|
||||
p->s_compressor = fx_object_create(compressor_type);
|
||||
if (!p->s_compressor) {
|
||||
fx_cstream_unref(stream);
|
||||
|
||||
return FX_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
fx_compressor_set_buffer(p->s_compressor, p->s_in, p->s_out);
|
||||
fx_compressor_set_mode(p->s_compressor, mode);
|
||||
|
||||
*out = stream;
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_read(
|
||||
fx_cstream *stream, void *buf, size_t count, size_t *out_nr_read)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC(
|
||||
FX_TYPE_CSTREAM, cstream_read, stream, buf, count, out_nr_read);
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_write(
|
||||
fx_cstream *stream, const void *buf, size_t count, size_t *out_nr_written)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC(
|
||||
FX_TYPE_CSTREAM, cstream_write, stream, buf, count, out_nr_written);
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_skip(fx_cstream *stream, size_t count, size_t *out_nr_skipped)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC(
|
||||
FX_TYPE_CSTREAM, cstream_skip, stream, count, out_nr_skipped);
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_reset(fx_cstream *stream)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC_0(FX_TYPE_CSTREAM, cstream_reset, stream);
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_begin_compressed_section(
|
||||
fx_cstream *stream, size_t *tx_uncompressed_bytes)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC(
|
||||
FX_TYPE_CSTREAM, cstream_begin_compressed_section, stream,
|
||||
tx_uncompressed_bytes);
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_end_compressed_section(
|
||||
fx_cstream *stream, size_t *tx_compressed_bytes, size_t *tx_uncompressed_bytes)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC(
|
||||
FX_TYPE_CSTREAM, cstream_end_compressed_section, stream,
|
||||
tx_compressed_bytes, tx_uncompressed_bytes);
|
||||
}
|
||||
|
||||
bool fx_cstream_in_compressed_section(const fx_cstream *stream)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC_0(
|
||||
FX_TYPE_CSTREAM, cstream_in_compressed_section, stream);
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_tx_bytes(const fx_cstream *stream, size_t *out)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC(FX_TYPE_CSTREAM, cstream_tx_bytes, stream, out);
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_tx_bytes_compressed(const fx_cstream *stream, size_t *out)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC(
|
||||
FX_TYPE_CSTREAM, cstream_tx_bytes_compressed, stream, out);
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_tx_bytes_uncompressed(const fx_cstream *stream, size_t *out)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC(
|
||||
FX_TYPE_CSTREAM, cstream_tx_bytes_uncompressed, stream, out);
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_set_cursor_position(fx_cstream *stream, size_t pos)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC(
|
||||
FX_TYPE_CSTREAM, cstream_set_cursor_position, stream, pos);
|
||||
}
|
||||
|
||||
enum fx_status fx_cstream_restore_cursor_position(fx_cstream *stream)
|
||||
{
|
||||
FX_CLASS_DISPATCH_STATIC_0(
|
||||
FX_TYPE_CSTREAM, cstream_restore_cursor_position, stream);
|
||||
}
|
||||
|
||||
/*** VIRTUAL FUNCTIONS ********************************************************/
|
||||
|
||||
static void cstream_init(fx_object *obj, void *priv)
|
||||
{
|
||||
}
|
||||
|
||||
static void cstream_fini(fx_object *obj, void *priv)
|
||||
{
|
||||
struct fx_cstream_p *stream = priv;
|
||||
|
||||
if (stream->s_compressor) {
|
||||
fx_compressor_unref(stream->s_compressor);
|
||||
}
|
||||
|
||||
if (stream->s_in) {
|
||||
fx_ringbuffer_unref(stream->s_in);
|
||||
}
|
||||
|
||||
if (stream->s_out) {
|
||||
fx_ringbuffer_unref(stream->s_out);
|
||||
}
|
||||
}
|
||||
|
||||
/*** CLASS DEFINITION *********************************************************/
|
||||
|
||||
FX_TYPE_CLASS_DEFINITION_BEGIN(fx_cstream)
|
||||
FX_TYPE_CLASS_INTERFACE_BEGIN(fx_object, FX_TYPE_OBJECT)
|
||||
FX_INTERFACE_ENTRY(to_string) = NULL;
|
||||
FX_TYPE_CLASS_INTERFACE_END(fx_object, FX_TYPE_OBJECT)
|
||||
|
||||
FX_TYPE_CLASS_INTERFACE_BEGIN(fx_stream, FX_TYPE_STREAM)
|
||||
FX_INTERFACE_ENTRY(s_close) = NULL;
|
||||
FX_INTERFACE_ENTRY(s_seek) = NULL;
|
||||
FX_INTERFACE_ENTRY(s_tell) = NULL;
|
||||
FX_INTERFACE_ENTRY(s_getc) = NULL;
|
||||
FX_INTERFACE_ENTRY(s_read) = fx_cstream_read;
|
||||
FX_INTERFACE_ENTRY(s_write) = fx_cstream_write;
|
||||
FX_INTERFACE_ENTRY(s_reserve) = NULL;
|
||||
FX_TYPE_CLASS_INTERFACE_END(fx_stream, FX_TYPE_STREAM)
|
||||
FX_TYPE_CLASS_DEFINITION_END(fx_cstream)
|
||||
|
||||
FX_TYPE_DEFINITION_BEGIN(fx_cstream)
|
||||
FX_TYPE_ID(0xe1e899b5, 0x6a3c, 0x4f9c, 0xafd0, 0xaab3f156615c);
|
||||
FX_TYPE_EXTENDS(FX_TYPE_STREAM);
|
||||
FX_TYPE_CLASS(fx_cstream_class);
|
||||
FX_TYPE_INSTANCE_PRIVATE(struct fx_cstream_p);
|
||||
FX_TYPE_INSTANCE_INIT(cstream_init);
|
||||
FX_TYPE_INSTANCE_FINI(cstream_fini);
|
||||
FX_TYPE_DEFINITION_END(fx_cstream)
|
||||
@@ -0,0 +1,362 @@
|
||||
#include <fx/compress/zstd.h>
|
||||
#include <fx/core/ringbuffer.h>
|
||||
#include <zstd.h>
|
||||
|
||||
/*** PRIVATE DATA *************************************************************/
|
||||
|
||||
struct fx_zstd_compressor_p {
|
||||
union {
|
||||
ZSTD_CCtx *zstd_c;
|
||||
ZSTD_DCtx *zstd_d;
|
||||
};
|
||||
};
|
||||
|
||||
/*** PUBLIC FUNCTIONS *********************************************************/
|
||||
|
||||
fx_status fx_zstd_compressor_get_buffer_size(
|
||||
fx_compressor_mode mode, size_t *inbuf_size, size_t *outbuf_size)
|
||||
{
|
||||
switch (mode) {
|
||||
case FX_COMPRESSOR_MODE_COMPRESS:
|
||||
*inbuf_size = ZSTD_CStreamInSize();
|
||||
*outbuf_size = ZSTD_CStreamOutSize();
|
||||
break;
|
||||
case FX_COMPRESSOR_MODE_DECOMPRESS:
|
||||
*inbuf_size = ZSTD_DStreamInSize();
|
||||
*outbuf_size = ZSTD_DStreamOutSize();
|
||||
break;
|
||||
default:
|
||||
return FX_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
/*** VIRTUAL FUNCTIONS ********************************************************/
|
||||
|
||||
static void zstd_compressor_init(fx_object *obj, void *priv)
|
||||
{
|
||||
}
|
||||
|
||||
static void zstd_compressor_fini(fx_object *obj, void *priv)
|
||||
{
|
||||
fx_compressor_data *c = fx_object_get_protected(obj, FX_TYPE_COMPRESSOR);
|
||||
struct fx_zstd_compressor_p *ctx = priv;
|
||||
switch (c->c_mode) {
|
||||
case FX_COMPRESSOR_MODE_COMPRESS:
|
||||
ZSTD_freeCCtx(ctx->zstd_c);
|
||||
break;
|
||||
case FX_COMPRESSOR_MODE_DECOMPRESS:
|
||||
ZSTD_freeDCtx(ctx->zstd_d);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static enum fx_status reset(fx_compressor *compressor)
|
||||
{
|
||||
struct fx_zstd_compressor_p *ctx
|
||||
= fx_object_get_private(compressor, FX_TYPE_ZSTD_COMPRESSOR);
|
||||
fx_compressor_data *data
|
||||
= fx_object_get_protected(compressor, FX_TYPE_COMPRESSOR);
|
||||
|
||||
if (!ctx || !data) {
|
||||
return FX_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
switch (data->c_mode) {
|
||||
case FX_COMPRESSOR_MODE_COMPRESS:
|
||||
ZSTD_CCtx_reset(ctx->zstd_c, ZSTD_reset_session_only);
|
||||
break;
|
||||
case FX_COMPRESSOR_MODE_DECOMPRESS:
|
||||
ZSTD_DCtx_reset(ctx->zstd_d, ZSTD_reset_session_only);
|
||||
break;
|
||||
default:
|
||||
return FX_ERR_BAD_STATE;
|
||||
}
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
static enum fx_status compress(fx_compressor *compressor)
|
||||
{
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
struct fx_zstd_compressor_p *ctx
|
||||
= fx_object_get_private(compressor, FX_TYPE_ZSTD_COMPRESSOR);
|
||||
fx_compressor_data *data
|
||||
= fx_object_get_protected(compressor, FX_TYPE_COMPRESSOR);
|
||||
|
||||
if (!ctx || !data) {
|
||||
return FX_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
fx_ringbuffer *in = data->c_in;
|
||||
fx_ringbuffer *out = data->c_out;
|
||||
|
||||
if (fx_ringbuffer_available_data_remaining(in) == 0) {
|
||||
return FX_ERR_NO_DATA;
|
||||
}
|
||||
|
||||
if (fx_ringbuffer_write_capacity_remaining(out) == 0) {
|
||||
return FX_ERR_NO_SPACE;
|
||||
}
|
||||
|
||||
size_t nr_consumed = 0;
|
||||
|
||||
while (1) {
|
||||
size_t in_available = 0, out_capacity = 0;
|
||||
const void *in_buf = NULL;
|
||||
void *out_buf = NULL;
|
||||
|
||||
status = fx_ringbuffer_open_read_buffer(in, &in_buf, &in_available);
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
status = fx_ringbuffer_open_write_buffer(
|
||||
out, &out_buf, &out_capacity);
|
||||
if (!FX_OK(status)) {
|
||||
fx_ringbuffer_close_read_buffer(in, &in_buf, 0);
|
||||
break;
|
||||
}
|
||||
|
||||
ZSTD_inBuffer z_in = {
|
||||
.src = in_buf,
|
||||
.pos = 0,
|
||||
.size = in_available,
|
||||
};
|
||||
|
||||
ZSTD_outBuffer z_out = {
|
||||
.dst = out_buf,
|
||||
.pos = 0,
|
||||
.size = out_capacity,
|
||||
};
|
||||
|
||||
do {
|
||||
size_t ret = ZSTD_compressStream2(
|
||||
ctx->zstd_c, &z_out, &z_in, ZSTD_e_continue);
|
||||
if (ZSTD_isError(ret)) {
|
||||
status = FX_ERR_COMPRESSION_FAILURE;
|
||||
break;
|
||||
}
|
||||
} while (z_in.pos < z_in.size && z_out.pos < z_out.size);
|
||||
|
||||
nr_consumed += z_in.pos;
|
||||
|
||||
fx_ringbuffer_close_read_buffer(in, &in_buf, z_in.pos);
|
||||
fx_ringbuffer_close_write_buffer(out, &out_buf, z_out.pos);
|
||||
}
|
||||
|
||||
if ((status == FX_ERR_NO_SPACE || status == FX_ERR_NO_DATA)
|
||||
&& nr_consumed > 0) {
|
||||
status = FX_SUCCESS;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum fx_status compress_end(fx_compressor *compressor)
|
||||
{
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
struct fx_zstd_compressor_p *ctx
|
||||
= fx_object_get_private(compressor, FX_TYPE_ZSTD_COMPRESSOR);
|
||||
fx_compressor_data *data
|
||||
= fx_object_get_protected(compressor, FX_TYPE_COMPRESSOR);
|
||||
|
||||
if (!ctx || !data) {
|
||||
return FX_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
fx_ringbuffer *out = data->c_out;
|
||||
if (fx_ringbuffer_write_capacity_remaining(out) == 0) {
|
||||
return FX_ERR_NO_SPACE;
|
||||
}
|
||||
|
||||
bool finished = false;
|
||||
do {
|
||||
void *out_buf = NULL;
|
||||
size_t out_capacity = 0;
|
||||
status = fx_ringbuffer_open_write_buffer(
|
||||
out, &out_buf, &out_capacity);
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
ZSTD_inBuffer z_in = {0};
|
||||
ZSTD_outBuffer z_out = {
|
||||
.dst = out_buf,
|
||||
.pos = 0,
|
||||
.size = out_capacity,
|
||||
};
|
||||
|
||||
do {
|
||||
size_t ret = ZSTD_compressStream2(
|
||||
ctx->zstd_c, &z_out, &z_in, ZSTD_e_end);
|
||||
if (ZSTD_isError(ret)) {
|
||||
status = FX_ERR_COMPRESSION_FAILURE;
|
||||
finished = true;
|
||||
}
|
||||
|
||||
if (ret == 0) {
|
||||
data->c_flags |= FX_COMPRESSOR_EOF;
|
||||
finished = true;
|
||||
}
|
||||
} while (!finished && z_out.pos < z_out.size);
|
||||
|
||||
fx_ringbuffer_close_write_buffer(out, &out_buf, z_out.pos);
|
||||
} while (!finished);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum fx_status decompress(fx_compressor *compressor)
|
||||
{
|
||||
enum fx_status status = FX_SUCCESS;
|
||||
struct fx_zstd_compressor_p *ctx
|
||||
= fx_object_get_private(compressor, FX_TYPE_ZSTD_COMPRESSOR);
|
||||
fx_compressor_data *data
|
||||
= fx_object_get_protected(compressor, FX_TYPE_COMPRESSOR);
|
||||
|
||||
if (!ctx || !data) {
|
||||
return FX_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
fx_ringbuffer *in = data->c_in;
|
||||
fx_ringbuffer *out = data->c_out;
|
||||
|
||||
if (fx_ringbuffer_available_data_remaining(in) == 0) {
|
||||
return FX_ERR_NO_DATA;
|
||||
}
|
||||
|
||||
if (fx_ringbuffer_write_capacity_remaining(out) == 0) {
|
||||
return FX_ERR_NO_SPACE;
|
||||
}
|
||||
|
||||
size_t nr_consumed = 0;
|
||||
|
||||
while (!(data->c_flags & FX_COMPRESSOR_EOF)) {
|
||||
size_t in_available = 0, out_capacity = 0;
|
||||
const void *in_buf = NULL;
|
||||
void *out_buf = NULL;
|
||||
|
||||
status = fx_ringbuffer_open_read_buffer(in, &in_buf, &in_available);
|
||||
if (!FX_OK(status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
status = fx_ringbuffer_open_write_buffer(
|
||||
out, &out_buf, &out_capacity);
|
||||
if (!FX_OK(status)) {
|
||||
fx_ringbuffer_close_read_buffer(in, &in_buf, 0);
|
||||
break;
|
||||
}
|
||||
|
||||
ZSTD_inBuffer z_in = {
|
||||
.src = in_buf,
|
||||
.pos = 0,
|
||||
.size = in_available,
|
||||
};
|
||||
|
||||
ZSTD_outBuffer z_out = {
|
||||
.dst = out_buf,
|
||||
.pos = 0,
|
||||
.size = out_capacity,
|
||||
};
|
||||
|
||||
do {
|
||||
size_t ret = ZSTD_decompressStream(
|
||||
ctx->zstd_d, &z_out, &z_in);
|
||||
if (ZSTD_isError(ret)) {
|
||||
status = FX_ERR_COMPRESSION_FAILURE;
|
||||
break;
|
||||
}
|
||||
|
||||
if (ret == 0) {
|
||||
data->c_flags |= FX_COMPRESSOR_EOF;
|
||||
break;
|
||||
}
|
||||
} while (z_in.pos < z_in.size && z_out.pos < z_out.size);
|
||||
|
||||
nr_consumed += z_in.pos;
|
||||
|
||||
fx_ringbuffer_close_read_buffer(in, &in_buf, z_in.pos);
|
||||
fx_ringbuffer_close_write_buffer(out, &out_buf, z_out.pos);
|
||||
}
|
||||
|
||||
if ((status == FX_ERR_NO_SPACE || status == FX_ERR_NO_DATA)
|
||||
&& nr_consumed > 0) {
|
||||
status = FX_SUCCESS;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum fx_status set_mode(fx_compressor *compressor, fx_compressor_mode mode)
|
||||
{
|
||||
struct fx_zstd_compressor_p *ctx
|
||||
= fx_object_get_private(compressor, FX_TYPE_ZSTD_COMPRESSOR);
|
||||
fx_compressor_data *data
|
||||
= fx_object_get_protected(compressor, FX_TYPE_COMPRESSOR);
|
||||
|
||||
if (!ctx || !data) {
|
||||
return FX_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
if (mode == data->c_mode) {
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
switch (data->c_mode) {
|
||||
case FX_COMPRESSOR_MODE_COMPRESS:
|
||||
ZSTD_freeCCtx(ctx->zstd_c);
|
||||
break;
|
||||
case FX_COMPRESSOR_MODE_DECOMPRESS:
|
||||
ZSTD_freeDCtx(ctx->zstd_d);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
data->c_mode = mode;
|
||||
|
||||
switch (data->c_mode) {
|
||||
case FX_COMPRESSOR_MODE_COMPRESS:
|
||||
ctx->zstd_c = ZSTD_createCCtx();
|
||||
break;
|
||||
case FX_COMPRESSOR_MODE_DECOMPRESS:
|
||||
ctx->zstd_d = ZSTD_createDCtx();
|
||||
break;
|
||||
default:
|
||||
return FX_ERR_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
return FX_SUCCESS;
|
||||
}
|
||||
|
||||
/*** CLASS DEFINITION *********************************************************/
|
||||
|
||||
FX_TYPE_CLASS_DEFINITION_BEGIN(fx_zstd_compressor)
|
||||
FX_TYPE_CLASS_INTERFACE_BEGIN(fx_object, FX_TYPE_OBJECT)
|
||||
FX_INTERFACE_ENTRY(to_string) = NULL;
|
||||
FX_TYPE_CLASS_INTERFACE_END(fx_object, FX_TYPE_OBJECT)
|
||||
|
||||
FX_TYPE_CLASS_INTERFACE_BEGIN(fx_compressor, FX_TYPE_COMPRESSOR)
|
||||
FX_INTERFACE_ENTRY(c_buffer_size)
|
||||
= fx_zstd_compressor_get_buffer_size;
|
||||
FX_INTERFACE_ENTRY(c_compress) = compress;
|
||||
FX_INTERFACE_ENTRY(c_compress_end) = compress_end;
|
||||
FX_INTERFACE_ENTRY(c_decompress) = decompress;
|
||||
FX_INTERFACE_ENTRY(c_reset) = reset;
|
||||
FX_INTERFACE_ENTRY(c_set_mode) = set_mode;
|
||||
FX_TYPE_CLASS_INTERFACE_END(fx_compressor, FX_TYPE_COMPRESSOR)
|
||||
FX_TYPE_CLASS_DEFINITION_END(fx_zstd_compressor)
|
||||
|
||||
FX_TYPE_DEFINITION_BEGIN(fx_zstd_compressor)
|
||||
FX_TYPE_ID(0x51d437fc, 0xe789, 0x4105, 0xbac7, 0xe6b3f45df198);
|
||||
FX_TYPE_EXTENDS(FX_TYPE_COMPRESSOR);
|
||||
FX_TYPE_CLASS(fx_zstd_compressor_class);
|
||||
FX_TYPE_INSTANCE_PRIVATE(struct fx_zstd_compressor_p);
|
||||
FX_TYPE_INSTANCE_INIT(zstd_compressor_init);
|
||||
FX_TYPE_INSTANCE_FINI(zstd_compressor_fini);
|
||||
FX_TYPE_DEFINITION_END(fx_zstd_compressor)
|
||||
@@ -0,0 +1,67 @@
|
||||
#ifndef FX_COMPRESS_COMPRESSOR_H_
|
||||
#define FX_COMPRESS_COMPRESSOR_H_
|
||||
|
||||
#include <fx/core/macros.h>
|
||||
#include <fx/core/misc.h>
|
||||
#include <fx/core/ringbuffer.h>
|
||||
#include <fx/core/status.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
FX_DECLS_BEGIN;
|
||||
|
||||
typedef enum fx_compressor_mode {
|
||||
FX_COMPRESSOR_MODE_NONE = 0,
|
||||
FX_COMPRESSOR_MODE_COMPRESS,
|
||||
FX_COMPRESSOR_MODE_DECOMPRESS,
|
||||
} fx_compressor_mode;
|
||||
|
||||
typedef enum fx_compressor_flags {
|
||||
FX_COMPRESSOR_EOF = 0x01u,
|
||||
} fx_compressor_flags;
|
||||
|
||||
#define FX_TYPE_COMPRESSOR (fx_compressor_get_type())
|
||||
|
||||
FX_DECLARE_TYPE(fx_compressor);
|
||||
|
||||
FX_TYPE_CLASS_DECLARATION_BEGIN(fx_compressor)
|
||||
fx_status (*c_buffer_size)(fx_compressor_mode, size_t *, size_t *);
|
||||
fx_status (*c_set_mode)(fx_compressor *, fx_compressor_mode);
|
||||
fx_status (*c_compress)(fx_compressor *);
|
||||
fx_status (*c_compress_end)(fx_compressor *);
|
||||
fx_status (*c_decompress)(fx_compressor *);
|
||||
fx_status (*c_reset)(fx_compressor *);
|
||||
FX_TYPE_CLASS_DECLARATION_END(fx_compressor)
|
||||
|
||||
typedef struct fx_compressor_data {
|
||||
fx_compressor_flags c_flags;
|
||||
fx_compressor_mode c_mode;
|
||||
fx_ringbuffer *c_in, *c_out;
|
||||
} fx_compressor_data;
|
||||
|
||||
FX_API fx_type fx_compressor_get_type(void);
|
||||
|
||||
#if 0
|
||||
FX_API fx_status fx_compressor_create(
|
||||
const struct fx_compression_function *func, enum fx_compression_mode mode,
|
||||
struct fx_ringbuffer *inbuf, struct fx_ringbuffer *outbuf,
|
||||
fx_compressor **out);
|
||||
#endif
|
||||
|
||||
FX_API fx_status fx_compressor_get_buffer_size(
|
||||
fx_type type, fx_compressor_mode mode, size_t *inbuf_size,
|
||||
size_t *outbuf_size);
|
||||
|
||||
FX_API fx_status fx_compressor_get_mode(
|
||||
const fx_compressor *compressor, fx_compressor_mode *out);
|
||||
FX_API fx_status fx_compressor_set_mode(
|
||||
fx_compressor *compressor, fx_compressor_mode mode);
|
||||
FX_API fx_status fx_compressor_set_buffer(
|
||||
fx_compressor *compressor, fx_ringbuffer *inbuf, fx_ringbuffer *outbuf);
|
||||
FX_API fx_status fx_compressor_step(fx_compressor *compressor);
|
||||
FX_API fx_status fx_compressor_end(fx_compressor *compressor);
|
||||
FX_API fx_status fx_compressor_reset(fx_compressor *compressor);
|
||||
FX_API bool fx_compressor_eof(const fx_compressor *compressor);
|
||||
|
||||
FX_DECLS_END;
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,50 @@
|
||||
#ifndef FX_COMPRESS_CSTREAM_H_
|
||||
#define FX_COMPRESS_CSTREAM_H_
|
||||
|
||||
#include <fx/core/macros.h>
|
||||
#include <fx/core/stream.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
FX_DECLS_BEGIN;
|
||||
|
||||
enum fx_compressor_mode;
|
||||
|
||||
#define FX_TYPE_CSTREAM (fx_cstream_get_type())
|
||||
|
||||
FX_DECLARE_TYPE(fx_cstream);
|
||||
|
||||
FX_TYPE_CLASS_DECLARATION_BEGIN(fx_cstream)
|
||||
FX_TYPE_CLASS_DECLARATION_END(fx_cstream)
|
||||
|
||||
FX_API fx_type fx_cstream_get_type(void);
|
||||
|
||||
FX_API fx_status fx_cstream_open(
|
||||
fx_stream *endpoint, fx_type compressor_type, enum fx_compressor_mode mode,
|
||||
fx_cstream **out);
|
||||
|
||||
FX_API fx_status fx_cstream_read(
|
||||
fx_cstream *stream, void *buf, size_t count, size_t *nr_read);
|
||||
FX_API fx_status fx_cstream_write(
|
||||
fx_cstream *stream, const void *buf, size_t count, size_t *nr_written);
|
||||
FX_API fx_status fx_cstream_skip(
|
||||
fx_cstream *stream, size_t count, size_t *nr_skipped);
|
||||
FX_API fx_status fx_cstream_reset(fx_cstream *stream);
|
||||
|
||||
FX_API fx_status fx_cstream_begin_compressed_section(
|
||||
fx_cstream *stream, size_t *tx_uncompressed_bytes);
|
||||
FX_API fx_status fx_cstream_end_compressed_section(
|
||||
fx_cstream *stream, size_t *tx_compressed_bytes,
|
||||
size_t *tx_uncompressed_bytes);
|
||||
FX_API bool fx_cstream_in_compressed_section(const fx_cstream *stream);
|
||||
FX_API fx_status fx_cstream_tx_bytes(const fx_cstream *stream, size_t *out);
|
||||
FX_API fx_status fx_cstream_tx_bytes_compressed(
|
||||
const fx_cstream *stream, size_t *out);
|
||||
FX_API fx_status fx_cstream_tx_bytes_uncompressed(
|
||||
const fx_cstream *stream, size_t *out);
|
||||
|
||||
FX_API fx_status fx_cstream_set_cursor_position(fx_cstream *stream, size_t pos);
|
||||
FX_API fx_status fx_cstream_restore_cursor_position(fx_cstream *stream);
|
||||
|
||||
FX_DECLS_END;
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,28 @@
|
||||
#ifndef FX_COMPRESS_ZSTD_H_
|
||||
#define FX_COMPRESS_ZSTD_H_
|
||||
|
||||
#include <fx/compress/compressor.h>
|
||||
#include <fx/core/macros.h>
|
||||
#include <fx/core/misc.h>
|
||||
#include <fx/core/status.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
FX_DECLS_BEGIN;
|
||||
|
||||
#define FX_TYPE_ZSTD_COMPRESSOR (fx_zstd_compressor_get_type())
|
||||
|
||||
FX_DECLARE_TYPE(fx_zstd_compressor);
|
||||
|
||||
FX_TYPE_CLASS_DECLARATION_BEGIN(fx_zstd_compressor)
|
||||
FX_TYPE_CLASS_DECLARATION_END(fx_compressor)
|
||||
|
||||
FX_API fx_type fx_zstd_compressor_get_type(void);
|
||||
|
||||
FX_API fx_status fx_zstd_compressor_get_buffer_size(
|
||||
fx_compressor_mode mode, size_t *inbuf_size, size_t *outbuf_size);
|
||||
|
||||
FX_TYPE_DEFAULT_CONSTRUCTOR(fx_zstd_compressor, FX_TYPE_ZSTD_COMPRESSOR);
|
||||
|
||||
FX_DECLS_END;
|
||||
|
||||
#endif
|
||||
Reference in New Issue
Block a user