You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
7.7 KiB

8 years ago
  1. <?php
  2. use BarnabyWalters\Mf2;
  3. class PushTask {
  4. public static function verify_subscription($subscription_id, $mode) {
  5. $subscription = db\get_by_id('subscriptions', $subscription_id);
  6. if($subscription) {
  7. $feed = db\get_by_id('feeds', $subscription->feed_id);
  8. // Choose the expiration for the subscription
  9. $lease_seconds = 86400*7 + 3600;
  10. $exp_ts = time() + $lease_seconds;
  11. $exp_date = date('Y-m-d H:i:s', $exp_ts);
  12. if($subscription->namespaced) {
  13. $prefix = 'hub.';
  14. } else {
  15. $prefix = '';
  16. }
  17. $push_params = [
  18. $prefix.'mode' => ($mode == 'subscribe' ? 'subscribe' : 'unsubscribe'),
  19. $prefix.'topic' => $feed->feed_url,
  20. $prefix.'challenge' => $subscription->challenge
  21. ];
  22. if($mode == 'subscribe') {
  23. $push_params[$prefix.'lease_seconds'] = $lease_seconds;
  24. }
  25. echo "Verifying subscription to ".$feed->feed_url." for callback ".$subscription->callback_url."\n";
  26. $url = parse_url($subscription->callback_url);
  27. if($q=k($url, 'query')) {
  28. parse_str($q, $existing_params);
  29. $push_params = array_merge($push_params, $existing_params);
  30. }
  31. $url['query'] = http_build_query($push_params);
  32. $url = build_url($url);
  33. echo "\t".$url."\n";
  34. $response = request\get_url($url, true);
  35. $subscription->challenge_response = $response['headers']."\n\n".$response['body'];
  36. if(request\response_is($response['status'], 2) && $response['body'] == $subscription->challenge) {
  37. // The subscriber replied with a 2xx status code and confirmed the challenge string.
  38. if($mode == 'subscribe') {
  39. // The subscription is confirmed and active.
  40. $subscription->date_confirmed = db\now();
  41. $subscription->lease_seconds = $lease_seconds;
  42. $subscription->date_expires = $exp_date;
  43. $subscription->active = 1;
  44. echo "Subscriber verified the request and is now subscribed\n";
  45. } else {
  46. $subscription->date_unsubscribed = db\now();
  47. $subscription->date_expires = null;
  48. $subscription->active = 0;
  49. echo "Subscriber verified the request and is now unsubscribed\n";
  50. }
  51. } else {
  52. // The subscriber did not confirm the subscription, so reject the request
  53. echo "Subscriber did not echo the challenge\n";
  54. }
  55. db\set_updated($subscription);
  56. $subscription->save();
  57. print_r($response);
  58. } else {
  59. echo "Subscription not found\n";
  60. }
  61. }
  62. public static function publish($feed_id) {
  63. $feed = db\get_by_id('feeds', $feed_id);
  64. if($feed) {
  65. echo $feed->feed_url."\n";
  66. echo "Checking feed for updates...\n";
  67. // First check the feed to see if the content has changed since the last time we checked
  68. $response = request\get_url($feed->feed_url, true);
  69. if($response['status'] != 200) {
  70. echo "Feed did not return HTTP 200: ".$response['status'].". Skipping publish.\n";
  71. return;
  72. }
  73. $feed->last_retrieved = db\now();
  74. db\set_updated($feed);
  75. $content_hash = md5($response['body']);
  76. if($content_hash != $feed->content_hash) {
  77. $feed->content_hash = $content_hash;
  78. $headers = request\parse_headers($response['headers']);
  79. if(array_key_exists('Content-Type', $headers)) {
  80. if(is_array($headers['Content-Type']))
  81. $feed->content_type = $headers['Content-Type'][0];
  82. else
  83. $feed->content_type = $headers['Content-Type'];
  84. }
  85. $feed->content = $response['body'];
  86. $feed->save();
  87. $subscribers = ORM::for_table('subscriptions')->where('feed_id', $feed->id)->where('active', 1)->find_many();
  88. foreach($subscribers as $s) {
  89. echo "Queuing notification for feed_id=$feed_id ($feed->feed_url) subscription_id=$s->id ($s->callback_url)\n";
  90. DeferredTask::queue('PushTask', 'notify_subscriber', [$feed_id, $s->id, db\now()]);
  91. }
  92. if(count($subscribers) == 0) {
  93. echo "No active subscribers\n";
  94. }
  95. } else {
  96. $feed->save();
  97. echo "Feed body has the same content hash as last time, not notifying subscribers\n";
  98. }
  99. } else {
  100. echo "Feed not found\n";
  101. }
  102. }
  103. public static function notify_subscriber($feed_id, $subscription_id, $date_queued) {
  104. $feed = db\get_by_id('feeds', $feed_id);
  105. if(!$feed) {
  106. echo "Feed not found\n";
  107. return;
  108. }
  109. $subscription = db\get_by_id('subscriptions', $subscription_id);
  110. if(!$subscription) {
  111. echo "Subscription not found\n";
  112. return;
  113. }
  114. // If the job was put on the queue before the last ping was sent, ignore it.
  115. // This happens when there is a retry job in the delayed queue, and then the
  116. // publisher sends a new publish request and the subscriber responds to it immediately.
  117. if(strtotime($date_queued) < strtotime($subscription->date_last_ping_sent)) {
  118. echo "Job was queued before the last ping was sent by the publisher, skipping\n";
  119. return;
  120. }
  121. echo "Processing subscriber: " . $subscription->callback_url . "\n";
  122. // Subscription may be "active" but the expiration date may have passed.
  123. // If so, de-activate the subscription.
  124. if(strtotime($subscription->date_expires) < time()) {
  125. echo "Subscription expired!\n";
  126. $subscription->active = 0;
  127. db\set_updated($subscription);
  128. $subscription->save();
  129. return;
  130. }
  131. echo "Notifying subscriber!\n";
  132. $headers = [
  133. 'Content-Type: ' . ($feed->content_type ?: 'text/plain'),
  134. 'Link: <' . Config::$base_url . '/>; rel="hub", <' . $feed->feed_url . '>; rel="self"',
  135. ];
  136. if($subscription->secret) {
  137. $signature = hash_hmac('sha256', $feed->content, $subscription->secret);
  138. $headers[] = 'X-Hub-Signature: sha256=' . $signature;
  139. }
  140. $subscription->date_last_ping_sent = db\now();
  141. $response = request\post($subscription->callback_url, $feed->content, false, $headers);
  142. $subscription->last_ping_status = $response['status'];
  143. $subscription->last_ping_headers = $response['headers'];
  144. $subscription->last_ping_body = $response['body'];
  145. echo "Subscriber return a " . $response['status'] . " HTTP status\n";
  146. if(request\response_is($response['status'], 2)) {
  147. $subscription->last_ping_success = 1;
  148. $subscription->last_ping_error_delay = 0;
  149. } elseif($response['status'] == 410) {
  150. // If the subscriber returns HTTP 410, then take that as an indication that they no longer want more notifications and deactivate this subscription
  151. // https://github.com/w3c/websub/issues/106
  152. // https://github.com/w3c/websub/issues/103
  153. echo "Deactivating subscription based on HTTP 410 response returned\n";
  154. $subscription->last_ping_success = 0;
  155. $subscription->active = 0;
  156. } else {
  157. $subscription->last_ping_success = 0;
  158. // If the ping failed, queue another ping for a later time with exponential backoff
  159. if($subscription->last_ping_error_delay == 0)
  160. $subscription->last_ping_error_delay = 15;
  161. // If it's timed out after 8 tries, de-activate the subscription
  162. if($subscription->last_ping_error_delay > 2000) {
  163. echo "Ping failed after " . $subscription->last_ping_error_delay . " seconds. Deactivating this subscription.\n";
  164. $subscription->active = 0;
  165. } else {
  166. echo "Ping failed, trying again in " . $subscription->last_ping_error_delay . " seconds\n";
  167. DeferredTask::queue('PushTask', 'notify_subscriber', [$feed_id, $subscription_id, db\now()], $subscription->last_ping_error_delay);
  168. $subscription->last_ping_error_delay = $subscription->last_ping_error_delay * 2;
  169. }
  170. }
  171. db\set_updated($subscription);
  172. $subscription->save();
  173. }
  174. }