diff options
Diffstat (limited to 'imapsync')
-rwxr-xr-x | imapsync | 318 |
1 files changed, 208 insertions, 110 deletions
@@ -121,15 +121,15 @@ $DBH->do('PRAGMA foreign_keys = ON'); local => [ q{idx INTEGER NOT NULL PRIMARY KEY REFERENCES mailboxes(idx)}, q{UIDVALIDITY UNSIGNED INT NOT NULL CHECK (UIDVALIDITY > 0)}, - q{UIDNEXT UNSIGNED INT NOT NULL CHECK (UIDNEXT > 0)}, - q{HIGHESTMODSEQ UNSIGNED BIGINT NOT NULL CHECK (HIGHESTMODSEQ > 0)} + q{UIDNEXT UNSIGNED INT NOT NULL}, # 0 initially + q{HIGHESTMODSEQ UNSIGNED BIGINT NOT NULL} # 0 initially # one-to-one correspondence between local.idx and remote.idx ], remote => [ q{idx INTEGER NOT NULL PRIMARY KEY REFERENCES mailboxes(idx)}, q{UIDVALIDITY UNSIGNED INT NOT NULL CHECK (UIDVALIDITY > 0)}, - q{UIDNEXT UNSIGNED INT NOT NULL CHECK (UIDNEXT > 0)}, - q{HIGHESTMODSEQ UNSIGNED BIGINT NOT NULL CHECK (HIGHESTMODSEQ > 0)} + q{UIDNEXT UNSIGNED INT NOT NULL}, # 0 initially + q{HIGHESTMODSEQ UNSIGNED BIGINT NOT NULL} # 0 initially # one-to-one correspondence between local.idx and remote.idx ], mapping => [ @@ -138,7 +138,7 @@ $DBH->do('PRAGMA foreign_keys = ON'); q{rUID UNSIGNED INT NOT NULL CHECK (rUID > 0)}, q{PRIMARY KEY (idx,lUID)}, q{UNIQUE (idx,rUID)} - # also, lUID < local.UIDNEXT and rUID < remote.UIDNEXT + # also, lUID < local.UIDNEXT and rUID < remote.UIDNEXT (except for interrupted syncs) # mapping.idx must be found among local.idx (and remote.idx) ], ); @@ -544,29 +544,30 @@ my $STH_UPDATE_LOCAL = $DBH->prepare(q{UPDATE local SET UIDNEXT = ?, HIGHESTMO my $STH_UPDATE_REMOTE = $DBH->prepare(q{UPDATE remote SET UIDNEXT = ?, HIGHESTMODSEQ = ? WHERE idx = ?}); # Add a new mailbox. -my $STH_NEWMAILBOX = $DBH->prepare(q{INSERT INTO mailboxes (mailbox,subscribed) VALUES (?,?)}); -my $STH_INSERT_LOCAL = $DBH->prepare(q{INSERT INTO local (idx,UIDVALIDITY,UIDNEXT,HIGHESTMODSEQ) VALUES (?,?,?,?)}); -my $STH_INSERT_REMOTE = $DBH->prepare(q{INSERT INTO remote (idx,UIDVALIDITY,UIDNEXT,HIGHESTMODSEQ) VALUES (?,?,?,?)}); +my $STH_INSERT_MAILBOX= $DBH->prepare(q{INSERT INTO mailboxes (mailbox,subscribed) VALUES (?,?)}); +my $STH_INSERT_LOCAL = $DBH->prepare(q{INSERT INTO local (idx,UIDVALIDITY,UIDNEXT,HIGHESTMODSEQ) VALUES (?,?,0,0)}); +my $STH_INSERT_REMOTE = $DBH->prepare(q{INSERT INTO remote (idx,UIDVALIDITY,UIDNEXT,HIGHESTMODSEQ) VALUES (?,?,0,0)}); # Insert or retrieve a (idx,lUID,rUID) association. my $STH_INSERT_MAPPING = $DBH->prepare(q{INSERT INTO mapping (idx,lUID,rUID) VALUES (?,?,?)}); my $STH_GET_MAPPING = $DBH->prepare(q{SELECT lUID,rUID FROM mapping WHERE idx = ?}); +# Get the list of interrupted mailbox syncs. +my $STH_LIST_INTERRUPTED = $DBH->prepare(q{ + SELECT mbx.idx, mailbox + FROM mailboxes mbx JOIN local l ON mbx.idx = l.idx JOIN remote r ON mbx.idx = r.idx JOIN mapping ON mbx.idx = mapping.idx + WHERE (lUID >= l.UIDNEXT OR rUID >= r.UIDNEXT) + GROUP BY mbx.idx +}); + +# For an interrupted mailbox sync, get the pairs (lUID,rUID) that have +# already been downloaded. +my $STH_GET_INTERRUPTED_BY_IDX = $DBH->prepare(q{ + SELECT lUID, rUID + FROM mapping m JOIN local l ON m.idx = l.idx JOIN remote r ON m.idx = r.idx + WHERE m.idx = ? AND (lUID >= l.UIDNEXT OR rUID >= r.UIDNEXT) +}); -# Initialize $lIMAP and $rIMAP states to detect mailbox dirtyness. -$STH_GET_CACHE->execute(); -while (defined (my $row = $STH_GET_CACHE->fetchrow_hashref())) { - $lIMAP->set_cache($row->{mailbox}, - UIDVALIDITY => $row->{lUIDVALIDITY}, - UIDNEXT => $row->{lUIDNEXT}, - HIGHESTMODSEQ => ($CONFIG{check} ? 0 : $row->{lHIGHESTMODSEQ}) - ); - $rIMAP->set_cache($row->{mailbox}, - UIDVALIDITY => $row->{rUIDVALIDITY}, - UIDNEXT => $row->{rUIDNEXT}, - HIGHESTMODSEQ => ($CONFIG{check} ? 0 : $row->{rHIGHESTMODSEQ}) - ); -} # Download some missing UIDs. sub fix_missing($$$@) { @@ -725,8 +726,8 @@ sub sync_known_messages($$) { } } - $lIMAP->remove(@lToRemove) if @lToRemove; - $rIMAP->remove(@rToRemove) if @rToRemove; + $lIMAP->remove_message(@lToRemove) if @lToRemove; + $rIMAP->remove_message(@rToRemove) if @rToRemove; # remove existing mappings foreach my $lUID (@$lVanished, @lToRemove) { @@ -804,85 +805,99 @@ sub sync_known_messages($$) { } -# Sync known and new messages -sub sync_messages($$) { - my ($idx, $mailbox) = @_; - - my %mapping; - foreach my $source (qw/remote local/) { - my $target = $source eq 'local' ? $rIMAP : $lIMAP; - my $multiappend; - - my @newmails; - my $buffer = 0; # sum of the RFC822 sizes in @newmails - - my (@sUID, @tUID); +# The callback to use when FETCHing new messages from $name to add it to +# the other one. +# If defined, the array reference $UIDs will be fed with the newly added +# UIDs. +# If defined, $buff contains the list of messages to be appended with +# MULTIAPPEND. In that case callback_new_message_flush should be called +# after the FETCH. +sub callback_new_message($$$$;$$$) { + my ($idx, $mailbox, $name, $mail, $UIDs, $buff, $bufflen) = @_; + return unless exists $mail->{RFC822}; # not for us + + my $length = length $mail->{RFC822}; + if ($length == 0) { + warn "$name($mailbox): WARNING: Ignoring new 0-length message (UID $mail->{UID})\n"; + return; + } - # don't fetch again the messages we've just added - my @ignore = $source eq 'local' ? keys %mapping : values %mapping; + my @UIDs; + unless (defined $buff) { + @UIDs = callback_new_message_flush($idx, $mailbox, $name, $mail); + } + else { + # use MULTIAPPEND (RFC 3502) + # proceed by batches of 1MB to save roundtrips without blowing up the memory + if (@$buff and $$bufflen + $length > 1048576) { + @UIDs = callback_new_message_flush($idx, $mailbox, $name, @$buff); + @$buff = (); + $$bufflen = 0; + } + push @$buff, $mail; + $$bufflen += $length; + } + push @$UIDs, @UIDs if defined $UIDs; +} - ($source eq 'local' ? $lIMAP : $rIMAP)->pull_new_messages(sub($) { - my $mail = shift; - return unless exists $mail->{RFC822}; # not for us - my $length = length $mail->{RFC822}; - my $suid = $mail->{UID}; - if ($length == 0) { - warn "$source($mailbox): WARNING: Ignoring new 0-length message (UID $suid)\n"; - return; - } +# Add the given @messages (multiple messages are only allowed for +# MULTIAPPEND-capable servers) from $name to the other server. +# Returns the list of newly allocated UIDs. +sub callback_new_message_flush($$$@) { + my ($idx, $mailbox, $name, @messages) = @_; - # use MULTIAPPEND if possible (RFC 3502) to save round-trips - $multiappend //= !$target->incapable('MULTIAPPEND'); + my $imap = $name eq 'local' ? $rIMAP : $lIMAP; # target client + my @sUID = map {$_->{UID}} @messages; + my @tUID = $imap->append($mailbox, @messages); + die unless $#sUID == $#tUID; # sanity check - push @sUID, $suid; - if (!$multiappend) { - push @tUID, $target->append($mailbox, $mail); - } - else { - # proceed by batch of 1MB to save roundtrips without blowing up the memory - if (@newmails and $buffer + $length > 1048576) { - push @tUID, $target->append($mailbox, @newmails); - @newmails = (); - $buffer = 0; - } - push @newmails, $mail; - $buffer += $length; - } - }, @ignore); - push @tUID, $target->append($mailbox, @newmails) if @newmails; - - die unless $#sUID == $#tUID; # sanity check - foreach my $k (0 .. $#sUID) { - my ($lUID,$rUID) = $source eq 'local' ? ($sUID[$k],$tUID[$k]) : ($tUID[$k],$sUID[$k]); - die if exists $mapping{$lUID}; # sanity check - $mapping{$lUID} = $rUID; - } + my ($lUIDs, $rUIDs) = $name eq 'local' ? (\@sUID,\@tUID) : (\@tUID,\@sUID); + for (my $k=0; $k<=$#messages; $k++) { + print STDERR "Adding mapping (lUID,rUID) = ($lUIDs->[$k],$rUIDs->[$k]) for $mailbox\n" + if $CONFIG{debug}; + $STH_INSERT_MAPPING->execute($idx, $lUIDs->[$k], $rUIDs->[$k]); } + $DBH->commit(); # commit only once per batch - # new mailbox - if (!defined $$idx) { - my $subscribed = (grep { $_ eq $mailbox} @SUBSCRIPTIONS) ? 1 : 0; - $STH_NEWMAILBOX->execute($mailbox, $subscribed); - $STH_GET_INDEX->execute($mailbox); - ($$idx) = $STH_GET_INDEX->fetchrow_array(); - die if !defined $$idx or defined $STH_GET_INDEX->fetchrow_arrayref(); # sanity check - - # there might be flag updates pending - sync_known_messages($$idx, $mailbox); - $STH_INSERT_LOCAL->execute($$idx, $lIMAP->get_cache(qw/UIDVALIDITY UIDNEXT HIGHESTMODSEQ/)); - $STH_INSERT_REMOTE->execute($$idx, $rIMAP->get_cache(qw/UIDVALIDITY UIDNEXT HIGHESTMODSEQ/)); - } - else { - # update known mailbox - sync_known_messages($$idx, $mailbox); - $STH_UPDATE_LOCAL->execute($lIMAP->get_cache( qw/UIDNEXT HIGHESTMODSEQ/), $$idx); - $STH_UPDATE_REMOTE->execute($rIMAP->get_cache(qw/UIDNEXT HIGHESTMODSEQ/), $$idx); - } + return @tUID; +} - while (my ($lUID,$rUID) = each %mapping) { - $STH_INSERT_MAPPING->execute($$idx, $lUID, $rUID); - } + +# Sync both known and new messages +# If the array references $lIgnore and $rIgnore are not empty, skip +# the given UIDs. +sub sync_messages($$;$$) { + my ($idx, $mailbox, $lIgnore, $rIgnore) = @_; + my ($buff, $bufflen, @lUIDs); + + # get new messages from remote (except @$rIgnore) and APPEND them to local + ($buff, $bufflen) = ([], 0); + undef $buff if $lIMAP->incapable('MULTIAPPEND'); + $rIMAP->pull_new_messages(sub($) { + callback_new_message($idx, $mailbox, 'remote', shift, \@lUIDs, $buff, \$bufflen) + }, @{$rIgnore // []}); + push @lUIDs, callback_new_message_flush($idx, $mailbox, 'remote', @$buff) + if defined $buff and @$buff; + + # get new messages from local (except @$lIgnore and the newly allocated local + # UIDs @lUIDs) and APPEND them to remote + ($buff, $bufflen) = ([], 0); + undef $buff if $rIMAP->incapable('MULTIAPPEND'); + $lIMAP->pull_new_messages(sub($) { + callback_new_message($idx, $mailbox, 'local', shift, undef, $buff, \$bufflen) + }, @{$lIgnore // []}, @lUIDs); + callback_new_message_flush($idx, $mailbox, 'local', @$buff) + if defined $buff and @$buff; + + # both local and remote UIDNEXT are now up to date; proceed with + # pending flag updates and vanished messages + sync_known_messages($idx, $mailbox); + + # don't store the new UIDNEXTs before to avoid downloading these + # mails again in the event of a crash + $STH_UPDATE_LOCAL->execute($lIMAP->get_cache( qw/UIDNEXT HIGHESTMODSEQ/), $idx); + $STH_UPDATE_REMOTE->execute($rIMAP->get_cache(qw/UIDNEXT HIGHESTMODSEQ/), $idx); $DBH->commit(); } @@ -907,34 +922,117 @@ sub wait_notifications(;$) { } -my ($mailbox, $idx); +# Resume interrupted mailbox syncs. +my ($MAILBOX, $IDX); +$STH_LIST_INTERRUPTED->execute(); +while (defined (my $row = $STH_LIST_INTERRUPTED->fetchrow_arrayref())) { + ($IDX, $MAILBOX) = @$row; + print STDERR "Resuming interrupted sync for $MAILBOX\n"; + + my %lUIDs; + $STH_GET_INTERRUPTED_BY_IDX->execute($IDX); + while (defined (my $row = $STH_GET_INTERRUPTED_BY_IDX->fetchrow_arrayref())) { + $lUIDs{$row->[0]} = $row->[1]; # pair ($lUID, $rUID) + } + die unless %lUIDs; # sanity check + + $lIMAP->select($MAILBOX); + $rIMAP->select($MAILBOX); + + # FETCH all messages with their FLAGS to detect messages that have + # vanished meanwhile, or for which there was a flag update. + + my (%lList, %rList); # The lists of existing local and remote UIDs + my $attrs = '('.join(' ', qw/MODSEQ FLAGS/).')'; + $lIMAP->fetch(compact_set(keys %lUIDs), $attrs, sub($){ $lList{shift->{UID}} = 1 }); + $rIMAP->fetch(compact_set(values %lUIDs), $attrs, sub($){ $rList{shift->{UID}} = 1 }); + + my (@lToRemove, @rToRemove); + while (my ($lUID,$rUID) = each %lUIDs) { + next if $lList{$lUID} and $rList{$rUID}; # exists on both + push @lToRemove, $lUID if $lList{$lUID}; + push @rToRemove, $rUID if $rList{$rUID}; + + my $r = $STH_DELETE_MAPPING->execute($IDX, $lUID); + die if $r > 1; # sanity check + warn "WARNING: Can't delete (idx,lUID) = ($IDX,$lUID) from the database\n" if $r == 0; + } + + $lIMAP->remove_message(@lToRemove) if @lToRemove; + $rIMAP->remove_message(@rToRemove) if @rToRemove; + $DBH->commit() if @lToRemove or @rToRemove; # /!\ commit *after* remove_message! + + # ignore deleted messages + delete @lList{@lToRemove}; + delete @rList{@rToRemove}; + + # Resume the sync, but skip messages that have already been + # downloaded. Flag updates will be processed automatically since + # the _MODIFIED internal cache has been initialized with all our + # UIDs. (Since there is no reliable HIGHESTMODSEQ, any flag + # difference is treated as a conflict.) + sync_messages($IDX, $MAILBOX, [keys %lList], [keys %rList]); +} + + + +# Initialize $lIMAP and $rIMAP states to detect mailbox dirtyness. +$STH_GET_CACHE->execute(); +while (defined (my $row = $STH_GET_CACHE->fetchrow_hashref())) { + $lIMAP->set_cache($row->{mailbox}, + UIDVALIDITY => $row->{lUIDVALIDITY}, + UIDNEXT => $row->{lUIDNEXT}, + HIGHESTMODSEQ => ($CONFIG{check} ? 0 : $row->{lHIGHESTMODSEQ}) + ); + $rIMAP->set_cache($row->{mailbox}, + UIDVALIDITY => $row->{rUIDVALIDITY}, + UIDNEXT => $row->{rUIDNEXT}, + HIGHESTMODSEQ => ($CONFIG{check} ? 0 : $row->{rHIGHESTMODSEQ}) + ); +} + + + while(1) { while(1) { my $cache; my $update = 0; - if (defined $mailbox and ($lIMAP->is_dirty($mailbox) or $rIMAP->is_dirty($mailbox))) { - # $mailbox is dirty on either the local or remote mailbox - sync_messages(\$idx, $mailbox); + if (defined $MAILBOX and ($lIMAP->is_dirty($MAILBOX) or $rIMAP->is_dirty($MAILBOX))) { + # $MAILBOX is dirty on either the local or remote mailbox + sync_messages($IDX, $MAILBOX); } else { - $mailbox = $lIMAP->next_dirty_mailbox(@ARGV) // $rIMAP->next_dirty_mailbox(@ARGV) // last; - $mailbox = 'INBOX' if uc $mailbox eq 'INBOX'; # INBOX is case insensitive + $MAILBOX = $lIMAP->next_dirty_mailbox(@ARGV) // $rIMAP->next_dirty_mailbox(@ARGV) // last; + $MAILBOX = 'INBOX' if uc $MAILBOX eq 'INBOX'; # INBOX is case insensitive - $STH_GET_INDEX->execute($mailbox); - ($idx) = $STH_GET_INDEX->fetchrow_array(); + $STH_GET_INDEX->execute($MAILBOX); + ($IDX) = $STH_GET_INDEX->fetchrow_array(); die if defined $STH_GET_INDEX->fetch(); # sanity check - $lIMAP->select($mailbox); - $rIMAP->select($mailbox); + $lIMAP->select($MAILBOX); + $rIMAP->select($MAILBOX); + + # new mailbox + if (!defined $IDX) { + my $subscribed = (grep { $_ eq $MAILBOX} @SUBSCRIPTIONS) ? 1 : 0; + $STH_INSERT_MAILBOX->execute($MAILBOX, $subscribed); + $STH_GET_INDEX->execute($MAILBOX); + ($IDX) = $STH_GET_INDEX->fetchrow_array(); + die if !defined $IDX or defined $STH_GET_INDEX->fetchrow_arrayref(); # sanity check - # sync updates to known messages before fetching new messages - if (defined $idx and sync_known_messages($idx, $mailbox)) { + $STH_INSERT_LOCAL->execute( $IDX, $lIMAP->uidvalidity($MAILBOX)); + $STH_INSERT_REMOTE->execute($IDX, $rIMAP->uidvalidity($MAILBOX)); + + # don't commit before the first mapping (lUID,rUID) + } + elsif (sync_known_messages($IDX, $MAILBOX)) { + # sync updates to known messages before fetching new messages # get_cache is safe after pull_update - $STH_UPDATE_LOCAL_HIGHESTMODSEQ->execute( $lIMAP->get_cache('HIGHESTMODSEQ'), $idx); - $STH_UPDATE_REMOTE_HIGHESTMODSEQ->execute($rIMAP->get_cache('HIGHESTMODSEQ'), $idx); + $STH_UPDATE_LOCAL_HIGHESTMODSEQ->execute( $lIMAP->get_cache('HIGHESTMODSEQ'), $IDX); + $STH_UPDATE_REMOTE_HIGHESTMODSEQ->execute($rIMAP->get_cache('HIGHESTMODSEQ'), $IDX); $DBH->commit(); } - sync_messages(\$idx, $mailbox); + sync_messages($IDX, $MAILBOX); } } # clean state! |