diff --git a/controllers/push.php b/controllers/push.php index 32ff264..c06d03d 100644 --- a/controllers/push.php +++ b/controllers/push.php @@ -13,6 +13,10 @@ $app->get('/callback-success', function() use($app) { $app->response()->status(200); echo $params['hub_challenge']; }); +$app->post('/callback-success', function() use($app) { + $params = $app->request()->params(); + $app->response()->status(200); +}); $app->get('/callback-fail', function() use($app) { $params = $app->request()->params(); @@ -20,6 +24,18 @@ $app->get('/callback-fail', function() use($app) { }); /////////////////////////////////////////////////////////////// +function verify_push_topic_url($topic, &$app) { + // If we've already seen the topic, assume it's valid and don't check it again + if(!db\feed_from_url($topic)) { + $topic_head = request\get_head($topic); + if($topic_head && !request\response_is($topic_head['status'], 2)) { + push_error($app, "The topic URL returned a " . $topic_head['status'] . " status code"); + } else { + push_error($app, 'We tried to verify the topic URL exists but it didn\'t respond to a HEAD request.'); + } + } +} + $app->post('/', function() use($app) { $params = $app->request()->params(); @@ -41,15 +57,7 @@ $app->post('/', function() use($app) { } if($mode == 'subscribe') { - // If we've already seen the topic, assume it's valid and don't check it again - if(!db\feed_from_url($topic)) { - $topic_head = request\get_head($topic); - if($topic_head && !request\response_is($topic_head['status'], 2)) { - push_error($app, "The topic URL returned a " . $topic_head['status'] . " status code"); - } else { - push_error($app, 'We tried to verify the topic URL exists but it didn\'t respond to a HEAD request.'); - } - } + verify_push_topic_url($topic, $app); // Find or create the feed given the topic URL $feed = db\find_or_create('feeds', ['feed_url'=>$topic], [ @@ -63,6 +71,7 @@ $app->post('/', function() use($app) { // Always set a new requested date and challenge $subscription->date_requested = db\now(); $subscription->challenge = db\random_hash(); + db\set_updated($subscription); $subscription->save(); // Queue the worker to validate the subscription @@ -89,9 +98,37 @@ $app->post('/', function() use($app) { break; + case 'publish': + + // Sanity check the request params + $url = k($params, 'hub_url'); + + if(!is_valid_push_url($url)) { + push_error($app, 'URL was invalid'); + } + + verify_push_topic_url($url, $app); + + // Find or create the feed given the topic URL + $feed = db\find_or_create('feeds', ['feed_url'=>$url], [ + 'hash' => db\random_hash(), + ], true); + + $num_subscribers = ORM::for_table('subscriptions')->where('feed_id', $feed->id)->where('active', 1)->count(); + + $feed->push_last_ping_received = db\now(); + db\set_updated($feed); + $feed->save(); + + // Queue the worker to ping all the subscribers about the new content + DeferredTask::queue('PushTask', 'publish', $feed->id); + + $app->response()->status(202); + echo "There are currently $num_subscribers active subscriptions for this feed.\n"; + echo "The hub is checking the feed for new content and notifying the subscribers.\nCheck the status here:\n"; + echo Config::$base_url . '/feed/' . $feed->hash . "\n"; break; } - }); diff --git a/lib/PushTask.php b/lib/PushTask.php index 7c6a921..737222b 100644 --- a/lib/PushTask.php +++ b/lib/PushTask.php @@ -62,8 +62,109 @@ class PushTask { $subscription->save(); print_r($response); + } else { + echo "Subscription not found\n"; } } + public static function publish($feed_id) { + $feed = db\get_by_id('feeds', $feed_id); + if($feed) { + + // First check the feed to see if the content has changed since the last time we checked + $response = request\get_url($feed->feed_url, true); + + $feed->last_retrieved = db\now(); + db\set_updated($feed); + $content_hash = md5($response['body']); + + if($content_hash != $feed->content_hash) { + $feed->content_hash = $content_hash; + + $subscribers = ORM::for_table('subscriptions')->where('feed_id', $feed->id)->where('active', 1)->find_many(); + foreach($subscribers as $s) { + echo "Queuing notification for feed_id=$feed_id subscription_id=$s->id\n"; + DeferredTask::queue('PushTask', 'notify_subscriber', [$feed_id, $s->id, db\now()]); + } + + } else { + echo "Feed body has the same content hash as last time, not notifying subscribers\n"; + } + + $feed->save(); + + } else { + echo "Feed not found\n"; + } + } + + public static function notify_subscriber($feed_id, $subscription_id, $date_queued) { + $feed = db\get_by_id('feeds', $feed_id); + if(!$feed) { + echo "Feed not found\n"; + return; + } + + $subscription = db\get_by_id('subscriptions', $subscription_id); + if(!$subscription) { + echo "Subscription not found\n"; + return; + } + + // If the job was put on the queue before the last ping was sent, ignore it. + // This happens when there is a retry job in the delayed queue, and then the + // publisher sends a new publish request and the subscriber responds to it immediately. + if(strtotime($date_queued) < strtotime($subscription->date_last_ping_sent)) { + echo "Job was queued before the last ping was sent by the publisher, skipping\n"; + return; + } + + echo "Processing subscriber: " . $subscription->callback_url . "\n"; + + // Subscription may be "active" but the expiration date may have passed. + // If so, de-activate the subscription. + if(strtotime($subscription->date_expires) < time()) { + echo "Subscription expired!\n"; + $subscription->active = 0; + db\set_updated($subscription); + $subscription->save(); + return; + } + + echo "Notifying subscriber!\n"; + + $subscription->date_last_ping_sent = db\now(); + $response = request\post($subscription->callback_url, []); + $subscription->last_ping_status = $response['status']; + $subscription->last_ping_headers = $response['headers']; + $subscription->last_ping_body = $response['body']; + + echo "Subscriber return a " . $response['status'] . " HTTP status\n"; + + if(request\response_is($response['status'], 2)) { + $subscription->last_ping_success = 1; + $subscription->last_ping_error_delay = 0; + } else { + $subscription->last_ping_success = 0; + // If the ping failed, queue another ping for a later time with exponential backoff + if($subscription->last_ping_error_delay == 0) + $subscription->last_ping_error_delay = 15; + + // If it's timed out after 8 tries, de-activate the subscription + if($subscription->last_ping_error_delay > 2000) { + echo "Ping failed after " . $subscription->last_ping_error_delay . " seconds. Deactivating this subscription.\n"; + $subscription->active = 0; + } else { + echo "Ping failed, trying again in " . $subscription->last_ping_error_delay . " seconds\n"; + DeferredTask::queue('PushTask', 'notify_subscriber', [$feed_id, $subscription_id, db\now()], $subscription->last_ping_error_delay); + $subscription->last_ping_error_delay = $subscription->last_ping_error_delay * 2; + } + } + + db\set_updated($subscription); + $subscription->save(); + + } + } diff --git a/lib/helpers.php b/lib/helpers.php index cc4d81c..df657d7 100644 --- a/lib/helpers.php +++ b/lib/helpers.php @@ -11,13 +11,16 @@ function friendly_url($url) { } function friendly_date($date_string, $tz_offset) { + if(!$date_string) + return ''; + $date = new DateTime($date_string); if($tz_offset > 0) $date->add(new DateInterval('PT'.$tz_offset.'S')); elseif($tz_offset < 0) $date->sub(new DateInterval('PT'.abs($tz_offset).'S')); $tz = ($tz_offset < 0 ? '-' : '+') . sprintf('%02d:%02d', abs($tz_offset/60/60), ($tz_offset/60)%60); - return $date->format('F j, Y g:ia') . ' ' . $tz; + return $date->format('F j, Y H:i:s') . ' ' . $tz; } function build_url($parsed_url) { diff --git a/public/css/style.css b/public/css/style.css index 28260ca..6d343ef 100644 --- a/public/css/style.css +++ b/public/css/style.css @@ -71,6 +71,10 @@ body.logged-out { border: 1px #ccc solid; } +.subscription-status pre { + max-width: 500px; + overflow: pre-wrap; +} /** * Bootstrap callouts diff --git a/views/subscription-status.php b/views/subscription-status.php index 8589129..770dd70 100644 --- a/views/subscription-status.php +++ b/views/subscription-status.php @@ -1,3 +1,4 @@ +
subscription->active): ?> @@ -5,10 +6,9 @@
This subscription is not active
- - - +

Subscription

+
@@ -19,7 +19,7 @@ - + @@ -27,7 +27,7 @@ - + subscription->date_unsubscribed): ?> @@ -44,4 +44,33 @@
Feed URL (Topic) feed->feed_url ?>
Date Subscription was Requestedsubscription->date_requested ? friendly_date($this->subscription->date_requested, $tz) : '' ?>subscription->date_requested, $tz) ?>
Subscription Verification Response
(from your server)
Date Subscription was Confirmedsubscription->date_confirmed ? friendly_date($this->subscription->date_confirmed, $tz) : '' ?>subscription->date_confirmed, $tz) ?>
+

Ping Info

+ + + + + + + + + + + + + + + + + + subscription->last_ping_success == 0): ?> + + + + + +
Last ping received from publisherfeed->push_last_ping_received, $tz) ?>
Last ping sent to subscribersubscription->date_last_ping_sent, $tz) ?>
Last Response
(from your server)
subscription->last_ping_headers."\n\n".$this->subscription->last_ping_body) ?>
Last ping was successful?subscription->last_ping_success ? 'Yes' : 'No' ?>
(Subscriber must return 2xx on success)
Retrying ping in + subscription->last_ping_error_delay/2 ?> seconds
+ (Will be de-activated after 1 hour from first failed ping) +
+
\ No newline at end of file