Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
700 changes: 700 additions & 0 deletions src/allocator_defrag.c

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions src/allocator_defrag.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef ALLOCATOR_DEFRAG_H
#define ALLOCATOR_DEFRAG_H
// #include "sds.h"

typedef enum DefragStrategy {
DEF_FRAG_JE_HINT = 0,
DEF_FRAG_JE_CTL = 1,
DEF_FRAG_ALL = 2,
} DefragStrategy;

typedef enum DefragSelectionStrategy {
SELECT_NORMAL = 0,
SELECT_PAGES_LOWER = 2,
SELECT_RANDOM = 3,
SELECT_PROGRESSIVE = 4, // per-bin make the selection stricter as we progress the iteration
SELECT_UTILIZATION_TREND = 5, // per-bin make the selection stricter as we progress the iteration
} DefragSelectionStrategy;


typedef enum DefragRulesAlloc {
RULE_ALLOC_NONE,
RULE_ALLOC_USE_TCACHE,
RULE_ALLOC_USE_UD_TCACHE,
} DefragRulesAlloc;

typedef enum DefragRulesFree {
RULE_FREE_NONE,
RULE_FREE_USE_TCACHE,
RULE_FREE_USE_UD_TCACHE,
} DefragRulesFree;

typedef enum DefragRulesRecalc {
RULE_NONE,
RULE_RECALC_ON_FULL_ITER,
} DefragRulesRecalc;

int allocatorDefragInit(void);
void defrag_jemalloc_free(void *ptr, size_t size);
__attribute__((malloc)) void *defrag_jemalloc_alloc(size_t size);
unsigned long allocatorGetFragmentationSmallBins(int new_iter);
// sds allocatorGetDefragInfo(sds info);
void allocatorDefragHint(void **ptrs, unsigned long num);
void allocatorSetStrategyConfig(DefragStrategy defrag_strategy);
void allocatorSetSelectConfig(DefragSelectionStrategy selection_strategy);
void allocatorSetRefreshConfig(DefragRulesRecalc recalc);
void allocatorSetFreeConfig(DefragRulesFree recalc);
void allocatorSetAllocConfig(DefragRulesAlloc recalc);
void allocatorSetThresholdConfig(int threshold);
#endif /* ALLOCATOR_DEFRAG_H */
76 changes: 73 additions & 3 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@
#include "cluster.h"
#include "connection.h"
#include "bio.h"

#include "allocator_defrag.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <glob.h>
#include <string.h>
#include <locale.h>
#include <ctype.h>

#include "allocator_defrag.h"
/*-----------------------------------------------------------------------------
* Config file name-value maps.
*----------------------------------------------------------------------------*/
Expand All @@ -51,6 +51,30 @@ typedef struct deprecatedConfig {
const int argc_max;
} deprecatedConfig;

configEnum defrag_strategy[] = {{"je-hint", DEF_FRAG_JE_HINT},
{"je-ctl", DEF_FRAG_JE_CTL},
{"all", DEF_FRAG_ALL},
{NULL, 0}};

configEnum defrag_select_strategy[] = {{"none", SELECT_NORMAL},
{"pgs", SELECT_PAGES_LOWER},
{"rand", SELECT_RANDOM},
{"progressive", SELECT_PROGRESSIVE},
{"util-trend", SELECT_UTILIZATION_TREND},
{NULL, 0}};

configEnum defrag_recalc[] = {{"none", RULE_NONE}, {"full_cycle", RULE_RECALC_ON_FULL_ITER}, {NULL, 0}};

configEnum defrag_alloc_strategy[] = {{"none", RULE_ALLOC_NONE},
{"tcach", RULE_ALLOC_USE_TCACHE},
{"ud-tcache", RULE_ALLOC_USE_UD_TCACHE},
{NULL, 0}};

configEnum defrag_free_strategy[] = {{"none", RULE_FREE_NONE},
{"tcach", RULE_FREE_USE_TCACHE},
{"ud-tcache", RULE_FREE_USE_UD_TCACHE},
{NULL, 0}};

configEnum maxmemory_policy_enum[] = {{"volatile-lru", MAXMEMORY_VOLATILE_LRU},
{"volatile-lfu", MAXMEMORY_VOLATILE_LFU},
{"volatile-random", MAXMEMORY_VOLATILE_RANDOM},
Expand Down Expand Up @@ -2452,7 +2476,46 @@ static int updatePort(const char **err) {

return 1;
}
#if defined(HAVE_DEFRAG) && defined(USE_JEMALLOC)
static int updateDefragStrategy(const char **err) {
UNUSED(err);
allocatorSetStrategyConfig(server.defrag_strategy);
server.active_defrag_configuration_changed = 1;
return 1;
}
static int updateDefragSelect(const char **err) {
UNUSED(err);
allocatorSetSelectConfig(server.defrag_select_strategy);
server.active_defrag_configuration_changed = 1;
return 1;
}
static int updateDefragRecalc(const char **err) {
UNUSED(err);
allocatorSetRefreshConfig(server.defrag_recalc);
server.active_defrag_configuration_changed = 1;
return 1;
}
static int updateDefragAlloc(const char **err) {
UNUSED(err);
allocatorSetAllocConfig(server.defrag_alloc_strategy);
server.active_defrag_configuration_changed = 1;
return 1;
}

static int updateDefragFree(const char **err) {
UNUSED(err);
allocatorSetFreeConfig(server.defrag_free_strategy);
server.active_defrag_configuration_changed = 1;
return 1;
}

static int updateDefragSelectThreshold(const char **err) {
UNUSED(err);
allocatorSetThresholdConfig(server.select_threshold_factor);
server.active_defrag_configuration_changed = 1;
return 1;
}
#endif
static int updateDefragConfiguration(const char **err) {
UNUSED(err);
server.active_defrag_configuration_changed = 1;
Expand Down Expand Up @@ -3161,7 +3224,14 @@ standardConfig static_configs[] = {
createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL),
createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),

#if defined(HAVE_DEFRAG) && defined(USE_JEMALLOC)
createEnumConfig("active-defrag-strategy", NULL, MODIFIABLE_CONFIG , defrag_strategy, server.defrag_strategy, 0, NULL, updateDefragStrategy),
createEnumConfig("active-defrag-select", NULL, MODIFIABLE_CONFIG , defrag_select_strategy, server.defrag_select_strategy, 0, NULL, updateDefragSelect),
createEnumConfig("active-defrag-recalc", NULL, MODIFIABLE_CONFIG , defrag_recalc, server.defrag_recalc, 0, NULL, updateDefragRecalc),
createEnumConfig("active-defrag-alloc", NULL, MODIFIABLE_CONFIG , defrag_alloc_strategy, server.defrag_alloc_strategy, 0, NULL, updateDefragAlloc),
createEnumConfig("active-defrag-free", NULL, MODIFIABLE_CONFIG , defrag_free_strategy, server.defrag_free_strategy, 0, NULL, updateDefragFree),
createIntConfig("active-defrag-nonfull-factor", NULL, MODIFIABLE_CONFIG, 0, 3000, server.select_threshold_factor, 125, INTEGER_CONFIG, NULL, updateDefragSelectThreshold), /* Default: 12.5% CPU min (at lower threshold) */
#endif
/* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
Expand Down
27 changes: 15 additions & 12 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
#include <stddef.h>

#ifdef HAVE_DEFRAG
#include "allocator_defrag.h"

#define defraged_alloc defrag_jemalloc_alloc
#define defraged_free defrag_jemalloc_free
#define defrag_should_defrag_multi allocatorDefragHint

typedef struct defragCtx {
void *privdata;
Expand All @@ -49,10 +54,6 @@ typedef struct defragPubSubCtx {
dict *(*clientPubSubChannels)(client *);
} defragPubSubCtx;

/* this method was added to jemalloc in order to help us understand which
* pointers are worthwhile moving and which aren't */
int je_get_defrag_hint(void *ptr);

/* Defrag helper for generic allocations.
*
* returns NULL in case the allocation wasn't moved.
Expand All @@ -61,17 +62,19 @@ int je_get_defrag_hint(void *ptr);
void *activeDefragAlloc(void *ptr) {
size_t size;
void *newptr;
if (!je_get_defrag_hint(ptr)) {
void *ptr_arr = ptr;
defrag_should_defrag_multi(&ptr_arr, 1);
if (!ptr_arr) {
server.stat_active_defrag_misses++;
return NULL;
}
/* move this allocation to a new allocation.
* make sure not to use the thread cache. so that we don't get back the same
* pointers we try to free */
size = zmalloc_size(ptr);
newptr = zmalloc_no_tcache(size);
newptr = defraged_alloc(size);
memcpy(newptr, ptr, size);
zfree_no_tcache(ptr);
defraged_free(ptr, size);
server.stat_active_defrag_hits++;
return newptr;
}
Expand Down Expand Up @@ -754,10 +757,10 @@ void defragScanCallback(void *privdata, const dictEntry *de) {
* fragmentation ratio in order to decide if a defrag action should be taken
* or not, a false detection can cause the defragmenter to waste a lot of CPU
* without the possibility of getting any results. */
float getAllocatorFragmentation(size_t *out_frag_bytes) {
float getAllocatorFragmentation(bool new_iter, size_t *out_frag_bytes) {
size_t resident, active, allocated, frag_smallbins_bytes;
zmalloc_get_allocator_info(&allocated, &active, &resident, NULL, NULL, &frag_smallbins_bytes);

zmalloc_get_allocator_info(&allocated, &active, &resident, NULL, NULL);
frag_smallbins_bytes = allocatorGetFragmentationSmallBins(new_iter);
/* Calculate the fragmentation ratio as the proportion of wasted memory in small
* bins (which are defraggable) relative to the total allocated memory (including large bins).
* This is because otherwise, if most of the memory usage is large bins, we may show high percentage,
Expand Down Expand Up @@ -915,7 +918,7 @@ int defragLaterStep(serverDb *db, int slot, long long endtime) {
/* decide if defrag is needed, and at what CPU effort to invest in it */
void computeDefragCycles(void) {
size_t frag_bytes;
float frag_pct = getAllocatorFragmentation(&frag_bytes);
float frag_pct = getAllocatorFragmentation(!server.active_defrag_running, &frag_bytes);
/* If we're not already running, and below the threshold, exit. */
if (!server.active_defrag_running) {
if (frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes) return;
Expand Down Expand Up @@ -1019,7 +1022,7 @@ void activeDefragCycle(void) {

long long now = ustime();
size_t frag_bytes;
float frag_pct = getAllocatorFragmentation(&frag_bytes);
float frag_pct = getAllocatorFragmentation(false, &frag_bytes);
serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu",
(int)((now - start_scan) / 1000), (int)(server.stat_active_defrag_hits - start_stat),
frag_pct, frag_bytes);
Expand Down
18 changes: 15 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
#include <sys/utsname.h>
#include <locale.h>
#include <sys/socket.h>
#include "allocator_defrag.h"

#ifdef __linux__
#include <sys/mman.h>
Expand Down Expand Up @@ -1220,8 +1221,8 @@ void cronUpdateMemoryStats(void) {
* allocations, and allocator reserved pages that can be pursed (all not actual frag) */
zmalloc_get_allocator_info(
&server.cron_malloc_stats.allocator_allocated, &server.cron_malloc_stats.allocator_active,
&server.cron_malloc_stats.allocator_resident, NULL, &server.cron_malloc_stats.allocator_muzzy,
&server.cron_malloc_stats.allocator_frag_smallbins_bytes);
&server.cron_malloc_stats.allocator_resident, NULL, &server.cron_malloc_stats.allocator_muzzy);
server.cron_malloc_stats.allocator_frag_smallbins_bytes = allocatorGetFragmentationSmallBins(false);
/* in case the allocator isn't providing these stats, fake them so that
* fragmentation info still shows some (inaccurate metrics) */
if (!server.cron_malloc_stats.allocator_resident) {
Expand Down Expand Up @@ -5591,7 +5592,14 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
/* clang-format on */
freeMemoryOverheadData(mh);
}

#if defined(HAVE_DEFRAG) && defined(USE_JEMALLOC)
// if (all_sections || (dictFind(section_dict, "defrag") != NULL)) {
// if (sections++) info = sdscat(info, "\r\n");
// /* clang-format off */
// info = sdscatprintf(info, "# Defrag\r\n");
// info = allocatorGetDefragInfo(info);
// }
#endif
/* Persistence */
if (all_sections || (dictFind(section_dict, "persistence") != NULL)) {
if (sections++) info = sdscat(info, "\r\n");
Expand Down Expand Up @@ -6791,6 +6799,10 @@ int main(int argc, char **argv) {
server.exec_argv[argc] = NULL;
for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);

#if defined(USE_JEMALLOC)
// we assume jemalloc version in use supports defragmentation api
serverAssert(!allocatorDefragInit());
#endif
/* We need to init sentinel right now as parsing the configuration file
* in sentinel mode will have the effect of populating the sentinel
* data structures with primary nodes to monitor. */
Expand Down
20 changes: 13 additions & 7 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1888,13 +1888,19 @@ struct valkeyServer {
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from
within the main dict scan */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int supervised; /* 1 if supervised, 0 otherwise. */
int supervised_mode; /* See SUPERVISED_* */
int daemonize; /* True if running as a daemon */
int set_proc_title; /* True if change proc title */
char *proc_title_template; /* Process title template format */
int defrag_strategy;
int defrag_select_strategy;
int defrag_recalc;
int defrag_alloc_strategy;
int defrag_free_strategy;
int select_threshold_factor;
size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int supervised; /* 1 if supervised, 0 otherwise. */
int supervised_mode; /* See SUPERVISED_* */
int daemonize; /* True if running as a daemon */
int set_proc_title; /* True if change proc title */
char *proc_title_template; /* Process title template format */
clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];
int extended_redis_compat; /* True if extended Redis OSS compatibility is enabled */
int pause_cron; /* Don't run cron tasks (debug) */
Expand Down
Loading