aboutsummaryrefslogtreecommitdiffstats
path: root/lib/Net
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Net')
-rw-r--r--lib/Net/IMAP/InterIMAP.pm180
1 files changed, 104 insertions, 76 deletions
diff --git a/lib/Net/IMAP/InterIMAP.pm b/lib/Net/IMAP/InterIMAP.pm
index 45253c1..a899831 100644
--- a/lib/Net/IMAP/InterIMAP.pm
+++ b/lib/Net/IMAP/InterIMAP.pm
@@ -35,7 +35,8 @@ BEGIN {
Net::SSLeay::SSLeay_add_ssl_algorithms();
Net::SSLeay::randomize();
- our @EXPORT_OK = qw/read_config compact_set $IMAP_text $IMAP_cond/;
+ our @EXPORT_OK = qw/read_config compact_set $IMAP_text $IMAP_cond
+ slurp is_dirty has_new_mails/;
}
@@ -313,6 +314,9 @@ sub new($%) {
foreach ($rd, $wd) {
close $_ or $self->panic("Can't close: $!");
}
+ foreach (qw/STDIN STDOUT/) {
+ binmode($self->{$_}) // $self->panic("binmode: $!")
+ }
}
else {
foreach (qw/host port/) {
@@ -320,28 +324,17 @@ sub new($%) {
}
my $socket = defined $self->{proxy} ? $self->_proxify(@$self{qw/proxy host port/})
: $self->_tcp_connect(@$self{qw/host port/});
- my ($cnt, $intvl) = (3, 5);
if (defined $self->{keepalive}) {
- # detect dead peers and drop the connection after 60 secs + $cnt*$intvl
setsockopt($socket, Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
or $self->fail("Can't setsockopt SO_KEEPALIVE: $!");
setsockopt($socket, Socket::IPPROTO_TCP, Socket::TCP_KEEPIDLE, 60)
or $self->fail("Can't setsockopt TCP_KEEPIDLE: $!");
- setsockopt($socket, Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, $cnt)
- or $self->fail("Can't setsockopt TCP_KEEPCNT: $!");
- setsockopt($socket, Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, $intvl)
- or $self->fail("Can't setsockopt TCP_KEEPINTVL: $!");
}
- # Abort after 15secs if write(2) isn't acknowledged
- # XXX Socket::TCP_USER_TIMEOUT isn't defined.
- # `grep TCP_USER_TIMEOUT /usr/include/linux/tcp.h` gives 18
- setsockopt($socket, Socket::IPPROTO_TCP, 18, 1000 * $cnt * $intvl)
- or $self->fail("Can't setsockopt TCP_USER_TIMEOUT: $!");
+ binmode($socket) // $self->panic("binmode: $!");
$self->_start_ssl($socket) if $self->{type} eq 'imaps';
$self->{$_} = $socket for qw/STDOUT STDIN/;
}
- binmode $self->{$_} foreach qw/STDIN STDOUT/;
# command counter
$self->{_TAG} = 0;
@@ -645,6 +638,7 @@ sub unselect($) {
# we'll get back to it
$self->{_VANISHED} = [];
$self->{_MODIFIED} = {};
+ $self->{_NEW} = 0;
}
@@ -916,91 +910,93 @@ sub fetch($$$;&) {
}
-# $self->notify(@specifications)
-# Issue a NOTIFY command with the given mailbox @specifications (cf RFC
-# 5465 section 6) to be monitored. Croak if the server did not
-# advertise "NOTIFY" (RFC 5465) in its CAPABILITY list.
-sub notify($@) {
+# $self->notify($arg, %specifications)
+# Issue a NOTIFY command with the given $arg ("SET", "SET STATUS" or
+# "NONE") and mailbox %specifications (cf RFC 5465 section 6) to be
+# monitored. Croak if the server did not advertise "NOTIFY" (RFC
+# 5465) in its CAPABILITY list.
+sub notify($$@) {
my $self = shift;
$self->fail("Server did not advertise NOTIFY (RFC 5465) capability.")
unless $self->_capable('NOTIFY');
- my $events = join ' ', qw/MessageNew MessageExpunge FlagChange MailboxName SubscriptionChange/;
- # Be notified of new messages with EXISTS/RECENT responses, but
- # don't receive unsolicited FETCH responses with a RFC822/BODY[].
- # It costs us an extra roundtrip, but we need to sync FLAG updates
- # and VANISHED responses in batch mode, update the HIGHESTMODSEQ,
- # and *then* issue an explicit UID FETCH command to get new message,
- # and process each FETCH response with a RFC822/BODY[] attribute as
- # they arrive.
- my $command = 'NOTIFY ';
- $command .= @_ ? ('SET '. join(' ', map {"($_ ($events))"} @_)) : 'NONE';
+ my $command = 'NOTIFY '.shift;
+ while (@_) {
+ $command .= " (".shift." (".join(' ', @{shift()})."))";
+ }
$self->_send($command);
}
-# $self->slurp([$callback, $cmd])
-# See if the server has sent some unprocessed data; try to as many
-# lines as possible, process them, and return the number of lines
-# read.
+# slurp($imap, $timeout, $stopwhen)
+# Keep reading untagged responses from the @$imap servers until the
+# $stopwhen condition becomes true (then return true), or until the
+# $timeout expires (then return false).
# This is mostly useful when waiting for notifications while no
# command is progress, cf. RFC 2177 (IDLE) or RFC 5465 (NOTIFY).
-sub slurp($;&$) {
- my ($self, $callback, $cmd) = @_;
- my $ssl = $self->{_SSL};
- my $read = 0;
+sub slurp($$$) {
+ my ($selfs, $timeout, $stopwhen) = @_;
+ my $aborted = 0;
+
+ my $rin = '';
+ vec($rin, fileno($_->{STDOUT}), 1) = 1 foreach @$selfs;
- vec(my $rin, fileno($self->{STDOUT}), 1) = 1;
while (1) {
- unless ((defined $self->{_OUTBUF} and $self->{_OUTBUF} ne '') or
- # Unprocessed data within the current TLS record would
- # cause select(2) to block/timeout due to the raw socket
- # not being ready.
- (defined $ssl and Net::SSLeay::pending($ssl) > 0)) {
- my $r = CORE::select($rin, undef, undef, 0);
+ # first, consider only unprocessed data without our own output
+ # buffer, or within the current TLS record: these would cause
+ # select(2) to block/timeout due to the raw socket not being
+ # ready.
+ my @ready = grep { (defined $_->{_OUTBUF} and $_->{_OUTBUF} ne '') or
+ (defined $_->{_SSL} and Net::SSLeay::pending($_->{_SSL}) > 0)
+ } @$selfs;
+ unless (@ready) {
+ my ($r, $timeleft) = CORE::select(my $rout = $rin, undef, undef, $timeout);
next if $r == -1 and $! == EINTR; # select(2) was interrupted
- $self->panic("Can't select: $!") if $r == -1;
- return $read if $r == 0; # nothing more to read
+ die "select: $!" if $r == -1;
+ return $aborted if $r == 0; # nothing more to read (timeout reached)
+ @ready = grep {vec($rout, fileno($_->{STDOUT}), 1)} @$selfs;
+ $timeout = $timeleft if $timeout > 0;
+ }
+
+ foreach my $imap (@ready) {
+ my $x = $imap->_getline();
+ $imap->_resp($x, sub($) {
+ if ($stopwhen->($imap, shift)) {
+ $aborted = 1;
+ $timeout = 0; # keep reading the handles while there is pending data
+ }
+ }, 'slurp');
}
- my $x = $self->_getline();
- $self->_resp($x, $callback, $cmd);
- $read++;
}
}
-# $self->idle([$timeout, $stopwhen])
+# $self->idle($timeout, $stopwhen)
# Enter IDLE (RFC 2177) for $timout seconds (by default 29 mins), or
# when the callback $stopwhen returns true.
-# Return false if the timeout was reached, and true if IDLE was
-# stopped due the callback.
-sub idle($;$&) {
+# Return true if the callback returned true (either aborting IDLE, or
+# after the $timeout) and false otherwise.
+sub idle($$$) {
my ($self, $timeout, $stopwhen) = @_;
- $timeout //= 1740; # 29 mins
- my $callback = sub() {$timeout = -1 if $stopwhen->()};
$self->fail("Server did not advertise IDLE (RFC 2177) capability.")
unless $self->_capable('IDLE');
my $tag = $self->_cmd_init('IDLE');
$self->_cmd_flush();
-
- for (; $timeout > 0; $timeout--) {
- $self->slurp($callback, 'IDLE');
- sleep 1 if $timeout > 0;
- }
+ my $r = slurp([$self], $timeout // 1740, $stopwhen); # 29 mins
# done idling
$self->_cmd_extend('DONE');
$self->_cmd_flush();
# run the callback again to update the return value if we received
# untagged responses between the DONE and the tagged response
- $self->_recv($tag, $callback, 'IDLE');
+ $self->_recv($tag, sub($) { $r = 1 if $stopwhen->($self, shift) }, 'slurp');
- return $timeout < 0 ? 1 : 0;
+ return $r;
}
-# $self->set_cache( $mailbox, STATE )
+# $self->set_cache($mailbox, STATE)
# Initialize or update the persistent cache, that is, associate a
# known $mailbox with the last known (synced) state:
# * UIDVALIDITY
@@ -1082,6 +1078,7 @@ sub get_cache($@) {
# persistent cache's values.
sub is_dirty($$) {
my ($self, $mailbox) = @_;
+ return 1 if $self->{_NEW};
$self->_updated_cache($mailbox, qw/HIGHESTMODSEQ UIDNEXT/);
}
@@ -1091,6 +1088,7 @@ sub is_dirty($$) {
# internal cache's UIDNEXT value differs from its persistent cache's.
sub has_new_mails($$) {
my ($self, $mailbox) = @_;
+ return 1 if $self->{_NEW};
$self->_updated_cache($mailbox, 'UIDNEXT');
}
@@ -1181,6 +1179,7 @@ sub pull_new_messages($$&@) {
my @ignore = sort { $a <=> $b } @_;
my $mailbox = $self->{_SELECTED} // $self->panic();
+ my $cache = $self->{_CACHE}->{$mailbox};
my $UIDNEXT;
do {
@@ -1205,19 +1204,20 @@ sub pull_new_messages($$&@) {
# 2^32-1: don't use '*' since the highest UID can be known already
$range .= "$since:4294967295";
- $UIDNEXT = $self->{_CACHE}->{$mailbox}->{UIDNEXT} // $self->panic(); # sanity check
+ $UIDNEXT = $cache->{UIDNEXT} // $self->panic(); # sanity check
$self->_send("UID FETCH $range ($attrs)", sub($) {
my $mail = shift;
$UIDNEXT = $mail->{UID} + 1 if $UIDNEXT <= $mail->{UID};
$callback->($mail) if defined $callback;
- }) if $first < $UIDNEXT;
+ }) if $first < $UIDNEXT or $self->{_NEW};
# update the persistent cache for UIDNEXT (not for HIGHESTMODSEQ
# since there might be pending updates)
$self->set_cache($mailbox, UIDNEXT => $UIDNEXT);
+ $self->{_NEW} = 0;
}
# loop if new messages were received in the meantime
- while ($UIDNEXT < $self->{_CACHE}->{$mailbox}->{UIDNEXT});
+ while ($self->{_NEW} or $UIDNEXT < $cache->{UIDNEXT});
}
@@ -1372,12 +1372,21 @@ sub _tcp_connect($$$) {
SOCKETS:
foreach my $ai (@res) {
socket (my $s, $ai->{family}, $ai->{socktype}, $ai->{protocol}) or $self->panic("connect: $!");
- # TODO: add a connection timeout
- # http://devpit.org/wiki/Connect%28%29_with_timeout_%28in_Perl%29
+
+ # timeout connect/read/write/... after 30s
+ # XXX we need to pack the struct timeval manually: not portable!
+ # https://stackoverflow.com/questions/8284243/how-do-i-set-so-rcvtimeo-on-a-socket-in-perl
+ my $timeout = pack('l!l!', 30, 0);
+ setsockopt($s, Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, $timeout)
+ or $self->fail("Can't setsockopt SO_RCVTIMEO: $!");
+ setsockopt($s, Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, $timeout)
+ or $self->fail("Can't setsockopt SO_RCVTIMEO: $!");
+
until (connect($s, $ai->{addr})) {
next if $! == EINTR; # try again if connect(2) was interrupted by a signal
next SOCKETS;
}
+
my $flags = fcntl($s, F_GETFD, 0) or $self->panic("fcntl F_GETFD: $!");
fcntl($s, F_SETFD, $flags | FD_CLOEXEC) or $self->panic("fcntl F_SETFD: $!");
return $s;
@@ -1908,11 +1917,11 @@ sub _send($$;&) {
my $tag = $self->_cmd_init($command);
$self->_cmd_flush();
+ my $cmd = $$command =~ /\AUID ($RE_ATOM_CHAR+) / ? $1 : $$command =~ /\A($RE_ATOM_CHAR+) / ? $1 : $$command;
if (!defined $callback) {
- $self->_recv($tag);
+ $self->_recv($tag, undef, $cmd);
}
else {
- my $cmd = $$command =~ /\AUID ($RE_ATOM_CHAR+) / ? $1 : $$command =~ /\A($RE_ATOM_CHAR+) / ? $1 : $$command;
my $set = $$command =~ /\AUID (?:FETCH|STORE) ([0-9:,*]+)/ ? $1 : undef;
$self->_recv($tag, $callback, $cmd, $set);
}
@@ -1993,6 +2002,7 @@ sub _open_mailbox($$) {
# we'll get back to it
$self->{_VANISHED} = [];
$self->{_MODIFIED} = {};
+ $self->{_NEW} = 0;
$self->{_SELECTED} = $mailbox;
$self->{_CACHE}->{$mailbox} //= {};
@@ -2219,6 +2229,7 @@ sub _resp($$;&$$) {
}
elsif (s/\A(?:OK|NO|BAD) //) {
$self->_resp_text($_);
+ $callback->($self->{_SELECTED}) if defined $self->{_SELECTED} and defined $callback and $cmd eq 'slurp';
}
elsif (/\ACAPABILITY((?: $RE_ATOM_CHAR+)+)\z/) {
$self->{_CAPABILITIES} = [ split / /, ($1 =~ s/^ //r) ];
@@ -2233,17 +2244,20 @@ sub _resp($$;&$$) {
# /!\ $cache->{EXISTS} MUST NOT be defined on SELECT
if (defined $cache->{EXISTS}) {
$self->panic("Unexpected EXISTS shrink $1 < $cache->{EXISTS}!") if $1 < $cache->{EXISTS};
- # the actual UIDNEXT is *at least* that
- $cache->{UIDNEXT} += $1 - $cache->{EXISTS} if defined $cache->{UIDNEXT};
+ $self->{_NEW} += $1 - $cache->{EXISTS} if $1 > $cache->{EXISTS}; # new mails
}
$cache->{EXISTS} = $1;
+ $callback->($self->{_SELECTED} // $self->panic()) if defined $callback and $cmd eq 'slurp';
}
elsif (/\A([0-9]+) EXPUNGE\z/) {
+ $self->panic() unless defined $cache->{EXISTS}; # sanity check
# /!\ No bookkeeping since there is no internal cache mapping sequence numbers to UIDs
if ($self->_enabled('QRESYNC')) {
$self->panic("$1 <= $cache->{EXISTS}") if $1 <= $cache->{EXISTS}; # sanity check
$self->fail("RFC 7162 violation! Got an EXPUNGE response with QRESYNC enabled.");
}
+ # the new message was expunged before it was synced
+ $self->{_NEW} = 0 if $self->{_NEW} == 1 and $cache->{EXISTS} == $1;
$cache->{EXISTS}--; # explicit EXISTS responses are optional
}
elsif (/\ASEARCH((?: [0-9]+)*)\z/) {
@@ -2266,11 +2280,20 @@ sub _resp($$;&$$) {
/\A \((\\?$RE_ATOM_CHAR+ [0-9]+(?: \\?$RE_ATOM_CHAR+ [0-9]+)*)?\)\z/ or $self->panic($_);
my %status = split / /, $1;
$mailbox = 'INBOX' if uc $mailbox eq 'INBOX'; # INBOX is case-insensitive
+ $self->panic("RFC 5465 violation! Missing HIGHESTMODSEQ data item in STATUS response")
+ if $self->_enabled('QRESYNC') and !defined $status{HIGHESTMODSEQ} and defined $cmd and
+ ($cmd eq 'NOTIFY' or $cmd eq 'slurp');
$self->_update_cache_for($mailbox, %status);
- $callback->($mailbox, %status) if defined $callback and $cmd eq 'STATUS';
+ if (defined $callback) {
+ if ($cmd eq 'STATUS') {
+ $callback->($mailbox, %status);
+ } elsif ($cmd eq 'slurp') {
+ $callback->($mailbox);
+ }
+ }
}
elsif (s/\A([0-9]+) FETCH \(//) {
- $self->panic("$1 <= $cache->{EXISTS}") unless $1 <= $cache->{EXISTS}; # sanity check
+ $cache->{EXISTS} = $1 if $1 > $cache->{EXISTS};
my ($seq, $first) = ($1, 1);
my %mail;
while ($_ ne ')') {
@@ -2313,8 +2336,13 @@ sub _resp($$;&$$) {
my $flags = join ' ', sort(grep {lc $_ ne '\recent'} @{$mail{FLAGS}}) if defined $mail{FLAGS};
$self->{_MODIFIED}->{$uid} = [ $mail{MODSEQ}, $flags ];
}
- $callback->(\%mail) if defined $callback and ($cmd eq 'FETCH' or $cmd eq 'STORE') and
- defined $uid and in_set($uid, $set);
+ if (defined $callback) {
+ if ($cmd eq 'FETCH' or $cmd eq 'STORE') {
+ $callback->(\%mail) if defined $uid and in_set($uid, $set);
+ } elsif ($cmd eq 'slurp') {
+ $callback->($self->{_SELECTED} // $self->panic())
+ }
+ }
}
elsif (/\AENABLED((?: $RE_ATOM_CHAR+)+)\z/) { # RFC 5161 ENABLE
$self->{_ENABLED} //= [];
@@ -2338,6 +2366,7 @@ sub _resp($$;&$$) {
push @{$self->{_VANISHED}}, ($min .. $max);
}
}
+ $callback->($self->{_SELECTED} // $self->panic()) if defined $callback and $cmd eq 'slurp';
}
}
elsif (s/\A\+// and ($_ eq '' or s/\A //)) {
@@ -2351,7 +2380,6 @@ sub _resp($$;&$$) {
else {
$self->panic("Unexpected response: ", $_);
}
- $callback->() if defined $callback and $cmd eq 'IDLE';
}