#!/usr/bin/perl use strict; use warnings; use Data::Dumper; use DBI; my %agreements = ( "replica.dba.example.org" => { # host name of where the script is running "database1" => { # database name which you wish to check primary => "primary.dba.example.org", # master server (at the top of the tree) - this is where the updates will happen primaryusr => "postgres", primarypw => "superpw", primaryindex => "after", # before/after/both (or empty for none) - will reindex the table before or after etc the replication checks primaryvacuum => "after", # before/after/both (or empty for none) - will vacuum analyse the table before or after etc the replication checks master => "replicahub.dba.example.org", # the intermediary host (if any) you want to run the compar against (if no intermediary it should be the same as the primary) masterusr => "postgres", masterpw => "superpw", masterindex => "", # same as primaryindex, but for the intermediary host - if the master is set to the same as the primary, you should set one not the other or the reindex will run twice mastervacuum => "", # same as primaryvacuum, but for the intermediary host - if the master is set to the same as the primary, you should set one not the other or the vacuum analyse will run twice. slave => "replica.dba.example.org", # this is the slave hostname (what you are checking for inconsistancies on) and is usually this host for maximum performance slaveusr => "pgsql", slavepw => "", slaveindex => "", # same as primaryindex, but for the slave host - if the master is set to the same as the primary, you should set one not the other or the reindex will run twice slavevacuum => "", # same as primaryvacuum, but for the slave host - if the master is set to the same as the primary, you should set one not the other or the vacuum analyse will run twice. }, "database2" => { # second database name to check (if any) - has its own config block similar to above - you can add as many blocks as you wish primary => "primary.dbb.example.org", primaryusr => "pgsql", primarypw => "superpw2", primaryindex => "", primaryvacuum => "", master => "replicahub2.dbb.example.org", masterusr => "pgsql", masterpw => "superpw2", masterindex => "before", mastervacuum => "after", slave => "replica.dba.example.org", slaveusr => "pgsql", slavepw => "", slaveindex => "both", slavevacuum => "both", }, }, "replicahub2.dbb.example.org" => { # configuration for another host where you might run this script (running across the internet it's a good idea to run against each 'local' database server "database2" => { primary => "primary.dbb.example.org", primaryusr => "pgsql", primarypw => "superpw2", primaryindex => "", primaryvacuum => "", master => "primary.dbb.example.org", masterusr => "pgsql", masterpw => "superpw2", masterindex => "before", mastervacuum => "after", slave => "replicahub2.dbb.example.org", slaveusr => "pgsql", slavepw => "superpw2", slaveindex => "before", slavevacuum => "both", }, }, "replica.dbremote.example.org" => { "database2" => { primary => "primary.dbb.example.org", primaryusr => "pgsql", primarypw => "superpw2", primaryindex => "", primaryvacuum => "", master => "replicahub2.dbb.example.org", masterusr => "pgsql", masterpw => "superpw2", masterindex => "", mastervacuum => "", slave => "replica.dbremote.example.org", slaveusr => "pgsql", slavepw => "superpw2a", slaveindex => "after", slavevacuum => "both", tables => [ "table1", "table2", "table3", "table4" ], # override list of tables to check (used if you have a slave with just a subset of tables on the master }, }, ); my $host = `hostname`; chomp($host); if (!exists $agreements{$host}) { die("Cannot locate $host (this host) in the configuration parameters...\n"); } for my $database (sort keys %{$agreements{$host}}) { my ($pvab, $pvaa, $mvab, $mvaa, $svab, $svaa) = (0, 0, 0, 0, 0, 0); my ($prib, $pria, $mrib, $mria, $srib, $sria) = (0, 0, 0, 0, 0, 0); my $primary = $agreements{$host}->{$database}->{primary}; my $master = $agreements{$host}->{$database}->{master}; my $slave = $agreements{$host}->{$database}->{slave}; my $pdsn = "dbi:Pg:host=$primary;database=$database"; my $mdsn = "dbi:Pg:host=$master;database=$database"; #;port=2345"; my $sdsn = "dbi:Pg:host=$slave;database=$database"; my $pdbh = DBI->connect($pdsn, $agreements{$host}->{$database}->{primaryusr}, $agreements{$host}->{$database}->{primarypw}) or die ("PRIMARY: Cannot connect using: $pdsn"); my $mdbh = DBI->connect($mdsn, $agreements{$host}->{$database}->{masterusr}, $agreements{$host}->{$database}->{masterpw}) or die ("MASTER: Cannot connect using: $mdsn"); my $sdbh = DBI->connect($sdsn, $agreements{$host}->{$database}->{slaveusr}, $agreements{$host}->{$database}->{slavepw}) or die ("SLAVE: Cannot connect using: $sdsn"); $pvab++ if ($agreements{$host}->{$database}->{primaryvacuum} eq "both" || $agreements{$host}->{$database}->{primaryvacuum} eq "before"); $pvaa++ if ($agreements{$host}->{$database}->{primaryvacuum} eq "both" || $agreements{$host}->{$database}->{primaryvacuum} eq "after" ); $prib++ if ($agreements{$host}->{$database}->{primaryindex} eq "both" || $agreements{$host}->{$database}->{primaryindex} eq "before"); $pria++ if ($agreements{$host}->{$database}->{primaryindex} eq "both" || $agreements{$host}->{$database}->{primaryindex} eq "after" ); $mvab++ if ($agreements{$host}->{$database}->{mastervacuum} eq "both" || $agreements{$host}->{$database}->{mastervacuum} eq "before"); $mvaa++ if ($agreements{$host}->{$database}->{mastervacuum} eq "both" || $agreements{$host}->{$database}->{mastervacuum} eq "after" ); $mrib++ if ($agreements{$host}->{$database}->{masterindex} eq "both" || $agreements{$host}->{$database}->{masterindex} eq "before"); $mria++ if ($agreements{$host}->{$database}->{masterindex} eq "both" || $agreements{$host}->{$database}->{masterindex} eq "after" ); $svab++ if ($agreements{$host}->{$database}->{slavevacuum} eq "both" || $agreements{$host}->{$database}->{slavevacuum} eq "before"); $svaa++ if ($agreements{$host}->{$database}->{slavevacuum} eq "both" || $agreements{$host}->{$database}->{slavevacuum} eq "after" ); $srib++ if ($agreements{$host}->{$database}->{slaveindex} eq "both" || $agreements{$host}->{$database}->{slaveindex} eq "before"); $sria++ if ($agreements{$host}->{$database}->{slaveindex} eq "both" || $agreements{$host}->{$database}->{slaveindex} eq "after" ); my $schema = "public"; my $type = "TABLE"; my $tmp = "/alternate/tmp/path"; # set and create this if you want to use a disk rather than /tmp (useful for large table dumps) if (! -w $tmp) { $tmp = "/tmp"; } my ($msth, $ssth); # first get the table list from both DBs $msth = $mdbh->table_info(undef, $schema, undef, $type); $ssth = $sdbh->table_info(undef, $schema, undef, $type); my %mtable = (); MTLOOP: for my $rel (@{$msth->fetchall_arrayref({})}) { if (exists $agreements{$host}->{$database}->{tables}) { # if we have a tables list, skip any tables that are not in the list my $f = 0; foreach (@{$agreements{$host}->{$database}->{tables}}) { $f++ if ($rel->{TABLE_NAME} eq$_); } next MTLOOP if (!$f); } #print Dumper $rel; $mtable{$rel->{TABLE_NAME}} = 0; } STLOOP: for my $rel (@{$ssth->fetchall_arrayref({})}) { if (exists $agreements{$host}->{$database}->{tables}) { # if we have a tables list, skip any tables that are not in the list my $f = 0; foreach (@{$agreements{$host}->{$database}->{tables}}) { $f++ if ($rel->{TABLE_NAME} eq$_); } next STLOOP if (!$f); } if (exists $mtable{$rel->{TABLE_NAME}}) { $mtable{$rel->{TABLE_NAME}}++; } else { #$mtable{$rel->{TABLE_NAME}} = 0; warn("Table $$rel{TABLE_NAME} does not exist on the master!!\n"); } } my %tables = (); $|=1; print "Checking primary keys for "; foreach my $t (sort keys %mtable) { if (! $mtable{$t}) { print "\n"; warn("Table $t exists on the master but not the slave!!\n"); print "Continuing key check... "; } else { print "$t "; my @mtables = $mdbh->primary_key(undef, $schema, $t); my %stables = map { $_ => 1 } $sdbh->primary_key(undef, $schema, $t); # warn("Checking $t (" . join(", ", @mtables) . ")"); if (! scalar @mtables) { print "skipping $t (no primary key!) "; next; } foreach my $ck (@mtables) { if (!exists $stables{$ck}) { die("Primary Keys for $t do not match!!! (Master DB ($master) has column $ck..) ABORTING!!"); } else { delete $stables{$ck}; } } if (scalar keys %stables) { die("Primary Keys for $t do not match!!! (Slave DB ($slave) has column(s) " . join(", ", keys %stables) . ".. ABORTING!!"); } # This is something you can enable/configure if you want specific tables to include specific columns that are not part of the primary key - ie if you want to check non-primary key data for inconsistancies #if ($t eq "table2" or $t eq "table1") #{ # push(@mtables, "col3"); # push(@mtables, $t eq "table2" ? "col1" : "cola"); # push(@mtables, "col4"); #} elsif ($t eq "table3") { # push(@mtables, "col2"); # push(@mtables, "col4"); #} # in future vations it might be included as optional config blocks. $tables{$t} = \@mtables; } } print "\n"; # the following will get a list of triggers that are not replication related. # SELECT t.tgname, pg_catalog.pg_get_triggerdef(t.oid), t.tgenabled FROM pg_catalog.pg_trigger t WHERE t.tgconstraint = 0 AND t.tgrelid IN (SELECT c.oid FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c.relname ~ '^(networks)$' AND pg_catalog.pg_table_is_visible(c.oid)) AND t.tgname NOT ILIKE 'bucardo%' ORDER BY 1 my $trigsql = "SELECT t.tgname, pg_catalog.pg_get_triggerdef(t.oid), t.tgenabled FROM pg_catalog.pg_trigger t WHERE t.tgconstraint = 0 AND t.tgrelid IN (SELECT c.oid FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c.relname ~ ? AND pg_catalog.pg_table_is_visible(c.oid)) AND t.tgname NOT ILIKE 'bucardo%' ORDER BY 1;"; my $trig_sth = $pdbh->prepare($trigsql) or die ("Cannot prepare trigger finding statement: $!"); foreach my $t (sort num_last_sort keys %tables) { # ok lets start checking the tables. my $c = 0; my $r = 0; my $ttname = ""; $host =~ /^([^\.]+)\..+$/ && ($ttname = "repl_sync_$1_$t"); if ($pvab) { print "Running VACUUM ANALYSE $t on $primary..."; $pdbh->do("VACUUM ANALYSE $t;"); print "Done!\n"; } if ($mvab) { print "Running VACUUM ANALYSE $t on $master..."; $mdbh->do("VACUUM ANALYSE $t;"); print "Done!\n"; } if ($svab) { print "Running VACUUM ANALYSE $t on $slave..."; $sdbh->do("VACUUM ANALYSE $t;"); print "Done!\n"; } if ($prib) { print "Running REINDEX TABLE $t on $primary..."; $pdbh->do("REINDEX TABLE $t;"); print "Done!\n"; } if ($mrib) { print "Running REINDEX TABLE $t on $master..."; $mdbh->do("REINDEX TABLE $t;"); print "Done!\n"; } if ($srib) { print "Running REINDEX TABLE $t on $slave..."; $sdbh->do("REINDEX TABLE $t;"); print "Done!\n"; } print "Checking $t (keys: " . join(", ", @{$tables{$t}}) . ").."; open FILE, ">$tmp/$t-keys.txt" or die("Unable to open key file: $!"); $mdbh->do("COPY $t(" . join(", ", @{$tables{$t}}) . ") TO STDOUT;"); do { my $row = undef; $c++; $r = $mdbh->pg_getcopydata($row); print FILE $row; if (! ($c % 10000)) { print " $c .."; } } while ($r >= 0); print " $c rows..\n"; close FILE; $trig_sth->execute($t) or die "Cannot get trigger list for $t!! $!"; my $disable_triggers = $trig_sth->rows(); open FILE, "<$tmp/$t-keys.txt" or die "Cannot open input file!! $!"; my $csql = "SELECT 1 FROM $t WHERE (" . join(", ", @{$tables{$t}}) . ") IN ((?" . (", ?" x ((scalar @{$tables{$t}}) - 1)) . "));"; my $csth = $sdbh->prepare($csql) or die ("Could not prepare the comparison statement for $t! $!"); my $usql = "UPDATE $t SET"; foreach my $col (@{$tables{$t}}) { $usql .= " $col = $col,"; } chop($usql); # Get rid of the trailing , $usql .= " WHERE (" . join(", ", @{$tables{$t}}) . ") IN (SELECT * FROM $ttname ORDER BY " . ${$tables{$t}}[0] . " LIMIT 1000);"; my $dsql = "DELETE FROM $ttname WHERE (" . join(",", @{$tables{$t}}) . ") IN (SELECT * FROM $ttname ORDER BY " . ${$tables{$t}}[0] . " LIMIT 1000);"; $c = 0; my $mod = 0; my $mods = 0; warn("Compare is using: $csql\n"); print "Comparing.."; open UFILE, ">$tmp/$t-update_keys.txt" or die("Unable to open Update Key File: $!"); while () { chomp; my @params = split("\t", $_); #push(@params, "") if ($t eq "hosts" and $params[0] eq 4622); $csth->execute(@params) or die("Could not execute comparison statement on $t! $!"); $c++; if (!$csth->rows) { print " " if (!$mod); print UFILE $_ . "\n"; # write keys to update file... we add the [LF] because we removed it at the beginning of this loop.. $mod++; if (! ($mod % 100)) { print "*"; } } if (! ($c % 10000)) { print "* [$mod] .." if ($mod); print " $c .."; $mods += $mod; $mod = 0; } } $mods += $mod; close FILE; unlink("$tmp/$t-keys.txt"); print " $c .. completed update list [$mods]...\n"; close UFILE; if ($mods > 0) { # Blind drop as we don't want to use anything left over by a previous process... $pdbh->do("DROP TABLE IF EXISTS $ttname;"); # This is a cheat - selecting with a limit of 0 (zero) creates the table with no rows $pdbh->do("SELECT " . join(",", @{$tables{$t}}) . " INTO $ttname FROM $t LIMIT 0;"); # Now we copy everything from the update file into the temp table (no indexes so it should be fast!) $pdbh->do("COPY $ttname FROM STDIN;"); $c = 0; open FILE, "<$tmp/$t-update_keys.txt" or die "Cannot open input file!! $!"; print "Uploading primary keys to temporary table $ttname..."; while () { $c++; $pdbh->pg_putcopydata($_); if (! ($c % 10000)) { print " $c .."; } } $pdbh->pg_putcopyend(); print " $c rows..\n"; # Now create a primary key - this will implicitly create indexes (another cheat) this is so we can run a sort $pdbh->do("ALTER TABLE $ttname ADD PRIMARY KEY (". join(",", @{$tables{$t}}) . ");"); my $usth = $pdbh->prepare($usql) or die ("Could not prepare the update statement for $t! $!"); my $dsth = $pdbh->prepare($dsql) or die ("Could not prepare the delete statement for $ttname [$t]! $!"); warn("Updating $mods rows...\n"); $c = 0; $mod = 0; warn("Update is using: $usql;\n"); warn("Deleting from the temp table using: $dsql\n"); if ($disable_triggers) { warn("Triggers for other than replication found, will have to disable them before the update!\n"); } else { warn("Only Bucardo triggers found, so we don't need to use transactions or trigger disabling routines!\n"); } print "Updating $t in chunks of 1000 rows... 0 .. "; $pdbh->do("BEGIN;") if ($disable_triggers); do { if (!$mod) { $pdbh->do("ALTER TABLE $t DISABLE TRIGGER ALL;") if ($disable_triggers); $pdbh->do("ALTER TABLE $t ENABLE TRIGGER bucardo_add_delta;") if ($disable_triggers); } my $changes = $usth->execute(); $mod += $changes; last if (!$changes); $c += $dsth->execute(); print "*"; if (! ($c % 10000)) { $pdbh->do("ALTER TABLE $t ENABLE TRIGGER ALL;") if ($disable_triggers); $pdbh->do("COMMIT;") if ($disable_triggers); print " *[$mod]*" if ($mod); print " $c .."; $mdbh->do("BEGIN;") if ($disable_triggers); $mod = 0; } } until (0); close FILE; $pdbh->do("ALTER TABLE $t ENABLE TRIGGER ALL;") if ($disable_triggers); $pdbh->do("COMMIT;") if ($disable_triggers); print " $c .. completed $t.\n"; $pdbh->do("DROP TABLE $ttname;"); print "DROPped temporary table '$ttname'...\n"; } else { warn("No rows to modify in $t...\n"); } if ($pvaa) { print "Running VACUUM ANALYSE $t on $primary..."; $pdbh->do("VACUUM ANALYSE $t;"); print "Done!\n"; } if ($mvaa) { print "Running VACUUM ANALYSE $t on $master..."; $mdbh->do("VACUUM ANALYSE $t;"); print "Done!\n"; } if ($svaa) { print "Running VACUUM ANALYSE $t on $slave..."; $sdbh->do("VACUUM ANALYSE $t;"); print "Done!\n"; } if ($pria) { print "Running REINDEX TABLE $t on $primary..."; $pdbh->do("REINDEX TABLE $t;"); print "Done!\n"; } if ($mria) { print "Running REINDEX TABLE $t on $master..."; $mdbh->do("REINDEX TABLE $t;"); print "Done!\n"; } if ($sria) { print "Running REINDEX TABLE $t on $slave..."; $sdbh->do("REINDEX TABLE $t;"); print "Done!\n"; } unlink("$tmp/$t-update_keys.txt"); } warn("Completed resync between $master\[$database] and $slave\[$database]\n"); } warn("Completed all resyncs... Exitting!!\n"); sub num_last_sort { return 1 if ($a =~ /(\d+)/); return -1 if ($b =~ /(\d+)/); }