aboutsummaryrefslogtreecommitdiffstats
path: root/imapsync
diff options
context:
space:
mode:
Diffstat (limited to 'imapsync')
-rwxr-xr-ximapsync318
1 files changed, 208 insertions, 110 deletions
diff --git a/imapsync b/imapsync
index 4cbc503..657050d 100755
--- a/imapsync
+++ b/imapsync
@@ -121,15 +121,15 @@ $DBH->do('PRAGMA foreign_keys = ON');
local => [
q{idx INTEGER NOT NULL PRIMARY KEY REFERENCES mailboxes(idx)},
q{UIDVALIDITY UNSIGNED INT NOT NULL CHECK (UIDVALIDITY > 0)},
- q{UIDNEXT UNSIGNED INT NOT NULL CHECK (UIDNEXT > 0)},
- q{HIGHESTMODSEQ UNSIGNED BIGINT NOT NULL CHECK (HIGHESTMODSEQ > 0)}
+ q{UIDNEXT UNSIGNED INT NOT NULL}, # 0 initially
+ q{HIGHESTMODSEQ UNSIGNED BIGINT NOT NULL} # 0 initially
# one-to-one correspondence between local.idx and remote.idx
],
remote => [
q{idx INTEGER NOT NULL PRIMARY KEY REFERENCES mailboxes(idx)},
q{UIDVALIDITY UNSIGNED INT NOT NULL CHECK (UIDVALIDITY > 0)},
- q{UIDNEXT UNSIGNED INT NOT NULL CHECK (UIDNEXT > 0)},
- q{HIGHESTMODSEQ UNSIGNED BIGINT NOT NULL CHECK (HIGHESTMODSEQ > 0)}
+ q{UIDNEXT UNSIGNED INT NOT NULL}, # 0 initially
+ q{HIGHESTMODSEQ UNSIGNED BIGINT NOT NULL} # 0 initially
# one-to-one correspondence between local.idx and remote.idx
],
mapping => [
@@ -138,7 +138,7 @@ $DBH->do('PRAGMA foreign_keys = ON');
q{rUID UNSIGNED INT NOT NULL CHECK (rUID > 0)},
q{PRIMARY KEY (idx,lUID)},
q{UNIQUE (idx,rUID)}
- # also, lUID < local.UIDNEXT and rUID < remote.UIDNEXT
+ # also, lUID < local.UIDNEXT and rUID < remote.UIDNEXT (except for interrupted syncs)
# mapping.idx must be found among local.idx (and remote.idx)
],
);
@@ -544,29 +544,30 @@ my $STH_UPDATE_LOCAL = $DBH->prepare(q{UPDATE local SET UIDNEXT = ?, HIGHESTMO
my $STH_UPDATE_REMOTE = $DBH->prepare(q{UPDATE remote SET UIDNEXT = ?, HIGHESTMODSEQ = ? WHERE idx = ?});
# Add a new mailbox.
-my $STH_NEWMAILBOX = $DBH->prepare(q{INSERT INTO mailboxes (mailbox,subscribed) VALUES (?,?)});
-my $STH_INSERT_LOCAL = $DBH->prepare(q{INSERT INTO local (idx,UIDVALIDITY,UIDNEXT,HIGHESTMODSEQ) VALUES (?,?,?,?)});
-my $STH_INSERT_REMOTE = $DBH->prepare(q{INSERT INTO remote (idx,UIDVALIDITY,UIDNEXT,HIGHESTMODSEQ) VALUES (?,?,?,?)});
+my $STH_INSERT_MAILBOX= $DBH->prepare(q{INSERT INTO mailboxes (mailbox,subscribed) VALUES (?,?)});
+my $STH_INSERT_LOCAL = $DBH->prepare(q{INSERT INTO local (idx,UIDVALIDITY,UIDNEXT,HIGHESTMODSEQ) VALUES (?,?,0,0)});
+my $STH_INSERT_REMOTE = $DBH->prepare(q{INSERT INTO remote (idx,UIDVALIDITY,UIDNEXT,HIGHESTMODSEQ) VALUES (?,?,0,0)});
# Insert or retrieve a (idx,lUID,rUID) association.
my $STH_INSERT_MAPPING = $DBH->prepare(q{INSERT INTO mapping (idx,lUID,rUID) VALUES (?,?,?)});
my $STH_GET_MAPPING = $DBH->prepare(q{SELECT lUID,rUID FROM mapping WHERE idx = ?});
+# Get the list of interrupted mailbox syncs.
+my $STH_LIST_INTERRUPTED = $DBH->prepare(q{
+ SELECT mbx.idx, mailbox
+ FROM mailboxes mbx JOIN local l ON mbx.idx = l.idx JOIN remote r ON mbx.idx = r.idx JOIN mapping ON mbx.idx = mapping.idx
+ WHERE (lUID >= l.UIDNEXT OR rUID >= r.UIDNEXT)
+ GROUP BY mbx.idx
+});
+
+# For an interrupted mailbox sync, get the pairs (lUID,rUID) that have
+# already been downloaded.
+my $STH_GET_INTERRUPTED_BY_IDX = $DBH->prepare(q{
+ SELECT lUID, rUID
+ FROM mapping m JOIN local l ON m.idx = l.idx JOIN remote r ON m.idx = r.idx
+ WHERE m.idx = ? AND (lUID >= l.UIDNEXT OR rUID >= r.UIDNEXT)
+});
-# Initialize $lIMAP and $rIMAP states to detect mailbox dirtyness.
-$STH_GET_CACHE->execute();
-while (defined (my $row = $STH_GET_CACHE->fetchrow_hashref())) {
- $lIMAP->set_cache($row->{mailbox},
- UIDVALIDITY => $row->{lUIDVALIDITY},
- UIDNEXT => $row->{lUIDNEXT},
- HIGHESTMODSEQ => ($CONFIG{check} ? 0 : $row->{lHIGHESTMODSEQ})
- );
- $rIMAP->set_cache($row->{mailbox},
- UIDVALIDITY => $row->{rUIDVALIDITY},
- UIDNEXT => $row->{rUIDNEXT},
- HIGHESTMODSEQ => ($CONFIG{check} ? 0 : $row->{rHIGHESTMODSEQ})
- );
-}
# Download some missing UIDs.
sub fix_missing($$$@) {
@@ -725,8 +726,8 @@ sub sync_known_messages($$) {
}
}
- $lIMAP->remove(@lToRemove) if @lToRemove;
- $rIMAP->remove(@rToRemove) if @rToRemove;
+ $lIMAP->remove_message(@lToRemove) if @lToRemove;
+ $rIMAP->remove_message(@rToRemove) if @rToRemove;
# remove existing mappings
foreach my $lUID (@$lVanished, @lToRemove) {
@@ -804,85 +805,99 @@ sub sync_known_messages($$) {
}
-# Sync known and new messages
-sub sync_messages($$) {
- my ($idx, $mailbox) = @_;
-
- my %mapping;
- foreach my $source (qw/remote local/) {
- my $target = $source eq 'local' ? $rIMAP : $lIMAP;
- my $multiappend;
-
- my @newmails;
- my $buffer = 0; # sum of the RFC822 sizes in @newmails
-
- my (@sUID, @tUID);
+# The callback to use when FETCHing new messages from $name to add it to
+# the other one.
+# If defined, the array reference $UIDs will be fed with the newly added
+# UIDs.
+# If defined, $buff contains the list of messages to be appended with
+# MULTIAPPEND. In that case callback_new_message_flush should be called
+# after the FETCH.
+sub callback_new_message($$$$;$$$) {
+ my ($idx, $mailbox, $name, $mail, $UIDs, $buff, $bufflen) = @_;
+ return unless exists $mail->{RFC822}; # not for us
+
+ my $length = length $mail->{RFC822};
+ if ($length == 0) {
+ warn "$name($mailbox): WARNING: Ignoring new 0-length message (UID $mail->{UID})\n";
+ return;
+ }
- # don't fetch again the messages we've just added
- my @ignore = $source eq 'local' ? keys %mapping : values %mapping;
+ my @UIDs;
+ unless (defined $buff) {
+ @UIDs = callback_new_message_flush($idx, $mailbox, $name, $mail);
+ }
+ else {
+ # use MULTIAPPEND (RFC 3502)
+ # proceed by batches of 1MB to save roundtrips without blowing up the memory
+ if (@$buff and $$bufflen + $length > 1048576) {
+ @UIDs = callback_new_message_flush($idx, $mailbox, $name, @$buff);
+ @$buff = ();
+ $$bufflen = 0;
+ }
+ push @$buff, $mail;
+ $$bufflen += $length;
+ }
+ push @$UIDs, @UIDs if defined $UIDs;
+}
- ($source eq 'local' ? $lIMAP : $rIMAP)->pull_new_messages(sub($) {
- my $mail = shift;
- return unless exists $mail->{RFC822}; # not for us
- my $length = length $mail->{RFC822};
- my $suid = $mail->{UID};
- if ($length == 0) {
- warn "$source($mailbox): WARNING: Ignoring new 0-length message (UID $suid)\n";
- return;
- }
+# Add the given @messages (multiple messages are only allowed for
+# MULTIAPPEND-capable servers) from $name to the other server.
+# Returns the list of newly allocated UIDs.
+sub callback_new_message_flush($$$@) {
+ my ($idx, $mailbox, $name, @messages) = @_;
- # use MULTIAPPEND if possible (RFC 3502) to save round-trips
- $multiappend //= !$target->incapable('MULTIAPPEND');
+ my $imap = $name eq 'local' ? $rIMAP : $lIMAP; # target client
+ my @sUID = map {$_->{UID}} @messages;
+ my @tUID = $imap->append($mailbox, @messages);
+ die unless $#sUID == $#tUID; # sanity check
- push @sUID, $suid;
- if (!$multiappend) {
- push @tUID, $target->append($mailbox, $mail);
- }
- else {
- # proceed by batch of 1MB to save roundtrips without blowing up the memory
- if (@newmails and $buffer + $length > 1048576) {
- push @tUID, $target->append($mailbox, @newmails);
- @newmails = ();
- $buffer = 0;
- }
- push @newmails, $mail;
- $buffer += $length;
- }
- }, @ignore);
- push @tUID, $target->append($mailbox, @newmails) if @newmails;
-
- die unless $#sUID == $#tUID; # sanity check
- foreach my $k (0 .. $#sUID) {
- my ($lUID,$rUID) = $source eq 'local' ? ($sUID[$k],$tUID[$k]) : ($tUID[$k],$sUID[$k]);
- die if exists $mapping{$lUID}; # sanity check
- $mapping{$lUID} = $rUID;
- }
+ my ($lUIDs, $rUIDs) = $name eq 'local' ? (\@sUID,\@tUID) : (\@tUID,\@sUID);
+ for (my $k=0; $k<=$#messages; $k++) {
+ print STDERR "Adding mapping (lUID,rUID) = ($lUIDs->[$k],$rUIDs->[$k]) for $mailbox\n"
+ if $CONFIG{debug};
+ $STH_INSERT_MAPPING->execute($idx, $lUIDs->[$k], $rUIDs->[$k]);
}
+ $DBH->commit(); # commit only once per batch
- # new mailbox
- if (!defined $$idx) {
- my $subscribed = (grep { $_ eq $mailbox} @SUBSCRIPTIONS) ? 1 : 0;
- $STH_NEWMAILBOX->execute($mailbox, $subscribed);
- $STH_GET_INDEX->execute($mailbox);
- ($$idx) = $STH_GET_INDEX->fetchrow_array();
- die if !defined $$idx or defined $STH_GET_INDEX->fetchrow_arrayref(); # sanity check
-
- # there might be flag updates pending
- sync_known_messages($$idx, $mailbox);
- $STH_INSERT_LOCAL->execute($$idx, $lIMAP->get_cache(qw/UIDVALIDITY UIDNEXT HIGHESTMODSEQ/));
- $STH_INSERT_REMOTE->execute($$idx, $rIMAP->get_cache(qw/UIDVALIDITY UIDNEXT HIGHESTMODSEQ/));
- }
- else {
- # update known mailbox
- sync_known_messages($$idx, $mailbox);
- $STH_UPDATE_LOCAL->execute($lIMAP->get_cache( qw/UIDNEXT HIGHESTMODSEQ/), $$idx);
- $STH_UPDATE_REMOTE->execute($rIMAP->get_cache(qw/UIDNEXT HIGHESTMODSEQ/), $$idx);
- }
+ return @tUID;
+}
- while (my ($lUID,$rUID) = each %mapping) {
- $STH_INSERT_MAPPING->execute($$idx, $lUID, $rUID);
- }
+
+# Sync both known and new messages
+# If the array references $lIgnore and $rIgnore are not empty, skip
+# the given UIDs.
+sub sync_messages($$;$$) {
+ my ($idx, $mailbox, $lIgnore, $rIgnore) = @_;
+ my ($buff, $bufflen, @lUIDs);
+
+ # get new messages from remote (except @$rIgnore) and APPEND them to local
+ ($buff, $bufflen) = ([], 0);
+ undef $buff if $lIMAP->incapable('MULTIAPPEND');
+ $rIMAP->pull_new_messages(sub($) {
+ callback_new_message($idx, $mailbox, 'remote', shift, \@lUIDs, $buff, \$bufflen)
+ }, @{$rIgnore // []});
+ push @lUIDs, callback_new_message_flush($idx, $mailbox, 'remote', @$buff)
+ if defined $buff and @$buff;
+
+ # get new messages from local (except @$lIgnore and the newly allocated local
+ # UIDs @lUIDs) and APPEND them to remote
+ ($buff, $bufflen) = ([], 0);
+ undef $buff if $rIMAP->incapable('MULTIAPPEND');
+ $lIMAP->pull_new_messages(sub($) {
+ callback_new_message($idx, $mailbox, 'local', shift, undef, $buff, \$bufflen)
+ }, @{$lIgnore // []}, @lUIDs);
+ callback_new_message_flush($idx, $mailbox, 'local', @$buff)
+ if defined $buff and @$buff;
+
+ # both local and remote UIDNEXT are now up to date; proceed with
+ # pending flag updates and vanished messages
+ sync_known_messages($idx, $mailbox);
+
+ # don't store the new UIDNEXTs before to avoid downloading these
+ # mails again in the event of a crash
+ $STH_UPDATE_LOCAL->execute($lIMAP->get_cache( qw/UIDNEXT HIGHESTMODSEQ/), $idx);
+ $STH_UPDATE_REMOTE->execute($rIMAP->get_cache(qw/UIDNEXT HIGHESTMODSEQ/), $idx);
$DBH->commit();
}
@@ -907,34 +922,117 @@ sub wait_notifications(;$) {
}
-my ($mailbox, $idx);
+# Resume interrupted mailbox syncs.
+my ($MAILBOX, $IDX);
+$STH_LIST_INTERRUPTED->execute();
+while (defined (my $row = $STH_LIST_INTERRUPTED->fetchrow_arrayref())) {
+ ($IDX, $MAILBOX) = @$row;
+ print STDERR "Resuming interrupted sync for $MAILBOX\n";
+
+ my %lUIDs;
+ $STH_GET_INTERRUPTED_BY_IDX->execute($IDX);
+ while (defined (my $row = $STH_GET_INTERRUPTED_BY_IDX->fetchrow_arrayref())) {
+ $lUIDs{$row->[0]} = $row->[1]; # pair ($lUID, $rUID)
+ }
+ die unless %lUIDs; # sanity check
+
+ $lIMAP->select($MAILBOX);
+ $rIMAP->select($MAILBOX);
+
+ # FETCH all messages with their FLAGS to detect messages that have
+ # vanished meanwhile, or for which there was a flag update.
+
+ my (%lList, %rList); # The lists of existing local and remote UIDs
+ my $attrs = '('.join(' ', qw/MODSEQ FLAGS/).')';
+ $lIMAP->fetch(compact_set(keys %lUIDs), $attrs, sub($){ $lList{shift->{UID}} = 1 });
+ $rIMAP->fetch(compact_set(values %lUIDs), $attrs, sub($){ $rList{shift->{UID}} = 1 });
+
+ my (@lToRemove, @rToRemove);
+ while (my ($lUID,$rUID) = each %lUIDs) {
+ next if $lList{$lUID} and $rList{$rUID}; # exists on both
+ push @lToRemove, $lUID if $lList{$lUID};
+ push @rToRemove, $rUID if $rList{$rUID};
+
+ my $r = $STH_DELETE_MAPPING->execute($IDX, $lUID);
+ die if $r > 1; # sanity check
+ warn "WARNING: Can't delete (idx,lUID) = ($IDX,$lUID) from the database\n" if $r == 0;
+ }
+
+ $lIMAP->remove_message(@lToRemove) if @lToRemove;
+ $rIMAP->remove_message(@rToRemove) if @rToRemove;
+ $DBH->commit() if @lToRemove or @rToRemove; # /!\ commit *after* remove_message!
+
+ # ignore deleted messages
+ delete @lList{@lToRemove};
+ delete @rList{@rToRemove};
+
+ # Resume the sync, but skip messages that have already been
+ # downloaded. Flag updates will be processed automatically since
+ # the _MODIFIED internal cache has been initialized with all our
+ # UIDs. (Since there is no reliable HIGHESTMODSEQ, any flag
+ # difference is treated as a conflict.)
+ sync_messages($IDX, $MAILBOX, [keys %lList], [keys %rList]);
+}
+
+
+
+# Initialize $lIMAP and $rIMAP states to detect mailbox dirtyness.
+$STH_GET_CACHE->execute();
+while (defined (my $row = $STH_GET_CACHE->fetchrow_hashref())) {
+ $lIMAP->set_cache($row->{mailbox},
+ UIDVALIDITY => $row->{lUIDVALIDITY},
+ UIDNEXT => $row->{lUIDNEXT},
+ HIGHESTMODSEQ => ($CONFIG{check} ? 0 : $row->{lHIGHESTMODSEQ})
+ );
+ $rIMAP->set_cache($row->{mailbox},
+ UIDVALIDITY => $row->{rUIDVALIDITY},
+ UIDNEXT => $row->{rUIDNEXT},
+ HIGHESTMODSEQ => ($CONFIG{check} ? 0 : $row->{rHIGHESTMODSEQ})
+ );
+}
+
+
+
while(1) {
while(1) {
my $cache;
my $update = 0;
- if (defined $mailbox and ($lIMAP->is_dirty($mailbox) or $rIMAP->is_dirty($mailbox))) {
- # $mailbox is dirty on either the local or remote mailbox
- sync_messages(\$idx, $mailbox);
+ if (defined $MAILBOX and ($lIMAP->is_dirty($MAILBOX) or $rIMAP->is_dirty($MAILBOX))) {
+ # $MAILBOX is dirty on either the local or remote mailbox
+ sync_messages($IDX, $MAILBOX);
}
else {
- $mailbox = $lIMAP->next_dirty_mailbox(@ARGV) // $rIMAP->next_dirty_mailbox(@ARGV) // last;
- $mailbox = 'INBOX' if uc $mailbox eq 'INBOX'; # INBOX is case insensitive
+ $MAILBOX = $lIMAP->next_dirty_mailbox(@ARGV) // $rIMAP->next_dirty_mailbox(@ARGV) // last;
+ $MAILBOX = 'INBOX' if uc $MAILBOX eq 'INBOX'; # INBOX is case insensitive
- $STH_GET_INDEX->execute($mailbox);
- ($idx) = $STH_GET_INDEX->fetchrow_array();
+ $STH_GET_INDEX->execute($MAILBOX);
+ ($IDX) = $STH_GET_INDEX->fetchrow_array();
die if defined $STH_GET_INDEX->fetch(); # sanity check
- $lIMAP->select($mailbox);
- $rIMAP->select($mailbox);
+ $lIMAP->select($MAILBOX);
+ $rIMAP->select($MAILBOX);
+
+ # new mailbox
+ if (!defined $IDX) {
+ my $subscribed = (grep { $_ eq $MAILBOX} @SUBSCRIPTIONS) ? 1 : 0;
+ $STH_INSERT_MAILBOX->execute($MAILBOX, $subscribed);
+ $STH_GET_INDEX->execute($MAILBOX);
+ ($IDX) = $STH_GET_INDEX->fetchrow_array();
+ die if !defined $IDX or defined $STH_GET_INDEX->fetchrow_arrayref(); # sanity check
- # sync updates to known messages before fetching new messages
- if (defined $idx and sync_known_messages($idx, $mailbox)) {
+ $STH_INSERT_LOCAL->execute( $IDX, $lIMAP->uidvalidity($MAILBOX));
+ $STH_INSERT_REMOTE->execute($IDX, $rIMAP->uidvalidity($MAILBOX));
+
+ # don't commit before the first mapping (lUID,rUID)
+ }
+ elsif (sync_known_messages($IDX, $MAILBOX)) {
+ # sync updates to known messages before fetching new messages
# get_cache is safe after pull_update
- $STH_UPDATE_LOCAL_HIGHESTMODSEQ->execute( $lIMAP->get_cache('HIGHESTMODSEQ'), $idx);
- $STH_UPDATE_REMOTE_HIGHESTMODSEQ->execute($rIMAP->get_cache('HIGHESTMODSEQ'), $idx);
+ $STH_UPDATE_LOCAL_HIGHESTMODSEQ->execute( $lIMAP->get_cache('HIGHESTMODSEQ'), $IDX);
+ $STH_UPDATE_REMOTE_HIGHESTMODSEQ->execute($rIMAP->get_cache('HIGHESTMODSEQ'), $IDX);
$DBH->commit();
}
- sync_messages(\$idx, $mailbox);
+ sync_messages($IDX, $MAILBOX);
}
}
# clean state!