diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index d7d1baa2730d27130ccfd6d9115078ab50ef5051..7db07970d585e9d1cefdfd7bc68796037ccc8569 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -5660,9 +5660,9 @@ dependencies = [
 
 [[package]]
 name = "parity-db"
-version = "0.3.13"
+version = "0.3.16"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "55a7901b85874402471e131de3332dde0e51f38432c69a3853627c8e25433048"
+checksum = "2bb474d0ed0836e185cb998a6b140ed1073d1fbf27d690ecf9ede8030289382c"
 dependencies = [
  "blake2-rfc",
  "crc32fast",
@@ -7228,6 +7228,7 @@ dependencies = [
  "sp-transaction-pool",
  "sp-trie",
  "substrate-prometheus-endpoint",
+ "tempfile",
  "thiserror",
  "tracing-gum",
  "westend-runtime",
diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml
index 59bf3b5204a8b98f0b9f582cf9ad7de99ef3935b..f12f4568e27bb3ce2a1d066fa18a816009358bbd 100644
--- a/polkadot/node/service/Cargo.toml
+++ b/polkadot/node/service/Cargo.toml
@@ -69,7 +69,7 @@ serde_json = "1.0.81"
 thiserror = "1.0.31"
 kvdb = "0.11.0"
 kvdb-rocksdb = { version = "0.15.2", optional = true }
-parity-db = { version = "0.3.13", optional = true }
+parity-db = { version = "0.3.16", optional = true }
 async-trait = "0.1.53"
 lru = "0.7"
 
@@ -128,6 +128,7 @@ polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
 env_logger = "0.9.0"
 log = "0.4.17"
 assert_matches = "1.5.0"
+tempfile = "3.2"
 
 [features]
 default = ["db", "full-node", "polkadot-native"]
diff --git a/polkadot/node/service/src/parachains_db/mod.rs b/polkadot/node/service/src/parachains_db/mod.rs
index f6f6864a0e78a170b776a0dfed2394aa124d9997..de12a8ac1a3247e8fe6ecf641479f543a334bb18 100644
--- a/polkadot/node/service/src/parachains_db/mod.rs
+++ b/polkadot/node/service/src/parachains_db/mod.rs
@@ -21,20 +21,25 @@ use {
 #[cfg(feature = "full-node")]
 mod upgrade;
 
+const LOG_TARGET: &str = "parachain::db";
+
 #[cfg(any(test, feature = "full-node"))]
 pub(crate) mod columns {
 	pub mod v0 {
 		pub const NUM_COLUMNS: u32 = 3;
 	}
-	pub const NUM_COLUMNS: u32 = 5;
-
-	pub const COL_AVAILABILITY_DATA: u32 = 0;
-	pub const COL_AVAILABILITY_META: u32 = 1;
-	pub const COL_APPROVAL_DATA: u32 = 2;
-	pub const COL_CHAIN_SELECTION_DATA: u32 = 3;
-	pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4;
-	pub const ORDERED_COL: &[u32] =
-		&[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA];
+
+	pub mod v1 {
+		pub const NUM_COLUMNS: u32 = 5;
+
+		pub const COL_AVAILABILITY_DATA: u32 = 0;
+		pub const COL_AVAILABILITY_META: u32 = 1;
+		pub const COL_APPROVAL_DATA: u32 = 2;
+		pub const COL_CHAIN_SELECTION_DATA: u32 = 3;
+		pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4;
+		pub const ORDERED_COL: &[u32] =
+			&[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA];
+	}
 }
 
 /// Columns used by different subsystems.
@@ -56,13 +61,19 @@ pub struct ColumnsConfig {
 /// The real columns used by the parachains DB.
 #[cfg(any(test, feature = "full-node"))]
 pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig {
-	col_availability_data: columns::COL_AVAILABILITY_DATA,
-	col_availability_meta: columns::COL_AVAILABILITY_META,
-	col_approval_data: columns::COL_APPROVAL_DATA,
-	col_chain_selection_data: columns::COL_CHAIN_SELECTION_DATA,
-	col_dispute_coordinator_data: columns::COL_DISPUTE_COORDINATOR_DATA,
+	col_availability_data: columns::v1::COL_AVAILABILITY_DATA,
+	col_availability_meta: columns::v1::COL_AVAILABILITY_META,
+	col_approval_data: columns::v1::COL_APPROVAL_DATA,
+	col_chain_selection_data: columns::v1::COL_CHAIN_SELECTION_DATA,
+	col_dispute_coordinator_data: columns::v1::COL_DISPUTE_COORDINATOR_DATA,
 };
 
+#[derive(PartialEq)]
+pub(crate) enum DatabaseKind {
+	ParityDB,
+	RocksDB,
+}
+
 /// The cache size for each column, in megabytes.
 #[derive(Debug, Clone)]
 pub struct CacheSizes {
@@ -95,27 +106,29 @@ pub fn open_creating_rocksdb(
 
 	let path = root.join("parachains").join("db");
 
-	let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS);
+	let mut db_config = DatabaseConfig::with_columns(columns::v1::NUM_COLUMNS);
 
 	let _ = db_config
 		.memory_budget
-		.insert(columns::COL_AVAILABILITY_DATA, cache_sizes.availability_data);
+		.insert(columns::v1::COL_AVAILABILITY_DATA, cache_sizes.availability_data);
 	let _ = db_config
 		.memory_budget
-		.insert(columns::COL_AVAILABILITY_META, cache_sizes.availability_meta);
+		.insert(columns::v1::COL_AVAILABILITY_META, cache_sizes.availability_meta);
 	let _ = db_config
 		.memory_budget
-		.insert(columns::COL_APPROVAL_DATA, cache_sizes.approval_data);
+		.insert(columns::v1::COL_APPROVAL_DATA, cache_sizes.approval_data);
 
 	let path_str = path
 		.to_str()
 		.ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?;
 
 	std::fs::create_dir_all(&path_str)?;
-	upgrade::try_upgrade_db(&path)?;
+	upgrade::try_upgrade_db(&path, DatabaseKind::RocksDB)?;
 	let db = Database::open(&db_config, &path_str)?;
-	let db =
-		polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, columns::ORDERED_COL);
+	let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(
+		db,
+		columns::v1::ORDERED_COL,
+	);
 
 	Ok(Arc::new(db))
 }
@@ -132,18 +145,14 @@ pub fn open_creating_paritydb(
 		.ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?;
 
 	std::fs::create_dir_all(&path_str)?;
+	upgrade::try_upgrade_db(&path, DatabaseKind::ParityDB)?;
 
-	let mut options = parity_db::Options::with_columns(&path, columns::NUM_COLUMNS as u8);
-	for i in columns::ORDERED_COL {
-		options.columns[*i as usize].btree_index = true;
-	}
-
-	let db = parity_db::Db::open_or_create(&options)
+	let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_1_config(&path))
 		.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;
 
 	let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new(
 		db,
-		columns::ORDERED_COL,
+		columns::v1::ORDERED_COL,
 	);
 	Ok(Arc::new(db))
 }
diff --git a/polkadot/node/service/src/parachains_db/upgrade.rs b/polkadot/node/service/src/parachains_db/upgrade.rs
index 0ba84103885fbc1f7fafb2effb5244ab7b4a4f8d..ad995f41ed82a584618ffb0d119175f6b30c9a3a 100644
--- a/polkadot/node/service/src/parachains_db/upgrade.rs
+++ b/polkadot/node/service/src/parachains_db/upgrade.rs
@@ -15,6 +15,7 @@
 
 #![cfg(feature = "full-node")]
 
+use super::{columns, other_io_error, DatabaseKind, LOG_TARGET};
 use std::{
 	fs, io,
 	path::{Path, PathBuf},
@@ -49,13 +50,23 @@ impl From<Error> for io::Error {
 }
 
 /// Try upgrading parachain's database to the current version.
-pub fn try_upgrade_db(db_path: &Path) -> Result<(), Error> {
+pub(crate) fn try_upgrade_db(db_path: &Path, db_kind: DatabaseKind) -> Result<(), Error> {
 	let is_empty = db_path.read_dir().map_or(true, |mut d| d.next().is_none());
 	if !is_empty {
-		match current_version(db_path)? {
-			0 => migrate_from_version_0_to_1(db_path)?,
-			CURRENT_VERSION => (),
-			v => return Err(Error::FutureVersion { current: CURRENT_VERSION, got: v }),
+		match get_db_version(db_path)? {
+			// 0 -> 1 migration
+			Some(0) => migrate_from_version_0_to_1(db_path, db_kind)?,
+			// Already at current version, do nothing.
+			Some(CURRENT_VERSION) => (),
+			// This is an arbitrary future version, we don't handle it.
+			Some(v) => return Err(Error::FutureVersion { current: CURRENT_VERSION, got: v }),
+			// No version file. For `RocksDB` we dont need to do anything.
+			None if db_kind == DatabaseKind::RocksDB => (),
+			// No version file. `ParityDB` did not previously have a version defined.
+			// We handle this as a `0 -> 1` migration.
+			None if db_kind == DatabaseKind::ParityDB =>
+				migrate_from_version_0_to_1(db_path, db_kind)?,
+			None => unreachable!(),
 		}
 	}
 
@@ -63,12 +74,14 @@ pub fn try_upgrade_db(db_path: &Path) -> Result<(), Error> {
 }
 
 /// Reads current database version from the file at given path.
-/// If the file does not exist, assumes the current version.
-fn current_version(path: &Path) -> Result<Version, Error> {
+/// If the file does not exist returns `None`, otherwise the version stored in the file.
+fn get_db_version(path: &Path) -> Result<Option<Version>, Error> {
 	match fs::read_to_string(version_file_path(path)) {
-		Err(ref err) if err.kind() == io::ErrorKind::NotFound => Ok(CURRENT_VERSION),
+		Err(ref err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
 		Err(err) => Err(err.into()),
-		Ok(content) => u32::from_str(&content).map_err(|_| Error::CorruptedVersionFile),
+		Ok(content) => u32::from_str(&content)
+			.map(|v| Some(v))
+			.map_err(|_| Error::CorruptedVersionFile),
 	}
 }
 
@@ -86,9 +99,22 @@ fn version_file_path(path: &Path) -> PathBuf {
 	file_path
 }
 
+fn migrate_from_version_0_to_1(path: &Path, db_kind: DatabaseKind) -> Result<(), Error> {
+	gum::info!(target: LOG_TARGET, "Migrating parachains db from version 0 to version 1 ...");
+
+	match db_kind {
+		DatabaseKind::ParityDB => paritydb_migrate_from_version_0_to_1(path),
+		DatabaseKind::RocksDB => rocksdb_migrate_from_version_0_to_1(path),
+	}
+	.and_then(|result| {
+		gum::info!(target: LOG_TARGET, "Migration complete! ");
+		Ok(result)
+	})
+}
+
 /// Migration from version 0 to version 1:
 /// * the number of columns has changed from 3 to 5;
-fn migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
+fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
 	use kvdb_rocksdb::{Database, DatabaseConfig};
 
 	let db_path = path
@@ -102,3 +128,131 @@ fn migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
 
 	Ok(())
 }
+
+// This currently clears columns which had their configs altered between versions.
+// The columns to be changed are constrained by the `allowed_columns` vector.
+fn paritydb_fix_columns(
+	path: &Path,
+	options: parity_db::Options,
+	allowed_columns: Vec<u32>,
+) -> io::Result<()> {
+	// Figure out which columns to delete. This will be determined by inspecting
+	// the metadata file.
+	if let Some(metadata) = parity_db::Options::load_metadata(&path)
+		.map_err(|e| other_io_error(format!("Error reading metadata {:?}", e)))?
+	{
+		let columns_to_clear = metadata
+			.columns
+			.into_iter()
+			.enumerate()
+			.filter(|(idx, _)| allowed_columns.contains(&(*idx as u32)))
+			.filter_map(|(idx, opts)| {
+				let changed = opts != options.columns[idx];
+				if changed {
+					gum::debug!(
+						target: LOG_TARGET,
+						"Column {} will be cleared. Old options: {:?}, New options: {:?}",
+						idx,
+						opts,
+						options.columns[idx]
+					);
+					Some(idx)
+				} else {
+					None
+				}
+			})
+			.collect::<Vec<_>>();
+
+		if columns_to_clear.len() > 0 {
+			gum::debug!(
+				target: LOG_TARGET,
+				"Database column changes detected, need to cleanup {} columns.",
+				columns_to_clear.len()
+			);
+		}
+
+		for column in columns_to_clear {
+			gum::debug!(target: LOG_TARGET, "Clearing column {}", column,);
+			parity_db::clear_column(path, column.try_into().expect("Invalid column ID"))
+				.map_err(|e| other_io_error(format!("Error clearing column {:?}", e)))?;
+		}
+
+		// Write the updated column options.
+		options
+			.write_metadata(path, &metadata.salt)
+			.map_err(|e| other_io_error(format!("Error writing metadata {:?}", e)))?;
+	}
+
+	Ok(())
+}
+
+/// Database configuration for version 1.
+pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options {
+	let mut options =
+		parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8);
+	for i in columns::v1::ORDERED_COL {
+		options.columns[*i as usize].btree_index = true;
+	}
+
+	options
+}
+
+/// Database configuration for version 0. This is useful just for testing.
+#[cfg(test)]
+pub(crate) fn paritydb_version_0_config(path: &Path) -> parity_db::Options {
+	let mut options =
+		parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8);
+	options.columns[super::columns::v1::COL_AVAILABILITY_META as usize].btree_index = true;
+	options.columns[super::columns::v1::COL_CHAIN_SELECTION_DATA as usize].btree_index = true;
+
+	options
+}
+
+/// Migration from version 0 to version 1.
+/// Cases covered:
+/// - upgrading from v0.9.23 or earlier -> the `dispute coordinator column` was changed
+/// - upgrading from v0.9.24+ -> this is a no op assuming the DB has been manually fixed as per
+/// release notes
+fn paritydb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
+	// Delete the `dispute coordinator` column if needed (if column configuration is changed).
+	paritydb_fix_columns(
+		path,
+		paritydb_version_1_config(path),
+		vec![super::columns::v1::COL_DISPUTE_COORDINATOR_DATA],
+	)?;
+
+	Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+	#[test]
+	fn test_paritydb_migrate_0_1() {
+		use super::{columns::v1::*, *};
+		use parity_db::Db;
+
+		let db_dir = tempfile::tempdir().unwrap();
+		let path = db_dir.path();
+		{
+			let db = Db::open_or_create(&paritydb_version_0_config(&path)).unwrap();
+
+			db.commit(vec![
+				(COL_DISPUTE_COORDINATOR_DATA as u8, b"1234".to_vec(), Some(b"somevalue".to_vec())),
+				(COL_AVAILABILITY_META as u8, b"5678".to_vec(), Some(b"somevalue".to_vec())),
+			])
+			.unwrap();
+		}
+
+		try_upgrade_db(&path, DatabaseKind::ParityDB).unwrap();
+
+		let db = Db::open(&paritydb_version_1_config(&path)).unwrap();
+		assert_eq!(
+			db.get(super::columns::v1::COL_DISPUTE_COORDINATOR_DATA as u8, b"1234").unwrap(),
+			None
+		);
+		assert_eq!(
+			db.get(super::columns::v1::COL_AVAILABILITY_META as u8, b"5678").unwrap(),
+			Some("somevalue".as_bytes().to_vec())
+		);
+	}
+}