[Commits] 4eff940dde0: cc
revision-id: 4eff940dde0fee130cf3a7512b1fe25fc71bbe81 (mariadb-10.4.3-82-g4eff940dde0) parent(s): 9c7299365f04faf1b3963a2d891c8cb6dda89e4a author: Oleksandr Byelkin committer: Oleksandr Byelkin timestamp: 2019-03-23 18:15:41 +0100 message: cc --- storage/maria/ma_pagecache.c | 345 ++++++++++++++++++++++++++++++++++++++++--- storage/maria/ma_pagecache.h | 20 ++- 2 files changed, 347 insertions(+), 18 deletions(-) diff --git a/storage/maria/ma_pagecache.c b/storage/maria/ma_pagecache.c index d10595fffd9..aa8ffc31d0a 100644 --- a/storage/maria/ma_pagecache.c +++ b/storage/maria/ma_pagecache.c @@ -85,6 +85,9 @@ #define PAGECACHE_DEBUG #define PAGECACHE_DEBUG_LOG "my_pagecache_debug.log" */ +#define PAGECACHE_DEBUG +#define PAGECACHE_DEBUG_LOG "my_pagecache_debug.log" +#define _VARARGS(X) X /* In key cache we have external raw locking here we use @@ -127,7 +130,8 @@ my_bool my_disable_flush_pagecache_blocks= 0; #define COND_FOR_REQUESTED 0 /* queue of thread waiting for read operation */ #define COND_FOR_SAVED 1 /* queue of thread waiting for flush */ #define COND_FOR_WRLOCK 2 /* queue of write lock */ -#define COND_SIZE 3 /* number of COND_* queues */ +#define COND_FOR_BIG_BLOCK 3 /* queue of waiting fo big block read */ +#define COND_SIZE 4 /* number of COND_* queues */ typedef mysql_cond_t KEYCACHE_CONDVAR; @@ -146,7 +150,7 @@ struct st_pagecache_hash_link struct st_pagecache_block_link *block; /* reference to the block for the page: */ PAGECACHE_FILE file; /* from such a file */ - pgcache_page_no_t pageno; /* this page */ + pgcache_page_no_t pageno; /* this page */ uint requests; /* number of requests for the page */ }; @@ -174,6 +178,7 @@ struct st_pagecache_hash_link #define PCBLOCK_CHANGED 32 /* block buffer contains a dirty page */ #define PCBLOCK_DIRECT_W 64 /* possible direct write to the block */ #define PCBLOCK_DEL_WRITE 128 /* should be written on delete */ +#define PCBLOCK_BIG_READ 256 /* the first block of the big read in progress */ /* page status, returned by find_block */ #define PAGE_READ 0 @@ -534,10 +539,22 @@ static void pagecache_debug_print _VARARGS((const char *fmt, ...)); #if defined(PAGECACHE_DEBUG_LOG) && defined(PAGECACHE_DEBUG) #define KEYCACHE_PRINT(l, m) KEYCACHE_DBUG_PRINT(l,m) + +#ifdef PAGECACHE_DEBUG_DLOG #define KEYCACHE_DBUG_PRINT(l, m) \ { if (pagecache_debug_log) \ + { \ fprintf(pagecache_debug_log, "%s: ", l); \ + DBUG_PRINT("PCDEBUG", ("%s: ", l)); \ + } \ pagecache_debug_print m; } +#else +#define KEYCACHE_DBUG_PRINT(l, m) \ + { if (pagecache_debug_log) \ + fprintf(pagecache_debug_log, "%s: ", l); \ + pagecache_debug_print m; } +#endif + #define KEYCACHE_DBUG_ASSERT(a) \ { if (! (a) && pagecache_debug_log) \ @@ -748,7 +765,8 @@ static inline uint next_power(uint value) size_t init_pagecache(PAGECACHE *pagecache, size_t use_mem, uint division_limit, uint age_threshold, - uint block_size, uint changed_blocks_hash_size, + uint block_size, + uint changed_blocks_hash_size, myf my_readwrite_flags) { size_t blocks, hash_links, length; @@ -1350,6 +1368,8 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, } } while (thread != last_thread); + DBUG_PRINT("XXX", ("hash_link (link block): %p, hash_link: %p -> %p", + hash_link, hash_link->block, block)); hash_link->block= block; /* Ensure that no other thread tries to use this block */ block->status|= PCBLOCK_REASSIGNED; @@ -1646,6 +1666,9 @@ static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link) if ((*hash_link->prev= hash_link->next)) hash_link->next->prev= hash_link->prev; + + DBUG_PRINT("XXX", ("hash_link (unlink): %p, hash_link: %p -> NULL", + hash_link, hash_link->block)); hash_link->block= NULL; if (pagecache->waiting_for_hash_link.last_thread) { @@ -1893,6 +1916,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, my_bool wrmode, my_bool block_is_copied, my_bool reg_req, + my_bool fast, int *page_st) { PAGECACHE_HASH_LINK *hash_link; @@ -1909,6 +1933,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, DBUG_EXECUTE("check_pagecache", test_key_cache(pagecache, "start of find_block", 0);); #endif + DBUG_ASSERT(!fast || !wrmode); restart: /* Find the hash link for the requested page (file, pageno) */ @@ -2018,9 +2043,11 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, /* This is a request for a new page or for a page not to be removed */ if (! block) { + DBUG_PRINT("XXX", ("request for a new page")); /* No block is assigned for the page yet */ if (pagecache->blocks_unused) { + DBUG_PRINT("XXX", ("there is never used blocks")); if (pagecache->free_block_list) { /* There is a block in the free list. */ @@ -2054,7 +2081,11 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, block->last_hit_time= 0; block->rec_lsn= LSN_MAX; link_to_file_list(pagecache, block, file, 0); + DBUG_PRINT("XXX", ("block (no block assigned): %p, hash_link: %p -> %p", + block, block->hash_link, hash_link)); block->hash_link= hash_link; + DBUG_PRINT("XXX", ("hash_link (no block assignment): %p, hash_link: %p -> %p", + hash_link, hash_link->block, block)); hash_link->block= block; page_status= PAGE_TO_BE_READ; DBUG_PRINT("info", ("page to be read set for page %p (%u)", @@ -2065,6 +2096,7 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, } else { + DBUG_PRINT("XXX", ("there is NOT never used blocks")); /* There are no never used blocks, use a block from the LRU chain */ /* @@ -2076,6 +2108,8 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, if (! pagecache->used_last) { + DBUG_PRINT("XXX", ("there is NOT UNUSED blocks")); + struct st_my_thread_var *thread; /* Wait until a new block is added to the LRU chain; several threads might wait here for the same page, @@ -2084,8 +2118,18 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, The block is given to us by the next thread executing link_block(). */ + if (fast) + { + DBUG_ASSERT(hash_link->requests == 0); + unlink_hash(pagecache, hash_link); + DBUG_PRINT("info", ("fast and no blocks in LRU")); - struct st_my_thread_var *thread= my_thread_var; + KEYCACHE_DBUG_PRINT("find_block", + ("fast and no blocks in LRU")); + DBUG_RETURN(0); + } + + thread= my_thread_var; thread->keycache_link= (void *) hash_link; wqueue_link_into_queue(&pagecache->waiting_for_block, thread); do @@ -2104,13 +2148,30 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, } else { + DBUG_PRINT("XXX", ("take a block from LRU")); /* Take the first block from the LRU chain unlinking it from the chain */ block= pagecache->used_last->next_used; + if (fast && + ((block->status & (PCBLOCK_IN_FLUSH | PCBLOCK_CHANGED)) || + (block->hash_link && block->hash_link != hash_link && + block->hash_link->requests))) + { + DBUG_ASSERT(hash_link->requests == 0); + unlink_hash(pagecache, hash_link); + DBUG_PRINT("info", ("fast and LRU block is in switch or has " + "readers")); + KEYCACHE_DBUG_PRINT("find_block", + ("fast and LRU block is in switch or has " + "readers")); + DBUG_RETURN (0); + } if (reg_req) reg_requests(pagecache, block, 1); + DBUG_PRINT("XXX", ("hash_link (LRU): %p, hash_link: %p -> %p", + hash_link, hash_link->block, block)); hash_link->block= block; DBUG_ASSERT(block->requests == 1); } @@ -2181,6 +2242,8 @@ static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, link_to_file_list(pagecache, block, file, (my_bool)(block->hash_link ? 1 : 0)); + DBUG_PRINT("XXX", ("block (LRU): %p, hash_link: %p -> %p", + block, block->hash_link, hash_link)); block->hash_link= hash_link; PCBLOCK_INFO(block); block->hits_left= init_hits_left; @@ -2669,6 +2732,170 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, } +static my_bool read_big_block(PAGECACHE *pagecache, + PAGECACHE_BLOCK_LINK *block) +{ + int page_st; + size_t big_block_size_in_pages; + size_t offset; + pgcache_page_no_t page, our_page; + pgcache_page_no_t page_to_read; + PAGECACHE_BLOCK_LINK *block_to_read= NULL; + PAGECACHE_IO_HOOK_ARGS args; + LEX_STRING data= {0,0}; + DBUG_ENTER("read_big_block"); + DBUG_PRINT("enter", ("read BIG block: %p", block)); + + DBUG_ASSERT(block->hash_link->file.big_block_size % + pagecache->block_size == 0); + big_block_size_in_pages= + block->hash_link->file.big_block_size / pagecache->block_size; + /* find first page of the big block (page_to_read) */ + page_to_read= + (block->hash_link->pageno - block->hash_link->file.first_page) / + big_block_size_in_pages * big_block_size_in_pages + + block->hash_link->file.first_page; + if (page_to_read != block->hash_link->pageno) + { + block_to_read= find_block(pagecache, &block->hash_link->file, + page_to_read, 1, + FALSE, TRUE /* copy under protection (?)*/, + TRUE /*register*/, FALSE, &page_st); + DBUG_ASSERT(block_to_read == block_to_read->hash_link->block); + + if (block_to_read->status & PCBLOCK_ERROR) + { + /* We get first block with an error so all operation failed */ + block->status|= PCBLOCK_ERROR; + block->error= block_to_read->error; + DBUG_RETURN(FALSE); // no retry + } + // only primary request here, PAGE_WAIT_TO_BE_READ is impossible + DBUG_ASSERT(page_st != PAGE_WAIT_TO_BE_READ); + if (block_to_read->status & PCBLOCK_BIG_READ) + { + struct st_my_thread_var *thread; + DBUG_ASSERT(page_st != PAGE_TO_BE_READ); + /* + Other thread perform the operation => + unlock and repeat + */ + // Big block read failed because somebody else read the first block + unreg_request(pagecache, block, 1); + thread= my_thread_var; + /* Put the request into a queue and wait until it can be processed */ + wqueue_add_to_queue(&block->wqueue[COND_FOR_BIG_BLOCK], thread); + do + { + DBUG_PRINT("wait", + ("suspend thread %s %ld", thread->name, + (ulong) thread->id)); + pagecache_pthread_cond_wait(&thread->suspend, + &pagecache->cache_lock); + } + while (thread->next); + DBUG_RETURN(TRUE); + } + } + else + block_to_read= block; + + + DBUG_ASSERT(!(block_to_read->status & PCBLOCK_BIG_READ)); + // Mark the first page of a big block + block_to_read->status|= PCBLOCK_BIG_READ; + + // perform read + pagecache_pthread_mutex_unlock(&pagecache->cache_lock); + + args.page= NULL; + args.pageno= block->hash_link->pageno; + args.data= block->hash_link->file.callback_data; + + if (block->hash_link->file.big_block_read(pagecache, + &args, &block->hash_link->file, + &data)) + { + pagecache_pthread_mutex_lock(&pagecache->cache_lock); + block_to_read->status|= PCBLOCK_ERROR; + block->status|= PCBLOCK_ERROR; + block_to_read->error= block->error= (int16) my_errno; + if (block_to_read != block) + { + remove_reader(block_to_read); + unreg_request(pagecache, block_to_read, 1); + } + /* TODO: is it correct? */ + if (data.str) + my_free(data.str); + DBUG_RETURN(FALSE); // no retry + } + + // fill pages + pagecache_pthread_mutex_lock(&pagecache->cache_lock); + + our_page= block->hash_link->pageno; + + for(offset= 0, page= page_to_read; + offset < data.length; + offset+= pagecache->block_size, page++) + { + DBUG_ASSERT(offset + pagecache->block_size <= data.length); + if (page == page_to_read) + { + if (page_st != PAGE_READ) + { + DBUG_ASSERT(page_st != PAGE_WAIT_TO_BE_READ); + DBUG_ASSERT(offset == 0); + memcpy(block_to_read->buffer, data.str, pagecache->block_size); + block_to_read->status|= PCBLOCK_READ; + } + else + DBUG_ASSERT(block->status & PCBLOCK_READ); + } else if (page == our_page) + { + DBUG_ASSERT(!(block->status & PCBLOCK_READ)); + memcpy(block->buffer, data.str + offset, pagecache->block_size); + block->status|= PCBLOCK_READ; + } + else + { + PAGECACHE_BLOCK_LINK *bl; + bl= find_block(pagecache, &block->hash_link->file, page, 1, + FALSE, TRUE /* copy under protection (?)*/, + TRUE /*register*/, TRUE /*fast*/, &page_st); + if (!bl) + { + // we run out of easy awaliable pages in the cache + break; + } + DBUG_ASSERT(bl == bl->hash_link->block); + if ((bl->status & PCBLOCK_ERROR) == 0 && + page_st == PAGE_TO_BE_READ) + { + memcpy(bl->buffer, data.str + offset, pagecache->block_size); + bl->status|= PCBLOCK_READ; + } + remove_reader(bl); + unreg_request(pagecache, bl, 1); + } + } + /* TODO: is it correct? */ + my_free(data.str); + + block_to_read->status&= ~PCBLOCK_BIG_READ; + if (block_to_read != block) + { + remove_reader(block_to_read); + unreg_request(pagecache, block_to_read, 1); + } + if (block->wqueue[COND_FOR_BIG_BLOCK].last_thread) + wqueue_release_queue(&block->wqueue[COND_FOR_BIG_BLOCK]); + + DBUG_RETURN(FALSE); +} + + /* Read into a key cache block buffer from disk. @@ -2861,7 +3088,7 @@ void pagecache_unlock(PAGECACHE *pagecache, inc_counter_for_resize_op(pagecache); /* See NOTE for pagecache_unlock about registering requests */ block= find_block(pagecache, file, pageno, 0, 0, 0, - pin == PAGECACHE_PIN_LEFT_UNPINNED, &page_st); + pin == PAGECACHE_PIN_LEFT_UNPINNED, FALSE, &page_st); PCBLOCK_INFO(block); DBUG_ASSERT(block != 0 && page_st == PAGE_READ); if (first_REDO_LSN_for_page) @@ -2948,7 +3175,7 @@ void pagecache_unpin(PAGECACHE *pagecache, inc_counter_for_resize_op(pagecache); /* See NOTE for pagecache_unlock about registering requests */ - block= find_block(pagecache, file, pageno, 0, 0, 0, 0, &page_st); + block= find_block(pagecache, file, pageno, 0, 0, 0, 0, FALSE, &page_st); DBUG_ASSERT(block != 0); DBUG_ASSERT(page_st == PAGE_READ); /* we can't unpin such page without unlock */ @@ -3349,7 +3576,7 @@ uchar *pagecache_read(PAGECACHE *pagecache, char llbuf[22]; DBUG_ENTER("pagecache_read"); DBUG_PRINT("enter", ("fd: %u page: %s buffer: %p level: %u " - "t:%s (%d)%s->%s %s->%s", + "t:%s (%d)%s->%s %s->%s big block: %d", (uint) file->file, ullstr(pageno, llbuf), buff, level, page_cache_page_type_str[type], @@ -3357,7 +3584,8 @@ uchar *pagecache_read(PAGECACHE *pagecache, page_cache_page_lock_str[lock_to_read[lock].new_lock], page_cache_page_lock_str[lock_to_read[lock].unlock_lock], page_cache_page_pin_str[new_pin], - page_cache_page_pin_str[unlock_pin])); + page_cache_page_pin_str[unlock_pin], + MY_TEST(file->big_block_read))); DBUG_ASSERT(buff != 0 || (buff == 0 && (unlock_pin == PAGECACHE_PIN || unlock_pin == PAGECACHE_PIN_LEFT_PINNED))); DBUG_ASSERT(pageno < ((1ULL) << 40)); @@ -3369,6 +3597,19 @@ uchar *pagecache_read(PAGECACHE *pagecache, restart: + /* + If we use big block than the big block is multiple of blocks and we + have enouch blocks in cache + */ + DBUG_ASSERT(!file->big_block_read || + (file->big_block_size != 0 && + file->big_block_size % pagecache->block_size == 0)); + /* + && + pagecache->blocks / + (file->big_block_size/pagecache->block_size) > 8)); + */ + if (pagecache->can_be_used) { /* Key cache is used */ @@ -3387,19 +3628,41 @@ uchar *pagecache_read(PAGECACHE *pagecache, pagecache->global_cache_r_requests++; /* See NOTE for pagecache_unlock about registering requests. */ reg_request= ((new_pin == PAGECACHE_PIN_LEFT_UNPINNED) || - (new_pin == PAGECACHE_PIN)); + (new_pin == PAGECACHE_PIN) || + file->big_block_read); block= find_block(pagecache, file, pageno, level, lock == PAGECACHE_LOCK_WRITE, buff != 0, - reg_request, &page_st); + reg_request, FALSE, &page_st); DBUG_PRINT("info", ("Block type: %s current type %s", page_cache_page_type_str[block->type], page_cache_page_type_str[type])); if (((block->status & PCBLOCK_ERROR) == 0) && (page_st != PAGE_READ)) { - /* The requested page is to be read into the block buffer */ - read_block(pagecache, block, - (my_bool)(page_st == PAGE_TO_BE_READ)); - DBUG_PRINT("info", ("read is done")); + my_bool primary= (my_bool)(page_st == PAGE_TO_BE_READ); + if (!file->big_block_read || !primary || + pageno < file->first_page) // TODO: remove after testing + { + /* The requested page is to be read into the block buffer */ + read_block(pagecache, block, primary); + DBUG_PRINT("info", ("read is done")); + } + else + { + // It is big read and this thread should read + if (read_big_block(pagecache, block)) + { + // block is unregistered in read_big_block + pagecache_pthread_mutex_unlock(&pagecache->cache_lock); + DBUG_PRINT("info", ("big block fail, restarting...")); + goto restart; + } + if (!((new_pin == PAGECACHE_PIN_LEFT_UNPINNED) || + (new_pin == PAGECACHE_PIN))) + { + // we registered reqest only for big_block_read + unreg_request(pagecache, block, 1); + } + } } /* Assert after block is read. Imagine two concurrent SELECTs on same @@ -3992,6 +4255,12 @@ my_bool pagecache_write_part(PAGECACHE *pagecache, DBUG_ASSERT(pageno < ((1ULL) << 40)); #endif + if (file->big_block_read) + { + DBUG_ASSERT(0); + DBUG_RETURN(1); + } + if (!page_link) page_link= &fake_link; *page_link= 0; @@ -4026,7 +4295,7 @@ my_bool pagecache_write_part(PAGECACHE *pagecache, (pin == PAGECACHE_PIN)); block= find_block(pagecache, file, pageno, level, TRUE, FALSE, - reg_request, &page_st); + reg_request, FALSE, &page_st); if (!block) { DBUG_ASSERT(write_mode != PAGECACHE_WRITE_DONE); @@ -4278,6 +4547,8 @@ static my_bool free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, block->type= PAGECACHE_EMPTY_PAGE; #endif block->rec_lsn= LSN_MAX; + DBUG_PRINT("XXX", ("block (Free): %p, hash_link: %p -> NULL", + block, block->hash_link)); block->hash_link= NULL; if (block->temperature == PCBLOCK_WARM) pagecache->warm_blocks--; @@ -5265,8 +5536,11 @@ static void pagecache_debug_print(const char * fmt, ...) va_start(args,fmt); if (pagecache_debug_log) { - VOID(vfprintf(pagecache_debug_log, fmt, args)); - VOID(fputc('\n',pagecache_debug_log)); + vfprintf(pagecache_debug_log, fmt, args); + fputc('\n',pagecache_debug_log); +#ifdef PAGECACHE_DEBUG_DLOG + _db_doprnt_(fmt, args); +#endif } va_end(args); } @@ -5307,6 +5581,35 @@ static void null_post_write_hook(int res __attribute__((unused)), return; } +#ifndef DBUG_OFF +static my_bool + _pagecache_big_block_read_emu(struct st_pagecache *pagecache, + PAGECACHE_IO_HOOK_ARGS *args, + struct st_pagecache_file *file, + LEX_STRING *data) +{ + DBUG_ASSERT(file->big_block_size > 0); + + if (!(data->str= my_malloc(file->big_block_size, MYF(MY_WME)))) + return TRUE; + + data->length= mysql_file_pread(file->file, + (unsigned char *)data->str, + file->big_block_size, + ((my_off_t) args->pageno << pagecache->shift), + MYF(0)); + if (data->length == 0 || data->length == MY_FILE_ERROR) + { + data->length = 0; + if (!my_errno) + my_errno= HA_ERR_UNSUPPORTED; // just something + return TRUE; + } + + return FALSE; +} +#endif /* DBUG_OFF */ + void pagecache_file_set_null_hooks(PAGECACHE_FILE *file) { @@ -5316,4 +5619,12 @@ pagecache_file_set_null_hooks(PAGECACHE_FILE *file) file->post_write_hook= null_post_write_hook; file->flush_log_callback= null_pre_hook; file->callback_data= NULL; + file->first_page= file->big_block_size= 0; + file->big_block_read= NULL; + DBUG_EXECUTE_IF("maria_emulate_big_block", + { + file->first_page= 1; + file->big_block_size= 4*1024*1024; + file->big_block_read= _pagecache_big_block_read_emu; + }); } diff --git a/storage/maria/ma_pagecache.h b/storage/maria/ma_pagecache.h index 1183f9d57e0..e3b68ce4f5f 100644 --- a/storage/maria/ma_pagecache.h +++ b/storage/maria/ma_pagecache.h @@ -86,9 +86,16 @@ typedef struct st_pagecache_io_hook_args uchar *crypt_buf; /* when using encryption */ } PAGECACHE_IO_HOOK_ARGS; +struct st_pagecache; + /* file descriptor for Maria */ typedef struct st_pagecache_file { + /* size n pages of first "page" (which is not a big block) */ + size_t first_page; + /* size of a big block for S3 or 0 */ + size_t big_block_size; + /* File number */ File file; /** Cannot be NULL */ @@ -99,9 +106,19 @@ typedef struct st_pagecache_file my_bool (*pre_write_hook)(PAGECACHE_IO_HOOK_ARGS *args); void (*post_write_hook)(int error, PAGECACHE_IO_HOOK_ARGS *args); - /** Cannot be NULL */ my_bool (*flush_log_callback)(PAGECACHE_IO_HOOK_ARGS *args); + /** + Function for reading file in big hunks from S3 + Data will be filled with pointer and length to data read + start_page will be contain first page read. + */ + my_bool (*big_block_read)(struct st_pagecache *pagecache, + PAGECACHE_IO_HOOK_ARGS *args, + struct st_pagecache_file *file, LEX_STRING *data); + + + /** Cannot be NULL */ uchar *callback_data; } PAGECACHE_FILE; @@ -123,6 +140,7 @@ typedef struct st_pagecache_hash_link PAGECACHE_HASH_LINK; #define PAGECACHE_PRIORITY_DEFAULT 3 #define PAGECACHE_PRIORITY_HIGH 6 + /* The page cache structure It also contains read-only statistics parameters.
participants (1)
-
Oleksandr Byelkin