You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
5.9 KiB

  1. <?php
  2. use BarnabyWalters\Mf2;
  3. class PushTask {
  4. public static function verify_subscription($subscription_id, $mode) {
  5. $subscription = db\get_by_id('subscriptions', $subscription_id);
  6. if($subscription) {
  7. $feed = db\get_by_id('feeds', $subscription->feed_id);
  8. // Choose the expiration for the subscription
  9. $lease_seconds = 86400*3;
  10. $exp_ts = time() + $lease_seconds;
  11. $exp_date = date('Y-m-d H:i:s', $exp_ts);
  12. $push_params = [
  13. 'hub.mode' => ($mode == 'subscribe' ? 'subscribe' : 'unsubscribe'),
  14. 'hub.topic' => $feed->feed_url,
  15. 'hub.challenge' => $subscription->challenge
  16. ];
  17. if($mode == 'subscribe') {
  18. $push_params['hub.lease_seconds'] = $lease_seconds;
  19. }
  20. $url = parse_url($subscription->callback_url);
  21. if($q=k($url, 'query')) {
  22. parse_str($q, $existing_params);
  23. $push_params = array_merge($push_params, $existing_params);
  24. }
  25. $url['query'] = http_build_query($push_params);
  26. $url = build_url($url);
  27. $response = request\get_url($url, true);
  28. $subscription->challenge_response = $response['headers']."\n\n".$response['body'];
  29. if(request\response_is($response['status'], 2) && $response['body'] == $subscription->challenge) {
  30. // The subscriber replied with a 2xx status code and confirmed the challenge string.
  31. if($mode == 'subscribe') {
  32. // The subscription is confirmed and active.
  33. $subscription->date_confirmed = db\now();
  34. $subscription->lease_seconds = $lease_seconds;
  35. $subscription->date_expires = $exp_date;
  36. $subscription->active = 1;
  37. echo "Subscriber verified the request and is now subscribed\n";
  38. } else {
  39. $subscription->date_unsubscribed = db\now();
  40. $subscription->date_expires = null;
  41. $subscription->active = 0;
  42. echo "Subscriber verified the request and is now unsubscribed\n";
  43. }
  44. } else {
  45. // The subscriber did not confirm the subscription, so reject the request
  46. echo "Subscriber did not echo the challenge\n";
  47. }
  48. db\set_updated($subscription);
  49. $subscription->save();
  50. print_r($response);
  51. } else {
  52. echo "Subscription not found\n";
  53. }
  54. }
  55. public static function publish($feed_id) {
  56. $feed = db\get_by_id('feeds', $feed_id);
  57. if($feed) {
  58. // First check the feed to see if the content has changed since the last time we checked
  59. $response = request\get_url($feed->feed_url, true);
  60. $feed->last_retrieved = db\now();
  61. db\set_updated($feed);
  62. $content_hash = md5($response['body']);
  63. if($content_hash != $feed->content_hash) {
  64. $feed->content_hash = $content_hash;
  65. $subscribers = ORM::for_table('subscriptions')->where('feed_id', $feed->id)->where('active', 1)->find_many();
  66. foreach($subscribers as $s) {
  67. echo "Queuing notification for feed_id=$feed_id subscription_id=$s->id\n";
  68. DeferredTask::queue('PushTask', 'notify_subscriber', [$feed_id, $s->id, db\now()]);
  69. }
  70. } else {
  71. echo "Feed body has the same content hash as last time, not notifying subscribers\n";
  72. }
  73. $feed->save();
  74. } else {
  75. echo "Feed not found\n";
  76. }
  77. }
  78. public static function notify_subscriber($feed_id, $subscription_id, $date_queued) {
  79. $feed = db\get_by_id('feeds', $feed_id);
  80. if(!$feed) {
  81. echo "Feed not found\n";
  82. return;
  83. }
  84. $subscription = db\get_by_id('subscriptions', $subscription_id);
  85. if(!$subscription) {
  86. echo "Subscription not found\n";
  87. return;
  88. }
  89. // If the job was put on the queue before the last ping was sent, ignore it.
  90. // This happens when there is a retry job in the delayed queue, and then the
  91. // publisher sends a new publish request and the subscriber responds to it immediately.
  92. if(strtotime($date_queued) < strtotime($subscription->date_last_ping_sent)) {
  93. echo "Job was queued before the last ping was sent by the publisher, skipping\n";
  94. return;
  95. }
  96. echo "Processing subscriber: " . $subscription->callback_url . "\n";
  97. // Subscription may be "active" but the expiration date may have passed.
  98. // If so, de-activate the subscription.
  99. if(strtotime($subscription->date_expires) < time()) {
  100. echo "Subscription expired!\n";
  101. $subscription->active = 0;
  102. db\set_updated($subscription);
  103. $subscription->save();
  104. return;
  105. }
  106. echo "Notifying subscriber!\n";
  107. $subscription->date_last_ping_sent = db\now();
  108. $response = request\post($subscription->callback_url, []);
  109. $subscription->last_ping_status = $response['status'];
  110. $subscription->last_ping_headers = $response['headers'];
  111. $subscription->last_ping_body = $response['body'];
  112. echo "Subscriber return a " . $response['status'] . " HTTP status\n";
  113. if(request\response_is($response['status'], 2)) {
  114. $subscription->last_ping_success = 1;
  115. $subscription->last_ping_error_delay = 0;
  116. } else {
  117. $subscription->last_ping_success = 0;
  118. // If the ping failed, queue another ping for a later time with exponential backoff
  119. if($subscription->last_ping_error_delay == 0)
  120. $subscription->last_ping_error_delay = 15;
  121. // If it's timed out after 8 tries, de-activate the subscription
  122. if($subscription->last_ping_error_delay > 2000) {
  123. echo "Ping failed after " . $subscription->last_ping_error_delay . " seconds. Deactivating this subscription.\n";
  124. $subscription->active = 0;
  125. } else {
  126. echo "Ping failed, trying again in " . $subscription->last_ping_error_delay . " seconds\n";
  127. DeferredTask::queue('PushTask', 'notify_subscriber', [$feed_id, $subscription_id, db\now()], $subscription->last_ping_error_delay);
  128. $subscription->last_ping_error_delay = $subscription->last_ping_error_delay * 2;
  129. }
  130. }
  131. db\set_updated($subscription);
  132. $subscription->save();
  133. }
  134. }