Author: 张远
MyRocks TTL(Time To Live) 特性允许用户指定表数据的自动过期时间,表数据根据指定的时间在compact过程中进行清理。
MyRocks TTL 简单用法如下,
在comment中通过ttl_duration指定过期时间,ttl_col指定过期时间列
CREATE TABLE t1 (
a bigint(20) NOT NULL,
b int NOT NULL,
ts bigint(20) UNSIGNED NOT NULL,
PRIMARY KEY (a),
KEY kb (b)
) ENGINE=rocksdb
COMMENT='ttl_duration=1;ttl_col=ts;';
也可以不指定过期时间列ttl_col,插入数据时会隐式将当前时间做为过期时间列存储到记录中。
CREATE TABLE t1 (
a bigint(20) NOT NULL,
PRIMARY KEY (a)
) ENGINE=rocksdb
COMMENT='ttl_duration=1;';
分区表也同样支持TTL
CREATE TABLE t1 (
c1 BIGINT,
c2 BIGINT UNSIGNED NOT NULL,
name VARCHAR(25) NOT NULL,
event DATE,
PRIMARY KEY (`c1`) COMMENT 'custom_p0_cfname=foo;custom_p1_cfname=bar;custom_p2_cfname=baz;'
) ENGINE=ROCKSDB
COMMENT="ttl_duration=1;custom_p1_ttl_duration=100;custom_p1_ttl_col=c2;custom_p2_ttl_duration=5000;"
PARTITION BY LIST(c1) (
PARTITION custom_p0 VALUES IN (1, 2, 3),
PARTITION custom_p1 VALUES IN (4, 5, 6),
PARTITION custom_p2 VALUES IN (7, 8, 9)
);
介绍MyRocks TTL实现之前,先来看看RocksDB TTL。
RocksDB 本身也支持TTL, 通过DBWithTTL::Open接口,可以指定每个column_family的过期时间。
每次put数据时,会调用DBWithTTLImpl::AppendTS将过期时间append到value最后。
在Compact时通过自定义的TtlCompactionFilter , 去判断数据是否可以清理。具体参考DBWithTTLImpl::IsStale
bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
if (ttl <= 0) { // Data is fresh if TTL is non-positive
return false;
}
int64_t curtime;
if (!env->GetCurrentTime(&curtime).ok()) {
return false; // Treat the data as fresh if could not get current time
}
int32_t timestamp_value =
DecodeFixed32(value.data() + value.size() - kTSLength);
return (timestamp_value + ttl) < curtime;
}
RocksDB TTL在compact时才清理过期数据,所以,过期时间并不是严格的,会有一定的滞后,取决于compact的速度。
和RocksDB TTL column family级别指定过期时间不同,MyRocks TTL可表级别指定过期时间。
MyRocks TTL表过期时间存储在数据字典INDEX_INFO中,表中可以指定过期时间列ttl_col, 也可以不指定, 不指定时会隐式生成ttl_col.
对于主键,ttl_col的值存储在value的头8个字节中,对于指定了过期时间列ttl_col的情况,value中ttl_col位置和valule的头8个字节都会存储ttl_col值,这里有一定的冗余。具体参考convert_record_to_storage_format
读取数据会自动跳过ttl_col占用的8个字节,参考convert_record_from_storage_format
对于二级索引,也会存储ttl_col同主键保持一致,其ttl_col存储在value的unpack_info中,
if (m_index_type == INDEX_TYPE_SECONDARY &&
m_total_index_flags_length > 0) {
// Reserve space for index flag fields
unpack_info->allocate(m_total_index_flags_length);
// Insert TTL timestamp
if (has_ttl() && ttl_bytes) {
write_index_flag_field(unpack_info,
reinterpret_cast<const uchar *const>(ttl_bytes),
Rdb_key_def::TTL_FLAG);
}
}
二级索引ttl_col同主键保持一致。 对于更新显式指定的ttl_col列时,所有的二级索引都需要更新,即使此列不在二级索引列中
MyRocks TTL 清理也发生在compact时,由Rdb_compact_filter定义清理动作, 具体参考should_filter_ttl_rec
RocksDB TTL中过期时间和当前时间做比较,而MyRocks TTL 的过期时间是和最老的快照时间(m_snapshot_timestamp )做比较(当没有快照时,也取当前时间)。
bool should_filter_ttl_rec(const rocksdb::Slice &key,
const rocksdb::Slice &existing_value) const {
uint64 ttl_timestamp;
Rdb_string_reader reader(&existing_value);
if (!reader.read(m_ttl_offset) || reader.read_uint64(&ttl_timestamp)) {
std::string buf;
buf = rdb_hexdump(existing_value.data(), existing_value.size(),
RDB_MAX_HEXDUMP_LEN);
// NO_LINT_DEBUG
sql_print_error("Decoding ttl from PK value failed in compaction filter, "
"for index (%u,%u), val: %s",
m_prev_index.cf_id, m_prev_index.index_id, buf.c_str());
abort();
}
/*
Filter out the record only if it is older than the oldest snapshot
timestamp. This prevents any rows from expiring in the middle of
long-running transactions.
*/
return ttl_timestamp + m_ttl_duration <= m_snapshot_timestamp;
}
前面讲到, RocksDB TTL 过期时间并不严格,取决于compaction速度。MyRocks TTL也有类似问题,因此MyRocks引入参数rocksdb_enable_ttl_read_filtering, 当开启此参数时,过期时间是严格的。 每次读取记录会调用should_hide_ttl_rec判断此记录是否过期,当compact操作不及时而没有清理的过期记录,在读取时会被过滤掉。
bool ha_rocksdb::should_hide_ttl_rec(const Rdb_key_def &kd,
const rocksdb::Slice &ttl_rec_val,
const int64_t curr_ts) {
DBUG_ASSERT(kd.has_ttl());
DBUG_ASSERT(kd.m_ttl_rec_offset != UINT_MAX);
/*
Curr_ts can only be 0 if there are no snapshots open.
should_hide_ttl_rec can only be called when there is >=1 snapshots, unless
we are filtering on the write path (single INSERT/UPDATE) in which case
we are passed in the current time as curr_ts.
In the event curr_ts is 0, we always decide not to filter the record. We
also log a warning and increment a diagnostic counter.
*/
if (curr_ts == 0) {
update_row_stats(ROWS_HIDDEN_NO_SNAPSHOT);
return false;
}
if (!rdb_is_ttl_read_filtering_enabled() || !rdb_is_ttl_enabled()) {
return false;
}
Rdb_string_reader reader(&ttl_rec_val);
/*
Find where the 8-byte ttl is for each record in this index.
*/
uint64 ts;
if (!reader.read(kd.m_ttl_rec_offset) || reader.read_uint64(&ts)) {
/*
This condition should never be reached since all TTL records have an
8 byte ttl field in front. Don't filter the record out, and log an error.
*/
std::string buf;
buf = rdb_hexdump(ttl_rec_val.data(), ttl_rec_val.size(),
RDB_MAX_HEXDUMP_LEN);
const GL_INDEX_ID gl_index_id = kd.get_gl_index_id();
// NO_LINT_DEBUG
sql_print_error("Decoding ttl from PK value failed, "
"for index (%u,%u), val: %s",
gl_index_id.cf_id, gl_index_id.index_id, buf.c_str());
DBUG_ASSERT(0);
return false;
}
/* Hide record if it has expired before the current snapshot time. */
uint64 read_filter_ts = 0;
#ifndef NDEBUG
read_filter_ts += rdb_dbug_set_ttl_read_filter_ts();
#endif
bool is_hide_ttl =
ts + kd.m_ttl_duration + read_filter_ts <= static_cast<uint64>(curr_ts);
if (is_hide_ttl) {
update_row_stats(ROWS_FILTERED);
}
return is_hide_ttl;
}
Issue#683 中谈到了MyRocks TTL 有个潜在问题, 当更新显式指定的ttl_col列值时,compact时有可能将新的记录清理掉,而老的记录仍然保留,从而有可能读取到本该不可见的老记录。此问题暂时还没有close.
MyRocks TTL 是一个不错的特性,可以应用在历史数据清理的场景。相比传统的Delete数据的方式,更节约空间和CPU资源,同时传统的Delete还会影响查询的效率。目前MyRocks TTL 还不够成熟,还有许多需要改进的地方。