[Commits] 0d136cfc32b: MDEV-12253: Buffer pool blocks are accessed after they have been freed

jan jan.lindstrom at mariadb.com
Thu Mar 23 16:35:42 EET 2017


revision-id: 0d136cfc32b66c0636eaf90efe5e02be4859fd4e (mariadb-10.2.4-79-g0d136cfc32b)
parent(s): 130346ce3593d77c86efcc88a02e33c9aed3bdab
author: Jan Lindström
committer: Jan Lindström
timestamp: 2017-03-23 16:33:17 +0200
message:

MDEV-12253: Buffer pool blocks are accessed after they have been freed

Problem was that bpage was referenced after it was already freed
from LRU. Fixed by adding a new variable encrypted that is
passed down to buf_page_check_corrupt() and used in
buf_page_get_gen() to stop processing page read.

Removed dict_table_t::is_encrypted and dict_table_t::ibd_file_missing
and replaced these with dict_table_t::file_unreadable. Table
ibd file is missing if fil_get_space(space_id) returns NULL
and encrypted if not. Removed buf_page_t::key_version as we can
always read key_version from page, note that reported key_version
in error message could be incorrect if page decryption was
done using incorrect key.

Added test cases when enrypted page could be read while doing
redo log crash recovery. Also added test case for row compressed
blobs.

btr_cur_open_at_index_side_func(),
btr_cur_open_at_rnd_pos_func(): Avoid referencing block that is
NULL.

buf_page_get_zip(): Issue error if page read fails.

buf_page_get_gen(): Use dberr_t for error detection and
do not reference bpage after we hare freed it.

buf_mark_space_corrupt(): remove bpage from LRU also when
it is encrypted.

buf_page_check_corrupt(): @return DB_SUCCESS if page has
been read and is not corrupted,
DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does not match. In read
case only DB_SUCCESS is possible.

buf_page_io_complete(): use dberr_t for error handling.

buf_flush_write_block_low(),
buf_read_ahead_random(),
buf_read_page_async(),
buf_read_ahead_linear(),
buf_read_ibuf_merge_pages(),
buf_read_recv_pages(),
fil_aio_wait():
        Issue error if page read fails.

btr_pcur_move_to_next_page(): Do not reference page if it is
NULL.

dict_stats_update_transient_for_index(),
dict_stats_update_transient()
        Do not continue if table decryption failed or table
        is corrupted.

fil_parse_write_crypt_data():
        Check that key read from redo log entry is found from
        encryptio plugin and if it is not, refuse to start.

---
 mysql-test/std_data/keys.txt                       |   2 +-
 mysql-test/std_data/keys2.txt                      |   3 +-
 mysql-test/std_data/keys3.txt                      |   2 +-
 .../encryption/r/innodb-bad-key-change.result      |  13 -
 .../encryption/r/innodb-bad-key-change2.result     |   1 +
 .../encryption/r/innodb-compressed-blob.result     |  24 ++
 .../suite/encryption/r/innodb-force-corrupt.result |  34 +++
 .../suite/encryption/r/innodb-redo-badkey.result   |  47 ++++
 .../suite/encryption/r/innodb-redo-nokeys.result   |  37 +++
 .../suite/encryption/t/innodb-compressed-blob.opt  |   4 +
 .../suite/encryption/t/innodb-compressed-blob.test |  44 +++
 .../suite/encryption/t/innodb-force-corrupt.test   | 103 +++++++
 .../suite/encryption/t/innodb-redo-badkey.opt      |   5 +
 .../suite/encryption/t/innodb-redo-badkey.test     |  93 +++++++
 .../suite/encryption/t/innodb-redo-nokeys.opt      |   3 +
 .../suite/encryption/t/innodb-redo-nokeys.test     |  80 ++++++
 .../suite/innodb/r/innodb_bug14147491.result       |   1 +
 mysql-test/suite/innodb/t/innodb_bug14147491.test  |   1 +
 storage/innobase/btr/btr0btr.cc                    |   6 +-
 storage/innobase/btr/btr0cur.cc                    |  21 +-
 storage/innobase/btr/btr0defragment.cc             |   2 +-
 storage/innobase/btr/btr0pcur.cc                   |   9 +
 storage/innobase/buf/buf0buf.cc                    | 271 ++++++++-----------
 storage/innobase/buf/buf0rea.cc                    | 152 +++++++----
 storage/innobase/dict/dict0crea.cc                 |   4 +-
 storage/innobase/dict/dict0defrag_bg.cc            |   2 +-
 storage/innobase/dict/dict0dict.cc                 |  23 +-
 storage/innobase/dict/dict0load.cc                 |  17 +-
 storage/innobase/dict/dict0stats.cc                |  18 +-
 storage/innobase/fil/fil0crypt.cc                  |  18 +-
 storage/innobase/fil/fil0fil.cc                    |  20 +-
 storage/innobase/handler/ha_innodb.cc              |  35 ++-
 storage/innobase/handler/handler0alter.cc          |  28 +-
 storage/innobase/include/btr0btr.ic                |   2 +-
 storage/innobase/include/buf0buf.h                 |  28 +-
 storage/innobase/include/buf0rea.h                 |  17 +-
 storage/innobase/include/db0err.h                  |   2 +
 storage/innobase/include/dict0dict.h               |   9 +
 storage/innobase/include/dict0mem.h                |  13 +-
 storage/innobase/include/fil0crypt.h               |   5 +-
 storage/innobase/include/log0recv.h                |   5 +-
 storage/innobase/log/log0recv.cc                   |  23 +-
 storage/innobase/page/page0cur.cc                  |   2 +-
 storage/innobase/row/row0import.cc                 |   6 +-
 storage/innobase/row/row0ins.cc                    |   4 +-
 storage/innobase/row/row0merge.cc                  |  10 +-
 storage/innobase/row/row0mysql.cc                  |  31 ++-
 storage/innobase/row/row0purge.cc                  |   2 +-
 storage/innobase/row/row0sel.cc                    |   9 +-
 storage/innobase/row/row0trunc.cc                  |   7 +-
 storage/innobase/row/row0uins.cc                   |   2 +-
 storage/innobase/row/row0umod.cc                   |   2 +-
 storage/innobase/srv/srv0start.cc                  |   7 +-
 storage/innobase/trx/trx0trx.cc                    |   2 +-
 storage/innobase/ut/ut0ut.cc                       |   2 +
 storage/xtradb/api/api0api.cc                      |   6 +-
 storage/xtradb/btr/btr0btr.cc                      |   6 +-
 storage/xtradb/btr/btr0cur.cc                      |  15 +-
 storage/xtradb/btr/btr0defragment.cc               |   2 +-
 storage/xtradb/btr/btr0pcur.cc                     |  10 +
 storage/xtradb/buf/buf0buf.cc                      | 297 +++++++++------------
 storage/xtradb/buf/buf0rea.cc                      | 230 ++++++++++------
 storage/xtradb/dict/dict0crea.cc                   |   2 +-
 storage/xtradb/dict/dict0dict.cc                   |  23 +-
 storage/xtradb/dict/dict0load.cc                   |  21 +-
 storage/xtradb/dict/dict0stats.cc                  |  15 +-
 storage/xtradb/fil/fil0crypt.cc                    |  12 +-
 storage/xtradb/fil/fil0fil.cc                      |  37 ++-
 storage/xtradb/handler/ha_innodb.cc                |  52 ++--
 storage/xtradb/handler/handler0alter.cc            |  25 +-
 storage/xtradb/include/btr0btr.ic                  |   2 +-
 storage/xtradb/include/btr0pcur.ic                 |   5 +
 storage/xtradb/include/buf0buf.h                   |  23 +-
 storage/xtradb/include/buf0rea.h                   |  37 +--
 storage/xtradb/include/db0err.h                    |   2 +
 storage/xtradb/include/dict0dict.h                 |   9 +
 storage/xtradb/include/dict0mem.h                  |  18 +-
 storage/xtradb/include/fil0crypt.h                 |   4 +-
 storage/xtradb/include/log0recv.h                  |   5 +-
 storage/xtradb/log/log0recv.cc                     |  20 +-
 storage/xtradb/row/row0import.cc                   |   6 +-
 storage/xtradb/row/row0ins.cc                      |   7 +-
 storage/xtradb/row/row0merge.cc                    |  11 +-
 storage/xtradb/row/row0mysql.cc                    |  39 +--
 storage/xtradb/row/row0purge.cc                    |   2 +-
 storage/xtradb/row/row0sel.cc                      |   8 +-
 storage/xtradb/row/row0uins.cc                     |   2 +-
 storage/xtradb/row/row0umod.cc                     |   2 +-
 storage/xtradb/srv/srv0start.cc                    |   7 +-
 storage/xtradb/trx/trx0trx.cc                      |   2 +-
 storage/xtradb/ut/ut0ut.cc                         |   2 +
 91 files changed, 1519 insertions(+), 760 deletions(-)

diff --git a/mysql-test/std_data/keys.txt b/mysql-test/std_data/keys.txt
index e511521d7ab..2d83b7fba60 100644
--- a/mysql-test/std_data/keys.txt
+++ b/mysql-test/std_data/keys.txt
@@ -9,4 +9,4 @@
 
 5;966050D7777350B6FD5CCB3E5F648DA45C63BEFB6DEDDFA13443F156B7D35C84
 6;B5EA210C8C09EF20DB95EC584714A89F # and yet another
-
+10;770A8A65DA156D24EE2A093277530143
diff --git a/mysql-test/std_data/keys2.txt b/mysql-test/std_data/keys2.txt
index 5b98fbeebd2..93fc537cd3e 100644
--- a/mysql-test/std_data/keys2.txt
+++ b/mysql-test/std_data/keys2.txt
@@ -1,7 +1,8 @@
-1;593E580927F47AB530D3B1237CDEF6D6
+1;770A8A65DA156D24EE2A093277530142
 2;352E42F1B9DB5CB915C3262FE745520A
 3;CFE065600F5EB57481075C65180C3F8A
 4;205379930183490D3BECA139BDF4DB5B
 5;E2D944D5D837A1DCB22FF7FD397892EE
 6;BAFE99B0BB87F2CD33A6AF26A11F6BD1
 19;678D6B0063824BACCE33224B385104B35F30FF5749F0EBC030A0955DBC7FAC34
+20;BAFE99B0BB87F2CD33A6AF26A11F6BD1
diff --git a/mysql-test/std_data/keys3.txt b/mysql-test/std_data/keys3.txt
index 4f6c618d2ad..be30acc6add 100644
--- a/mysql-test/std_data/keys3.txt
+++ b/mysql-test/std_data/keys3.txt
@@ -1,4 +1,4 @@
-1;593E580927F47AB530D3B1237CDEF6D6
+1;770A8A65DA156D24EE2A093277530142
 2;E4B00A45BF775B4E07D634EC5CA5912B
 3;6E35ACB162B29D1FB9E178021DAF16ED
 4;971A664A88EE0022D408E40BFAB17E79
diff --git a/mysql-test/suite/encryption/r/innodb-bad-key-change.result b/mysql-test/suite/encryption/r/innodb-bad-key-change.result
index 798057e3fb5..6c44c462f0d 100644
--- a/mysql-test/suite/encryption/r/innodb-bad-key-change.result
+++ b/mysql-test/suite/encryption/r/innodb-bad-key-change.result
@@ -32,11 +32,8 @@ Warning	192	Table test/t1 in tablespace  is encrypted but encryption service or
 Warning	192	Table test/t1 is encrypted but encryption service or used key_id 2 is not available.  Can't continue reading table.
 Error	1296	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 DROP TABLE t1;
-Warnings:
-Warning	192	Table in tablespace  encrypted.However key management plugin or used key_id 1 is not found or used encryption algorithm or method does not match. Can't continue opening the table.
 SHOW WARNINGS;
 Level	Code	Message
-Warning	192	Table in tablespace  encrypted.However key management plugin or used key_id 1 is not found or used encryption algorithm or method does not match. Can't continue opening the table.
 # Start server with keys.txt
 CREATE TABLE t2 (c VARCHAR(8), id int not null primary key, b int, key(b)) ENGINE=InnoDB ENCRYPTED=YES;
 INSERT INTO t2 VALUES ('foobar',1,2);
@@ -53,47 +50,40 @@ SELECT * FROM t2 where id = 1;
 ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 SHOW WARNINGS;
 Level	Code	Message
-Warning	1812	Tablespace is missing for table 'test/t2'
 Warning	192	Table test/t2 is encrypted but encryption service or used key_id is not available.  Can't continue reading table.
 Error	1296	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 SELECT * FROM t2 where b = 1;
 ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 SHOW WARNINGS;
 Level	Code	Message
-Warning	1812	Tablespace is missing for table 'test/t2'
 Warning	192	Table test/t2 is encrypted but encryption service or used key_id is not available.  Can't continue reading table.
 Error	1296	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 INSERT INTO t2 VALUES ('tmp',3,3);
 ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 SHOW WARNINGS;
 Level	Code	Message
-Warning	1812	Tablespace is missing for table 'test/t2'
 Warning	192	Table test/t2 is encrypted but encryption service or used key_id is not available.  Can't continue reading table.
 Error	1296	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 DELETE FROM t2 where b = 3;
 ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 SHOW WARNINGS;
 Level	Code	Message
-Warning	1812	Tablespace is missing for table 'test/t2'
 Warning	192	Table test/t2 is encrypted but encryption service or used key_id is not available.  Can't continue reading table.
 Error	1296	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 DELETE FROM t2 where id = 3;
 ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 SHOW WARNINGS;
 Level	Code	Message
-Warning	1812	Tablespace is missing for table 'test/t2'
 Warning	192	Table test/t2 is encrypted but encryption service or used key_id is not available.  Can't continue reading table.
 Error	1296	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 UPDATE t2 set b = b +1;
 ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 SHOW WARNINGS;
 Level	Code	Message
-Warning	1812	Tablespace is missing for table 'test/t2'
 Warning	192	Table test/t2 is encrypted but encryption service or used key_id is not available.  Can't continue reading table.
 Error	1296	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 OPTIMIZE TABLE t2;
 Table	Op	Msg_type	Msg_text
-test.t2	optimize	Warning	Tablespace is missing for table 'test/t2'
 test.t2	optimize	Warning	Table test/t2 is encrypted but encryption service or used key_id is not available.  Can't continue reading table.
 test.t2	optimize	Error	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 test.t2	optimize	error	Corrupt
@@ -103,12 +93,10 @@ ALTER TABLE t2 ADD COLUMN c INT;
 ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 SHOW WARNINGS;
 Level	Code	Message
-Warning	1812	Tablespace is missing for table 'test/t2'
 Warning	192	Table test/t2 is encrypted but encryption service or used key_id is not available.  Can't continue reading table.
 Error	1296	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 ANALYZE TABLE t2;
 Table	Op	Msg_type	Msg_text
-test.t2	analyze	Warning	Tablespace is missing for table 'test/t2'
 test.t2	analyze	Warning	Table test/t2 is encrypted but encryption service or used key_id is not available.  Can't continue reading table.
 test.t2	analyze	Error	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 test.t2	analyze	error	Corrupt
@@ -118,7 +106,6 @@ TRUNCATE TABLE t2;
 ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 SHOW WARNINGS;
 Level	Code	Message
-Warning	1812	Tablespace is missing for table 'test/t2'
 Warning	192	Table test/t2 is encrypted but encryption service or used key_id is not available.  Can't continue reading table.
 Error	1296	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 DROP TABLE t2;
diff --git a/mysql-test/suite/encryption/r/innodb-bad-key-change2.result b/mysql-test/suite/encryption/r/innodb-bad-key-change2.result
index adf984b9708..a5ae7f889bf 100644
--- a/mysql-test/suite/encryption/r/innodb-bad-key-change2.result
+++ b/mysql-test/suite/encryption/r/innodb-bad-key-change2.result
@@ -13,6 +13,7 @@ alter table t1 discard tablespace;
 ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 show warnings;
 Level	Code	Message
+Warning	1812	Tablespace is missing for table 'test/t1'
 Error	1296	Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
 alter table t1 engine=InnoDB;
 ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
diff --git a/mysql-test/suite/encryption/r/innodb-compressed-blob.result b/mysql-test/suite/encryption/r/innodb-compressed-blob.result
new file mode 100644
index 00000000000..8ef60cb3d78
--- /dev/null
+++ b/mysql-test/suite/encryption/r/innodb-compressed-blob.result
@@ -0,0 +1,24 @@
+call mtr.add_suppression("InnoDB: Block in space_id .* in file .* encrypted.");
+call mtr.add_suppression("InnoDB: However key management plugin or used key_version .* is not found or used encryption algorithm or method does not match.");
+call mtr.add_suppression("InnoDB: Marking tablespace as missing. You may drop this table or install correct key management plugin and key file.");
+call mtr.add_suppression("InnoDB: The page .* cannot be decrypted.");
+# Restart mysqld --file-key-management-filename=keys2.txt
+SET GLOBAL innodb_file_format = `Barracuda`;
+SET GLOBAL innodb_file_per_table = ON;
+set GLOBAL innodb_default_encryption_key_id=4;
+create table t1(a int not null primary key, b blob, index(b(10))) engine=innodb row_format=compressed;
+create table t2(a int not null primary key, b blob, index(b(10))) engine=innodb row_format=compressed encrypted=yes;
+create table t3(a int not null primary key, b blob, index(b(10))) engine=innodb row_format=compressed encrypted=no;
+insert into t1 values (1, repeat('secret',6000));
+insert into t2 values (1, repeat('secret',6000));
+insert into t3 values (1, repeat('secret',6000));
+# Restart mysqld --file-key-management-filename=keys3.txt
+select count(*) from t1 FORCE INDEX (b) where b like 'secret%';
+ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
+select count(*) from t2 FORCE INDEX (b) where b like 'secret%';
+ERROR HY000: Got error 192 'Table encrypted but decryption failed. This could be because correct encryption management plugin is not loaded, used encryption key is not available or encryption method does not match.' from InnoDB
+select count(*) from t3 FORCE INDEX (b) where b like 'secret%';
+count(*)
+1
+# Restart mysqld --file-key-management-filename=keys2.txt
+drop table t1,t2,t3;
diff --git a/mysql-test/suite/encryption/r/innodb-force-corrupt.result b/mysql-test/suite/encryption/r/innodb-force-corrupt.result
new file mode 100644
index 00000000000..394a65d494f
--- /dev/null
+++ b/mysql-test/suite/encryption/r/innodb-force-corrupt.result
@@ -0,0 +1,34 @@
+call mtr.add_suppression("InnoDB: Database page corruption on disk or a failed.*");
+call mtr.add_suppression("InnoDB: Corruption: Block in space_id .* in file .* corrupted.");
+call mtr.add_suppression("InnoDB: Unable to read tablespace .* page no .* into the buffer pool after .* attempts");
+CALL mtr.add_suppression("InnoDB: Space .* file test/t1 read of page .*");
+CALL mtr.add_suppression("InnoDB: You may have to recover from a backup.");
+CALL mtr.add_suppression("InnoDB: It is also possible that your operatingsystem has corrupted its own file cache.");
+CALL mtr.add_suppression("InnoDB: and rebooting your computer removes the error.");
+CALL mtr.add_suppression("InnoDB: If the corrupt page is an index page you can also try to");
+CALL mtr.add_suppression("InnoDB: fix the corruption by dumping, dropping, and reimporting");
+CALL mtr.add_suppression("InnoDB: the corrupt table. You can use CHECK");
+CALL mtr.add_suppression("InnoDB: TABLE to scan your table for corruption.");
+CALL mtr.add_suppression("InnoDB: See also .* about forcing recovery.");
+CALL mtr.add_suppression("InnoDB: However key management plugin or used key_version .* is not found or used encryption algorithm or method does not match.");
+CALL mtr.add_suppression("Marking tablespace as missing. You may drop this table or install correct key management plugin and key file.");
+CALL mtr.add_suppression("InnoDB: The page .* cannot be decrypted.");
+SET GLOBAL innodb_file_format = `Barracuda`;
+SET GLOBAL innodb_file_per_table = ON;
+set global innodb_compression_algorithm = 1;
+# Create and populate tables to be corrupted
+CREATE TABLE t1 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT,c char(200)) ENGINE=InnoDB encrypted=yes;
+CREATE TABLE t2 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT,c char(200)) ENGINE=InnoDB row_format=compressed encrypted=yes;
+CREATE TABLE t3 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT, c char(200)) ENGINE=InnoDB page_compressed=yes encrypted=yes;
+BEGIN;
+INSERT INTO t1 (b,c) VALUES ('corrupt me','secret');
+INSERT INTO t1 (b,c) VALUES ('corrupt me','moresecretmoresecret');
+INSERT INTO t2 select * from t1;
+INSERT INTO t3 select * from t1;
+COMMIT;
+# Backup tables before corrupting
+# Corrupt tables
+SELECT * FROM t1;
+SELECT * FROM t3;
+# Restore the original t1.ibd
+DROP TABLE t1,t2,t3;
diff --git a/mysql-test/suite/encryption/r/innodb-redo-badkey.result b/mysql-test/suite/encryption/r/innodb-redo-badkey.result
new file mode 100644
index 00000000000..fde285c38d5
--- /dev/null
+++ b/mysql-test/suite/encryption/r/innodb-redo-badkey.result
@@ -0,0 +1,47 @@
+call mtr.add_suppression("InnoDB: Block in space_id .* in file .* encrypted.");
+call mtr.add_suppression("InnoDB: However key management plugin or used key_version .* is not found or used encryption algorithm or method does not match.");
+call mtr.add_suppression("InnoDB: Marking tablespace as missing. You may drop this table or install correct key management plugin and key file.");
+call mtr.add_suppression("mysqld: File .* not found .*");
+call mtr.add_suppression("Plugin 'file_key_management' init function returned error.");
+call mtr.add_suppression("Plugin 'file_key_management' registration as a ENCRYPTION failed.");
+call mtr.add_suppression("InnoDB: cannot enable encryption, encryption plugin is not available");
+call mtr.add_suppression("Plugin 'InnoDB' init function returned error.");
+call mtr.add_suppression("Plugin 'InnoDB' registration as a STORAGE ENGINE failed.");
+call mtr.add_suppression("InnoDB: Read operation failed for tablespace .*");
+call mtr.add_suppression("InnoDB: Database page corruption on disk or a failed.*");
+call mtr.add_suppression("InnoDB: Space .* file .* read of page .*.");
+call mtr.add_suppression("InnoDB: You may have to recover from a backup.");
+call mtr.add_suppression("InnoDB: Recovery read page .* from tablespace .*");
+call mtr.add_suppression("InnoDB: The page .* cannot be decrypted.");
+call mtr.add_suppression("InnoDB: Missing MLOG_FILE_NAME or MLOG_FILE_DELETE .*");
+call mtr.add_suppression("InnoDB: Plugin initialization aborted at srv0start.cc.*");
+call mtr.add_suppression("InnoDB: ############### CORRUPT LOG RECORD FOUND ##################");
+# Restart mysqld --file-key-management-filename=keys2.txt
+# Wait max 10 min for key encryption threads to encrypt all spaces
+SET GLOBAL innodb_file_format = `Barracuda`;
+SET GLOBAL innodb_file_per_table = ON;
+create table t1(a int not null primary key auto_increment, c char(250), b blob, index(b(10))) engine=innodb row_format=compressed encrypted=yes encryption_key_id=4;
+create table t2(a int not null primary key auto_increment, c char(250), b blob, index(b(10))) engine=innodb row_format=compressed;
+create table t3(a int not null primary key auto_increment, c char(250), b blob, index(b(10))) engine=innodb encrypted=yes encryption_key_id=4;
+create table t4(a int not null primary key auto_increment, c char(250), b blob, index(b(10))) engine=innodb;
+begin;
+insert into t2 select * from t1;
+insert into t3 select * from t1;
+insert into t4 select * from t1;
+commit;
+SET GLOBAL innodb_flush_log_at_trx_commit=1;
+begin;
+update t1 set c = repeat('secret3', 20);
+update t2 set c = repeat('secret4', 20);
+update t3 set c = repeat('secret4', 20);
+update t4 set c = repeat('secret4', 20);
+insert into t1 (c,b) values (repeat('secret5',20), repeat('secret6',6000));
+insert into t2 (c,b) values (repeat('secret7',20), repeat('secret8',6000));
+insert into t3 (c,b) values (repeat('secret9',20), repeat('secre10',6000));
+insert into t4 (c,b) values (repeat('secre11',20), repeat('secre12',6000));
+COMMIT;
+# Kill the server
+# restart
+# Kill the server
+# Restart mysqld --file-key-management-filename=keys2.txt
+drop table t1, t2,t3,t4;
diff --git a/mysql-test/suite/encryption/r/innodb-redo-nokeys.result b/mysql-test/suite/encryption/r/innodb-redo-nokeys.result
new file mode 100644
index 00000000000..1387089432a
--- /dev/null
+++ b/mysql-test/suite/encryption/r/innodb-redo-nokeys.result
@@ -0,0 +1,37 @@
+call mtr.add_suppression("InnoDB: Block in space_id .* in file .* encrypted.");
+call mtr.add_suppression("InnoDB: However key management plugin or used key_version .* is not found or used encryption algorithm or method does not match.");
+call mtr.add_suppression("InnoDB: Marking tablespace as missing. You may drop this table or install correct key management plugin and key file.");
+call mtr.add_suppression("mysqld: File .* not found .*");
+call mtr.add_suppression("Plugin 'file_key_management' init function returned error.");
+call mtr.add_suppression("Plugin 'file_key_management' registration as a ENCRYPTION failed.");
+call mtr.add_suppression("InnoDB: cannot enable encryption, encryption plugin is not available");
+call mtr.add_suppression("Plugin 'InnoDB' init function returned error.");
+call mtr.add_suppression("Plugin 'InnoDB' registration as a STORAGE ENGINE failed.");
+call mtr.add_suppression("InnoDB: The page .* cannot be decrypted.");
+# Restart mysqld --file-key-management-filename=keys2.txt
+SET GLOBAL innodb_file_format = `Barracuda`;
+SET GLOBAL innodb_file_per_table = ON;
+create table t1(a int not null primary key auto_increment, c char(200), b blob, index(b(10))) engine=innodb row_format=compressed encrypted=yes encryption_key_id=20;
+create table t2(a int not null primary key auto_increment, c char(200), b blob, index(b(10))) engine=innodb row_format=compressed;
+create table t3(a int not null primary key auto_increment, c char(200), b blob, index(b(10))) engine=innodb encrypted=yes encryption_key_id=20;
+create table t4(a int not null primary key auto_increment, c char(200), b blob, index(b(10))) engine=innodb;
+begin;
+insert into t2 select * from t1;
+insert into t3 select * from t1;
+insert into t4 select * from t1;
+commit;
+SET GLOBAL innodb_flush_log_at_trx_commit=1;
+begin;
+update t1 set c = repeat('secret3', 20);
+update t2 set c = repeat('secret4', 20);
+update t3 set c = repeat('secret4', 20);
+update t4 set c = repeat('secret4', 20);
+insert into t1 (c,b) values (repeat('secret5',20), repeat('secret6',6000));
+insert into t2 (c,b) values (repeat('secret7',20), repeat('secret8',6000));
+insert into t3 (c,b) values (repeat('secret9',20), repeat('secre10',6000));
+insert into t4 (c,b) values (repeat('secre11',20), repeat('secre12',6000));
+COMMIT;
+# Kill the server
+# restart
+# Restart mysqld --file-key-management-filename=keys2.txt
+drop table t1, t2,t3,t4;
diff --git a/mysql-test/suite/encryption/t/innodb-compressed-blob.opt b/mysql-test/suite/encryption/t/innodb-compressed-blob.opt
new file mode 100644
index 00000000000..36dcb6c6f26
--- /dev/null
+++ b/mysql-test/suite/encryption/t/innodb-compressed-blob.opt
@@ -0,0 +1,4 @@
+--innodb-tablespaces-encryption
+--innodb-encrypt-tables=on
+--innodb-encryption-threads=2
+--max_allowed_packet=64K
diff --git a/mysql-test/suite/encryption/t/innodb-compressed-blob.test b/mysql-test/suite/encryption/t/innodb-compressed-blob.test
new file mode 100644
index 00000000000..eeb3f024a81
--- /dev/null
+++ b/mysql-test/suite/encryption/t/innodb-compressed-blob.test
@@ -0,0 +1,44 @@
+-- source include/have_innodb.inc
+-- source include/have_file_key_management_plugin.inc
+
+# embedded does not support restart
+-- source include/not_embedded.inc
+
+call mtr.add_suppression("InnoDB: Block in space_id .* in file .* encrypted.");
+call mtr.add_suppression("InnoDB: However key management plugin or used key_version .* is not found or used encryption algorithm or method does not match.");
+call mtr.add_suppression("InnoDB: Marking tablespace as missing. You may drop this table or install correct key management plugin and key file.");
+call mtr.add_suppression("InnoDB: The page .* cannot be decrypted.");
+
+--echo # Restart mysqld --file-key-management-filename=keys2.txt
+-- let $restart_parameters=--file-key-management-filename=$MYSQL_TEST_DIR/std_data/keys2.txt
+-- source include/restart_mysqld.inc
+
+--disable_warnings
+SET GLOBAL innodb_file_format = `Barracuda`;
+SET GLOBAL innodb_file_per_table = ON;
+--enable_warnings
+
+set GLOBAL innodb_default_encryption_key_id=4;
+create table t1(a int not null primary key, b blob, index(b(10))) engine=innodb row_format=compressed;
+create table t2(a int not null primary key, b blob, index(b(10))) engine=innodb row_format=compressed encrypted=yes;
+create table t3(a int not null primary key, b blob, index(b(10))) engine=innodb row_format=compressed encrypted=no;
+
+insert into t1 values (1, repeat('secret',6000));
+insert into t2 values (1, repeat('secret',6000));
+insert into t3 values (1, repeat('secret',6000));
+
+--echo # Restart mysqld --file-key-management-filename=keys3.txt
+-- let $restart_parameters=--file-key-management-filename=$MYSQL_TEST_DIR/std_data/keys3.txt
+-- source include/restart_mysqld.inc
+
+--error 1296
+select count(*) from t1 FORCE INDEX (b) where b like 'secret%';
+--error 1296
+select count(*) from t2 FORCE INDEX (b) where b like 'secret%';
+select count(*) from t3 FORCE INDEX (b) where b like 'secret%';
+
+--echo # Restart mysqld --file-key-management-filename=keys2.txt
+-- let $restart_parameters=--file-key-management-filename=$MYSQL_TEST_DIR/std_data/keys2.txt
+-- source include/restart_mysqld.inc
+
+drop table t1,t2,t3;
diff --git a/mysql-test/suite/encryption/t/innodb-force-corrupt.test b/mysql-test/suite/encryption/t/innodb-force-corrupt.test
new file mode 100644
index 00000000000..65fbb613a5a
--- /dev/null
+++ b/mysql-test/suite/encryption/t/innodb-force-corrupt.test
@@ -0,0 +1,103 @@
+#
+# MDEV-11759: Encryption code in MariaDB 10.1/10.2 causes compatibility problems
+#
+
+-- source include/have_innodb.inc
+-- source include/have_file_key_management_plugin.inc
+# Don't test under embedded
+-- source include/not_embedded.inc
+
+call mtr.add_suppression("InnoDB: Database page corruption on disk or a failed.*");
+call mtr.add_suppression("InnoDB: Corruption: Block in space_id .* in file .* corrupted.");
+call mtr.add_suppression("InnoDB: Unable to read tablespace .* page no .* into the buffer pool after .* attempts");
+CALL mtr.add_suppression("InnoDB: Space .* file test/t1 read of page .*");
+CALL mtr.add_suppression("InnoDB: You may have to recover from a backup.");
+CALL mtr.add_suppression("InnoDB: It is also possible that your operatingsystem has corrupted its own file cache.");
+CALL mtr.add_suppression("InnoDB: and rebooting your computer removes the error.");
+CALL mtr.add_suppression("InnoDB: If the corrupt page is an index page you can also try to");
+CALL mtr.add_suppression("InnoDB: fix the corruption by dumping, dropping, and reimporting");
+CALL mtr.add_suppression("InnoDB: the corrupt table. You can use CHECK");
+CALL mtr.add_suppression("InnoDB: TABLE to scan your table for corruption.");
+CALL mtr.add_suppression("InnoDB: See also .* about forcing recovery.");
+CALL mtr.add_suppression("InnoDB: However key management plugin or used key_version .* is not found or used encryption algorithm or method does not match.");
+CALL mtr.add_suppression("Marking tablespace as missing. You may drop this table or install correct key management plugin and key file.");
+CALL mtr.add_suppression("InnoDB: The page .* cannot be decrypted.");
+
+--disable_warnings
+SET GLOBAL innodb_file_format = `Barracuda`;
+SET GLOBAL innodb_file_per_table = ON;
+set global innodb_compression_algorithm = 1;
+--enable_warnings
+
+--echo # Create and populate tables to be corrupted
+CREATE TABLE t1 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT,c char(200)) ENGINE=InnoDB encrypted=yes;
+CREATE TABLE t2 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT,c char(200)) ENGINE=InnoDB row_format=compressed encrypted=yes;
+CREATE TABLE t3 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT, c char(200)) ENGINE=InnoDB page_compressed=yes encrypted=yes;
+
+BEGIN;
+INSERT INTO t1 (b,c) VALUES ('corrupt me','secret');
+--disable_query_log
+--let $i = 100
+while ($i)
+{
+  INSERT INTO t1 (b,c) VALUES (REPEAT('abcabcabc', 100),'secretsecret');
+  dec $i;
+}
+--enable_query_log
+
+INSERT INTO t1 (b,c) VALUES ('corrupt me','moresecretmoresecret');
+INSERT INTO t2 select * from t1;
+INSERT INTO t3 select * from t1;
+COMMIT;
+
+let $MYSQLD_DATADIR=`select @@datadir`;
+let INNODB_PAGE_SIZE=`select @@innodb_page_size`;
+let MYSQLD_DATADIR=`select @@datadir`;
+
+--source include/shutdown_mysqld.inc
+
+--echo # Backup tables before corrupting
+--copy_file $MYSQLD_DATADIR/test/t1.ibd $MYSQLD_DATADIR/test/t1.ibd.backup
+--copy_file $MYSQLD_DATADIR/test/t2.ibd $MYSQLD_DATADIR/test/t2.ibd.backup
+--copy_file $MYSQLD_DATADIR/test/t3.ibd $MYSQLD_DATADIR/test/t3.ibd.backup
+
+--echo # Corrupt tables
+
+perl;
+open(FILE, "+<", "$ENV{MYSQLD_DATADIR}/test/t1.ibd") or die "open";
+binmode FILE;
+seek(FILE, $ENV{'INNODB_PAGE_SIZE'} * 3 + 26, SEEK_SET) or die "seek";
+print FILE pack("H*", "c00lcafedeadb017");
+close FILE or die "close";
+open(FILE, "+<", "$ENV{MYSQLD_DATADIR}/test/t2.ibd") or die "open";
+binmode FILE;
+seek(FILE, $ENV{'INNODB_PAGE_SIZE'} * 3 + 26, SEEK_SET) or die "seek";
+print FILE pack("H*", "c00lcafedeadb017");
+close FILE or die "close";
+open(FILE, "+<", "$ENV{MYSQLD_DATADIR}/test/t3.ibd") or die "open";
+binmode FILE;
+seek(FILE, $ENV{'INNODB_PAGE_SIZE'} * 3 + 26, SEEK_SET) or die "seek";
+print FILE pack("H*", "c00lcafedeadb017");
+close FILE or die "close";
+EOF
+
+--source include/start_mysqld.inc
+
+--disable_result_log
+--error ER_GET_ERRMSG, 1030
+SELECT * FROM t1;
+--error ER_GET_ERRMSG, 1030
+SELECT * FROM t3;
+--enable_result_log
+
+--source include/shutdown_mysqld.inc
+
+--echo # Restore the original t1.ibd
+--remove_file $MYSQLD_DATADIR/test/t1.ibd
+--move_file $MYSQLD_DATADIR/test/t1.ibd.backup $MYSQLD_DATADIR/test/t1.ibd
+--move_file $MYSQLD_DATADIR/test/t2.ibd.backup $MYSQLD_DATADIR/test/t2.ibd
+--move_file $MYSQLD_DATADIR/test/t3.ibd.backup $MYSQLD_DATADIR/test/t3.ibd
+
+--source include/start_mysqld.inc
+
+DROP TABLE t1,t2,t3;
diff --git a/mysql-test/suite/encryption/t/innodb-redo-badkey.opt b/mysql-test/suite/encryption/t/innodb-redo-badkey.opt
new file mode 100644
index 00000000000..343128e8803
--- /dev/null
+++ b/mysql-test/suite/encryption/t/innodb-redo-badkey.opt
@@ -0,0 +1,5 @@
+--innodb-change-buffering=all
+--innodb-encrypt-tables=on
+--innodb-tablespaces-encryption
+--innodb-encryption-threads=2
+--innodb-default-encryption-key-id=4
diff --git a/mysql-test/suite/encryption/t/innodb-redo-badkey.test b/mysql-test/suite/encryption/t/innodb-redo-badkey.test
new file mode 100644
index 00000000000..79771f83d71
--- /dev/null
+++ b/mysql-test/suite/encryption/t/innodb-redo-badkey.test
@@ -0,0 +1,93 @@
+-- source include/have_innodb.inc
+-- source include/have_file_key_management_plugin.inc
+# embedded does not support restart
+-- source include/not_embedded.inc
+
+call mtr.add_suppression("InnoDB: Block in space_id .* in file .* encrypted.");
+call mtr.add_suppression("InnoDB: However key management plugin or used key_version .* is not found or used encryption algorithm or method does not match.");
+call mtr.add_suppression("InnoDB: Marking tablespace as missing. You may drop this table or install correct key management plugin and key file.");
+call mtr.add_suppression("mysqld: File .* not found .*");
+call mtr.add_suppression("Plugin 'file_key_management' init function returned error.");
+call mtr.add_suppression("Plugin 'file_key_management' registration as a ENCRYPTION failed.");
+call mtr.add_suppression("InnoDB: cannot enable encryption, encryption plugin is not available");
+call mtr.add_suppression("Plugin 'InnoDB' init function returned error.");
+call mtr.add_suppression("Plugin 'InnoDB' registration as a STORAGE ENGINE failed.");
+call mtr.add_suppression("InnoDB: Read operation failed for tablespace .*");
+call mtr.add_suppression("InnoDB: Database page corruption on disk or a failed.*");
+call mtr.add_suppression("InnoDB: Space .* file .* read of page .*.");
+call mtr.add_suppression("InnoDB: You may have to recover from a backup.");
+call mtr.add_suppression("InnoDB: Recovery read page .* from tablespace .*");
+call mtr.add_suppression("InnoDB: The page .* cannot be decrypted.");
+call mtr.add_suppression("InnoDB: Missing MLOG_FILE_NAME or MLOG_FILE_DELETE .*");
+call mtr.add_suppression("InnoDB: Plugin initialization aborted at srv0start.cc.*");
+call mtr.add_suppression("InnoDB: ############### CORRUPT LOG RECORD FOUND ##################");
+
+--echo # Restart mysqld --file-key-management-filename=keys2.txt
+-- let $restart_parameters=--file-key-management-filename=$MYSQL_TEST_DIR/std_data/keys2.txt
+-- source include/restart_mysqld.inc
+
+--echo # Wait max 10 min for key encryption threads to encrypt all spaces
+--let $wait_timeout= 600
+--let $wait_condition=SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.INNODB_TABLESPACES_ENCRYPTION WHERE MIN_KEY_VERSION = 0
+--source include/wait_condition.inc
+
+--disable_warnings
+SET GLOBAL innodb_file_format = `Barracuda`;
+SET GLOBAL innodb_file_per_table = ON;
+--enable_warnings
+
+create table t1(a int not null primary key auto_increment, c char(250), b blob, index(b(10))) engine=innodb row_format=compressed encrypted=yes encryption_key_id=4;
+create table t2(a int not null primary key auto_increment, c char(250), b blob, index(b(10))) engine=innodb row_format=compressed;
+create table t3(a int not null primary key auto_increment, c char(250), b blob, index(b(10))) engine=innodb encrypted=yes encryption_key_id=4;
+create table t4(a int not null primary key auto_increment, c char(250), b blob, index(b(10))) engine=innodb;
+
+begin;
+--disable_query_log
+--let $i = 20
+begin;
+while ($i)
+{
+  insert into t1(c,b) values (repeat('secret1',20), repeat('secret2',6000));
+  dec $i;
+}
+--enable_query_log
+
+insert into t2 select * from t1;
+insert into t3 select * from t1;
+insert into t4 select * from t1;
+commit;
+
+--source ../../suite/innodb/include/no_checkpoint_start.inc
+
+#
+# We test redo log page read at recv_read_page using
+# incorrect keys from std_data/keys.txt. If checkpoint
+# happens we will skip this test. If no checkpoint
+# happens, InnoDB refuses to start as used
+# encryption key is incorrect.
+#
+SET GLOBAL innodb_flush_log_at_trx_commit=1;
+begin;
+update t1 set c = repeat('secret3', 20);
+update t2 set c = repeat('secret4', 20);
+update t3 set c = repeat('secret4', 20);
+update t4 set c = repeat('secret4', 20);
+insert into t1 (c,b) values (repeat('secret5',20), repeat('secret6',6000));
+insert into t2 (c,b) values (repeat('secret7',20), repeat('secret8',6000));
+insert into t3 (c,b) values (repeat('secret9',20), repeat('secre10',6000));
+insert into t4 (c,b) values (repeat('secre11',20), repeat('secre12',6000));
+COMMIT;
+let $cleanup= drop table t1,t2,t3,t4;
+--let CLEANUP_IF_CHECKPOINT= $cleanup;
+--source ../../suite/innodb/include/no_checkpoint_end.inc
+
+--echo # restart
+--error 1
+-- source include/start_mysqld.inc
+--source include/kill_mysqld.inc
+
+--echo # Restart mysqld --file-key-management-filename=keys2.txt
+-- let $restart_parameters=--file-key-management-filename=$MYSQL_TEST_DIR/std_data/keys2.txt
+-- source include/start_mysqld.inc
+
+drop table t1, t2,t3,t4;
diff --git a/mysql-test/suite/encryption/t/innodb-redo-nokeys.opt b/mysql-test/suite/encryption/t/innodb-redo-nokeys.opt
new file mode 100644
index 00000000000..21afc19fc5d
--- /dev/null
+++ b/mysql-test/suite/encryption/t/innodb-redo-nokeys.opt
@@ -0,0 +1,3 @@
+--innodb-change-buffering=none
+--innodb-encrypt-tables=on
+--innodb-default-encryption-key-id=20
diff --git a/mysql-test/suite/encryption/t/innodb-redo-nokeys.test b/mysql-test/suite/encryption/t/innodb-redo-nokeys.test
new file mode 100644
index 00000000000..5fff14c29f6
--- /dev/null
+++ b/mysql-test/suite/encryption/t/innodb-redo-nokeys.test
@@ -0,0 +1,80 @@
+-- source include/have_innodb.inc
+-- source include/have_file_key_management_plugin.inc
+# embedded does not support restart
+-- source include/not_embedded.inc
+
+call mtr.add_suppression("InnoDB: Block in space_id .* in file .* encrypted.");
+call mtr.add_suppression("InnoDB: However key management plugin or used key_version .* is not found or used encryption algorithm or method does not match.");
+call mtr.add_suppression("InnoDB: Marking tablespace as missing. You may drop this table or install correct key management plugin and key file.");
+call mtr.add_suppression("mysqld: File .* not found .*");
+call mtr.add_suppression("Plugin 'file_key_management' init function returned error.");
+call mtr.add_suppression("Plugin 'file_key_management' registration as a ENCRYPTION failed.");
+call mtr.add_suppression("InnoDB: cannot enable encryption, encryption plugin is not available");
+call mtr.add_suppression("Plugin 'InnoDB' init function returned error.");
+call mtr.add_suppression("Plugin 'InnoDB' registration as a STORAGE ENGINE failed.");
+call mtr.add_suppression("InnoDB: The page .* cannot be decrypted.");
+
+--echo # Restart mysqld --file-key-management-filename=keys2.txt
+-- let $restart_parameters=--file-key-management-filename=$MYSQL_TEST_DIR/std_data/keys2.txt
+-- source include/restart_mysqld.inc
+
+--disable_warnings
+SET GLOBAL innodb_file_format = `Barracuda`;
+SET GLOBAL innodb_file_per_table = ON;
+--enable_warnings
+
+create table t1(a int not null primary key auto_increment, c char(200), b blob, index(b(10))) engine=innodb row_format=compressed encrypted=yes encryption_key_id=20;
+create table t2(a int not null primary key auto_increment, c char(200), b blob, index(b(10))) engine=innodb row_format=compressed;
+create table t3(a int not null primary key auto_increment, c char(200), b blob, index(b(10))) engine=innodb encrypted=yes encryption_key_id=20;
+create table t4(a int not null primary key auto_increment, c char(200), b blob, index(b(10))) engine=innodb;
+
+begin;
+--disable_query_log
+--let $i = 20
+begin;
+while ($i)
+{
+  insert into t1(c,b) values (repeat('secret1',20), repeat('secret2',6000));
+  dec $i;
+}
+--enable_query_log
+
+insert into t2 select * from t1;
+insert into t3 select * from t1;
+insert into t4 select * from t1;
+commit;
+
+--source ../../suite/innodb/include/no_checkpoint_start.inc
+#
+# We test redo log page read at recv_read_page using
+# keys that are not in std_data/keys.txt. If checkpoint
+# happens we will skip this test. If no checkpoint
+# happens, InnoDB refuses to start as used
+# encryption key is not found.
+#
+SET GLOBAL innodb_flush_log_at_trx_commit=1;
+begin;
+update t1 set c = repeat('secret3', 20);
+update t2 set c = repeat('secret4', 20);
+update t3 set c = repeat('secret4', 20);
+update t4 set c = repeat('secret4', 20);
+insert into t1 (c,b) values (repeat('secret5',20), repeat('secret6',6000));
+insert into t2 (c,b) values (repeat('secret7',20), repeat('secret8',6000));
+insert into t3 (c,b) values (repeat('secret9',20), repeat('secre10',6000));
+insert into t4 (c,b) values (repeat('secre11',20), repeat('secre12',6000));
+COMMIT;
+let $cleanup= drop table t1,t2,t3,t4;
+--let CLEANUP_IF_CHECKPOINT= $cleanup;
+--source ../../suite/innodb/include/no_checkpoint_end.inc
+
+--echo # restart
+-- source include/start_mysqld.inc
+#
+# In above server does start but InnoDB refuses to start
+# thus we need to restart server with correct key file
+#
+--echo # Restart mysqld --file-key-management-filename=keys2.txt
+-- let $restart_parameters=--file-key-management-filename=$MYSQL_TEST_DIR/std_data/keys2.txt
+-- source include/restart_mysqld.inc
+
+drop table t1, t2,t3,t4;
diff --git a/mysql-test/suite/innodb/r/innodb_bug14147491.result b/mysql-test/suite/innodb/r/innodb_bug14147491.result
index cf960e3a6ee..bde86e8d9dc 100644
--- a/mysql-test/suite/innodb/r/innodb_bug14147491.result
+++ b/mysql-test/suite/innodb/r/innodb_bug14147491.result
@@ -1,4 +1,5 @@
 CREATE TABLE t1 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT) ROW_FORMAT=COMPACT ENGINE=InnoDB;
+call mtr.add_suppression("InnoDB: Unable to read tablespace .* page no .* into the buffer pool after .* attempts");
 INSERT INTO t1 (b) VALUES ('corrupt me');
 INSERT INTO t1 (b) VALUES ('corrupt me');
 # Backup the t1.ibd before corrupting
diff --git a/mysql-test/suite/innodb/t/innodb_bug14147491.test b/mysql-test/suite/innodb/t/innodb_bug14147491.test
index c848d24294d..307ebd207e8 100644
--- a/mysql-test/suite/innodb/t/innodb_bug14147491.test
+++ b/mysql-test/suite/innodb/t/innodb_bug14147491.test
@@ -2,6 +2,7 @@
 # Test opening a corrupted table.
 #
 # Valgrind can hang or return spurious messages on DBUG_SUICIDE
+call mtr.add_suppression("InnoDB: Unable to read tablespace .* page no .* into the buffer pool after .* attempts");
 source include/not_valgrind.inc;
 # Avoid CrashReporter popup on Mac
 source include/not_crashrep.inc;
diff --git a/storage/innobase/btr/btr0btr.cc b/storage/innobase/btr/btr0btr.cc
index e1446b409d1..2e75c455fe1 100644
--- a/storage/innobase/btr/btr0btr.cc
+++ b/storage/innobase/btr/btr0btr.cc
@@ -169,8 +169,7 @@ btr_root_block_get(
 
 	if (!block) {
 		if (index && index->table) {
-			index->table->is_encrypted = TRUE;
-			index->table->corrupted = FALSE;
+			index->table->file_unreadable = true;
 
 			ib_push_warning(index->table->thd, DB_DECRYPTION_FAILED,
 				"Table %s in tablespace %lu is encrypted but encryption service or"
@@ -183,6 +182,7 @@ btr_root_block_get(
 	}
 
 	btr_assert_not_corrupted(block, index);
+
 #ifdef UNIV_BTR_DEBUG
 	if (!dict_index_is_ibuf(index)) {
 		const page_t*	root = buf_block_get_frame(block);
@@ -5387,7 +5387,7 @@ btr_validate_index(
 
 	page_t*	root = btr_root_get(index, &mtr);
 
-	if (root == NULL && index->table->is_encrypted) {
+	if (root == NULL && index->table->file_unreadable) {
 		err = DB_DECRYPTION_FAILED;
 		mtr_commit(&mtr);
 		return err;
diff --git a/storage/innobase/btr/btr0cur.cc b/storage/innobase/btr/btr0cur.cc
index c15784a97a4..001bcae5360 100644
--- a/storage/innobase/btr/btr0cur.cc
+++ b/storage/innobase/btr/btr0cur.cc
@@ -1117,7 +1117,7 @@ btr_cur_search_to_nth_level(
 				" used key_id is not available. "
 				" Can't continue reading table.",
 				index->table->name);
-			index->table->is_encrypted = true;
+			index->table->file_unreadable = true;
 		}
 
 		goto func_exit;
@@ -1230,7 +1230,7 @@ btr_cur_search_to_nth_level(
 						" used key_id is not available. "
 						" Can't continue reading table.",
 						index->table->name);
-					index->table->is_encrypted = true;
+					index->table->file_unreadable = true;
 				}
 
 				goto func_exit;
@@ -1259,7 +1259,7 @@ btr_cur_search_to_nth_level(
 					" used key_id is not available. "
 					" Can't continue reading table.",
 					index->table->name);
-				index->table->is_encrypted = true;
+				index->table->file_unreadable = true;
 			}
 
 			goto func_exit;
@@ -2057,7 +2057,7 @@ btr_cur_open_at_index_side_func(
 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
 	ulint*		offsets		= offsets_;
 	dberr_t		err = DB_SUCCESS;
-	
+
 	rec_offs_init(offsets_);
 
 	estimate = latch_mode & BTR_ESTIMATE;
@@ -2166,7 +2166,7 @@ btr_cur_open_at_index_side_func(
 					" used key_id is not available. "
 					" Can't continue reading table.",
 					index->table->name);
-				index->table->is_encrypted = true;
+				index->table->file_unreadable = true;
 			}
 
 			goto exit_loop;
@@ -2524,7 +2524,7 @@ btr_cur_open_at_rnd_pos_func(
 					" used key_id is not available. "
 					" Can't continue reading table.",
 					index->table->name);
-				index->table->is_encrypted = true;
+				index->table->file_unreadable = true;
 			}
 			goto exit_loop;
 		}
@@ -5329,7 +5329,7 @@ btr_estimate_n_rows_in_range_on_level(
 					" used key_id is not available. "
 					" Can't continue reading table.",
 					index->table->name);
-				index->table->is_encrypted = true;
+				index->table->file_unreadable = true;
 			}
 
 			mtr_commit(&mtr);
@@ -5995,6 +5995,12 @@ btr_estimate_number_of_different_key_vals(
 		because otherwise our algorithm would give a wrong estimate
 		for an index where there is just one key value. */
 
+		if (index->table->file_unreadable) {
+			mtr_commit(&mtr);
+			goto exit_loop;
+		}
+
+
 		page = btr_cur_get_page(&cursor);
 
 		rec = page_rec_get_next(page_get_infimum_rec(page));
@@ -6078,6 +6084,7 @@ btr_estimate_number_of_different_key_vals(
 		mtr_commit(&mtr);
 	}
 
+exit_loop:
 	/* If we saw k borders between different key values on
 	n_sample_pages leaf pages, we can estimate how many
 	there will be in index->stat_n_leaf_pages */
diff --git a/storage/innobase/btr/btr0defragment.cc b/storage/innobase/btr/btr0defragment.cc
index 8c4f27cc78a..215f5e42e90 100644
--- a/storage/innobase/btr/btr0defragment.cc
+++ b/storage/innobase/btr/btr0defragment.cc
@@ -227,7 +227,7 @@ btr_defragment_add_index(
 		page = buf_block_get_frame(block);
 	}
 
-	if (page == NULL && index->table->is_encrypted) {
+	if (page == NULL && index->table->file_unreadable) {
 		mtr_commit(&mtr);
 		*err = DB_DECRYPTION_FAILED;
 		return NULL;
diff --git a/storage/innobase/btr/btr0pcur.cc b/storage/innobase/btr/btr0pcur.cc
index 445aa3504b7..a7e1e021cdf 100644
--- a/storage/innobase/btr/btr0pcur.cc
+++ b/storage/innobase/btr/btr0pcur.cc
@@ -418,6 +418,11 @@ btr_pcur_move_to_next_page(
 	cursor->old_stored = false;
 
 	page = btr_pcur_get_page(cursor);
+
+	if (!page) {
+		return;
+	}
+
 	next_page_no = btr_page_get_next(page, mtr);
 
 	ut_ad(next_page_no != FIL_NULL);
@@ -438,6 +443,10 @@ btr_pcur_move_to_next_page(
 		block->page.size, mode,
 		btr_pcur_get_btr_cur(cursor)->index, mtr);
 
+	if (!next_block) {
+		return;
+	}
+
 	next_page = buf_block_get_frame(next_block);
 #ifdef UNIV_BTR_DEBUG
 	ut_a(page_is_comp(next_page) == page_is_comp(page));
diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc
index 11fe77d75de..701bd5f50c4 100644
--- a/storage/innobase/buf/buf0buf.cc
+++ b/storage/innobase/buf/buf0buf.cc
@@ -1504,7 +1504,6 @@ buf_block_init(
 	block->page.buf_fix_count = 0;
 	block->page.io_fix = BUF_IO_NONE;
 	block->page.flush_observer = NULL;
-	block->page.key_version = 0;
 	block->page.encrypted = false;
 	block->page.real_size = 0;
 	block->page.write_size = 0;
@@ -3786,7 +3785,6 @@ buf_page_get_zip(
 	ibool		discard_attempted = FALSE;
 	ibool		must_read;
 	buf_pool_t*	buf_pool = buf_pool_get(page_id);
-	buf_page_t*	rpage = NULL;
 
 	buf_pool->stat.n_page_gets++;
 
@@ -3805,7 +3803,18 @@ buf_page_get_zip(
 		/* Page not in buf_pool: needs to be read from file */
 
 		ut_ad(!hash_lock);
-		buf_read_page(page_id, page_size, &rpage);
+		dberr_t err = buf_read_page(page_id, page_size);
+
+		if (err != DB_SUCCESS) {
+			ib::error()
+				<< "Reading compressed page "
+				<< page_id
+				<< " failed with error: "
+				<< ut_strerr(err);
+
+			goto err_exit;
+		}
+
 
 #if defined UNIV_DEBUG || defined UNIV_BUF_DEBUG
 		ut_a(++buf_dbg_counter % 5771 || buf_validate());
@@ -4279,7 +4288,6 @@ buf_page_get_gen(
 	}
 
 	if (block == NULL) {
-		buf_page_t*	bpage=NULL;
 
 		/* Page not in buf_pool: needs to be read from file */
 
@@ -4337,7 +4345,18 @@ buf_page_get_gen(
 			return(NULL);
 		}
 
-		if (buf_read_page(page_id, page_size, &bpage)) {
+		/* Call path is buf_read_page() -> buf_read_page_low()
+		(_fil_io()) -> buf_page_io_complete() ->
+		buf_decrypt_after_read() here fil_space_t* is used
+		and we decrypt -> buf_page_check_corrupt() where
+		page checksums are compared. Decryption/decompression
+		is handled lower level, error handling is handled on lower
+		level, here we need only to know is page really corrupted
+		or encrypted page with correct checksum. */
+
+		dberr_t local_err = buf_read_page(page_id, page_size);
+
+		if (local_err == DB_SUCCESS) {
 			buf_read_ahead_random(page_id, page_size,
 					      ibuf_inside(mtr));
 
@@ -4345,49 +4364,30 @@ buf_page_get_gen(
 		} else if (retries < BUF_PAGE_READ_MAX_RETRIES) {
 			++retries;
 
-			bool corrupted = false;
-
-			if (bpage) {
-				corrupted = buf_page_check_corrupt(bpage);
-			}
-
-			/* Do not try again for encrypted pages */
-			if (corrupted && bpage->encrypted) {
-				BPageMutex* pmutex = buf_page_get_mutex(bpage);
-
-				buf_pool = buf_pool_from_bpage(bpage);
-				buf_pool_mutex_enter(buf_pool);
-				mutex_enter(pmutex);
-
-				ut_ad(buf_pool->n_pend_reads > 0);
-				my_atomic_addlint(&buf_pool->n_pend_reads, -1);
-				buf_page_set_io_fix(bpage, BUF_IO_NONE);
-				mutex_exit(pmutex);
-				buf_LRU_free_page(bpage, true);
-				buf_pool_mutex_exit(buf_pool);
-				rw_lock_x_unlock_gen(&((buf_block_t*) bpage)->lock,
-					     BUF_IO_READ);
-
-				if (err) {
-					*err = DB_DECRYPTION_FAILED;
-				}
-
-				return (NULL);
-			}
-
 			DBUG_EXECUTE_IF(
 				"innodb_page_corruption_retries",
 				retries = BUF_PAGE_READ_MAX_RETRIES;
 			);
 		} else {
-			bool corrupted = false;
+			if (err) {
+				*err = local_err;
+			}
 
-			if (bpage) {
-				corrupted = buf_page_check_corrupt(bpage);
+			/* Encrypted pages that are not corrupted are marked
+			as encrypted and that fact is later pushed to
+			user thread. */
+			if (local_err == DB_DECRYPTION_FAILED) {
+				return (NULL);
 			}
 
-			if (corrupted && !bpage->encrypted) {
-				ib::fatal() << "Unable to read page " << page_id
+			/* Try to set table as corrupted instead of
+			asserting. */
+			if (page_id.space() > TRX_SYS_SPACE &&
+				dict_set_corrupted_by_space(page_id.space())) {
+				return (NULL);
+			}
+
+			ib::fatal() << "Unable to read page " << page_id
 					    << " into the buffer pool after "
 					    << BUF_PAGE_READ_MAX_RETRIES << " attempts."
 					" The most probable cause of this error may"
@@ -4399,28 +4399,6 @@ buf_page_get_gen(
 					" innodb_force_recovery."
 					" Please see " REFMAN " for more"
 					" details. Aborting...";
-			} else {
-				BPageMutex* pmutex = buf_page_get_mutex(bpage);
-
-				buf_pool = buf_pool_from_bpage(bpage);
-				buf_pool_mutex_enter(buf_pool);
-				mutex_enter(pmutex);
-
-				ut_ad(buf_pool->n_pend_reads > 0);
-				my_atomic_addlint(&buf_pool->n_pend_reads, -1);
- 				buf_page_set_io_fix(bpage, BUF_IO_NONE);
-				mutex_exit(pmutex);
-				buf_LRU_free_page(bpage, true);
-				buf_pool_mutex_exit(buf_pool);
-				rw_lock_x_unlock_gen(&((buf_block_t*) bpage)->lock,
-					     BUF_IO_READ);
-
-				if (err) {
-					*err = DB_DECRYPTION_FAILED;
-				}
-
-				return (NULL);
-			}
 		}
 
 #if defined UNIV_DEBUG || defined UNIV_BUF_DEBUG
@@ -5187,7 +5165,6 @@ buf_page_init_low(
 	bpage->newest_modification = 0;
 	bpage->oldest_modification = 0;
 	bpage->write_size = 0;
-	bpage->key_version = 0;
 	bpage->encrypted = false;
 	bpage->real_size = 0;
 	bpage->slot = NULL;
@@ -5782,55 +5759,51 @@ buf_page_monitor(
 /********************************************************************//**
 Mark a table with the specified space pointed by bpage->id.space() corrupted.
 Also remove the bpage from LRU list.
- at return TRUE if successful */
+ at param[in,out]		bpage			Block */
 static
-ibool
+void
 buf_mark_space_corrupt(
-/*===================*/
-	buf_page_t*	bpage)	/*!< in: pointer to the block in question */
+	buf_page_t*	bpage)
 {
 	buf_pool_t*	buf_pool = buf_pool_from_bpage(bpage);
 	const ibool	uncompressed = (buf_page_get_state(bpage)
 					== BUF_BLOCK_FILE_PAGE);
 	ib_uint32_t	space = bpage->id.space();
-	ibool		ret = TRUE;
+	BPageMutex*	block_mutex = buf_page_get_mutex(bpage);
 
-	if (!bpage->encrypted) {
-		/* First unfix and release lock on the bpage */
-		buf_pool_mutex_enter(buf_pool);
-		mutex_enter(buf_page_get_mutex(bpage));
-		ut_ad(buf_page_get_io_fix(bpage) == BUF_IO_READ);
-		ut_ad(bpage->buf_fix_count == 0);
-
-		/* Set BUF_IO_NONE before we remove the block from LRU list */
-		buf_page_set_io_fix(bpage, BUF_IO_NONE);
+	/* First unfix and release lock on the bpage */
+	buf_pool_mutex_enter(buf_pool);
+	mutex_enter(block_mutex);
+	ut_ad(buf_page_get_io_fix(bpage) == BUF_IO_READ);
+	ut_ad(bpage->buf_fix_count == 0);
 
-		if (uncompressed) {
-			rw_lock_x_unlock_gen(
-				&((buf_block_t*) bpage)->lock,
-				BUF_IO_READ);
-		}
+	/* Set BUF_IO_NONE before we remove the block from LRU list */
+	buf_page_set_io_fix(bpage, BUF_IO_NONE);
 
-		mutex_exit(buf_page_get_mutex(bpage));
+	if (uncompressed) {
+		rw_lock_x_unlock_gen(
+			&((buf_block_t*) bpage)->lock,
+			BUF_IO_READ);
 	}
 
-	/* Find the table with specified space id, and mark it corrupted */
-	if (dict_set_corrupted_by_space(space)) {
-		if (!bpage->encrypted) {
-			buf_LRU_free_one_page(bpage);
-		}
+	mutex_exit(block_mutex);
+
+	/* If block is not encrypted find the table with specified
+	space id, and mark it corrupted. Encrypted tables
+	are marked unusable later e.g. in ::open(). */
+	if (!bpage->encrypted) {
+		dict_set_corrupted_by_space(space);
 	} else {
-		ret = FALSE;
+		dict_set_encrypted_by_space(space);
 	}
 
-	if (!bpage->encrypted) {
-		ut_ad(buf_pool->n_pend_reads > 0);
-		buf_pool->n_pend_reads--;
+	/* After this point bpage can't be referenced. */
+	buf_LRU_free_one_page(bpage);
 
-		buf_pool_mutex_exit(buf_pool);
-	}
+	ut_ad(buf_pool->n_pend_reads > 0);
+	buf_pool->n_pend_reads--;
 
-	return(ret);
+	buf_pool_mutex_exit(buf_pool);
 }
 
 /********************************************************************//**
@@ -5838,9 +5811,12 @@ Check if page is maybe compressed, encrypted or both when we encounter
 corrupted page. Note that we can't be 100% sure if page is corrupted
 or decrypt/decompress just failed.
 @param[in,out]	bpage		Page
- at return true if page corrupted, false if not */
-UNIV_INTERN
-bool
+ at return DB_SUCCESS if page has been read and is not corrupted,
+DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
+DB_DECRYPTION_FAILED if page post encryption checksum matches but
+after decryption normal page checksum does not match.*/
+static
+dberr_t
 buf_page_check_corrupt(
 	buf_page_t*	bpage)
 {
@@ -5848,6 +5824,7 @@ buf_page_check_corrupt(
 		((buf_block_t*) bpage)->frame;
 	fil_space_t* space = fil_space_acquire_silent(bpage->id.space());
 	bool still_encrypted = false;
+	dberr_t err = DB_SUCCESS;
 	bool corrupted = false;
 	fil_space_crypt_t* crypt_data = space ? space->crypt_data : NULL;
 
@@ -5872,6 +5849,8 @@ buf_page_check_corrupt(
 
 		if (!corrupted) {
 			bpage->encrypted = false;
+		} else {
+			err = DB_PAGE_CORRUPTED;
 		}
 	}
 
@@ -5882,7 +5861,7 @@ buf_page_check_corrupt(
 		buf_page_io_complete(). */
 	} else if (still_encrypted || (bpage->encrypted && corrupted)) {
 		bpage->encrypted = true;
-		corrupted = true;
+		err = DB_DECRYPTION_FAILED;
 
 		ib::error()
 			<< "The page " << bpage->id << " in file "
@@ -5891,7 +5870,8 @@ buf_page_check_corrupt(
 
 		ib::info()
 			<< "However key management plugin or used key_version "
-			<< bpage->key_version << " is not found or"
+			<<  mach_read_from_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)
+			<< " is not found or"
 			" used encryption algorithm or method does not match.";
 
 		if (bpage->id.space() != TRX_SYS_SPACE) {
@@ -5907,27 +5887,31 @@ buf_page_check_corrupt(
 		fil_space_release(space);
 	}
 
-	return corrupted;
+	return (err);
 }
 
 /********************************************************************//**
 Completes an asynchronous read or write request of a file page to or from
 the buffer pool.
- at return true if successful */
-bool
+ at param[in,out]		bpage		Page to complete
+ at param[in]		evict		whether or not to evict the page
+					from LRU list.
+ at return DB_SUCCESS if page has been read and is not corrupted,
+DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
+DB_DECRYPTION_FAILED if page post encryption checksum matches but
+after decryption normal page checksum does not match.
+in write only DB_SUCCESS is possible. */
+dberr_t
 buf_page_io_complete(
-/*=================*/
-	buf_page_t*	bpage,	/*!< in: pointer to the block in question */
-	bool		evict)	/*!< in: whether or not to evict the page
-				from LRU list. */
-
+	buf_page_t*	bpage,
+	bool		evict)
 {
 	enum buf_io_fix	io_type;
 	buf_pool_t*	buf_pool = buf_pool_from_bpage(bpage);
 	const ibool	uncompressed = (buf_page_get_state(bpage)
 					== BUF_BLOCK_FILE_PAGE);
-	byte*	frame = NULL;
-	bool corrupted = false;
+	byte*		frame = NULL;
+	dberr_t 	err = DB_SUCCESS;
 
 	ut_a(buf_page_in_file(bpage));
 
@@ -5943,6 +5927,7 @@ buf_page_io_complete(
 	if (io_type == BUF_IO_READ) {
 		ulint	read_page_no;
 		ulint	read_space_id;
+		uint	key_version;
 
 		ut_ad(bpage->zip.data != NULL || ((buf_block_t*)bpage)->frame != NULL);
 
@@ -5968,6 +5953,8 @@ buf_page_io_complete(
 					   << bpage->id
 					   << " zip_decompress failure.";
 
+				err = DB_PAGE_CORRUPTED;
+
 				goto database_corrupted;
 			}
 			buf_pool->n_pend_unzip--;
@@ -5982,6 +5969,8 @@ buf_page_io_complete(
 		read_page_no = mach_read_from_4(frame + FIL_PAGE_OFFSET);
 		read_space_id = mach_read_from_4(
 			frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
+		key_version = mach_read_from_4(
+			frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
 
 		if (bpage->id.space() == TRX_SYS_SPACE
 		    && buf_dblwr_page_inside(bpage->id.page_no())) {
@@ -6005,27 +5994,28 @@ buf_page_io_complete(
 				<< ", should be " << bpage->id;
 		}
 
-		corrupted = buf_page_check_corrupt(bpage);
+		err = buf_page_check_corrupt(bpage);
 
 database_corrupted:
 
-		if (corrupted) {
+		if (err != DB_SUCCESS) {
 			/* Not a real corruption if it was triggered by
 			error injection */
 			DBUG_EXECUTE_IF(
 				"buf_page_import_corrupt_failure",
 				if (bpage->id.space()
 				    > srv_undo_tablespaces_open
-				    && bpage->id.space() != SRV_TMP_SPACE_ID
-				    && buf_mark_space_corrupt(bpage)) {
+					&& bpage->id.space() != SRV_TMP_SPACE_ID) {
+					buf_mark_space_corrupt(bpage);
 					ib::info() << "Simulated IMPORT "
 						"corruption";
-					return(true);
+					return(err);
 				}
+				err = DB_SUCCESS;
 				goto page_not_corrupt;
 			);
 
-			if (!bpage->encrypted) {
+			if (err == DB_PAGE_CORRUPTED) {
 				fil_system_enter();
 				fil_space_t* space = fil_space_get_by_id(bpage->id.space());
 				fil_system_exit();
@@ -6060,35 +6050,13 @@ buf_page_io_complete(
 				/* If page space id is larger than TRX_SYS_SPACE
 				(0), we will attempt to mark the corresponding
 				table as corrupted instead of crashing server */
-
-				if (bpage->id.space() > TRX_SYS_SPACE
-				    && buf_mark_space_corrupt(bpage)) {
-					return(false);
+				if (bpage->id.space() > srv_undo_tablespaces_open
+					&& bpage->id.space() != SRV_TMP_SPACE_ID) {
+					buf_mark_space_corrupt(bpage);
+					return(err);
 				} else {
-					if (!bpage->encrypted) {
-						ib::fatal()
-							<< "Aborting because of a"
-							" corrupt database page in"
-							" the system tablespace. Or, "
-							" there was a failure in"
-							" tagging the tablespace "
-							" as corrupt.";
-					}
-
-					ib_push_warning(innobase_get_trx(), DB_DECRYPTION_FAILED,
-						"Table in tablespace %lu encrypted."
-						"However key management plugin or used key_id %lu is not found or"
-						" used encryption algorithm or method does not match."
-						" Can't continue opening the table.",
-						bpage->id.space(), bpage->key_version);
-
-					if (bpage->encrypted && bpage->id.space() > TRX_SYS_SPACE) {
-						buf_mark_space_corrupt(bpage);
-					} else {
-						ut_error;
-					}
-
-					return(false);
+					ib::fatal()
+						<< "Ending processing because of a corrupt database page.";
 				}
 			}
 		}
@@ -6119,7 +6087,7 @@ buf_page_io_complete(
 					<< bpage->id.space()
 					<< " encrypted. However key "
 					"management plugin or used "
-					<< "key_version " << bpage->key_version
+					<< "key_version " << key_version
 					<< "is not found or"
 					" used encryption algorithm or method does not match."
 					" Can't continue opening the table.";
@@ -6221,7 +6189,7 @@ buf_page_io_complete(
 
 	buf_pool_mutex_exit(buf_pool);
 
-	return(true);
+	return(err);
 }
 
 /*********************************************************************//**
@@ -6246,7 +6214,7 @@ buf_all_freed_instance(
 
 		const buf_block_t* block = buf_chunk_not_freed(chunk);
 
-		if (UNIV_LIKELY_NULL(block) && block->page.key_version == 0) {
+		if (UNIV_LIKELY_NULL(block)) {
 			ib::fatal() << "Page " << block->page.id
 				<< " still fixed or dirty";
 		}
@@ -7392,13 +7360,11 @@ buf_page_encrypt_before_write(
 
 	if (bpage->id.page_no() == 0) {
 		/* Page 0 of a tablespace is not encrypted/compressed */
-		ut_ad(bpage->key_version == 0);
 		return src_frame;
 	}
 
 	if (space_id == TRX_SYS_SPACE && bpage->id.page_no() == TRX_SYS_PAGE_NO) {
 		/* don't encrypt/compress page as it contains address to dblwr buffer */
-		bpage->key_version = 0;
 		return src_frame;
 	}
 
@@ -7429,7 +7395,6 @@ buf_page_encrypt_before_write(
 	/* Is encryption needed? */
 	if (crypt_data == NULL || crypt_data->type == CRYPT_SCHEME_UNENCRYPTED) {
 		/* An unencrypted table */
-		bpage->key_version = 0;
 		encrypted = false;
 	}
 
@@ -7457,10 +7422,6 @@ buf_page_encrypt_before_write(
 					      src_frame,
 					      dst_frame);
 
-		uint32_t key_version = mach_read_from_4(
-			dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
-		ut_ad(key_version == 0 || key_version >= bpage->key_version);
-		bpage->key_version = key_version;
 		bpage->real_size = page_size.physical();
 		slot->out_buf = dst_frame = tmp;
 
@@ -7528,8 +7489,6 @@ buf_page_decrypt_after_read(buf_page_t* bpage)
 	buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
 	bool success = true;
 
-	bpage->key_version = key_version;
-
 	if (bpage->id.page_no() == 0) {
 		/* File header pages are not encrypted/compressed */
 		return (true);
diff --git a/storage/innobase/buf/buf0rea.cc b/storage/innobase/buf/buf0rea.cc
index 4d68ad5ac51..886958180b7 100644
--- a/storage/innobase/buf/buf0rea.cc
+++ b/storage/innobase/buf/buf0rea.cc
@@ -1,7 +1,7 @@
 /*****************************************************************************
 
 Copyright (c) 1995, 2015, Oracle and/or its affiliates. All Rights Reserved.
-Copyright (c) 2015, 2016 MariaDB Corporation.
+Copyright (c) 2015, 2017, MariaDB Corporation.
 
 This program is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free Software
@@ -95,15 +95,18 @@ buffer buf_pool if it is not already there, in which case does nothing.
 Sets the io_fix flag and sets an exclusive lock on the buffer frame. The
 flag is cleared and the x-lock released by an i/o-handler thread.
 
- at param[out] err		DB_SUCCESS, DB_TABLESPACE_DELETED or
-			DB_TABLESPACE_TRUNCATED if we are trying
-			to read from a non-existent tablespace, a
+ at param[out]	err	DB_SUCCESS, DB_TABLESPACE_DELETED if we are
+			trying to read from a non-existent tablespace, or a
 			tablespace which is just now being dropped,
-			or a tablespace which is truncated
+			DB_PAGE_CORRUPTED if page based on checksum
+			check is corrupted, or DB_DECRYPTION_FAILED
+			if page post encryption checksum matches but
+			after decryption normal page checksum does not match.
 @param[in] sync		true if synchronous aio is desired
 @param[in] type		IO type, SIMULATED, IGNORE_MISSING
 @param[in] mode		BUF_READ_IBUF_PAGES_ONLY, ...,
- at param[in] page_id	page id
+ at param[in] page_id	Page id
+ at param[in] page_size	Page size
 @param[in] unzip	true=request uncompressed page
 @return 1 if a read request was queued, 0 if the page already resided
 in buf_pool, or if the page is in the doublewrite buffer blocks in
@@ -118,11 +121,9 @@ buf_read_page_low(
 	ulint			mode,
 	const page_id_t&	page_id,
 	const page_size_t&	page_size,
-	bool			unzip,
-	buf_page_t** 	rbpage) /*!< out: page */
+	bool			unzip)
 {
 	buf_page_t*	bpage;
-
 	*err = DB_SUCCESS;
 
 	if (page_id.space() == TRX_SYS_SPACE
@@ -213,19 +214,13 @@ buf_read_page_low(
 	if (sync) {
 		/* The i/o is already completed when we arrive from
 		fil_read */
+		*err = buf_page_io_complete(bpage);
 
-		if (!buf_page_io_complete(bpage)) {
-			if (rbpage) {
-				*rbpage = bpage;
-			}
+		if (*err != DB_SUCCESS) {
 			return(0);
 		}
 	}
 
-	if (rbpage) {
-		*rbpage = bpage;
-	}
-
 	return(1);
 }
 
@@ -256,7 +251,7 @@ buf_read_ahead_random(
 	ulint		ibuf_mode;
 	ulint		count;
 	ulint		low, high;
-	dberr_t		err;
+	dberr_t		err = DB_SUCCESS;
 	ulint		i;
 	const ulint	buf_read_ahead_random_area
 				= BUF_READ_AHEAD_AREA(buf_pool);
@@ -351,19 +346,28 @@ buf_read_ahead_random(
 		const page_id_t	cur_page_id(page_id.space(), i);
 
 		if (!ibuf_bitmap_page(cur_page_id, page_size)) {
-			buf_page_t* rpage = NULL;
 			count += buf_read_page_low(
 				&err, false,
 				IORequest::DO_NOT_WAKE,
 				ibuf_mode,
-				cur_page_id, page_size, false, &rpage);
+				cur_page_id, page_size, false);
 
-			if (err == DB_TABLESPACE_DELETED) {
+			switch(err) {
+			case DB_SUCCESS:
+				break;
+			case DB_TABLESPACE_DELETED:
 				ib::warn() << "Random readahead trying to"
 					" access page " << cur_page_id
 					<< " in nonexisting or"
 					" being-dropped tablespace";
 				break;
+			case DB_DECRYPTION_FAILED:
+				ib::error()
+					<< "Random readahead failed to decrypt page "
+					<< cur_page_id;
+				break;
+			default:
+				ut_error;
 			}
 		}
 	}
@@ -394,17 +398,21 @@ buf_read_ahead_random(
 buffer buf_pool if it is not already there. Sets the io_fix flag and sets
 an exclusive lock on the buffer frame. The flag is cleared and the x-lock
 released by the i/o-handler thread.
+
 @param[in]	page_id		page id
 @param[in]	page_size	page size
- at return TRUE if page has been read in, FALSE in case of failure */
-ibool
+
+ at return DB_SUCCESS if page has been read and is not corrupted,
+DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
+DB_DECRYPTION_FAILED if page post encryption checksum matches but
+after decryption normal page checksum does not match.*/
+dberr_t
 buf_read_page(
 	const page_id_t&	page_id,
-	const page_size_t&	page_size,
-	buf_page_t** bpage)	/*!< out: page */
+	const page_size_t&	page_size)
 {
 	ulint		count;
-	dberr_t		err;
+	dberr_t		err = DB_SUCCESS;
 
 	/* We do synchronous IO because our AIO completion code
 	is sub-optimal. See buf_page_io_complete(), we have to
@@ -414,19 +422,17 @@ buf_read_page(
 
 	count = buf_read_page_low(
 		&err, true,
-		0, BUF_READ_ANY_PAGE, page_id, page_size, false, bpage);
+		0, BUF_READ_ANY_PAGE, page_id, page_size, false);
 
-	srv_stats.buf_pool_reads.add(count);
+	/* Page corruption and decryption failures are already reported
+	in above function. */
 
-	if (err == DB_TABLESPACE_DELETED) {
-		ib::error() << "trying to read page " << page_id
-			<< " in nonexisting or being-dropped tablespace";
-	}
+	srv_stats.buf_pool_reads.add(count);
 
 	/* Increment number of I/O operations used for LRU policy. */
 	buf_LRU_stat_inc_io();
 
-	return(count > 0);
+	return(err);
 }
 
 /** High-level function which reads a page asynchronously from a file to the
@@ -435,23 +441,38 @@ an exclusive lock on the buffer frame. The flag is cleared and the x-lock
 released by the i/o-handler thread.
 @param[in]	page_id		page id
 @param[in]	page_size	page size
- at param[in]	sync		true if synchronous aio is desired
- at return TRUE if page has been read in, FALSE in case of failure */
-ibool
+ at param[in]	sync		true if synchronous aio is desired */
+void
 buf_read_page_background(
 	const page_id_t&	page_id,
 	const page_size_t&	page_size,
 	bool			sync)
 {
 	ulint		count;
-	dberr_t		err;
-	buf_page_t*	rbpage = NULL;
+	dberr_t		err = DB_SUCCESS;
 
 	count = buf_read_page_low(
 		&err, sync,
 		IORequest::DO_NOT_WAKE | IORequest::IGNORE_MISSING,
 		BUF_READ_ANY_PAGE,
-		page_id, page_size, false, &rbpage);
+		page_id, page_size, false);
+
+	switch(err) {
+	case DB_SUCCESS:
+		break;
+	case DB_TABLESPACE_DELETED:
+		ib::error() << "Background page read trying to read page " << page_id
+			<< " in nonexisting or being-dropped tablespace";
+		break;
+
+	case DB_DECRYPTION_FAILED:
+		ib::error()
+			<< "Background Page read failed to decrypt page "
+			<< page_id;
+		break;
+	default:
+		ut_error;
+	}
 
 	srv_stats.buf_pool_reads.add(count);
 
@@ -461,8 +482,6 @@ buf_read_page_background(
 	buffer pool. Since this function is called from buffer pool load
 	these IOs are deliberate and are not part of normal workload we can
 	ignore these in our heuristics. */
-
-	return(count > 0);
 }
 
 /** Applies linear read-ahead if in the buf_pool the page is a border page of
@@ -507,7 +526,7 @@ buf_read_ahead_linear(
 	ulint		new_offset;
 	ulint		fail_count;
 	ulint		low, high;
-	dberr_t		err;
+	dberr_t		err = DB_SUCCESS;
 	ulint		i;
 	const ulint	buf_read_ahead_linear_area
 		= BUF_READ_AHEAD_AREA(buf_pool);
@@ -712,19 +731,30 @@ buf_read_ahead_linear(
 		const page_id_t	cur_page_id(page_id.space(), i);
 
 		if (!ibuf_bitmap_page(cur_page_id, page_size)) {
-			buf_page_t* rpage = NULL;
 
 			count += buf_read_page_low(
 				&err, false,
 				IORequest::DO_NOT_WAKE,
-				ibuf_mode, cur_page_id, page_size, false, &rpage);
+				ibuf_mode, cur_page_id, page_size, false);
 
-			if (err == DB_TABLESPACE_DELETED) {
+			switch(err) {
+			case DB_SUCCESS:
+				break;
+			case DB_TABLESPACE_DELETED:
 				ib::warn() << "linear readahead trying to"
 					" access page "
 					<< page_id_t(page_id.space(), i)
 					<< " in nonexisting or being-dropped"
 					" tablespace";
+				break;
+
+			case DB_DECRYPTION_FAILED:
+				ib::error()
+					<< "Linear readahead failed to decrypt page "
+					<< page_id_t(page_id.space(), i);
+				break;
+			default:
+				ut_error;
 			}
 		}
 	}
@@ -779,7 +809,7 @@ buf_read_ibuf_merge_pages(
 		const page_id_t	page_id(space_ids[i], page_nos[i]);
 
 		buf_pool_t*	buf_pool = buf_pool_get(page_id);
-		buf_page_t*	rpage = NULL;
+		dberr_t 	err = DB_SUCCESS;
 
 		bool			found;
 		const page_size_t	page_size(fil_space_get_page_size(
@@ -798,19 +828,28 @@ buf_read_ibuf_merge_pages(
 			os_thread_sleep(500000);
 		}
 
-		dberr_t	err;
-
 		buf_read_page_low(&err,
 				  sync && (i + 1 == n_stored),
 				  0,
 				  BUF_READ_ANY_PAGE, page_id, page_size,
-				  true, &rpage);
+				  true);
 
-		if (err == DB_TABLESPACE_DELETED) {
+		switch(err) {
+		case DB_SUCCESS:
+			break;
+		case DB_TABLESPACE_DELETED:
 			/* We have deleted or are deleting the single-table
 			tablespace: remove the entries for that page */
 			ibuf_merge_or_delete_for_page(NULL, page_id,
 						      &page_size, FALSE);
+			break;
+		case DB_DECRYPTION_FAILED:
+			ib::error()
+				<< "Failed to decrypt insert buffer page "
+				<< page_id;
+			break;
+		default:
+			ut_error;
 		}
 	}
 
@@ -838,7 +877,7 @@ buf_read_recv_pages(
 	ulint		n_stored)
 {
 	ulint			count;
-	dberr_t			err;
+	dberr_t		err=DB_SUCCESS;
 	ulint			i;
 	fil_space_t*		space	= fil_space_get(space_id);
 
@@ -854,7 +893,6 @@ buf_read_recv_pages(
 	for (i = 0; i < n_stored; i++) {
 		buf_pool_t*		buf_pool;
 		const page_id_t	cur_page_id(space_id, page_nos[i]);
-		buf_page_t*		rpage = NULL;
 
 		count = 0;
 
@@ -881,13 +919,19 @@ buf_read_recv_pages(
 				&err, true,
 				0,
 				BUF_READ_ANY_PAGE,
-				cur_page_id, page_size, true, &rpage);
+				cur_page_id, page_size, true);
 		} else {
 			buf_read_page_low(
 				&err, false,
 				IORequest::DO_NOT_WAKE,
 				BUF_READ_ANY_PAGE,
-				cur_page_id, page_size, true, &rpage);
+				cur_page_id, page_size, true);
+		}
+
+		if (err == DB_DECRYPTION_FAILED) {
+			ib::error()
+				<< "Recovery failed to decrypt read page "
+				<< cur_page_id;
 		}
 	}
 
diff --git a/storage/innobase/dict/dict0crea.cc b/storage/innobase/dict/dict0crea.cc
index c1bd5c2d368..8b6cea6106f 100644
--- a/storage/innobase/dict/dict0crea.cc
+++ b/storage/innobase/dict/dict0crea.cc
@@ -890,7 +890,7 @@ dict_create_index_tree_step(
 
 	mtr_start(&mtr);
 
-	const bool	missing = index->table->ibd_file_missing
+	const bool	missing = index->table->file_unreadable
 		|| dict_table_is_discarded(index->table);
 
 	if (!missing) {
@@ -963,7 +963,7 @@ dict_create_index_tree_in_mem(
 
 	/* Currently this function is being used by temp-tables only.
 	Import/Discard of temp-table is blocked and so this assert. */
-	ut_ad(index->table->ibd_file_missing == 0
+	ut_ad(index->table->file_unreadable == 0
 	      && !dict_table_is_discarded(index->table));
 
 	page_no = btr_create(
diff --git a/storage/innobase/dict/dict0defrag_bg.cc b/storage/innobase/dict/dict0defrag_bg.cc
index 016b774217f..9f0c0397f0d 100644
--- a/storage/innobase/dict/dict0defrag_bg.cc
+++ b/storage/innobase/dict/dict0defrag_bg.cc
@@ -320,7 +320,7 @@ dict_stats_save_defrag_stats(
 {
 	dberr_t	ret;
 
-	if (index->table->ibd_file_missing) {
+	if (index->table->file_unreadable) {
 		ut_print_timestamp(stderr);
 		fprintf(stderr,
 			" InnoDB: Cannot save defragment stats because "
diff --git a/storage/innobase/dict/dict0dict.cc b/storage/innobase/dict/dict0dict.cc
index d7fcbdf3906..96f51e42b82 100644
--- a/storage/innobase/dict/dict0dict.cc
+++ b/storage/innobase/dict/dict0dict.cc
@@ -1180,7 +1180,7 @@ dict_table_open_on_name(
 
 		/* If table is encrypted return table */
 		if (ignore_err == DICT_ERR_IGNORE_NONE
-			&& table->is_encrypted) {
+			&& table->file_unreadable) {
 			/* Make life easy for drop table. */
 			dict_table_prevent_eviction(table);
 
@@ -6073,6 +6073,24 @@ dict_set_corrupted_by_space(
 }
 
 /**********************************************************************//**
+Flags a table with specified space_id encrypted in the data dictionary
+cache
+ at param[in]	space_id	Tablespace id */
+UNIV_INTERN
+void
+dict_set_encrypted_by_space(
+	ulint	space_id)
+{
+	dict_table_t*   table;
+
+	table = dict_find_single_table_by_space(space_id);
+
+	if (table) {
+		table->file_unreadable = true;
+	}
+}
+
+/**********************************************************************//**
 Flags an index corrupted both in the data dictionary cache
 and in the SYS_INDEXES */
 void
@@ -6581,7 +6599,8 @@ dict_table_schema_check(
 		}
 	}
 
-	if (table->ibd_file_missing) {
+	if (table->file_unreadable &&
+	    fil_space_get(table->space) == NULL) {
 		/* missing tablespace */
 
 		ut_snprintf(errstr, errstr_sz,
diff --git a/storage/innobase/dict/dict0load.cc b/storage/innobase/dict/dict0load.cc
index da72126793f..8b1271f6512 100644
--- a/storage/innobase/dict/dict0load.cc
+++ b/storage/innobase/dict/dict0load.cc
@@ -2430,7 +2430,7 @@ dict_load_indexes(
 			dict_mem_index_free(index);
 			goto func_exit;
 		} else if (index->page == FIL_NULL
-			   && !table->ibd_file_missing
+			   && !table->file_unreadable
 			   && (!(index->type & DICT_FTS))) {
 
 			ib::error() << "Trying to load index " << index->name
@@ -2551,7 +2551,7 @@ dict_load_table_low(table_name_t& name, const rec_t* rec, dict_table_t** table)
 	*table = dict_mem_table_create(
 		name.m_name, space_id, n_cols + n_v_col, n_v_col, flags, flags2);
 	(*table)->id = table_id;
-	(*table)->ibd_file_missing = FALSE;
+	(*table)->file_unreadable = false;
 
 	return(NULL);
 }
@@ -2709,7 +2709,7 @@ dict_load_tablespace(
 	if (table->flags2 & DICT_TF2_DISCARDED) {
 		ib::warn() << "Tablespace for table " << table->name
 			<< " is set as discarded.";
-		table->ibd_file_missing = TRUE;
+		table->file_unreadable = true;
 		return;
 	}
 
@@ -2754,7 +2754,7 @@ dict_load_tablespace(
 
 	if (err != DB_SUCCESS) {
 		/* We failed to find a sensible tablespace file */
-		table->ibd_file_missing = TRUE;
+		table->file_unreadable = true;
 	}
 
 	ut_free(filepath);
@@ -2886,9 +2886,10 @@ dict_load_table_one(
 	were not allowed while the table is being locked by a transaction. */
 	dict_err_ignore_t index_load_err =
 		!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
-		&& table->ibd_file_missing
+		&& table->file_unreadable
 		? DICT_ERR_IGNORE_ALL
 		: ignore_err;
+
 	err = dict_load_indexes(table, heap, index_load_err);
 
 	if (err == DB_INDEX_CORRUPT) {
@@ -2947,7 +2948,7 @@ dict_load_table_one(
 	of the error condition, since the user may want to dump data from the
 	clustered index. However we load the foreign key information only if
 	all indexes were loaded. */
-	if (!cached || table->ibd_file_missing) {
+	if (!cached || table->file_unreadable) {
 		/* Don't attempt to load the indexes from disk. */
 	} else if (err == DB_SUCCESS) {
 		err = dict_load_foreigns(table->name.m_name, NULL,
@@ -2981,7 +2982,7 @@ dict_load_table_one(
 			table = NULL;
 
 		} else if (dict_index_is_corrupted(index)
-			   && !table->ibd_file_missing) {
+			   && !table->file_unreadable) {
 
 			/* It is possible we force to load a corrupted
 			clustered index if srv_load_corrupted is set.
@@ -2995,7 +2996,7 @@ dict_load_table_one(
 
 	ut_ad(!table
 	      || ignore_err != DICT_ERR_IGNORE_NONE
-	      || table->ibd_file_missing
+	      || table->file_unreadable
 	      || !table->corrupted);
 
 	if (table && table->fts) {
diff --git a/storage/innobase/dict/dict0stats.cc b/storage/innobase/dict/dict0stats.cc
index 9eb033a7eb9..65fd2179c31 100644
--- a/storage/innobase/dict/dict0stats.cc
+++ b/storage/innobase/dict/dict0stats.cc
@@ -912,10 +912,12 @@ dict_stats_update_transient_for_index(
 
 		index->stat_n_leaf_pages = size;
 
-		/* We don't handle the return value since it will be false
-		only when some thread is dropping the table and we don't
-		have to empty the statistics of the to be dropped index */
-		btr_estimate_number_of_different_key_vals(index);
+		/* Do not continue if table decryption has failed or
+		table is already marked corrupted. */
+		if (!index->table->file_unreadable &&
+		    !index->table->corrupted) {
+			btr_estimate_number_of_different_key_vals(index);
+		}
 	}
 }
 
@@ -966,8 +968,10 @@ dict_stats_update_transient(
 			continue;
 		}
 
-		/* Do not continue if table decryption has failed. */
-		if (index->table->is_encrypted) {
+		/* Do not continue if table decryption has failed or
+		table is already marked corrupted. */
+		if (!index->table->file_unreadable &&
+		    !index->table->corrupted) {
 			break;
 		}
 
@@ -3152,7 +3156,7 @@ dict_stats_update(
 {
 	ut_ad(!mutex_own(&dict_sys->mutex));
 
-	if (table->ibd_file_missing) {
+	if (table->file_unreadable) {
 
 		ib::warn() << "Cannot calculate statistics for table "
 			<< table->name
diff --git a/storage/innobase/fil/fil0crypt.cc b/storage/innobase/fil/fil0crypt.cc
index a61d7439e2c..b447a768093 100644
--- a/storage/innobase/fil/fil0crypt.cc
+++ b/storage/innobase/fil/fil0crypt.cc
@@ -442,13 +442,15 @@ Parse a MLOG_FILE_WRITE_CRYPT_DATA log entry
 @param[in]	ptr		Log entry start
 @param[in]	end_ptr		Log entry end
 @param[in]	block		buffer block
+ at param[out]	err		DB_SUCCESS or DB_DECRYPTION_FAILED
 @return position on log buffer */
 UNIV_INTERN
 const byte*
 fil_parse_write_crypt_data(
 	const byte*		ptr,
 	const byte*		end_ptr,
-	const buf_block_t*	block)
+	const buf_block_t*	block,
+	dberr_t*		err)
 {
 	/* check that redo log entry is complete */
 	uint entry_size =
@@ -460,6 +462,8 @@ fil_parse_write_crypt_data(
 		4 +  // size of key_id
 		1; // fil_encryption_t
 
+	*err = DB_SUCCESS;
+
 	if (ptr + entry_size > end_ptr) {
 		return NULL;
 	}
@@ -506,6 +510,11 @@ fil_parse_write_crypt_data(
 		fil_space_release(space);
 	}
 
+	/* Check is used key found from encryption plugin */
+	if (crypt_data->should_encrypt() && !crypt_data->is_key_found()) {
+		*err = DB_DECRYPTION_FAILED;
+	}
+
 	return ptr;
 }
 
@@ -1783,11 +1792,11 @@ fil_crypt_rotate_page(
 		bool modified = false;
 		int needs_scrubbing = BTR_SCRUB_SKIP_PAGE;
 		lsn_t block_lsn = block->page.newest_modification;
-		uint kv =  block->page.key_version;
+		byte* frame = buf_block_get_frame(block);
+		uint kv =  mach_read_from_4(frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
 
 		/* check if tablespace is closing after reading page */
 		if (space->is_stopping()) {
-			byte* frame = buf_block_get_frame(block);
 
 			if (kv == 0 &&
 				fil_crypt_is_page_uninitialized(frame, page_size)) {
@@ -1809,9 +1818,6 @@ fil_crypt_rotate_page(
 					FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
 					space_id, MLOG_4BYTES, &mtr);
 
-				/* update block */
-				block->page.key_version = key_state->key_version;
-
 				/* statistics */
 				state->crypt_stat.pages_modified++;
 			} else {
diff --git a/storage/innobase/fil/fil0fil.cc b/storage/innobase/fil/fil0fil.cc
index 016d880297c..484c2d6f395 100644
--- a/storage/innobase/fil/fil0fil.cc
+++ b/storage/innobase/fil/fil0fil.cc
@@ -5450,7 +5450,25 @@ fil_aio_wait(
 		/* async single page writes from the dblwr buffer don't have
 		access to the page */
 		if (message != NULL) {
-			buf_page_io_complete(static_cast<buf_page_t*>(message));
+			buf_page_t *bpage = static_cast<buf_page_t*>(message);
+			const page_id_t page_id = bpage->id;
+			dberr_t err = buf_page_io_complete(bpage);
+
+			if (err != DB_SUCCESS) {
+
+				/* In crash recovery set log corruption on
+				and produce only an error to fail InnoDB startup. */
+				if (recv_recovery_is_on()) {
+					recv_sys->found_corrupt_log = true;
+				}
+
+				ib::error()
+					<< (type == IORequestRead ? "Read" : "Write")
+					<< "operation failed for " << node->name
+					<< " page " << page_id
+					<< " error= " << ut_strerr(err);
+			}
+
 		}
 		return;
 	case FIL_TYPE_LOG:
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index ab92b976dff..a686748c7eb 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -6626,7 +6626,7 @@ ha_innobase::open(
 	ib_table->thd = (void*)thd;
 
 	/* No point to init any statistics if tablespace is still encrypted. */
-	if (!ib_table->is_encrypted) {
+	if (!ib_table->file_unreadable) {
 		dict_stats_init(ib_table);
 	} else {
 		ib_table->stat_initialized = 1;
@@ -6634,7 +6634,8 @@ ha_innobase::open(
 
 	MONITOR_INC(MONITOR_TABLE_OPEN);
 
-	bool	no_tablespace;
+	bool	no_tablespace = false;
+	bool	ibd_missing = false;
 
 	if (dict_table_is_discarded(ib_table)) {
 
@@ -6649,7 +6650,7 @@ ha_innobase::open(
 
 		no_tablespace = false;
 
-	} else if (ib_table->ibd_file_missing) {
+	} else if (ib_table->file_unreadable && fil_space_get(ib_table->space) == NULL) {
 
 		ib_senderrf(
 			thd, IB_LOG_LEVEL_WARN,
@@ -6659,11 +6660,11 @@ ha_innobase::open(
 		file, best to play it safe. */
 
 		no_tablespace = true;
-	} else if (ib_table->is_encrypted) {
+		ibd_missing = true;
+	} else if (ib_table->file_unreadable) {
 		/* This means that tablespace was found but we could not
 		decrypt encrypted page. */
 		no_tablespace = true;
-		ib_table->ibd_file_missing = true;
 	} else {
 		no_tablespace = false;
 	}
@@ -6676,7 +6677,7 @@ ha_innobase::open(
 		/* If table has no talespace but it has crypt data, check
 		is tablespace made unaccessible because encryption service
 		or used key_id is not available. */
-		if (ib_table) {
+		if (ib_table && !ibd_missing) {
 			bool warning_pushed = false;
 			fil_space_crypt_t* crypt_data = ib_table->crypt_data;
 
@@ -6697,7 +6698,7 @@ ha_innobase::open(
 			/* If table is marked as encrypted then we push
 			warning if it has not been already done as used
 			key_id might be found but it is incorrect. */
-			if (ib_table->is_encrypted && !warning_pushed) {
+			if (ib_table->file_unreadable && !warning_pushed) {
 				push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
 					HA_ERR_DECRYPTION_FAILED,
 					"Table %s is encrypted but encryption service or"
@@ -6862,7 +6863,7 @@ ha_innobase::open(
 	if (m_prebuilt->table == NULL
 	    || dict_table_is_temporary(m_prebuilt->table)
 	    || m_prebuilt->table->persistent_autoinc
-	    || m_prebuilt->table->ibd_file_missing) {
+	    || m_prebuilt->table->file_unreadable) {
 	} else if (const Field* ai = table->found_next_number_field) {
 		initialize_auto_increment(m_prebuilt->table, ai);
 	}
@@ -10312,6 +10313,14 @@ ha_innobase::general_fetch(
 			DB_FORCED_ABORT, 0,  m_user_thd));
 	}
 
+	if (m_prebuilt->table->corrupted) {
+		DBUG_RETURN(HA_ERR_CRASHED);
+	}
+
+	if (m_prebuilt->table->file_unreadable) {
+		DBUG_RETURN(HA_ERR_DECRYPTION_FAILED);
+	}
+
 	innobase_srv_conc_enter_innodb(m_prebuilt);
 
 	ret = row_search_mvcc(
@@ -13653,7 +13662,7 @@ ha_innobase::discard_or_import_tablespace(
 		user may want to set the DISCARD flag in order to IMPORT
 		a new tablespace. */
 
-		if (dict_table->ibd_file_missing) {
+		if (dict_table->file_unreadable) {
 			ib_senderrf(
 				m_prebuilt->trx->mysql_thd,
 				IB_LOG_LEVEL_WARN, ER_TABLESPACE_MISSING,
@@ -13663,7 +13672,7 @@ ha_innobase::discard_or_import_tablespace(
 		err = row_discard_tablespace_for_mysql(
 			dict_table->name.m_name, m_prebuilt->trx);
 
-	} else if (!dict_table->ibd_file_missing) {
+	} else if (!dict_table->file_unreadable) {
 		/* Commit the transaction in order to
 		release the table lock. */
 		trx_commit_for_mysql(m_prebuilt->trx);
@@ -14270,7 +14279,8 @@ ha_innobase::records(
 		*num_rows = HA_POS_ERROR;
 		DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
 
-	} else if (m_prebuilt->table->ibd_file_missing) {
+	} else if (m_prebuilt->table->file_unreadable &&
+		   fil_space_get(m_prebuilt->table->space) == NULL) {
 		ib_senderrf(
 			m_user_thd, IB_LOG_LEVEL_ERROR,
 			ER_TABLESPACE_MISSING,
@@ -15423,7 +15433,8 @@ ha_innobase::check(
 
 		DBUG_RETURN(HA_ADMIN_CORRUPT);
 
-	} else if (m_prebuilt->table->ibd_file_missing) {
+	} else if (m_prebuilt->table->file_unreadable &&
+		   fil_space_get(m_prebuilt->table->space) == NULL) {
 
 		ib_senderrf(
 			thd, IB_LOG_LEVEL_ERROR,
diff --git a/storage/innobase/handler/handler0alter.cc b/storage/innobase/handler/handler0alter.cc
index fd61db9725d..a7515a3e9b0 100644
--- a/storage/innobase/handler/handler0alter.cc
+++ b/storage/innobase/handler/handler0alter.cc
@@ -4872,7 +4872,7 @@ prepare_inplace_alter_table_dict(
 		clustered index of the old table, later. */
 		if (new_clustered
 		    || !ctx->online
-		    || user_table->ibd_file_missing
+		    || user_table->file_unreadable
 		    || dict_table_is_discarded(user_table)) {
 			/* No need to allocate a modification log. */
 			ut_ad(!ctx->add_index[a]->online_log);
@@ -5554,7 +5554,8 @@ ha_innobase::prepare_inplace_alter_table(
 
 	indexed_table = m_prebuilt->table;
 
-	if (indexed_table->is_encrypted) {
+	if (indexed_table->file_unreadable &&
+	    fil_space_get(indexed_table->space) != NULL) {
 		String str;
 		const char* engine= table_type();
 		push_warning_printf(m_user_thd, Sql_condition::WARN_LEVEL_WARN,
@@ -6370,7 +6371,7 @@ ha_innobase::inplace_alter_table(
 
 	ctx->m_stage = UT_NEW_NOKEY(ut_stage_alter_t(pk));
 
-	if (m_prebuilt->table->ibd_file_missing
+	if (m_prebuilt->table->file_unreadable
 	    || dict_table_is_discarded(m_prebuilt->table)) {
 		goto all_done;
 	}
@@ -7778,7 +7779,7 @@ commit_try_rebuild(
 	/* The new table must inherit the flag from the
 	"parent" table. */
 	if (dict_table_is_discarded(user_table)) {
-		rebuilt_table->ibd_file_missing = true;
+		rebuilt_table->file_unreadable = true;
 		rebuilt_table->flags2 |= DICT_TF2_DISCARDED;
 	}
 
@@ -8290,15 +8291,15 @@ alter_stats_rebuild(
 
 	DBUG_EXECUTE_IF(
 		"ib_rename_index_fail2",
-		ibd_file_missing_orig = table->ibd_file_missing;
-		table->ibd_file_missing = TRUE;
+		ibd_file_missing_orig = table->file_unreadable;
+		table->file_unreadable = true;
 	);
 
 	dberr_t	ret = dict_stats_update(table, DICT_STATS_RECALC_PERSISTENT);
 
 	DBUG_EXECUTE_IF(
 		"ib_rename_index_fail2",
-		table->ibd_file_missing = ibd_file_missing_orig;
+		table->file_unreadable = ibd_file_missing_orig;
 	);
 
 	if (ret != DB_SUCCESS) {
@@ -8429,6 +8430,19 @@ ha_innobase::commit_inplace_alter_table(
 			= static_cast<ha_innobase_inplace_ctx*>(*pctx);
 		DBUG_ASSERT(ctx->prebuilt->trx == m_prebuilt->trx);
 
+		/* If decryption failed for old table or new table
+		fail here. */
+		if ((ctx->old_table->file_unreadable &&
+		     fil_space_get(ctx->old_table->space) != NULL)||
+		    (ctx->new_table->file_unreadable &&
+		     fil_space_get(ctx->new_table->space) != NULL)) {
+			String str;
+			const char* engine= table_type();
+			get_error_message(HA_ERR_DECRYPTION_FAILED, &str);
+			my_error(ER_GET_ERRMSG, MYF(0), HA_ERR_DECRYPTION_FAILED, str.c_ptr(), engine);
+			DBUG_RETURN(true);
+		}
+
 		/* Exclusively lock the table, to ensure that no other
 		transaction is holding locks on the table while we
 		change the table definition. The MySQL meta-data lock
diff --git a/storage/innobase/include/btr0btr.ic b/storage/innobase/include/btr0btr.ic
index 308fcfe9b03..bd4f2a40267 100644
--- a/storage/innobase/include/btr0btr.ic
+++ b/storage/innobase/include/btr0btr.ic
@@ -63,7 +63,7 @@ btr_block_get_func(
 
 	if (err == DB_DECRYPTION_FAILED) {
 		if (index && index->table) {
-			index->table->is_encrypted = true;
+			index->table->file_unreadable = true;
 		}
 	}
 
diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h
index 3aab242606d..656b3878d41 100644
--- a/storage/innobase/include/buf0buf.h
+++ b/storage/innobase/include/buf0buf.h
@@ -835,17 +835,6 @@ buf_page_is_checksum_valid_none(
 	)
 	MY_ATTRIBUTE((nonnull(1), warn_unused_result));
 
-/********************************************************************//**
-Check if page is maybe compressed, encrypted or both when we encounter
-corrupted page. Note that we can't be 100% sure if page is corrupted
-or decrypt/decompress just failed.
- at param[in]	bpage		Page
- at return true if page corrupted, false if not */
-bool
-buf_page_check_corrupt(
-	buf_page_t*	bpage)	/*!< in/out: buffer page read from disk */
-	MY_ATTRIBUTE((nonnull, warn_unused_result));
-
 /** Checks if a page contains only zeroes.
 @param[in]	read_buf	database page
 @param[in]	page_size	page size
@@ -1326,13 +1315,17 @@ buf_page_init_for_read(
 /********************************************************************//**
 Completes an asynchronous read or write request of a file page to or from
 the buffer pool.
- at return true if successful */
-bool
+ at param[in,out]	bpage		pointer to the block in question
+ at param[in]	evict		true if page should be evicted from LRU
+ at return DB_SUCCESS if page has been read and is not corrupted,
+DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
+DB_DECRYPTION_FAILED if page post encryption checksum matches but
+after decryption normal page checksum does not match.*/
+dberr_t
 buf_page_io_complete(
-/*=================*/
-	buf_page_t*	bpage,	/*!< in: pointer to the block in question */
-	bool		evict = false);/*!< in: whether or not to evict
-				the page from LRU list. */
+	buf_page_t*	bpage,
+	bool		evict = false);
+
 /********************************************************************//**
 Calculates the index of a buffer pool to the buf_pool[] array.
 @return the position of the buffer pool in buf_pool[] */
@@ -1637,7 +1630,6 @@ class buf_page_t {
 					if written again we check is TRIM
 					operation needed. */
 
-	unsigned        key_version;	/*!< key version for this block */
 	bool            encrypted;	/*!< page is still encrypted */
 
 	ulint           real_size;	/*!< Real size of the page
diff --git a/storage/innobase/include/buf0rea.h b/storage/innobase/include/buf0rea.h
index 9c97a5147c1..8bc6a8ed9ef 100644
--- a/storage/innobase/include/buf0rea.h
+++ b/storage/innobase/include/buf0rea.h
@@ -1,7 +1,7 @@
 /*****************************************************************************
 
 Copyright (c) 1995, 2015, Oracle and/or its affiliates. All Rights Reserved.
-Copyright (c) 2015, MariaDB Corporation.
+Copyright (c) 2015, 2017, MariaDB Corporation.
 
 This program is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free Software
@@ -37,12 +37,14 @@ an exclusive lock on the buffer frame. The flag is cleared and the x-lock
 released by the i/o-handler thread.
 @param[in]	page_id		page id
 @param[in]	page_size	page size
- at return TRUE if page has been read in, FALSE in case of failure */
-ibool
+ at return DB_SUCCESS if page has been read and is not corrupted,
+DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
+DB_DECRYPTION_FAILED if page post encryption checksum matches but
+after decryption normal page checksum does not match.*/
+dberr_t
 buf_read_page(
 	const page_id_t&	page_id,
-	const page_size_t&	page_size,
-	buf_page_t**		bpage);
+	const page_size_t&	page_size);
 
 /********************************************************************//**
 High-level function which reads a page asynchronously from a file to the
@@ -51,9 +53,8 @@ an exclusive lock on the buffer frame. The flag is cleared and the x-lock
 released by the i/o-handler thread.
 @param[in]	page_id		page id
 @param[in]	page_size	page size
- at param[in]	sync		true if synchronous aio is desired
- at return TRUE if page has been read in, FALSE in case of failure */
-ibool
+ at param[in]	sync		true if synchronous aio is desired */
+void
 buf_read_page_background(
 	const page_id_t&	page_id,
 	const page_size_t&	page_size,
diff --git a/storage/innobase/include/db0err.h b/storage/innobase/include/db0err.h
index b0609991f61..984afa6e017 100644
--- a/storage/innobase/include/db0err.h
+++ b/storage/innobase/include/db0err.h
@@ -143,6 +143,8 @@ enum dberr_t {
 					of missing key management plugin,
 					or missing or incorrect key or
 					incorret AES method or algorithm. */
+	DB_PAGE_CORRUPTED,		/* Page read from tablespace is
+					corrupted. */
 
 	DB_IO_ERROR = 100,		/*!< Generic IO error */
 
diff --git a/storage/innobase/include/dict0dict.h b/storage/innobase/include/dict0dict.h
index 18578388723..a8d517025a8 100644
--- a/storage/innobase/include/dict0dict.h
+++ b/storage/innobase/include/dict0dict.h
@@ -1889,6 +1889,15 @@ dict_set_corrupted_by_space(
 /*========================*/
 	ulint		space_id);	/*!< in: space ID */
 
+/**********************************************************************//**
+Flags a table with specified space_id encrypted in the data dictionary
+cache
+ at param[in]	space_id	Tablespace id */
+UNIV_INTERN
+void
+dict_set_encrypted_by_space(
+	ulint	space_id);
+
 /** Sets merge_threshold in the SYS_INDEXES
 @param[in,out]	index		index
 @param[in]	merge_threshold	value to set */
diff --git a/storage/innobase/include/dict0mem.h b/storage/innobase/include/dict0mem.h
index 0630137bb4f..7f4b60ff063 100644
--- a/storage/innobase/include/dict0mem.h
+++ b/storage/innobase/include/dict0mem.h
@@ -1404,9 +1404,9 @@ struct dict_table_t {
 	unsigned				flags2:DICT_TF2_BITS;
 
 	/** TRUE if this is in a single-table tablespace and the .ibd file is
-	missing. Then we must return in ha_innodb.cc an error if the user
-	tries to query such an orphaned table. */
-	unsigned				ibd_file_missing:1;
+	missing or page in file is encrypted. Then we must return in
+	ha_innodb.cc an error if the usertries to query such an table. */
+	unsigned				file_unreadable:1;
 
 	/** TRUE if the table object has been added to the dictionary cache. */
 	unsigned				cached:1;
@@ -1727,7 +1727,9 @@ struct dict_table_t {
 	/** Timestamp of the last modification of this table. */
 	time_t					update_time;
 
-	bool					is_encrypted;
+	/** mysql_row_templ_t for base columns used for compute the virtual
+	columns */
+	dict_vcol_templ_t*			vc_templ;
 
 #ifdef UNIV_DEBUG
 	/** Value of 'magic_n'. */
@@ -1736,9 +1738,6 @@ struct dict_table_t {
 	/** Magic number. */
 	ulint					magic_n;
 #endif /* UNIV_DEBUG */
-	/** mysql_row_templ_t for base columns used for compute the virtual
-	columns */
-	dict_vcol_templ_t*			vc_templ;
 };
 
 /*******************************************************************//**
diff --git a/storage/innobase/include/fil0crypt.h b/storage/innobase/include/fil0crypt.h
index 831d61445d8..8b0a55b1aeb 100644
--- a/storage/innobase/include/fil0crypt.h
+++ b/storage/innobase/include/fil0crypt.h
@@ -293,14 +293,15 @@ Parse a MLOG_FILE_WRITE_CRYPT_DATA log entry
 @param[in]	ptr		Log entry start
 @param[in]	end_ptr		Log entry end
 @param[in]	block		buffer block
+ at param[out]	err		DB_SUCCESS or DB_DECRYPTION_FAILED
 @return position on log buffer */
 UNIV_INTERN
 const byte*
 fil_parse_write_crypt_data(
 	const byte*		ptr,
 	const byte*		end_ptr,
-	const buf_block_t*	block)
-	MY_ATTRIBUTE((warn_unused_result));
+	const buf_block_t*	block,
+	dberr_t*		err);
 
 /** Encrypt a buffer.
 @param[in,out]		crypt_data	Crypt data
diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h
index 74ea6c95036..f56a40698e9 100644
--- a/storage/innobase/include/log0recv.h
+++ b/storage/innobase/include/log0recv.h
@@ -108,8 +108,9 @@ recv_sys_var_init(void);
 
 /** Apply the hash table of stored log records to persistent data pages.
 @param[in]	last_batch	whether the change buffer merge will be
-				performed as part of the operation */
-void
+				performed as part of the operation
+ at return DB_SUCCESS or DB_DECRYPTION_FAILED */
+dberr_t
 recv_apply_hashed_log_recs(bool last_batch);
 
 /** Block of log record data */
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index adf2d3aca0a..a8534a5475b 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -1495,7 +1495,12 @@ recv_parse_or_apply_log_rec_body(
 		}
 		break;
 	case MLOG_FILE_WRITE_CRYPT_DATA:
-		ptr = const_cast<byte*>(fil_parse_write_crypt_data(ptr, end_ptr, block));
+		dberr_t err;
+		ptr = const_cast<byte*>(fil_parse_write_crypt_data(ptr, end_ptr, block, &err));
+
+		if (err != DB_SUCCESS) {
+			recv_sys->found_corrupt_log = TRUE;
+		}
 		break;
 	default:
 		ptr = NULL;
@@ -1980,8 +1985,9 @@ recv_read_in_area(
 
 /** Apply the hash table of stored log records to persistent data pages.
 @param[in]	last_batch	whether the change buffer merge will be
-				performed as part of the operation */
-void
+				performed as part of the operation
+ at return DB_SUCCESS or DB_DECRYPTION_FAILED */
+dberr_t
 recv_apply_hashed_log_recs(bool last_batch)
 {
 	for (;;) {
@@ -1991,6 +1997,11 @@ recv_apply_hashed_log_recs(bool last_batch)
 			break;
 		}
 
+		if (recv_sys->found_corrupt_log) {
+			mutex_exit(&recv_sys->mutex);
+			return(DB_DECRYPTION_FAILED);
+		}
+
 		mutex_exit(&recv_sys->mutex);
 		os_thread_sleep(500000);
 	}
@@ -2074,6 +2085,10 @@ recv_apply_hashed_log_recs(bool last_batch)
 
 		mutex_exit(&(recv_sys->mutex));
 
+		if (recv_sys->found_corrupt_log) {
+			return(DB_DECRYPTION_FAILED);
+		}
+
 		os_thread_sleep(500000);
 
 		mutex_enter(&(recv_sys->mutex));
@@ -2117,6 +2132,8 @@ recv_apply_hashed_log_recs(bool last_batch)
 	recv_sys_empty_hash();
 
 	mutex_exit(&recv_sys->mutex);
+
+	return (DB_SUCCESS);
 }
 
 /** Tries to parse a single log record.
diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc
index f2ba1c64229..f6075999fe1 100644
--- a/storage/innobase/page/page0cur.cc
+++ b/storage/innobase/page/page0cur.cc
@@ -2062,7 +2062,7 @@ page_copy_rec_list_end_to_created_page(
 	mtr_log_t	log_mode;
 
 	if (dict_table_is_temporary(index->table)
-	    || index->table->ibd_file_missing /* IMPORT TABLESPACE */) {
+	    || index->table->file_unreadable /* IMPORT TABLESPACE */) {
 		log_mode = mtr_get_log_mode(mtr);
 	} else {
 		log_mode = mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
diff --git a/storage/innobase/row/row0import.cc b/storage/innobase/row/row0import.cc
index b70a1952f97..e340ab25253 100644
--- a/storage/innobase/row/row0import.cc
+++ b/storage/innobase/row/row0import.cc
@@ -2100,7 +2100,7 @@ row_import_discard_changes(
 		index->space = FIL_NULL;
 	}
 
-	table->ibd_file_missing = TRUE;
+	table->file_unreadable = true;
 
 	fil_close_tablespace(trx, table->space);
 }
@@ -3357,7 +3357,7 @@ row_import_for_mysql(
 
 	ut_a(table->space);
 	ut_ad(prebuilt->trx);
-	ut_a(table->ibd_file_missing);
+	ut_a(table->file_unreadable);
 
 	ibuf_delete_for_discarded_space(table->space);
 
@@ -3691,7 +3691,7 @@ row_import_for_mysql(
 		return(row_import_error(prebuilt, trx, err));
 	}
 
-	table->ibd_file_missing = false;
+	table->file_unreadable = false;
 	table->flags2 &= ~DICT_TF2_DISCARDED;
 
 	/* Set autoinc value read from .cfg file, if one was specified.
diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc
index 9626645ebf2..9cc5b67e475 100644
--- a/storage/innobase/row/row0ins.cc
+++ b/storage/innobase/row/row0ins.cc
@@ -1652,7 +1652,7 @@ row_ins_check_foreign_constraint(
 	}
 
 	if (check_table == NULL
-	    || check_table->ibd_file_missing
+	    || check_table->file_unreadable
 	    || check_index == NULL) {
 
 		if (!srv_read_only_mode && check_ref) {
@@ -2928,7 +2928,7 @@ row_ins_sec_index_entry_low(
 				" used key_id is not available. "
 				" Can't continue reading table.",
 				index->table->name);
-			index->table->is_encrypted = true;
+			index->table->file_unreadable = true;
 		}
 		goto func_exit;
 	}
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc
index 85f0ce2c9e6..1c18e2544f3 100644
--- a/storage/innobase/row/row0merge.cc
+++ b/storage/innobase/row/row0merge.cc
@@ -1995,7 +1995,8 @@ row_merge_read_clustered_index(
 		page_cur_t*	cur	= btr_pcur_get_page_cur(&pcur);
 
 		/* Do not continue if table pages are still encrypted */
-		if (old_table->is_encrypted || new_table->is_encrypted) {
+		if (old_table->file_unreadable ||
+		    new_table->file_unreadable) {
 			err = DB_DECRYPTION_FAILED;
 			trx->error_key_num = 0;
 			goto func_exit;
@@ -4295,7 +4296,7 @@ row_merge_rename_tables_dict(
 	renamed along with the table. */
 	if (err == DB_SUCCESS
 	    && dict_table_is_file_per_table(old_table)
-	    && !old_table->ibd_file_missing) {
+	    && fil_space_get(old_table->space) != NULL) {
 		/* Make pathname to update SYS_DATAFILES. */
 		char* tmp_path = row_make_new_pathname(old_table, tmp_name);
 
@@ -4772,13 +4773,14 @@ row_merge_build_indexes(
 	pct_cost = COST_READ_CLUSTERED_INDEX * 100 / (total_static_cost + total_dynamic_cost);
 
 	/* Do not continue if we can't encrypt table pages */
-	if (old_table->is_encrypted || new_table->is_encrypted) {
+	if (old_table->file_unreadable ||
+	    new_table->file_unreadable) {
 		error = DB_DECRYPTION_FAILED;
 		ib_push_warning(trx->mysql_thd, DB_DECRYPTION_FAILED,
 			"Table %s is encrypted but encryption service or"
 			" used key_id is not available. "
 			" Can't continue reading table.",
-			old_table->is_encrypted ? old_table->name : new_table->name);
+			old_table->file_unreadable ? old_table->name : new_table->name);
 		goto func_exit;
 	}
 
diff --git a/storage/innobase/row/row0mysql.cc b/storage/innobase/row/row0mysql.cc
index 8b7c64868b8..f6beeea63d6 100644
--- a/storage/innobase/row/row0mysql.cc
+++ b/storage/innobase/row/row0mysql.cc
@@ -1,7 +1,7 @@
 /*****************************************************************************
 
 Copyright (c) 2000, 2016, Oracle and/or its affiliates. All Rights Reserved.
-Copyright (c) 2017, MariaDB Corporation.
+Copyright (c) 2015, 2017, MariaDB Corporation.
 
 This program is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free Software
@@ -1432,13 +1432,14 @@ row_insert_for_mysql(
 
 		return(DB_TABLESPACE_DELETED);
 
-	} else if (prebuilt->table->ibd_file_missing) {
+	} else if (prebuilt->table->file_unreadable &&
+		   fil_space_get(prebuilt->table->space) == NULL) {
 
 		ib::error() << ".ibd file is missing for table "
 			<< prebuilt->table->name;
 
 		return(DB_TABLESPACE_NOT_FOUND);
-	} else if (prebuilt->table->is_encrypted) {
+	} else if (prebuilt->table->file_unreadable) {
 		ib_push_warning(trx, DB_DECRYPTION_FAILED,
 			"Table %s in tablespace %lu encrypted."
 			"However key management plugin or used key_id is not found or"
@@ -1856,7 +1857,8 @@ row_update_for_mysql_using_upd_graph(
 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
 	UT_NOT_USED(mysql_rec);
 
-	if (prebuilt->table->ibd_file_missing) {
+	if (prebuilt->table->file_unreadable &&
+	    fil_space_get(prebuilt->table->space) == NULL) {
 		ib::error() << "MySQL is trying to use a table handle but the"
 			" .ibd file for table " << prebuilt->table->name
 			<< " does not exist. Have you deleted"
@@ -1864,13 +1866,13 @@ row_update_for_mysql_using_upd_graph(
 			" the MySQL datadir, or have you used DISCARD"
 			" TABLESPACE? " << TROUBLESHOOTING_MSG;
 		DBUG_RETURN(DB_ERROR);
-	} else if (prebuilt->table->is_encrypted) {
+	} else if (prebuilt->table->file_unreadable) {
 		ib_push_warning(trx, DB_DECRYPTION_FAILED,
 			"Table %s in tablespace %lu encrypted."
 			"However key management plugin or used key_id is not found or"
 			" used encryption algorithm or method does not match.",
 			prebuilt->table->name, prebuilt->table->space);
-		return (DB_TABLE_NOT_FOUND);
+		DBUG_RETURN(DB_TABLE_NOT_FOUND);
 	}
 
 	if(srv_force_recovery) {
@@ -3243,7 +3245,7 @@ row_discard_tablespace(
 		/* All persistent operations successful, update the
 		data dictionary memory cache. */
 
-		table->ibd_file_missing = TRUE;
+		table->file_unreadable = true;
 
 		table->flags2 |= DICT_TF2_DISCARDED;
 
@@ -3299,7 +3301,8 @@ row_discard_tablespace_for_mysql(
 
 	if (table == 0) {
 		err = DB_TABLE_NOT_FOUND;
-	} else if (table->is_encrypted) {
+	} else if (table->file_unreadable &&
+		   fil_space_get(table->space) != NULL) {
 		err = DB_DECRYPTION_FAILED;
 	} else if (dict_table_is_temporary(table)) {
 
@@ -3639,9 +3642,11 @@ row_drop_table_for_mysql(
 		err = DB_TABLE_NOT_FOUND;
 		goto funct_exit;
 	}
+
 	/* If table is encrypted and table page encryption failed
 	return error. */
-	if (table->is_encrypted) {
+	if (table->file_unreadable &&
+	    fil_space_get(table->space) != NULL) {
 
 		if (table->can_be_evicted) {
 			dict_table_move_from_lru_to_non_lru(table);
@@ -4048,7 +4053,7 @@ row_drop_table_for_mysql(
 
 	case DB_SUCCESS:
 		space_id = table->space;
-		ibd_file_missing = table->ibd_file_missing;
+		ibd_file_missing = table->file_unreadable;
 		is_discarded = dict_table_is_discarded(table);
 		table_flags = table->flags;
 		ut_ad(!dict_table_is_temporary(table));
@@ -4300,7 +4305,7 @@ row_drop_database_for_mysql(
 					<< table->name << ".frm' was lost.";
 			}
 
-			if (table->ibd_file_missing) {
+			if (table->file_unreadable) {
 				ib::warn() << "Missing .ibd file for table "
 					<< table->name << ".";
 			}
@@ -4556,7 +4561,7 @@ row_rename_table_for_mysql(
 		err = DB_TABLE_NOT_FOUND;
 		goto funct_exit;
 
-	} else if (table->ibd_file_missing
+	} else if (table->file_unreadable
 		   && !dict_table_is_discarded(table)) {
 
 		err = DB_TABLE_NOT_FOUND;
@@ -4623,7 +4628,7 @@ row_rename_table_for_mysql(
 	the table is in a single-table tablespace. */
 	if (err == DB_SUCCESS
 	    && dict_table_is_file_per_table(table)
-	    && !table->ibd_file_missing) {
+	    && !table->file_unreadable) {
 		/* Make a new pathname to update SYS_DATAFILES. */
 		char*	new_path = row_make_new_pathname(table, new_name);
 
diff --git a/storage/innobase/row/row0purge.cc b/storage/innobase/row/row0purge.cc
index ce9a265bd8c..5d312ba8fb1 100644
--- a/storage/innobase/row/row0purge.cc
+++ b/storage/innobase/row/row0purge.cc
@@ -871,7 +871,7 @@ row_purge_parse_undo_rec(
 		innobase_init_vc_templ(node->table);
 	}
 
-	if (node->table->ibd_file_missing) {
+	if (node->table->file_unreadable) {
 		/* We skip purge of missing .ibd files */
 
 		dict_table_close(node->table, FALSE, FALSE);
diff --git a/storage/innobase/row/row0sel.cc b/storage/innobase/row/row0sel.cc
index 229bd567c48..0ec1a3108bb 100644
--- a/storage/innobase/row/row0sel.cc
+++ b/storage/innobase/row/row0sel.cc
@@ -4191,13 +4191,14 @@ row_search_mvcc(
 
 		DBUG_RETURN(DB_TABLESPACE_DELETED);
 
-	} else if (prebuilt->table->ibd_file_missing) {
+	} else if (prebuilt->table->file_unreadable &&
+		   fil_space_get(prebuilt->table->space) == NULL) {
 
 		DBUG_RETURN(DB_TABLESPACE_NOT_FOUND);
 
-	} else if (prebuilt->table->is_encrypted) {
+	} else if (prebuilt->table->file_unreadable) {
 
-		return(DB_DECRYPTION_FAILED);
+		DBUG_RETURN(DB_DECRYPTION_FAILED);
 	} else if (!prebuilt->index_usable) {
 		DBUG_RETURN(DB_MISSING_HISTORY);
 
@@ -4673,7 +4674,7 @@ row_search_mvcc(
 					" used key_id is not available. "
 					" Can't continue reading table.",
 					prebuilt->table->name);
-				index->table->is_encrypted = true;
+				index->table->file_unreadable = true;
 			}
 			rec = NULL;
 			goto lock_wait_or_error;
diff --git a/storage/innobase/row/row0trunc.cc b/storage/innobase/row/row0trunc.cc
index 46cff288059..ec8b51f0328 100644
--- a/storage/innobase/row/row0trunc.cc
+++ b/storage/innobase/row/row0trunc.cc
@@ -1681,10 +1681,15 @@ row_truncate_sanity_checks(
 
 		return(DB_TABLESPACE_DELETED);
 
-	} else if (table->ibd_file_missing) {
+	} else if (table->file_unreadable &&
+		fil_space_get(table->space) == NULL) {
 
 		return(DB_TABLESPACE_NOT_FOUND);
 
+	} else if (table->file_unreadable) {
+
+		return(DB_DECRYPTION_FAILED);
+
 	} else if (dict_table_is_corrupted(table)) {
 
 		return(DB_TABLE_CORRUPT);
diff --git a/storage/innobase/row/row0uins.cc b/storage/innobase/row/row0uins.cc
index 25504e32087..5ad7e2ccb4a 100644
--- a/storage/innobase/row/row0uins.cc
+++ b/storage/innobase/row/row0uins.cc
@@ -345,7 +345,7 @@ row_undo_ins_parse_undo_rec(
 
 	/* Skip the UNDO if we can't find the table or the .ibd file. */
 	if (UNIV_UNLIKELY(node->table == NULL)) {
-	} else if (UNIV_UNLIKELY(node->table->ibd_file_missing)) {
+	} else if (UNIV_UNLIKELY(node->table->file_unreadable)) {
 close_table:
 		dict_table_close(node->table, dict_locked, FALSE);
 		node->table = NULL;
diff --git a/storage/innobase/row/row0umod.cc b/storage/innobase/row/row0umod.cc
index 378cad00b93..b804285f88e 100644
--- a/storage/innobase/row/row0umod.cc
+++ b/storage/innobase/row/row0umod.cc
@@ -1139,7 +1139,7 @@ row_undo_mod_parse_undo_rec(
 		return;
 	}
 
-	if (node->table->ibd_file_missing) {
+	if (node->table->file_unreadable) {
 		dict_table_close(node->table, dict_locked, FALSE);
 
 		/* We skip undo operations to missing .ibd files */
diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc
index 7c16ce1e58f..c968a4c3755 100644
--- a/storage/innobase/srv/srv0start.cc
+++ b/storage/innobase/srv/srv0start.cc
@@ -2221,7 +2221,12 @@ innobase_start_or_create_for_mysql(void)
 			respective file pages, for the last batch of
 			recv_group_scan_log_recs(). */
 
-			recv_apply_hashed_log_recs(true);
+			err = recv_apply_hashed_log_recs(true);
+
+			if (err != DB_SUCCESS) {
+				return (err);
+			}
+
 			DBUG_PRINT("ib_log", ("apply completed"));
 
 			if (recv_needed_recovery) {
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index f7a488d7507..ac1c0702b28 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -775,7 +775,7 @@ trx_resurrect_table_locks(
 	     i != tables.end(); i++) {
 		if (dict_table_t* table = dict_table_open_on_id(
 			    *i, FALSE, DICT_TABLE_OP_LOAD_TABLESPACE)) {
-			if (table->ibd_file_missing
+			if (table->file_unreadable
 			    || dict_table_is_temporary(table)) {
 				mutex_enter(&dict_sys->mutex);
 				dict_table_close(table, TRUE, FALSE);
diff --git a/storage/innobase/ut/ut0ut.cc b/storage/innobase/ut/ut0ut.cc
index c352323eca9..54a45fcf2ee 100644
--- a/storage/innobase/ut/ut0ut.cc
+++ b/storage/innobase/ut/ut0ut.cc
@@ -744,6 +744,8 @@ ut_strerr(
 		return("Too many words in a FTS phrase or proximity search");
 	case DB_DECRYPTION_FAILED:
 		return("Table is encrypted but decrypt failed.");
+	case DB_PAGE_CORRUPTED:
+		return("Page read from tablespace is corrupted.");
 	case DB_IO_PARTIAL_FAILED:
 		return("Partial IO failed");
 	case DB_FORCED_ABORT:
diff --git a/storage/xtradb/api/api0api.cc b/storage/xtradb/api/api0api.cc
index a6dfad40150..2a46dd4b4c1 100644
--- a/storage/xtradb/api/api0api.cc
+++ b/storage/xtradb/api/api0api.cc
@@ -247,7 +247,7 @@ ib_open_table_by_id(
 
 	table = dict_table_open_on_id(table_id, TRUE, DICT_TABLE_OP_NORMAL);
 
-	if (table != NULL && table->ibd_file_missing) {
+	if (table != NULL && table->file_unreadable) {
 		table = NULL;
 	}
 
@@ -272,7 +272,7 @@ ib_open_table_by_name(
 	table = dict_table_open_on_name(name, FALSE, FALSE,
 					DICT_ERR_IGNORE_NONE);
 
-	if (table != NULL && table->ibd_file_missing) {
+	if (table != NULL && table->file_unreadable) {
 		table = NULL;
 	}
 
@@ -292,7 +292,7 @@ ib_lookup_table_by_name(
 
 	table = dict_table_get_low(name);
 
-	if (table != NULL && table->ibd_file_missing) {
+	if (table != NULL && table->file_unreadable) {
 		table = NULL;
 	}
 
diff --git a/storage/xtradb/btr/btr0btr.cc b/storage/xtradb/btr/btr0btr.cc
index 417eeb2c367..028cd8d9116 100644
--- a/storage/xtradb/btr/btr0btr.cc
+++ b/storage/xtradb/btr/btr0btr.cc
@@ -744,8 +744,8 @@ btr_root_block_get(
 
 	if (!block) {
 		if (index && index->table) {
-			index->table->is_encrypted = TRUE;
-			index->table->corrupted = FALSE;
+			index->table->file_unreadable = true;
+			index->table->corrupted = false;
 
 			ib_push_warning(index->table->thd, DB_DECRYPTION_FAILED,
 				"Table %s in tablespace %lu is encrypted but encryption service or"
@@ -5211,7 +5211,7 @@ btr_validate_index(
 
 	page_t*	root = btr_root_get(index, &mtr);
 
-	if (root == NULL && index->table->is_encrypted) {
+	if (root == NULL && index->table->file_unreadable) {
 		err = DB_DECRYPTION_FAILED;
 		mtr_commit(&mtr);
 		return err;
diff --git a/storage/xtradb/btr/btr0cur.cc b/storage/xtradb/btr/btr0cur.cc
index 454b085862c..66a1b791173 100644
--- a/storage/xtradb/btr/btr0cur.cc
+++ b/storage/xtradb/btr/btr0cur.cc
@@ -3,7 +3,7 @@
 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
 Copyright (c) 2008, Google Inc.
 Copyright (c) 2012, Facebook Inc.
-Copyright (c) 2015, MariaDB Corporation.
+Copyright (c) 2015, 2017, MariaDB Corporation.
 
 Portions of this file contain modifications contributed and copyrighted by
 Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -666,7 +666,7 @@ btr_cur_search_to_nth_level(
 				" used key_id is not available. "
 				" Can't continue reading table.",
 				index->table->name);
-			index->table->is_encrypted = true;
+			index->table->file_unreadable = true;
 		}
 
 		goto func_exit;
@@ -996,7 +996,7 @@ btr_cur_open_at_index_side_func(
 					" used key_id is not available. "
 					" Can't continue reading table.",
 					index->table->name);
-				index->table->is_encrypted = true;
+				index->table->file_unreadable = true;
 			}
 
 			goto exit_loop;
@@ -1169,7 +1169,7 @@ btr_cur_open_at_rnd_pos_func(
 					" used key_id is not available. "
 					" Can't continue reading table.",
 					index->table->name);
-				index->table->is_encrypted = true;
+				index->table->file_unreadable = true;
 			}
 			goto exit_loop;
 		}
@@ -3861,7 +3861,7 @@ btr_estimate_n_rows_in_range_on_level(
 					" used key_id is not available. "
 					" Can't continue reading table.",
 					index->table->name);
-				index->table->is_encrypted = true;
+				index->table->file_unreadable = true;
 			}
 
 			mtr_commit(&mtr);
@@ -4362,6 +4362,11 @@ btr_estimate_number_of_different_key_vals(
 
 		page = btr_cur_get_page(&cursor);
 
+		if (index->table->file_unreadable || index->table->corrupted) {
+			mtr_commit(&mtr);
+			goto exit_loop;
+		}
+
 		SRV_CORRUPT_TABLE_CHECK(page, goto exit_loop;);
 		DBUG_EXECUTE_IF("ib_corrupt_page_while_stats_calc",
 				page = NULL;);
diff --git a/storage/xtradb/btr/btr0defragment.cc b/storage/xtradb/btr/btr0defragment.cc
index 8de85e746ca..6bad13ff063 100644
--- a/storage/xtradb/btr/btr0defragment.cc
+++ b/storage/xtradb/btr/btr0defragment.cc
@@ -227,7 +227,7 @@ btr_defragment_add_index(
 		page = buf_block_get_frame(block);
 	}
 
-	if (page == NULL && index->table->is_encrypted) {
+	if (page == NULL && index->table->file_unreadable) {
 		mtr_commit(&mtr);
 		*err = DB_DECRYPTION_FAILED;
 		return NULL;
diff --git a/storage/xtradb/btr/btr0pcur.cc b/storage/xtradb/btr/btr0pcur.cc
index dd6ef484fc3..23ad9ada935 100644
--- a/storage/xtradb/btr/btr0pcur.cc
+++ b/storage/xtradb/btr/btr0pcur.cc
@@ -420,6 +420,11 @@ btr_pcur_move_to_next_page(
 	cursor->old_stored = BTR_PCUR_OLD_NOT_STORED;
 
 	page = btr_pcur_get_page(cursor);
+
+	if (!page) {
+		return;
+	}
+
 	next_page_no = btr_page_get_next(page, mtr);
 	space = buf_block_get_space(btr_pcur_get_block(cursor));
 	zip_size = buf_block_get_zip_size(btr_pcur_get_block(cursor));
@@ -429,6 +434,11 @@ btr_pcur_move_to_next_page(
 	next_block = btr_block_get(space, zip_size, next_page_no,
 				   cursor->latch_mode,
 				   btr_pcur_get_btr_cur(cursor)->index, mtr);
+
+	if (!next_block) {
+		return;
+	}
+
 	next_page = buf_block_get_frame(next_block);
 
 	SRV_CORRUPT_TABLE_CHECK(next_page,
diff --git a/storage/xtradb/buf/buf0buf.cc b/storage/xtradb/buf/buf0buf.cc
index c9a3f6aa6ec..bb19b447112 100644
--- a/storage/xtradb/buf/buf0buf.cc
+++ b/storage/xtradb/buf/buf0buf.cc
@@ -1181,7 +1181,6 @@ buf_block_init(
 	block->page.buf_fix_count = 0;
 	block->page.io_fix = BUF_IO_NONE;
 	block->page.encrypted = false;
-	block->page.key_version = 0;
 	block->page.real_size = 0;
 	block->page.write_size = 0;
 	block->modify_clock = 0;
@@ -2354,7 +2353,17 @@ buf_page_get_zip(
 		/* Page not in buf_pool: needs to be read from file */
 
 		ut_ad(!hash_lock);
-		buf_read_page(space, zip_size, offset, trx, NULL);
+		dberr_t err = buf_read_page(space, zip_size, offset, trx);
+
+		if (err != DB_SUCCESS) {
+			ib_logf(IB_LOG_LEVEL_ERROR,
+				"Reading compressed page " ULINTPF
+				" from tablespace " ULINTPF
+				" failed with error: %s (%d).",
+				offset, space, ut_strerr(err), err);
+
+			goto err_exit;
+		}
 
 #if defined UNIV_DEBUG || defined UNIV_BUF_DEBUG
 		ut_a(++buf_dbg_counter % 5771 || buf_validate());
@@ -2960,7 +2969,6 @@ buf_page_get_gen(
 	}
 
 	if (block == NULL) {
-		buf_page_t*	bpage=NULL;
 
 		/* Page not in buf_pool: needs to be read from file */
 
@@ -2996,7 +3004,18 @@ buf_page_get_gen(
 			return(NULL);
 		}
 
-		if (buf_read_page(space, zip_size, offset, trx, &bpage)) {
+		/* Call path is buf_read_page() -> buf_read_page_low()
+		(_fil_io()) -> buf_page_io_complete() ->
+		buf_decrypt_after_read() here fil_space_t* is used
+		and we decrypt -> buf_page_check_corrupt() where
+		page checksums are compared. Decryption/decompression
+		is handled lower level, error handling is handled on lower
+		level, here we need only to know is page really corrupted
+		or encrypted page with correct checksum. */
+
+		dberr_t local_err = buf_read_page(space, zip_size, offset, trx);
+
+		if (local_err == DB_SUCCESS) {
 			buf_read_ahead_random(space, zip_size, offset,
 					      ibuf_inside(mtr), trx);
 
@@ -3004,89 +3023,47 @@ buf_page_get_gen(
 		} else if (retries < BUF_PAGE_READ_MAX_RETRIES) {
 			++retries;
 
-			bool corrupted = false;
-
-			if (bpage) {
-				corrupted = buf_page_check_corrupt(bpage);
-			}
-
-			/* Do not try again for encrypted pages */
-			if (corrupted && bpage->encrypted) {
-				ib_mutex_t* pmutex = buf_page_get_mutex(bpage);
-				mutex_enter(&buf_pool->LRU_list_mutex);
-				mutex_enter(pmutex);
-
-				ut_ad(buf_pool->n_pend_reads > 0);
-				os_atomic_decrement_ulint(&buf_pool->n_pend_reads, 1);
-				buf_page_set_io_fix(bpage, BUF_IO_NONE);
-
-				if (!buf_LRU_free_page(bpage, true)) {
-					mutex_exit(&buf_pool->LRU_list_mutex);
-				}
-
-				mutex_exit(pmutex);
-				rw_lock_x_unlock_gen(&((buf_block_t*) bpage)->lock,
-					     BUF_IO_READ);
-
-				if (err) {
-					*err = DB_DECRYPTION_FAILED;
-				}
-
-				return (NULL);
-			}
-
 			DBUG_EXECUTE_IF(
 				"innodb_page_corruption_retries",
-				retries = BUF_PAGE_READ_MAX_RETRIES;
+				goto force_fail;
 			);
-		} else {
-			bool corrupted = false;
-
-			if (bpage) {
-				corrupted = buf_page_check_corrupt(bpage);
-			}
 
-			if (corrupted && !bpage->encrypted) {
-				ib_logf(IB_LOG_LEVEL_ERROR, "Unable"
-					" to read tablespace %lu page no"
-					" %lu into the buffer pool after"
-					" %lu attempts\n"
-					"InnoDB: The most probable cause"
-					" of this error may be that the"
-					" table has been corrupted.\n"
-					"InnoDB: You can try to fix this"
-					" problem by using"
-					" innodb_force_recovery.\n"
-					"InnoDB: Please see reference manual"
-					" for more details.\n"
-					"InnoDB: Aborting...\n",
-					space, offset,
-					BUF_PAGE_READ_MAX_RETRIES);
 
-				ut_error;
-			} else {
-				ib_mutex_t* pmutex = buf_page_get_mutex(bpage);
-				mutex_enter(&buf_pool->LRU_list_mutex);
-				mutex_enter(pmutex);
-
-				ut_ad(buf_pool->n_pend_reads > 0);
-				os_atomic_decrement_ulint(&buf_pool->n_pend_reads, 1);
-				buf_page_set_io_fix(bpage, BUF_IO_NONE);
-
-				if (!buf_LRU_free_page(bpage, true)) {
-					mutex_exit(&buf_pool->LRU_list_mutex);
-				}
-
-				mutex_exit(pmutex);
-				rw_lock_x_unlock_gen(&((buf_block_t*) bpage)->lock,
-					     BUF_IO_READ);
+		} else {
+			if (*err) {
+				*err = local_err;
+			}
 
-				if (err) {
-					*err = DB_DECRYPTION_FAILED;
-				}
+			/* Encrypted pages that are not corrupted are marked
+			as encrypted and that fact is later pushed to
+			user thread. */
+			if (local_err == DB_DECRYPTION_FAILED) {
+				return (NULL);
+			}
 
+			/* Try to set table as corrupted instead of
+			asserting. */
+			if (space > TRX_SYS_SPACE &&
+			    dict_set_corrupted_by_space(space)) {
 				return (NULL);
 			}
+
+force_fail:
+			ib_logf(IB_LOG_LEVEL_FATAL, "Unable"
+				" to read tablespace %lu page no"
+				" %lu into the buffer pool after"
+				" %lu attempts\n"
+				"InnoDB: The most probable cause"
+				" of this error may be that the"
+				" table has been corrupted.\n"
+				"InnoDB: You can try to fix this"
+				" problem by using"
+				" innodb_force_recovery.\n"
+				"InnoDB: Please see reference manual"
+				" for more details.\n"
+				"InnoDB: Aborting...\n",
+				space, offset,
+				BUF_PAGE_READ_MAX_RETRIES);
 		}
 
 #if defined UNIV_DEBUG || defined UNIV_BUF_DEBUG
@@ -3859,7 +3836,6 @@ buf_page_init_low(
 	bpage->oldest_modification = 0;
 	bpage->write_size = 0;
 	bpage->encrypted = false;
-	bpage->key_version = 0;
 	bpage->real_size = 0;
 
 	HASH_INVALIDATE(bpage, hash);
@@ -4502,61 +4478,54 @@ buf_page_monitor(
 /********************************************************************//**
 Mark a table with the specified space pointed by bpage->space corrupted.
 Also remove the bpage from LRU list.
- at return TRUE if successful */
+ at param[in,out]		bpage			Block */
 static
-ibool
+void
 buf_mark_space_corrupt(
-/*===================*/
-	buf_page_t*	bpage)	/*!< in: pointer to the block in question */
+	buf_page_t*	bpage)
 {
 	buf_pool_t*	buf_pool = buf_pool_from_bpage(bpage);
 	const ibool	uncompressed = (buf_page_get_state(bpage)
 					== BUF_BLOCK_FILE_PAGE);
 	ulint		space = bpage->space;
-	ibool		ret = TRUE;
 	const ulint	fold = buf_page_address_fold(bpage->space,
 						     bpage->offset);
 	prio_rw_lock_t*	hash_lock = buf_page_hash_lock_get(buf_pool, fold);
+	ib_mutex_t* block_mutex = buf_page_get_mutex(bpage);
 
 	/* First unfix and release lock on the bpage */
 	ut_ad(!mutex_own(&buf_pool->LRU_list_mutex));
 
-	if (!bpage->encrypted) {
-		mutex_enter(&buf_pool->LRU_list_mutex);
-		rw_lock_x_lock(hash_lock);
-		mutex_enter(buf_page_get_mutex(bpage));
-		ut_ad(buf_page_get_io_fix(bpage) == BUF_IO_READ);
-		ut_ad(bpage->buf_fix_count == 0);
+	mutex_enter(&buf_pool->LRU_list_mutex);
+	rw_lock_x_lock(hash_lock);
+	mutex_enter(block_mutex);
+	ut_ad(buf_page_get_io_fix(bpage) == BUF_IO_READ);
+	ut_ad(bpage->buf_fix_count == 0);
 
-		/* Set BUF_IO_NONE before we remove the block from LRU list */
-		buf_page_set_io_fix(bpage, BUF_IO_NONE);
+	/* Set BUF_IO_NONE before we remove the block from LRU list */
+	buf_page_set_io_fix(bpage, BUF_IO_NONE);
 
-		if (uncompressed) {
-			rw_lock_x_unlock_gen(
-				&((buf_block_t*) bpage)->lock,
-				BUF_IO_READ);
-		}
+	if (uncompressed) {
+		rw_lock_x_unlock_gen(
+			&((buf_block_t*) bpage)->lock,
+			BUF_IO_READ);
 	}
 
-	/* Find the table with specified space id, and mark it corrupted */
-	if (dict_set_corrupted_by_space(space)) {
-		if (!bpage->encrypted) {
-			buf_LRU_free_one_page(bpage);
-		}
+	/* If block is not encrypted find the table with specified
+	space id, and mark it corrupted. Encrypted tables
+	are marked unusable later e.g. in ::open(). */
+	if (!bpage->encrypted) {
+		dict_set_corrupted_by_space(space);
 	} else {
-		if (!bpage->encrypted) {
-			mutex_exit(buf_page_get_mutex(bpage));
-		}
-		ret = FALSE;
+		dict_set_encrypted_by_space(space);
 	}
 
-	if(!bpage->encrypted) {
-		mutex_exit(&buf_pool->LRU_list_mutex);
-		ut_ad(buf_pool->n_pend_reads > 0);
-		os_atomic_decrement_ulint(&buf_pool->n_pend_reads, 1);
-	}
+	/* After this point bpage can't be referenced. */
+	buf_LRU_free_one_page(bpage);
 
-	return(ret);
+	ut_ad(buf_pool->n_pend_reads > 0);
+	os_atomic_decrement_ulint(&buf_pool->n_pend_reads, 1);
+	mutex_exit(&buf_pool->LRU_list_mutex);
 }
 
 /********************************************************************//**
@@ -4564,9 +4533,12 @@ Check if page is maybe compressed, encrypted or both when we encounter
 corrupted page. Note that we can't be 100% sure if page is corrupted
 or decrypt/decompress just failed.
 @param[in,out]	bpage		Page
- at return true if page corrupted, false if not */
-UNIV_INTERN
-bool
+ at return DB_SUCCESS if page has been read and is not corrupted,
+DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
+DB_DECRYPTION_FAILED if page post encryption checksum matches but
+after decryption normal page checksum does not match.*/
+static
+dberr_t
 buf_page_check_corrupt(
 	buf_page_t*	bpage)
 {
@@ -4576,6 +4548,7 @@ buf_page_check_corrupt(
 	ulint space_id = bpage->space;
 	fil_space_t* space = fil_space_acquire_silent(space_id);
 	bool still_encrypted = false;
+	dberr_t err = DB_SUCCESS;
 	bool corrupted = false;
 	ulint page_type = mach_read_from_2(dst_frame + FIL_PAGE_TYPE);
 	fil_space_crypt_t* crypt_data = NULL;
@@ -4602,6 +4575,8 @@ buf_page_check_corrupt(
 
 		if (!corrupted) {
 			bpage->encrypted = false;
+		} else {
+			err = DB_PAGE_CORRUPTED;
 		}
 	}
 
@@ -4617,15 +4592,17 @@ buf_page_check_corrupt(
 			fil_get_page_type_name(page_type), page_type);
 	} else if (still_encrypted || (bpage->encrypted && corrupted)) {
 		bpage->encrypted = true;
-		corrupted = true;
+		err = DB_DECRYPTION_FAILED;
 
 		ib_logf(IB_LOG_LEVEL_ERROR,
 			"Block in space_id " ULINTPF " in file %s encrypted.",
 			space_id, (space && space->name) ? space->name : "NULL");
 		ib_logf(IB_LOG_LEVEL_ERROR,
-				"However key management plugin or used key_version %u is not found or"
-				" used encryption algorithm or method does not match.",
-				bpage->key_version);
+			"However key management plugin or used key_version " ULINTPF
+			" is not found or"
+			" used encryption algorithm or method does not match.",
+			mach_read_from_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION));
+
 		if (space_id > TRX_SYS_SPACE) {
 			ib_logf(IB_LOG_LEVEL_ERROR,
 				"Marking tablespace as missing. You may drop this table or"
@@ -4637,18 +4614,22 @@ buf_page_check_corrupt(
 		fil_space_release(space);
 	}
 
-	return corrupted;
+	return (err);
 }
 
 /********************************************************************//**
 Completes an asynchronous read or write request of a file page to or from
 the buffer pool.
- at return true if successful */
+ at param[in,out]		bpage		Page to complete
+ at return DB_SUCCESS if page has been read and is not corrupted,
+DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
+DB_DECRYPTION_FAILED if page post encryption checksum matches but
+after decryption normal page checksum does not match.
+in write only DB_SUCCESS is possible. */
 UNIV_INTERN
-bool
+dberr_t
 buf_page_io_complete(
-/*=================*/
-	buf_page_t*	bpage)	/*!< in: pointer to the block in question */
+	buf_page_t*	bpage)
 {
 	enum buf_io_fix	io_type;
 	buf_pool_t*	buf_pool = buf_pool_from_bpage(bpage);
@@ -4656,8 +4637,8 @@ buf_page_io_complete(
 					== BUF_BLOCK_FILE_PAGE);
 	bool		have_LRU_mutex = false;
 	fil_space_t*	space = NULL;
-	byte*	frame = NULL;
-	bool corrupted = false;
+	byte*		frame = NULL;
+	dberr_t 	err = DB_SUCCESS;
 
 	ut_a(buf_page_in_file(bpage));
 
@@ -4673,6 +4654,7 @@ buf_page_io_complete(
 	if (io_type == BUF_IO_READ) {
 		ulint	read_page_no;
 		ulint	read_space_id;
+		uint	key_version;
 
 		buf_page_decrypt_after_read(bpage);
 
@@ -4696,7 +4678,7 @@ buf_page_io_complete(
 					"Page %u in tablespace %u zip_decompress failure.",
 					bpage->offset, bpage->space);
 
-				corrupted = true;
+				err = DB_PAGE_CORRUPTED;
 
 				goto database_corrupted;
 			}
@@ -4712,6 +4694,8 @@ buf_page_io_complete(
 		read_page_no = mach_read_from_4(frame + FIL_PAGE_OFFSET);
 		read_space_id = mach_read_from_4(
 			frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
+		key_version = mach_read_from_4(
+			frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
 
 		if (bpage->space == TRX_SYS_SPACE
 		    && buf_dblwr_page_inside(bpage->offset)) {
@@ -4746,27 +4730,27 @@ buf_page_io_complete(
 
 		if (UNIV_LIKELY(!bpage->is_corrupt ||
 				!srv_pass_corrupt_table)) {
-			corrupted = buf_page_check_corrupt(bpage);
-
+			err = buf_page_check_corrupt(bpage);
 		}
 
 database_corrupted:
 
-		if (corrupted) {
+		if (err != DB_SUCCESS) {
 			/* Not a real corruption if it was triggered by
 			error injection */
 
 			DBUG_EXECUTE_IF("buf_page_is_corrupt_failure",
-				if (bpage->space > TRX_SYS_SPACE
-					&& buf_mark_space_corrupt(bpage)) {
+				if (bpage->space > TRX_SYS_SPACE) {
+					buf_mark_space_corrupt(bpage);
 					ib_logf(IB_LOG_LEVEL_INFO,
 						"Simulated page corruption");
-					return(true);
+					return(err);
 				}
+				err = DB_SUCCESS;
 				goto page_not_corrupt;
 			);
 
-			if (!bpage->encrypted) {
+			if (err == DB_PAGE_CORRUPTED) {
 				fil_system_enter();
 				space = fil_space_get_by_id(bpage->space);
 				fil_system_exit();
@@ -4828,31 +4812,14 @@ buf_page_io_complete(
 				/* If page space id is larger than TRX_SYS_SPACE
 				(0), we will attempt to mark the corresponding
 				table as corrupted instead of crashing server */
-				if (bpage->space > TRX_SYS_SPACE
-				    && buf_mark_space_corrupt(bpage)) {
-					return(false);
+				if (bpage->space > TRX_SYS_SPACE) {
+					buf_mark_space_corrupt(bpage);
+					return(err);
 				} else {
-					if (!bpage->encrypted) {
-						ib_logf(IB_LOG_LEVEL_ERROR,
-							"Ending processing because of a corrupt database page.");
-
-						ut_error;
-					}
-
-					ib_push_warning(innobase_get_trx(), DB_DECRYPTION_FAILED,
-						"Table in tablespace %lu encrypted."
-						"However key management plugin or used key_id %lu is not found or"
-						" used encryption algorithm or method does not match."
-						" Can't continue opening the table.",
-						bpage->space, bpage->key_version);
-
-					if (bpage->encrypted && bpage->space > TRX_SYS_SPACE) {
-						buf_mark_space_corrupt(bpage);
-					} else {
-						ut_error;
-					}
+					ib_logf(IB_LOG_LEVEL_ERROR,
+						"Ending processing because of a corrupt database page.");
 
-					return(false);
+					ut_error;
 				}
 			}
 		}
@@ -4872,11 +4839,11 @@ buf_page_io_complete(
 
 			if (bpage && bpage->encrypted) {
 				ib_logf(IB_LOG_LEVEL_WARN,
-					"Table in tablespace %lu encrypted."
+					"Table in tablespace " ULINTPF " encrypted."
 					"However key management plugin or used key_version %u is not found or"
 					" used encryption algorithm or method does not match."
 					" Can't continue opening the table.\n",
-					(ulint)bpage->space, bpage->key_version);
+					read_page_no, key_version);
 			} else {
 
 				ibuf_merge_or_delete_for_page(
@@ -4998,7 +4965,7 @@ buf_page_io_complete(
 
 	mutex_exit(block_mutex);
 
-	return(true);
+	return(err);
 }
 
 /*********************************************************************//**
@@ -5026,7 +4993,7 @@ buf_all_freed_instance(
 
 		mutex_exit(&buf_pool->LRU_list_mutex);
 
-		if (UNIV_LIKELY_NULL(block) && block->page.key_version == 0) {
+		if (UNIV_LIKELY_NULL(block)) {
 			fil_space_t* space = fil_space_get(block->page.space);
 			ib_logf(IB_LOG_LEVEL_ERROR,
 				"Page %u %u still fixed or dirty.",
@@ -6264,13 +6231,11 @@ buf_page_encrypt_before_write(
 
 	if (bpage->offset == 0) {
 		/* Page 0 of a tablespace is not encrypted/compressed */
-		ut_ad(bpage->key_version == 0);
 		return src_frame;
 	}
 
 	if (bpage->space == TRX_SYS_SPACE && bpage->offset == TRX_SYS_PAGE_NO) {
 		/* don't encrypt/compress page as it contains address to dblwr buffer */
-		bpage->key_version = 0;
 		return src_frame;
 	}
 
@@ -6300,7 +6265,6 @@ buf_page_encrypt_before_write(
 	/* Is encryption needed? */
 	if (crypt_data == NULL || crypt_data->type == CRYPT_SCHEME_UNENCRYPTED) {
 		/* An unencrypted table */
-		bpage->key_version = 0;
 		encrypted = false;
 	}
 
@@ -6330,9 +6294,6 @@ buf_page_encrypt_before_write(
 					      src_frame,
 					      dst_frame);
 
-		ulint key_version = mach_read_from_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
-		ut_ad(key_version == 0 || key_version >= bpage->key_version);
-		bpage->key_version = key_version;
 		bpage->real_size = page_size;
 		slot->out_buf = dst_frame = tmp;
 
@@ -6406,8 +6367,6 @@ buf_page_decrypt_after_read(
 	buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
 	bool success = true;
 
-	bpage->key_version = key_version;
-
 	if (bpage->offset == 0) {
 		/* File header pages are not encrypted/compressed */
 		return (true);
diff --git a/storage/xtradb/buf/buf0rea.cc b/storage/xtradb/buf/buf0rea.cc
index 15465699726..006376256ad 100644
--- a/storage/xtradb/buf/buf0rea.cc
+++ b/storage/xtradb/buf/buf0rea.cc
@@ -1,7 +1,7 @@
 /*****************************************************************************
 
 Copyright (c) 1995, 2013, Oracle and/or its affiliates. All Rights Reserved.
-Copyright (c) 2013, 2014, MariaDB Corporation. All Rights Reserved.
+Copyright (c) 2013, 2017, MariaDB Corporation. All Rights Reserved.
 
 This program is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free Software
@@ -103,36 +103,46 @@ flag is cleared and the x-lock released by an i/o-handler thread.
 in buf_pool, or if the page is in the doublewrite buffer blocks in
 which case it is never read into the pool, or if the tablespace does
 not exist or is being dropped
- at return 1 if read request is issued. 0 if it is not */
-UNIV_INTERN
-ulint
-buf_read_page_low(
-/*==============*/
-	dberr_t*	err,	/*!< out: DB_SUCCESS or DB_TABLESPACE_DELETED if we are
-			trying to read from a non-existent tablespace, or a
-			tablespace which is just now being dropped */
-	bool	sync,	/*!< in: true if synchronous aio is desired */
-	ulint	mode,	/*!< in: BUF_READ_IBUF_PAGES_ONLY, ...,
-			ORed to OS_AIO_SIMULATED_WAKE_LATER (see below
-			at read-ahead functions) */
-	ulint	space,	/*!< in: space id */
-	ulint	zip_size,/*!< in: compressed page size, or 0 */
-	ibool	unzip,	/*!< in: TRUE=request uncompressed page */
-	ib_int64_t tablespace_version, /*!< in: if the space memory object has
+
+ at param[out]	err	 	DB_SUCCESS, DB_TABLESPACE_DELETED if we are
+				trying to read from a non-existent tablespace, or a
+				tablespace which is just now being dropped,
+				DB_PAGE_CORRUPTED if page based on checksum
+				check is corrupted, or DB_DECRYPTION_FAILED
+				if page post encryption checksum matches but
+				after decryption normal page checksum does not match.
+ at param[in]	sync		true if synchronous aio is desired
+ at param[in]	mode		BUF_READ_IBUF_PAGES_ONLY, ...,
+				ORed to OS_AIO_SIMULATED_WAKE_LATER (see below
+				at read-ahead functions)
+ at param[in]	space		space id
+ at param[in]	zip_size	compressed page size, or 0
+ at param[in]	unzip		TRUE=request uncompressed page
+ at param[in]	tablespace_version	if the space memory object has
 			this timestamp different from what we are giving here,
 			treat the tablespace as dropped; this is a timestamp we
 			use to stop dangling page reads from a tablespace
-			which we have DISCARDed + IMPORTed back */
-	ulint	offset,	/*!< in: page number */
-	trx_t*	trx,	/*!< in: trx */
-	buf_page_t** rbpage) /*!< out: page */
+			which we have DISCARDed + IMPORTed back
+ at param[in]	offset		page number
+ at param[in]	trx		transaction
+ at return 1 if read request is issued. 0 if it is not */
+static
+ulint
+buf_read_page_low(
+	dberr_t*	err,
+	bool		sync,
+	ulint		mode,
+	ulint		space,
+	ulint		zip_size,
+	ibool		unzip,
+	ib_int64_t	tablespace_version,
+	ulint		offset,
+	trx_t*		trx = NULL)
 {
 	buf_page_t*	bpage;
 	ulint		wake_later;
 	ibool		ignore_nonexistent_pages;
 
-	*err = DB_SUCCESS;
-
 	wake_later = mode & OS_AIO_SIMULATED_WAKE_LATER;
 	mode = mode & ~OS_AIO_SIMULATED_WAKE_LATER;
 
@@ -259,18 +269,13 @@ buf_read_page_low(
 	if (sync) {
 		/* The i/o is already completed when we arrive from
 		fil_read */
-		if (!buf_page_io_complete(bpage)) {
-			if (rbpage) {
-				*rbpage = bpage;
-			}
+		*err = buf_page_io_complete(bpage);
+
+		if (*err != DB_SUCCESS) {
 			return(0);
 		}
 	}
 
-	if (rbpage) {
-		*rbpage = bpage;
-	}
-
 	return(1);
 }
 
@@ -307,7 +312,7 @@ buf_read_ahead_random(
 	ulint		ibuf_mode;
 	ulint		count;
 	ulint		low, high;
-	dberr_t		err;
+	dberr_t		err = DB_SUCCESS;
 	ulint		i;
 	const ulint	buf_read_ahead_random_area
 				= BUF_READ_AHEAD_AREA(buf_pool);
@@ -402,20 +407,33 @@ buf_read_ahead_random(
 		mode: hence FALSE as the first parameter */
 
 		if (!ibuf_bitmap_page(zip_size, i)) {
+
 			count += buf_read_page_low(
 				&err, false,
 				ibuf_mode | OS_AIO_SIMULATED_WAKE_LATER,
 				space, zip_size, FALSE,
-				tablespace_version, i, trx, NULL);
-			if (err == DB_TABLESPACE_DELETED) {
-				ut_print_timestamp(stderr);
-				fprintf(stderr,
-					"  InnoDB: Warning: in random"
-					" readahead trying to access\n"
-					"InnoDB: tablespace %lu page %lu,\n"
-					"InnoDB: but the tablespace does not"
-					" exist or is just being dropped.\n",
-					(ulong) space, (ulong) i);
+				tablespace_version, i, trx);
+
+			switch(err) {
+			case DB_SUCCESS:
+				break;
+			case DB_TABLESPACE_DELETED:
+				ib_logf(IB_LOG_LEVEL_WARN,
+					"In random"
+					" readahead trying to access"
+					" tablespace " ULINTPF ":" ULINTPF
+					" but the tablespace does not"
+					" exist or is just being dropped.",
+					space, i);
+				break;
+			case DB_DECRYPTION_FAILED:
+				ib_logf(IB_LOG_LEVEL_ERROR,
+					"Random readahead failed to decrypt page "
+					ULINTPF ":" ULINTPF ".",
+					i, space);
+				break;
+			default:
+				ut_error;
 			}
 		}
 	}
@@ -449,20 +467,27 @@ High-level function which reads a page asynchronously from a file to the
 buffer buf_pool if it is not already there. Sets the io_fix flag and sets
 an exclusive lock on the buffer frame. The flag is cleared and the x-lock
 released by the i/o-handler thread.
- at return TRUE if page has been read in, FALSE in case of failure */
+
+ at param[in]	space		space_id
+ at param[in]	zip_size	compressed page size in bytes, or 0
+ at param[in]	offset		page number
+ at param[in]	trx		transaction
+ at param[out]	encrypted	true if page encrypted
+ at return DB_SUCCESS if page has been read and is not corrupted,
+DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
+DB_DECRYPTION_FAILED if page post encryption checksum matches but
+after decryption normal page checksum does not match.*/
 UNIV_INTERN
-ibool
+dberr_t
 buf_read_page(
-/*==========*/
-	ulint	space,		/*!< in: space id */
-	ulint	zip_size,	/*!< in: compressed page size in bytes, or 0 */
-	ulint	offset,		/*!< in: page number */
-	trx_t*	trx,		/*!< in: trx */
-	buf_page_t** bpage)	/*!< out: page */
+	ulint	space,
+	ulint	zip_size,
+	ulint	offset,
+	trx_t*	trx)
 {
 	ib_int64_t	tablespace_version;
 	ulint		count;
-	dberr_t		err;
+	dberr_t		err=DB_SUCCESS;
 
 	tablespace_version = fil_space_get_version(space);
 
@@ -471,22 +496,23 @@ buf_read_page(
 
 	count = buf_read_page_low(&err, true, BUF_READ_ANY_PAGE, space,
 				  zip_size, FALSE,
-				  tablespace_version, offset, trx, bpage);
+				  tablespace_version, offset, trx);
+
 	srv_stats.buf_pool_reads.add(count);
+
 	if (err == DB_TABLESPACE_DELETED) {
-		ut_print_timestamp(stderr);
-		fprintf(stderr,
-			"  InnoDB: Error: trying to access"
-			" tablespace %lu page no. %lu,\n"
-			"InnoDB: but the tablespace does not exist"
-			" or is just being dropped.\n",
-			(ulong) space, (ulong) offset);
+		ib_logf(IB_LOG_LEVEL_ERROR,
+			"Trying to access"
+			" tablespace " ULINTPF " page no. " ULINTPF
+			" but the tablespace does not exist"
+			" or is just being dropped.",
+			space, offset);
 	}
 
 	/* Increment number of I/O operations used for LRU policy. */
 	buf_LRU_stat_inc_io();
 
-	return(count > 0);
+	return(err);
 }
 
 /********************************************************************//**
@@ -494,23 +520,23 @@ High-level function which reads a page asynchronously from a file to the
 buffer buf_pool if it is not already there. Sets the io_fix flag and sets
 an exclusive lock on the buffer frame. The flag is cleared and the x-lock
 released by the i/o-handler thread.
- at return TRUE if page has been read in, FALSE in case of failure */
+ at param[in]	space		Tablespace id
+ at param[in]	offset		Page no */
 UNIV_INTERN
-ibool
+void
 buf_read_page_async(
-/*================*/
-	ulint	space,	/*!< in: space id */
-	ulint	offset)	/*!< in: page number */
+	ulint	space,
+	ulint	offset)
 {
 	ulint		zip_size;
 	ib_int64_t	tablespace_version;
 	ulint		count;
-	dberr_t		err;
+	dberr_t		err = DB_SUCCESS;
 
 	zip_size = fil_space_get_zip_size(space);
 
 	if (zip_size == ULINT_UNDEFINED) {
-		return(FALSE);
+		return;
 	}
 
 	tablespace_version = fil_space_get_version(space);
@@ -519,7 +545,9 @@ buf_read_page_async(
 				  | OS_AIO_SIMULATED_WAKE_LATER
 				  | BUF_READ_IGNORE_NONEXISTENT_PAGES,
 				  space, zip_size, FALSE,
-				  tablespace_version, offset, NULL,NULL);
+				  tablespace_version, offset);
+
+	/* Possible decryption errors are already reported. */
 	srv_stats.buf_pool_reads.add(count);
 
 	/* We do not increment number of I/O operations used for LRU policy
@@ -528,8 +556,6 @@ buf_read_page_async(
 	buffer pool. Since this function is called from buffer pool load
 	these IOs are deliberate and are not part of normal workload we can
 	ignore these in our heuristics. */
-
-	return(count > 0);
 }
 
 /********************************************************************//**
@@ -580,7 +606,7 @@ buf_read_ahead_linear(
 	ulint		fail_count;
 	ulint		ibuf_mode;
 	ulint		low, high;
-	dberr_t		err;
+	dberr_t		err=DB_SUCCESS;
 	ulint		i;
 	const ulint	buf_read_ahead_linear_area
 		= BUF_READ_AHEAD_AREA(buf_pool);
@@ -784,20 +810,36 @@ buf_read_ahead_linear(
 		aio mode: hence FALSE as the first parameter */
 
 		if (!ibuf_bitmap_page(zip_size, i)) {
+
 			count += buf_read_page_low(
 				&err, false,
 				ibuf_mode,
-				space, zip_size, FALSE, tablespace_version, i, trx, NULL);
-			if (err == DB_TABLESPACE_DELETED) {
-				ut_print_timestamp(stderr);
-				fprintf(stderr,
-					"  InnoDB: Warning: in"
-					" linear readahead trying to access\n"
-					"InnoDB: tablespace %lu page %lu,\n"
-					"InnoDB: but the tablespace does not"
-					" exist or is just being dropped.\n",
-					(ulong) space, (ulong) i);
+				space, zip_size, FALSE, tablespace_version,
+				i, trx);
+
+			switch(err) {
+			case DB_SUCCESS:
+				break;
+			case DB_TABLESPACE_DELETED:
+				ib_logf(IB_LOG_LEVEL_WARN,
+					"In linear"
+					" readahead trying to access"
+					" tablespace " ULINTPF ":" ULINTPF
+					" but the tablespace does not"
+					" exist or is just being dropped.",
+					space, i);
+				break;
+
+			case DB_DECRYPTION_FAILED:
+				ib_logf(IB_LOG_LEVEL_ERROR,
+					"Linear readahead failed to decrypt page "
+					ULINTPF ":" ULINTPF ".",
+					i, space);
+				break;
+			default:
+				ut_error;
 			}
+
 		}
 	}
 
@@ -858,7 +900,7 @@ buf_read_ibuf_merge_pages(
 #endif
 
 	for (i = 0; i < n_stored; i++) {
-		dberr_t		err;
+		dberr_t		err=DB_SUCCESS;
 		buf_pool_t*	buf_pool;
 		ulint		zip_size = fil_space_get_zip_size(space_ids[i]);
 
@@ -877,7 +919,7 @@ buf_read_ibuf_merge_pages(
 		buf_read_page_low(&err, sync && (i + 1 == n_stored),
 				  BUF_READ_ANY_PAGE, space_ids[i],
 				  zip_size, TRUE, space_versions[i],
-				  page_nos[i], NULL, NULL);
+				  page_nos[i], NULL);
 
 		if (UNIV_UNLIKELY(err == DB_TABLESPACE_DELETED)) {
 tablespace_deleted:
@@ -888,6 +930,13 @@ buf_read_ibuf_merge_pages(
 						      page_nos[i],
 						      zip_size, FALSE);
 		}
+
+		if (err == DB_DECRYPTION_FAILED) {
+			ib_logf(IB_LOG_LEVEL_ERROR,
+				"Read page " ULINTPF " from tablespace " ULINTPF
+				" for insert buffer but page was encrypted.",
+				page_nos[i], space_ids[i]);
+		}
 	}
 
 	os_aio_simulated_wake_handler_threads();
@@ -924,7 +973,7 @@ buf_read_recv_pages(
 {
 	ib_int64_t	tablespace_version;
 	ulint		count;
-	dberr_t		err;
+	dberr_t		err=DB_SUCCESS;
 	ulint		i;
 
 	zip_size = fil_space_get_zip_size(space);
@@ -1017,12 +1066,21 @@ buf_read_recv_pages(
 		if ((i + 1 == n_stored) && sync) {
 			buf_read_page_low(&err, true, BUF_READ_ANY_PAGE, space,
 					  zip_size, TRUE, tablespace_version,
-					  page_nos[i], NULL, NULL);
+					  page_nos[i], NULL);
 		} else {
 			buf_read_page_low(&err, false, BUF_READ_ANY_PAGE
 					  | OS_AIO_SIMULATED_WAKE_LATER,
 					  space, zip_size, TRUE,
-					  tablespace_version, page_nos[i], NULL, NULL);
+					  tablespace_version, page_nos[i],
+					  NULL);
+		}
+
+		if (err == DB_DECRYPTION_FAILED) {
+			ib_logf(IB_LOG_LEVEL_ERROR,
+				"Recovery read page " ULINTPF
+				" from tablespace " ULINTPF
+				" but page was encrypted.",
+				page_nos[i], space);
 		}
 	}
 
diff --git a/storage/xtradb/dict/dict0crea.cc b/storage/xtradb/dict/dict0crea.cc
index 1ada35a89a2..6d5b12474eb 100644
--- a/storage/xtradb/dict/dict0crea.cc
+++ b/storage/xtradb/dict/dict0crea.cc
@@ -692,7 +692,7 @@ dict_create_index_tree_step(
 	dberr_t		err = DB_SUCCESS;
 	ulint		zip_size = dict_table_zip_size(index->table);
 
-	if (node->index->table->ibd_file_missing
+	if (node->index->table->file_unreadable
 	    || dict_table_is_discarded(node->index->table)) {
 
 		node->page_no = FIL_NULL;
diff --git a/storage/xtradb/dict/dict0dict.cc b/storage/xtradb/dict/dict0dict.cc
index 49de1cf7ef8..b0757055262 100644
--- a/storage/xtradb/dict/dict0dict.cc
+++ b/storage/xtradb/dict/dict0dict.cc
@@ -1187,7 +1187,7 @@ dict_table_open_on_name(
 
 		/* If table is encrypted return table */
 		if (ignore_err == DICT_ERR_IGNORE_NONE
-			&& table->is_encrypted) {
+			&& table->file_unreadable) {
 			/* Make life easy for drop table. */
 			if (table->can_be_evicted) {
 				dict_table_move_from_lru_to_non_lru(table);
@@ -6189,6 +6189,24 @@ dict_set_corrupted_by_space(
 }
 
 /**********************************************************************//**
+Flags a table with specified space_id encrypted in the data dictionary
+cache
+ at param[in]	space_id	Tablespace id */
+UNIV_INTERN
+void
+dict_set_encrypted_by_space(
+	ulint	space_id)
+{
+	dict_table_t*   table;
+
+	table = dict_find_table_by_space(space_id);
+
+	if (table) {
+		table->file_unreadable = true;
+	}
+}
+
+/**********************************************************************//**
 Flags an index corrupted both in the data dictionary cache
 and in the SYS_INDEXES */
 UNIV_INTERN
@@ -6683,7 +6701,8 @@ dict_table_schema_check(
 		}
 	}
 
-	if (table->ibd_file_missing) {
+	if (table->file_unreadable &&
+	    fil_space_get(table->space) == NULL) {
 		/* missing tablespace */
 
 		ut_snprintf(errstr, errstr_sz,
diff --git a/storage/xtradb/dict/dict0load.cc b/storage/xtradb/dict/dict0load.cc
index b843891f16c..30c026d94d1 100644
--- a/storage/xtradb/dict/dict0load.cc
+++ b/storage/xtradb/dict/dict0load.cc
@@ -1964,7 +1964,7 @@ dict_load_indexes(
 			dict_mem_index_free(index);
 			goto func_exit;
 		} else if (index->page == FIL_NULL
-			   && !table->ibd_file_missing
+			   && !table->file_unreadable
 			   && (!(index->type & DICT_FTS))) {
 
 			fprintf(stderr,
@@ -2193,7 +2193,7 @@ dict_load_table_low(
 
 	(*table)->id = mach_read_from_8(field);
 
-	(*table)->ibd_file_missing = FALSE;
+	(*table)->file_unreadable = false;
 
 	return(NULL);
 }
@@ -2380,7 +2380,7 @@ dict_load_table(
 			"Table '%s' tablespace is set as discarded.",
 			table_name);
 
-		table->ibd_file_missing = TRUE;
+		table->file_unreadable = true;
 
 	} else if (!fil_space_for_table_exists_in_mem(
 			table->space, name, false, true, heap,
@@ -2388,7 +2388,7 @@ dict_load_table(
 
 		if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_TEMPORARY)) {
 			/* Do not bother to retry opening temporary tables. */
-			table->ibd_file_missing = TRUE;
+			table->file_unreadable = true;
 
 		} else {
 			if (!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)) {
@@ -2423,7 +2423,7 @@ dict_load_table(
 				/* We failed to find a sensible
 				tablespace file */
 
-				table->ibd_file_missing = TRUE;
+				table->file_unreadable = true;
 			}
 			if (filepath) {
 				mem_free(filepath);
@@ -2448,9 +2448,10 @@ dict_load_table(
 	were not allowed while the table is being locked by a transaction. */
 	dict_err_ignore_t index_load_err =
 		!(ignore_err & DICT_ERR_IGNORE_RECOVER_LOCK)
-		&& table->ibd_file_missing
+		&& table->file_unreadable
 		? DICT_ERR_IGNORE_ALL
 		: ignore_err;
+
 	err = dict_load_indexes(table, heap, index_load_err);
 
 	if (err == DB_INDEX_CORRUPT) {
@@ -2485,7 +2486,7 @@ dict_load_table(
 	of the error condition, since the user may want to dump data from the
 	clustered index. However we load the foreign key information only if
 	all indexes were loaded. */
-	if (!cached || table->ibd_file_missing) {
+	if (!cached || table->file_unreadable) {
 		/* Don't attempt to load the indexes from disk. */
 	} else if (err == DB_SUCCESS) {
 		err = dict_load_foreigns(table->name, NULL, true, true,
@@ -2518,12 +2519,12 @@ dict_load_table(
 			table = NULL;
 
 		} else if (dict_index_is_corrupted(index)
-			   && !table->ibd_file_missing) {
+			   && !table->file_unreadable) {
 
 			/* It is possible we force to load a corrupted
 			clustered index if srv_load_corrupted is set.
 			Mark the table as corrupted in this case */
-			table->corrupted = TRUE;
+			table->corrupted = true;
 		}
 	}
 
@@ -2532,7 +2533,7 @@ dict_load_table(
 
 	ut_ad(!table
 	      || ignore_err != DICT_ERR_IGNORE_NONE
-	      || table->ibd_file_missing
+	      || table->file_unreadable
 	      || !table->corrupted);
 
 	if (table && table->fts) {
diff --git a/storage/xtradb/dict/dict0stats.cc b/storage/xtradb/dict/dict0stats.cc
index 6a28f3cdf8f..d5698ee0811 100644
--- a/storage/xtradb/dict/dict0stats.cc
+++ b/storage/xtradb/dict/dict0stats.cc
@@ -920,7 +920,11 @@ dict_stats_update_transient_for_index(
 
 		index->stat_n_leaf_pages = size;
 
-		btr_estimate_number_of_different_key_vals(index);
+		/* Do not continue if table decryption has failed or
+		table is already marked as corrupted. */
+		if (!index->table->file_unreadable && !index->table->corrupted) {
+			btr_estimate_number_of_different_key_vals(index);
+		}
 	}
 }
 
@@ -974,8 +978,9 @@ dict_stats_update_transient(
 			continue;
 		}
 
-		/* Do not continue if table decryption has failed. */
-		if (index->table->is_encrypted) {
+		/* Do not continue if table decryption has failed or
+		table is already marked as corrupted. */
+		if (index->table->file_unreadable || index->table->corrupted) {
 			break;
 		}
 
@@ -3192,7 +3197,7 @@ dict_stats_update(
 
 	ut_ad(!mutex_own(&dict_sys->mutex));
 
-	if (table->ibd_file_missing) {
+	if (table->file_unreadable) {
 		ut_print_timestamp(stderr);
 		fprintf(stderr,
 			" InnoDB: cannot calculate statistics for table %s "
@@ -3946,7 +3951,7 @@ dict_stats_save_defrag_stats(
 {
 	dberr_t	ret;
 
-	if (index->table->ibd_file_missing) {
+	if (index->table->file_unreadable) {
 		ut_print_timestamp(stderr);
 		fprintf(stderr,
 			" InnoDB: Cannot save defragment stats because "
diff --git a/storage/xtradb/fil/fil0crypt.cc b/storage/xtradb/fil/fil0crypt.cc
index 60ab067d105..1018b54b0dd 100644
--- a/storage/xtradb/fil/fil0crypt.cc
+++ b/storage/xtradb/fil/fil0crypt.cc
@@ -541,6 +541,11 @@ fil_parse_write_crypt_data(
 		fil_space_release(space);
 	}
 
+	/* Check is used key found from encryption plugin */
+	if (crypt_data->should_encrypt() && !crypt_data->is_key_found()) {
+		return NULL;
+	}
+
 	return ptr;
 }
 
@@ -1967,11 +1972,11 @@ fil_crypt_rotate_page(
 		bool modified = false;
 		int needs_scrubbing = BTR_SCRUB_SKIP_PAGE;
 		lsn_t block_lsn = block->page.newest_modification;
-		uint kv =  block->page.key_version;
+		byte* frame = buf_block_get_frame(block);
+		uint kv =  mach_read_from_4(frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
 
 		/* check if tablespace is closing after reading page */
 		if (space->is_stopping()) {
-			byte* frame = buf_block_get_frame(block);
 
 			if (kv == 0 &&
 				fil_crypt_is_page_uninitialized(frame, zip_size)) {
@@ -1993,9 +1998,6 @@ fil_crypt_rotate_page(
 					FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
 					space_id, MLOG_4BYTES, &mtr);
 
-				/* update block */
-				block->page.key_version = key_state->key_version;
-
 				/* statistics */
 				state->crypt_stat.pages_modified++;
 			} else {
diff --git a/storage/xtradb/fil/fil0fil.cc b/storage/xtradb/fil/fil0fil.cc
index a116bfad99d..c20374379cb 100644
--- a/storage/xtradb/fil/fil0fil.cc
+++ b/storage/xtradb/fil/fil0fil.cc
@@ -6252,8 +6252,15 @@ _fil_io(
 			mutex_exit(&fil_system->mutex);
 			if (mode == OS_AIO_NORMAL) {
 				ut_a(space->purpose == FIL_TABLESPACE);
-				buf_page_io_complete(static_cast<buf_page_t *>
-						     (message));
+				dberr_t err = buf_page_io_complete(static_cast<buf_page_t *>
+					(message));
+
+				if (err != DB_SUCCESS) {
+					ib_logf(IB_LOG_LEVEL_ERROR,
+						"Write operation failed for tablespace %s ("
+						ULINTPF ") offset " ULINTPF " error=%d.",
+						space->name, space->id, byte_offset, err);
+				}
 			}
 		}
 
@@ -6356,6 +6363,8 @@ fil_aio_wait(
 	mutex_enter(&fil_system->mutex);
 
 	fil_node_complete_io(fil_node, fil_system, type);
+	ulint purpose = fil_node->space->purpose;
+	space_id = fil_node->space->id;
 
 	mutex_exit(&fil_system->mutex);
 
@@ -6367,9 +6376,29 @@ fil_aio_wait(
 	deadlocks in the i/o system. We keep tablespace 0 data files always
 	open, and use a special i/o thread to serve insert buffer requests. */
 
-	if (fil_node->space->purpose == FIL_TABLESPACE) {
+	if (purpose == FIL_TABLESPACE) {
 		srv_set_io_thread_op_info(segment, "complete io for buf page");
-		buf_page_io_complete(static_cast<buf_page_t*>(message));
+		buf_page_t* bpage = static_cast<buf_page_t*>(message);
+		ulint offset = bpage ? bpage->offset : ULINT_UNDEFINED;
+
+		dberr_t err = buf_page_io_complete(bpage);
+
+		if (err != DB_SUCCESS) {
+			ib_log_level_t level = IB_LOG_LEVEL_FATAL;
+
+			/* In crash recovery set log corruption on
+			and produce only an error to fail InnoDB startup. */
+			if (recv_recovery_is_on()) {
+				recv_sys->found_corrupt_log = true;
+				level = IB_LOG_LEVEL_ERROR;
+			}
+
+			ib_logf(level,
+				"%s operation failed for tablespace %s"
+				" offset " ULINTPF " error=%s (%d).",
+				type == OS_FILE_WRITE ? "Write" : "Read",
+				fil_node->name, offset, ut_strerr(err), err);
+		}
 	} else {
 		srv_set_io_thread_op_info(segment, "complete io for log");
 		log_io_complete(static_cast<log_group_t*>(message));
diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc
index 9445a066e25..81a6865278b 100644
--- a/storage/xtradb/handler/ha_innodb.cc
+++ b/storage/xtradb/handler/ha_innodb.cc
@@ -6458,7 +6458,7 @@ ha_innobase::open(
 	ib_table->thd = (void*)thd;
 
 	/* No point to init any statistics if tablespace is still encrypted. */
-	if (!ib_table->is_encrypted) {
+	if (!ib_table->file_unreadable) {
 		dict_stats_init(ib_table);
 	} else {
 		ib_table->stat_initialized = 1;
@@ -6466,7 +6466,8 @@ ha_innobase::open(
 
 	MONITOR_INC(MONITOR_TABLE_OPEN);
 
-	bool	no_tablespace;
+	bool	no_tablespace = false;
+	bool	ibd_missing = false;
 
 	if (dict_table_is_discarded(ib_table)) {
 
@@ -6481,7 +6482,7 @@ ha_innobase::open(
 
 		no_tablespace = false;
 
-	} else if (ib_table->ibd_file_missing) {
+	} else if (ib_table->file_unreadable && fil_space_get(ib_table->space) == NULL) {
 
 		ib_senderrf(
 			thd, IB_LOG_LEVEL_WARN,
@@ -6491,11 +6492,11 @@ ha_innobase::open(
 		file, best to play it safe. */
 
 		no_tablespace = true;
-	} else if (ib_table->is_encrypted) {
+		ibd_missing = true;
+	} else if (ib_table->file_unreadable) {
 		/* This means that tablespace was found but we could not
 		decrypt encrypted page. */
 		no_tablespace = true;
-		ib_table->ibd_file_missing = true;
 	} else {
 		no_tablespace = false;
 	}
@@ -6508,7 +6509,7 @@ ha_innobase::open(
 		/* If table has no talespace but it has crypt data, check
 		is tablespace made unaccessible because encryption service
 		or used key_id is not available. */
-		if (ib_table) {
+		if (ib_table && !ibd_missing) {
 			fil_space_crypt_t* crypt_data = ib_table->crypt_data;
 
 			if (crypt_data && crypt_data->should_encrypt()) {
@@ -6522,7 +6523,7 @@ ha_innobase::open(
 						ib_table->name, crypt_data->key_id);
 					ret_err = HA_ERR_DECRYPTION_FAILED;
 				}
-			} else if (ib_table->is_encrypted) {
+			} else if (ib_table->file_unreadable) {
 				push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
 					HA_ERR_DECRYPTION_FAILED,
 					"Table %s is encrypted but encryption service or"
@@ -6664,7 +6665,7 @@ ha_innobase::open(
 
 	/* Only if the table has an AUTOINC column. */
 	if (prebuilt->table != NULL
-	    && !prebuilt->table->ibd_file_missing
+	    && !prebuilt->table->file_unreadable
 	    && table->found_next_number_field != NULL) {
 		dict_table_autoinc_lock(prebuilt->table);
 
@@ -8651,7 +8652,7 @@ ha_innobase::write_row(
 
 	if (share->ib_table != prebuilt->table) {
 		fprintf(stderr,
-			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %lu.",
+			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %d.",
 			share->ib_table, prebuilt->table, prebuilt->table->name, prebuilt->table->is_corrupt);
 	}
 
@@ -8998,7 +8999,7 @@ ha_innobase::write_row(
 
 	if (share->ib_table != prebuilt->table) {
 		fprintf(stderr,
-			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %lu.",
+			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %d.",
 			share->ib_table, prebuilt->table, prebuilt->table->name, prebuilt->table->is_corrupt);
 	}
 
@@ -9418,7 +9419,7 @@ ha_innobase::update_row(
 
 	if (share->ib_table != prebuilt->table) {
 		fprintf(stderr,
-			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %lu.",
+			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %d.",
 			share->ib_table, prebuilt->table, prebuilt->table->name, prebuilt->table->is_corrupt);
 	}
 
@@ -9533,7 +9534,7 @@ ha_innobase::update_row(
 
 	if (share->ib_table != prebuilt->table) {
 		fprintf(stderr,
-			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %lu.",
+			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %d.",
 			share->ib_table, prebuilt->table, prebuilt->table->name, prebuilt->table->is_corrupt);
 	}
 
@@ -10237,6 +10238,15 @@ ha_innobase::general_fetch(
 
 	innobase_srv_conc_enter_innodb(prebuilt->trx);
 
+	if (prebuilt->table->corrupted) {
+		DBUG_RETURN(HA_ERR_CRASHED);
+	}
+
+	if (prebuilt->table->file_unreadable &&
+		fil_space_get(prebuilt->table->space) != NULL) {
+		DBUG_RETURN(HA_ERR_DECRYPTION_FAILED);
+	}
+
 	ret = row_search_for_mysql(
 		(byte*) buf, 0, prebuilt, match_mode, direction);
 
@@ -12975,7 +12985,7 @@ ha_innobase::discard_or_import_tablespace(
 		user may want to set the DISCARD flag in order to IMPORT
 		a new tablespace. */
 
-		if (dict_table->ibd_file_missing) {
+		if (dict_table->file_unreadable) {
 			ib_senderrf(
 				prebuilt->trx->mysql_thd,
 				IB_LOG_LEVEL_WARN, ER_TABLESPACE_MISSING,
@@ -12985,7 +12995,7 @@ ha_innobase::discard_or_import_tablespace(
 		err = row_discard_tablespace_for_mysql(
 			dict_table->name, prebuilt->trx);
 
-	} else if (!dict_table->ibd_file_missing) {
+	} else if (!dict_table->file_unreadable) {
 		/* Commit the transaction in order to
 		release the table lock. */
 		trx_commit_for_mysql(prebuilt->trx);
@@ -13064,7 +13074,7 @@ ha_innobase::truncate()
 
 	if (share->ib_table != prebuilt->table) {
 		fprintf(stderr,
-			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %lu.",
+			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %d.",
 			share->ib_table, prebuilt->table, prebuilt->table->name, prebuilt->table->is_corrupt);
 	}
 
@@ -13085,7 +13095,7 @@ ha_innobase::truncate()
 
 	if (share->ib_table != prebuilt->table) {
 		fprintf(stderr,
-			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %lu.",
+			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %d.",
 			share->ib_table, prebuilt->table, prebuilt->table->name, prebuilt->table->is_corrupt);
 	}
 
@@ -14484,7 +14494,7 @@ ha_innobase::analyze(
 
 	if (share->ib_table != prebuilt->table) {
 		fprintf(stderr,
-			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %lu.",
+			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %d.",
 			share->ib_table, prebuilt->table, prebuilt->table->name, prebuilt->table->is_corrupt);
 	}
 
@@ -14500,7 +14510,7 @@ ha_innobase::analyze(
 
 	if (share->ib_table != prebuilt->table) {
 		fprintf(stderr,
-			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %lu.",
+			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %d.",
 			share->ib_table, prebuilt->table, prebuilt->table->name, prebuilt->table->is_corrupt);
 	}
 
@@ -14610,7 +14620,7 @@ ha_innobase::check(
 
 		DBUG_RETURN(HA_ADMIN_CORRUPT);
 
-	} else if (prebuilt->table->ibd_file_missing) {
+	} else if (prebuilt->table->file_unreadable) {
 
 		ib_senderrf(
 			thd, IB_LOG_LEVEL_ERROR,
@@ -15813,7 +15823,7 @@ ha_innobase::transactional_table_lock(
 
 	if (share->ib_table != prebuilt->table) {
 		fprintf(stderr,
-			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %lu.",
+			"InnoDB: Warning: share->ib_table %p prebuilt->table %p table %s is_corrupt %d.",
 			share->ib_table, prebuilt->table, prebuilt->table->name, prebuilt->table->is_corrupt);
 	}
 
@@ -15830,7 +15840,7 @@ ha_innobase::transactional_table_lock(
 				ER_TABLESPACE_DISCARDED,
 				table->s->table_name.str);
 
-		} else if (prebuilt->table->ibd_file_missing) {
+		} else if (prebuilt->table->file_unreadable) {
 
 			ib_senderrf(
 				thd, IB_LOG_LEVEL_ERROR,
diff --git a/storage/xtradb/handler/handler0alter.cc b/storage/xtradb/handler/handler0alter.cc
index 8aaf5cd83bc..86c96d828b2 100644
--- a/storage/xtradb/handler/handler0alter.cc
+++ b/storage/xtradb/handler/handler0alter.cc
@@ -3138,7 +3138,7 @@ prepare_inplace_alter_table_dict(
 		clustered index of the old table, later. */
 		if (new_clustered
 		    || !ctx->online
-		    || user_table->ibd_file_missing
+		    || user_table->file_unreadable
 		    || dict_table_is_discarded(user_table)) {
 			/* No need to allocate a modification log. */
 			ut_ad(!ctx->add_index[a]->online_log);
@@ -4210,7 +4210,7 @@ ha_innobase::inplace_alter_table(
 	DBUG_ASSERT(ctx->trx);
 	DBUG_ASSERT(ctx->prebuilt == prebuilt);
 
-	if (prebuilt->table->ibd_file_missing
+	if (prebuilt->table->file_unreadable
 	    || dict_table_is_discarded(prebuilt->table)) {
 		goto all_done;
 	}
@@ -5232,7 +5232,7 @@ commit_try_rebuild(
 	/* The new table must inherit the flag from the
 	"parent" table. */
 	if (dict_table_is_discarded(user_table)) {
-		rebuilt_table->ibd_file_missing = true;
+		rebuilt_table->file_unreadable = true;
 		rebuilt_table->flags2 |= DICT_TF2_DISCARDED;
 	}
 
@@ -5764,9 +5764,9 @@ ha_innobase::commit_inplace_alter_table(
 	if (ha_alter_info->group_commit_ctx) {
 		ctx_array = ha_alter_info->group_commit_ctx;
 	} else {
-	ctx_single[0] = ctx0;
-	ctx_single[1] = NULL;
-	ctx_array = ctx_single;
+		ctx_single[0] = ctx0;
+		ctx_single[1] = NULL;
+		ctx_array = ctx_single;
 	}
 
 	DBUG_ASSERT(ctx0 == ctx_array[0]);
@@ -5795,6 +5795,19 @@ ha_innobase::commit_inplace_alter_table(
 			= static_cast<ha_innobase_inplace_ctx*>(*pctx);
 		DBUG_ASSERT(ctx->prebuilt->trx == prebuilt->trx);
 
+		/* If decryption failed for old table or new table
+		fail here. */
+		if ((ctx->old_table->file_unreadable &&
+		     fil_space_get(ctx->old_table->space) != NULL)||
+		    (ctx->new_table->file_unreadable &&
+		     fil_space_get(ctx->new_table->space) != NULL)) {
+			String str;
+			const char* engine= table_type();
+			get_error_message(HA_ERR_DECRYPTION_FAILED, &str);
+			my_error(ER_GET_ERRMSG, MYF(0), HA_ERR_DECRYPTION_FAILED, str.c_ptr(), engine);
+			DBUG_RETURN(true);
+		}
+
 		/* Exclusively lock the table, to ensure that no other
 		transaction is holding locks on the table while we
 		change the table definition. The MySQL meta-data lock
diff --git a/storage/xtradb/include/btr0btr.ic b/storage/xtradb/include/btr0btr.ic
index 62a24873482..0f5f025d6a3 100644
--- a/storage/xtradb/include/btr0btr.ic
+++ b/storage/xtradb/include/btr0btr.ic
@@ -61,7 +61,7 @@ btr_block_get_func(
 
 	if (err == DB_DECRYPTION_FAILED) {
 		if (index && index->table) {
-			index->table->is_encrypted = true;
+			index->table->file_unreadable = true;
 		}
 	}
 
diff --git a/storage/xtradb/include/btr0pcur.ic b/storage/xtradb/include/btr0pcur.ic
index 1cd13824542..87b48de6389 100644
--- a/storage/xtradb/include/btr0pcur.ic
+++ b/storage/xtradb/include/btr0pcur.ic
@@ -357,6 +357,11 @@ btr_pcur_move_to_next(
 			return(FALSE);
 		}
 
+		if (cursor->btr_cur.index->table->corrupted ||
+		    cursor->btr_cur.index->table->file_unreadable) {
+			return(FALSE);
+		}
+
 		btr_pcur_move_to_next_page(cursor, mtr);
 
 		return(TRUE);
diff --git a/storage/xtradb/include/buf0buf.h b/storage/xtradb/include/buf0buf.h
index 1774d9445ff..155556cd37c 100644
--- a/storage/xtradb/include/buf0buf.h
+++ b/storage/xtradb/include/buf0buf.h
@@ -690,17 +690,6 @@ buf_page_is_corrupted(
 	const fil_space_t* 	space)
 	MY_ATTRIBUTE((warn_unused_result));
 /********************************************************************//**
-Check if page is maybe compressed, encrypted or both when we encounter
-corrupted page. Note that we can't be 100% sure if page is corrupted
-or decrypt/decompress just failed.
- at param[in]	bpage		Page
- at return true if page corrupted, false if not */
-bool
-buf_page_check_corrupt(
-	buf_page_t*	bpage)	/*!< in/out: buffer page read from disk */
-	MY_ATTRIBUTE(( warn_unused_result));
-
-/********************************************************************//**
 Checks if a page is all zeroes.
 @return	TRUE if the page is all zeroes */
 bool
@@ -1273,12 +1262,15 @@ buf_page_init_for_read(
 /********************************************************************//**
 Completes an asynchronous read or write request of a file page to or from
 the buffer pool.
- at return true if successful */
+ at param[in,out]	bpage		pointer to the block in question
+ at return DB_SUCCESS if page has been read and is not corrupted,
+DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
+DB_DECRYPTION_FAILED if page post encryption checksum matches but
+after decryption normal page checksum does not match.*/
 UNIV_INTERN
-bool
+dberr_t
 buf_page_io_complete(
-/*=================*/
-	buf_page_t*	bpage);	/*!< in: pointer to the block in question */
+	buf_page_t*	bpage);
 /********************************************************************//**
 Calculates a folded value of a file page address to use in the page hash
 table.
@@ -1678,7 +1670,6 @@ struct buf_page_t{
 					if written again we check is TRIM
 					operation needed. */
 
-	unsigned        key_version;	/*!< key version for this block */
 	bool            encrypted;	/*!< page is still encrypted */
 
 	ulint           real_size;	/*!< Real size of the page
diff --git a/storage/xtradb/include/buf0rea.h b/storage/xtradb/include/buf0rea.h
index f0652b5d2cd..6b9b2ccce75 100644
--- a/storage/xtradb/include/buf0rea.h
+++ b/storage/xtradb/include/buf0rea.h
@@ -1,7 +1,7 @@
 /*****************************************************************************
 
 Copyright (c) 1995, 2013, Oracle and/or its affiliates. All Rights Reserved.
-Copyright (c) 2015, MariaDB Corporation.
+Copyright (c) 2015, 2017, MariaDB Corporation.
 
 This program is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free Software
@@ -35,29 +35,36 @@ High-level function which reads a page asynchronously from a file to the
 buffer buf_pool if it is not already there. Sets the io_fix flag and sets
 an exclusive lock on the buffer frame. The flag is cleared and the x-lock
 released by the i/o-handler thread.
- at return TRUE if page has been read in, FALSE in case of failure */
+
+ at param[in]	space		space_id
+ at param[in]	zip_size	compressed page size in bytes, or 0
+ at param[in]	offset		page number
+ at param[in]	trx		transaction
+ at return DB_SUCCESS if page has been read and is not corrupted,
+DB_PAGE_CORRUPTED if page based on checksum check is corrupted,
+DB_DECRYPTION_FAILED if page post encryption checksum matches but
+after decryption normal page checksum does not match.*/
 UNIV_INTERN
-ibool
+dberr_t
 buf_read_page(
-/*==========*/
-	ulint	space,		/*!< in: space id */
-	ulint	zip_size,	/*!< in: compressed page size in bytes, or 0 */
-	ulint	offset,		/*!< in: page number */
-	trx_t*	trx,		/*!< in: trx */
-	buf_page_t** bpage	/*!< out: page */
-);
+	ulint	space,
+	ulint	zip_size,
+	ulint	offset,
+	trx_t*	trx);
+
 /********************************************************************//**
 High-level function which reads a page asynchronously from a file to the
 buffer buf_pool if it is not already there. Sets the io_fix flag and sets
 an exclusive lock on the buffer frame. The flag is cleared and the x-lock
 released by the i/o-handler thread.
- at return TRUE if page has been read in, FALSE in case of failure */
+ at param[in]	space		Tablespace id
+ at param[in]	offset		Page number */
 UNIV_INTERN
-ibool
+void
 buf_read_page_async(
-/*================*/
-	ulint	space,	/*!< in: space id */
-	ulint	offset);/*!< in: page number */
+	ulint	space,
+	ulint	offset);
+
 /********************************************************************//**
 Applies a random read-ahead in buf_pool if there are at least a threshold
 value of accessed pages from the random read-ahead area. Does not read any
diff --git a/storage/xtradb/include/db0err.h b/storage/xtradb/include/db0err.h
index b11c5f4ea1a..8bd3beda110 100644
--- a/storage/xtradb/include/db0err.h
+++ b/storage/xtradb/include/db0err.h
@@ -138,6 +138,8 @@ enum dberr_t {
 					of missing key management plugin,
 					or missing or incorrect key or
 					incorret AES method or algorithm. */
+	DB_PAGE_CORRUPTED,		/* Page read from tablespace is
+					corrupted. */
 	/* The following are partial failure codes */
 	DB_FAIL = 1000,
 	DB_OVERFLOW,
diff --git a/storage/xtradb/include/dict0dict.h b/storage/xtradb/include/dict0dict.h
index 1622b927a76..7b0c9f4c465 100644
--- a/storage/xtradb/include/dict0dict.h
+++ b/storage/xtradb/include/dict0dict.h
@@ -1820,6 +1820,15 @@ dict_set_corrupted_by_space(
 /*========================*/
 	ulint		space_id);	/*!< in: space ID */
 
+/**********************************************************************//**
+Flags a table with specified space_id encrypted in the data dictionary
+cache
+ at param[in]	space_id	Tablespace id */
+UNIV_INTERN
+void
+dict_set_encrypted_by_space(
+	ulint	space_id);
+
 /********************************************************************//**
 Validate the table flags.
 @return	true if valid. */
diff --git a/storage/xtradb/include/dict0mem.h b/storage/xtradb/include/dict0mem.h
index 96c85cd8a99..9b347ff5741 100644
--- a/storage/xtradb/include/dict0mem.h
+++ b/storage/xtradb/include/dict0mem.h
@@ -2,7 +2,7 @@
 
 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
 Copyright (c) 2012, Facebook Inc.
-Copyright (c) 2013, 2016, MariaDB Corporation.
+Copyright (c) 2013, 2017, MariaDB Corporation.
 
 This program is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free Software
@@ -1062,11 +1062,13 @@ struct dict_table_t{
 				table is placed */
 	unsigned	flags:DICT_TF_BITS;	/*!< DICT_TF_... */
 	unsigned	flags2:DICT_TF2_BITS;	/*!< DICT_TF2_... */
-	unsigned	ibd_file_missing:1;
-				/*!< TRUE if this is in a single-table
-				tablespace and the .ibd file is missing; then
-				we must return in ha_innodb.cc an error if the
-				user tries to query such an orphaned table */
+	unsigned	file_unreadable:1;
+				/*!< true if this is in a single-table
+				tablespace and the .ibd file is missing
+				or page decryption failed and page is
+				corrupted; then we must return in
+				ha_innodb.cc an error if the
+				user tries to query such table */
 	unsigned	cached:1;/*!< TRUE if the table object has been added
 				to the dictionary cache */
 	unsigned	to_be_dropped:1;
@@ -1364,8 +1366,8 @@ struct dict_table_t{
 	UT_LIST_BASE_NODE_T(lock_t)
 			locks;	/*!< list of locks on the table; protected
 				by lock_sys->mutex */
-	ibool		is_corrupt;
-	ibool		is_encrypted;
+	bool		is_corrupt;
+
 #endif /* !UNIV_HOTBACKUP */
 
 #ifdef UNIV_DEBUG
diff --git a/storage/xtradb/include/fil0crypt.h b/storage/xtradb/include/fil0crypt.h
index cfc2d850883..997c6368215 100644
--- a/storage/xtradb/include/fil0crypt.h
+++ b/storage/xtradb/include/fil0crypt.h
@@ -117,10 +117,8 @@ struct fil_space_crypt_t : st_encryption_scheme
 		min_key_version(new_min_key_version),
 		page0_offset(0),
 		encryption(new_encryption),
-		key_found(),
 		rotate_state()
 	{
-		key_found = new_min_key_version;
 		key_id = new_key_id;
 		my_random_bytes(iv, sizeof(iv));
 		mutex_create(fil_crypt_data_mutex_key,
@@ -136,6 +134,8 @@ struct fil_space_crypt_t : st_encryption_scheme
 			type = CRYPT_SCHEME_1;
 			min_key_version = key_get_latest_version();
 		}
+
+		key_found = min_key_version;
 	}
 
 	/** Destructor */
diff --git a/storage/xtradb/include/log0recv.h b/storage/xtradb/include/log0recv.h
index e7b6a937f01..8bbff97d7d0 100644
--- a/storage/xtradb/include/log0recv.h
+++ b/storage/xtradb/include/log0recv.h
@@ -303,9 +303,10 @@ recv_sys_var_init(void);
 #endif /* !UNIV_HOTBACKUP */
 /** Apply the hash table of stored log records to persistent data pages.
 @param[in]	last_batch	whether the change buffer merge will be
-				performed as part of the operation */
+				performed as part of the operation
+ at return DB_SUCCESS or DB_DECRYPTION_FAILED */
 UNIV_INTERN
-void
+dberr_t
 recv_apply_hashed_log_recs(bool last_batch);
 #ifdef UNIV_HOTBACKUP
 /*******************************************************************//**
diff --git a/storage/xtradb/log/log0recv.cc b/storage/xtradb/log/log0recv.cc
index 6b7c8d77824..369db3c5fd0 100644
--- a/storage/xtradb/log/log0recv.cc
+++ b/storage/xtradb/log/log0recv.cc
@@ -1384,6 +1384,10 @@ recv_parse_or_apply_log_rec_body(
 		break;
 	case MLOG_FILE_WRITE_CRYPT_DATA:
 		ptr = const_cast<byte*>(fil_parse_write_crypt_data(ptr, end_ptr, block));
+
+		if (!ptr) {
+			recv_sys->found_corrupt_log = TRUE;
+		}
 		break;
 	default:
 		ptr = NULL;
@@ -1856,9 +1860,10 @@ recv_read_in_area(
 
 /** Apply the hash table of stored log records to persistent data pages.
 @param[in]	last_batch	whether the change buffer merge will be
-				performed as part of the operation */
+				performed as part of the operation
+ at return DB_SUCCESS or DB_DECRYPTION_FAILED */
 UNIV_INTERN
-void
+dberr_t
 recv_apply_hashed_log_recs(bool last_batch)
 {
 	for (;;) {
@@ -1868,6 +1873,11 @@ recv_apply_hashed_log_recs(bool last_batch)
 			break;
 		}
 
+		if (recv_sys->found_corrupt_log) {
+			mutex_exit(&recv_sys->mutex);
+			return(DB_DECRYPTION_FAILED);
+		}
+
 		mutex_exit(&recv_sys->mutex);
 		os_thread_sleep(500000);
 	}
@@ -1932,6 +1942,10 @@ recv_apply_hashed_log_recs(bool last_batch)
 
 		mutex_exit(&(recv_sys->mutex));
 
+		if (recv_sys->found_corrupt_log) {
+			return(DB_DECRYPTION_FAILED);
+		}
+
 		os_thread_sleep(500000);
 
 		mutex_enter(&(recv_sys->mutex));
@@ -1978,6 +1992,8 @@ recv_apply_hashed_log_recs(bool last_batch)
 	recv_sys_empty_hash();
 
 	mutex_exit(&recv_sys->mutex);
+
+	return (DB_SUCCESS);
 }
 #else /* !UNIV_HOTBACKUP */
 /*******************************************************************//**
diff --git a/storage/xtradb/row/row0import.cc b/storage/xtradb/row/row0import.cc
index 6dc01907710..61fd55baf3f 100644
--- a/storage/xtradb/row/row0import.cc
+++ b/storage/xtradb/row/row0import.cc
@@ -2207,7 +2207,7 @@ row_import_discard_changes(
 		index->space = FIL_NULL;
 	}
 
-	table->ibd_file_missing = TRUE;
+	table->file_unreadable = true;
 
 	fil_close_tablespace(trx, table->space);
 }
@@ -3453,7 +3453,7 @@ row_import_for_mysql(
 
 	ut_a(table->space);
 	ut_ad(prebuilt->trx);
-	ut_a(table->ibd_file_missing);
+	ut_a(table->file_unreadable);
 
 	trx_start_if_not_started(prebuilt->trx);
 
@@ -3758,7 +3758,7 @@ row_import_for_mysql(
 		return(row_import_error(prebuilt, trx, err));
 	}
 
-	table->ibd_file_missing = false;
+	table->file_unreadable = false;
 	table->flags2 &= ~DICT_TF2_DISCARDED;
 
 	if (autoinc != 0) {
diff --git a/storage/xtradb/row/row0ins.cc b/storage/xtradb/row/row0ins.cc
index 8dbf093a17d..f4f96d32c50 100644
--- a/storage/xtradb/row/row0ins.cc
+++ b/storage/xtradb/row/row0ins.cc
@@ -1518,7 +1518,7 @@ row_ins_check_foreign_constraint(
 	}
 
 	if (check_table == NULL
-	    || check_table->ibd_file_missing
+	    || check_table->file_unreadable
 	    || check_index == NULL) {
 
 		if (!srv_read_only_mode && check_ref) {
@@ -2410,8 +2410,7 @@ row_ins_clust_index_entry_low(
 				    &cursor, 0, __FILE__, __LINE__, &mtr);
 
 	if (err != DB_SUCCESS) {
-		index->table->is_encrypted = true;
-		index->table->ibd_file_missing = true;
+		index->table->file_unreadable = true;
 		mtr_commit(&mtr);
 		goto func_exit;
 	}
@@ -2771,7 +2770,7 @@ row_ins_sec_index_entry_low(
 				" used key_id is not available. "
 				" Can't continue reading table.",
 				index->table->name);
-			index->table->is_encrypted = true;
+			index->table->file_unreadable = true;
 		}
 		goto func_exit;
 	}
diff --git a/storage/xtradb/row/row0merge.cc b/storage/xtradb/row/row0merge.cc
index 72be305d481..988eb6f3568 100644
--- a/storage/xtradb/row/row0merge.cc
+++ b/storage/xtradb/row/row0merge.cc
@@ -1624,7 +1624,8 @@ row_merge_read_clustered_index(
 		page_cur_t*	cur	= btr_pcur_get_page_cur(&pcur);
 
 		/* Do not continue if table pages are still encrypted */
-		if (old_table->is_encrypted || new_table->is_encrypted) {
+		if (old_table->file_unreadable ||
+		    new_table->file_unreadable) {
 			err = DB_DECRYPTION_FAILED;
 			trx->error_key_num = 0;
 			goto func_exit;
@@ -3694,7 +3695,7 @@ row_merge_rename_tables_dict(
 	table is in a non-system tablespace where space > 0. */
 	if (err == DB_SUCCESS
 	    && old_table->space != TRX_SYS_SPACE
-	    && !old_table->ibd_file_missing) {
+	    && fil_space_get(old_table->space) != NULL) {
 		/* Make pathname to update SYS_DATAFILES. */
 		char* tmp_path = row_make_new_pathname(old_table, tmp_name);
 
@@ -4067,13 +4068,15 @@ row_merge_build_indexes(
 	pct_cost = COST_READ_CLUSTERED_INDEX * 100 / (total_static_cost + total_dynamic_cost);
 
 	/* Do not continue if we can't encrypt table pages */
-	if (old_table->is_encrypted || new_table->is_encrypted) {
+	if (old_table->file_unreadable ||
+	    new_table->file_unreadable) {
 		error = DB_DECRYPTION_FAILED;
 		ib_push_warning(trx->mysql_thd, DB_DECRYPTION_FAILED,
 			"Table %s is encrypted but encryption service or"
 			" used key_id is not available. "
 			" Can't continue reading table.",
-			old_table->is_encrypted ? old_table->name : new_table->name);
+			old_table->file_unreadable ? old_table->name :
+			    new_table->name);
 		goto func_exit;
 	}
 
diff --git a/storage/xtradb/row/row0mysql.cc b/storage/xtradb/row/row0mysql.cc
index 7c8636d354f..440439fef80 100644
--- a/storage/xtradb/row/row0mysql.cc
+++ b/storage/xtradb/row/row0mysql.cc
@@ -1,7 +1,7 @@
 /*****************************************************************************
 
 Copyright (c) 2000, 2016, Oracle and/or its affiliates. All Rights Reserved.
-Copyright (c) 2017, MariaDB Corporation.
+Copyright (c) 2015, 2017, MariaDB Corporation.
 
 This program is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free Software
@@ -1308,14 +1308,15 @@ row_insert_for_mysql(
 
 		return(DB_TABLESPACE_DELETED);
 
-	} else if (prebuilt->table->ibd_file_missing) {
+	} else if (prebuilt->table->file_unreadable &&
+		   fil_space_get(prebuilt->table->space) == NULL) {
 
 		ib_logf(IB_LOG_LEVEL_ERROR,
 			".ibd file is missing for table %s",
 			prebuilt->table->name);
 
 		return(DB_TABLESPACE_NOT_FOUND);
-	} else if (prebuilt->table->is_encrypted) {
+	} else if (prebuilt->table->file_unreadable) {
 		ib_push_warning(trx, DB_DECRYPTION_FAILED,
 			"Table %s in tablespace %lu encrypted."
 			"However key management plugin or used key_id is not found or"
@@ -1712,7 +1713,8 @@ row_update_for_mysql(
 	ut_ad(trx != NULL);
 	UT_NOT_USED(mysql_rec);
 
-	if (prebuilt->table->ibd_file_missing) {
+	if (prebuilt->table->file_unreadable &&
+	    fil_space_get(prebuilt->table->space) == NULL) {
 		ut_print_timestamp(stderr);
 		fprintf(stderr, "  InnoDB: Error:\n"
 			"InnoDB: MySQL is trying to use a table handle"
@@ -1727,7 +1729,7 @@ row_update_for_mysql(
 			"InnoDB: how you can resolve the problem.\n",
 			prebuilt->table->name);
 		return(DB_ERROR);
-	} else if (prebuilt->table->is_encrypted) {
+	} else if (prebuilt->table->file_unreadable) {
 		ib_push_warning(trx, DB_DECRYPTION_FAILED,
 			"Table %s in tablespace %lu encrypted."
 			"However key management plugin or used key_id is not found or"
@@ -3128,7 +3130,7 @@ row_discard_tablespace(
 		/* All persistent operations successful, update the
 		data dictionary memory cache. */
 
-		table->ibd_file_missing = TRUE;
+		table->file_unreadable = true;
 
 		table->flags2 |= DICT_TF2_DISCARDED;
 
@@ -3185,7 +3187,8 @@ row_discard_tablespace_for_mysql(
 
 	if (table == 0) {
 		err = DB_TABLE_NOT_FOUND;
-	} else if (table->is_encrypted) {
+	} else if (table->file_unreadable &&
+		   fil_space_get(table->space) != NULL) {
 		err = DB_DECRYPTION_FAILED;
 	} else if (table->space == TRX_SYS_SPACE) {
 		char	table_name[MAX_FULL_NAME_LEN + 1];
@@ -3401,10 +3404,11 @@ row_truncate_table_for_mysql(
 
 	if (dict_table_is_discarded(table)) {
 		return(DB_TABLESPACE_DELETED);
-	} else if (table->is_encrypted) {
-		return(DB_DECRYPTION_FAILED);
-	} else if (table->ibd_file_missing) {
+	} else if (table->file_unreadable &&
+		   fil_space_get(table->space) == NULL) {
 		return(DB_TABLESPACE_NOT_FOUND);
+	} else if (table->file_unreadable) {
+		return(DB_DECRYPTION_FAILED);
 	}
 
 	trx_start_for_ddl(trx, TRX_DICT_OP_TABLE);
@@ -3570,7 +3574,7 @@ row_truncate_table_for_mysql(
 					"create a new tablespace",
 					table->name);
 
-				table->ibd_file_missing = 1;
+				table->file_unreadable = true;
 				err = DB_ERROR;
 				goto funct_exit;
 			}
@@ -3996,7 +4000,8 @@ row_drop_table_for_mysql(
 
 	/* If table is encrypted and table page encryption failed
 	return error. */
-	if (table->is_encrypted) {
+	if (table->file_unreadable &&
+	    fil_space_get(table->space) != NULL) {
 
 		if (table->can_be_evicted) {
 			dict_table_move_from_lru_to_non_lru(table);
@@ -4378,7 +4383,7 @@ row_drop_table_for_mysql(
 		from table->heap, which will be freed by
 		dict_table_remove_from_cache(table) below. */
 		space_id = table->space;
-		ibd_file_missing = table->ibd_file_missing;
+		ibd_file_missing = table->file_unreadable;
 
 		table_flags = table->flags;
 		is_temp = DICT_TF2_FLAG_IS_SET(table, DICT_TF2_TEMPORARY);
@@ -4389,7 +4394,7 @@ row_drop_table_for_mysql(
 		have a temp flag but not know the temp path */
 		ut_a(table->dir_path_of_temp_table == NULL || is_temp);
 		if (dict_table_is_discarded(table)
-		    || table->ibd_file_missing) {
+		    || table->file_unreadable) {
 			/* Do not attempt to drop known-to-be-missing
 			tablespaces. */
 			space_id = 0;
@@ -4801,7 +4806,7 @@ row_drop_database_for_mysql(
 					"'%s.frm' was lost.", table->name);
 			}
 
-			if (table->ibd_file_missing) {
+			if (table->file_unreadable) {
 				ib_logf(IB_LOG_LEVEL_WARN,
 					"Missing %s.ibd file for table %s.",
 					table->name, table->name);
@@ -5075,7 +5080,7 @@ row_rename_table_for_mysql(
 		      stderr);
 		goto funct_exit;
 
-	} else if (table->ibd_file_missing
+	} else if (table->file_unreadable
 		   && !dict_table_is_discarded(table)) {
 
 		err = DB_TABLE_NOT_FOUND;
@@ -5145,7 +5150,7 @@ row_rename_table_for_mysql(
 	which have space IDs > 0. */
 	if (err == DB_SUCCESS
 	    && table->space != TRX_SYS_SPACE
-	    && !table->ibd_file_missing) {
+	    && !table->file_unreadable) {
 		/* Make a new pathname to update SYS_DATAFILES. */
 		char*	new_path = row_make_new_pathname(table, new_name);
 
diff --git a/storage/xtradb/row/row0purge.cc b/storage/xtradb/row/row0purge.cc
index 35b3520749b..8a1dbd6f69f 100644
--- a/storage/xtradb/row/row0purge.cc
+++ b/storage/xtradb/row/row0purge.cc
@@ -777,7 +777,7 @@ row_purge_parse_undo_rec(
 		goto err_exit;
 	}
 
-	if (node->table->ibd_file_missing) {
+	if (node->table->file_unreadable) {
 		/* We skip purge of missing .ibd files */
 
 		dict_table_close(node->table, FALSE, FALSE);
diff --git a/storage/xtradb/row/row0sel.cc b/storage/xtradb/row/row0sel.cc
index 29569bb31e7..ed83dfd6d7a 100644
--- a/storage/xtradb/row/row0sel.cc
+++ b/storage/xtradb/row/row0sel.cc
@@ -62,6 +62,7 @@ Created 12/19/1997 Heikki Tuuri
 #include "srv0start.h"
 #include "m_string.h" /* for my_sys.h */
 #include "my_sys.h" /* DEBUG_SYNC_C */
+#include "fil0fil.h"
 
 #include "my_compare.h" /* enum icp_result */
 
@@ -3728,11 +3729,12 @@ row_search_for_mysql(
 
 		return(DB_TABLESPACE_DELETED);
 
-	} else if (prebuilt->table->ibd_file_missing) {
+	} else if (prebuilt->table->file_unreadable &&
+		fil_space_get(prebuilt->table->space) == NULL) {
 
 		return(DB_TABLESPACE_NOT_FOUND);
 
-	} else if (prebuilt->table->is_encrypted) {
+	} else if (prebuilt->table->file_unreadable) {
 
 		return(DB_DECRYPTION_FAILED);
 	} else if (!prebuilt->index_usable) {
@@ -4193,7 +4195,7 @@ row_search_for_mysql(
 					" used key_id is not available. "
 					" Can't continue reading table.",
 					prebuilt->table->name);
-				index->table->is_encrypted = true;
+				index->table->file_unreadable = true;
 			}
 			rec = NULL;
 			goto lock_wait_or_error;
diff --git a/storage/xtradb/row/row0uins.cc b/storage/xtradb/row/row0uins.cc
index 651042fb820..f14a4ef9bcf 100644
--- a/storage/xtradb/row/row0uins.cc
+++ b/storage/xtradb/row/row0uins.cc
@@ -320,7 +320,7 @@ row_undo_ins_parse_undo_rec(
 
 	/* Skip the UNDO if we can't find the table or the .ibd file. */
 	if (UNIV_UNLIKELY(node->table == NULL)) {
-	} else if (UNIV_UNLIKELY(node->table->ibd_file_missing)) {
+	} else if (UNIV_UNLIKELY(node->table->file_unreadable)) {
 close_table:
 		dict_table_close(node->table, dict_locked, FALSE);
 		node->table = NULL;
diff --git a/storage/xtradb/row/row0umod.cc b/storage/xtradb/row/row0umod.cc
index 19576d976bd..8deba4f00a5 100644
--- a/storage/xtradb/row/row0umod.cc
+++ b/storage/xtradb/row/row0umod.cc
@@ -1068,7 +1068,7 @@ row_undo_mod_parse_undo_rec(
 		return;
 	}
 
-	if (node->table->ibd_file_missing) {
+	if (node->table->file_unreadable) {
 		dict_table_close(node->table, dict_locked, FALSE);
 
 		/* We skip undo operations to missing .ibd files */
diff --git a/storage/xtradb/srv/srv0start.cc b/storage/xtradb/srv/srv0start.cc
index 5255a7454ea..9494fd21777 100644
--- a/storage/xtradb/srv/srv0start.cc
+++ b/storage/xtradb/srv/srv0start.cc
@@ -2530,7 +2530,12 @@ innobase_start_or_create_for_mysql(void)
 			respective file pages, for the last batch of
 			recv_group_scan_log_recs(). */
 
-			recv_apply_hashed_log_recs(true);
+			err = recv_apply_hashed_log_recs(true);
+
+			if (err != DB_SUCCESS) {
+				return (err);
+			}
+
 			DBUG_PRINT("ib_log", ("apply completed"));
 		}
 
diff --git a/storage/xtradb/trx/trx0trx.cc b/storage/xtradb/trx/trx0trx.cc
index 73c1172ff0f..dc4bba8f3a0 100644
--- a/storage/xtradb/trx/trx0trx.cc
+++ b/storage/xtradb/trx/trx0trx.cc
@@ -661,7 +661,7 @@ trx_resurrect_table_locks(
 	     i != tables.end(); i++) {
 		if (dict_table_t* table = dict_table_open_on_id(
 			    *i, FALSE, DICT_TABLE_OP_LOAD_TABLESPACE)) {
-			if (table->ibd_file_missing
+			if (table->file_unreadable
 			    || dict_table_is_temporary(table)) {
 				mutex_enter(&dict_sys->mutex);
 				dict_table_close(table, TRUE, FALSE);
diff --git a/storage/xtradb/ut/ut0ut.cc b/storage/xtradb/ut/ut0ut.cc
index acedb56879a..fd52537ae11 100644
--- a/storage/xtradb/ut/ut0ut.cc
+++ b/storage/xtradb/ut/ut0ut.cc
@@ -852,6 +852,8 @@ ut_strerr(
 		return("BLOB record length is greater than 10%% of redo log");
 	case DB_DECRYPTION_FAILED:
 		return("Table is encrypted but decrypt failed.");
+	case DB_PAGE_CORRUPTED:
+		return("Page read from tablespace is corrupted.");
 
 	/* do not add default: in order to produce a warning if new code
 	is added to the enum but not added here */



More information about the commits mailing list