[ Avaa Bypassed ]




Upload:

Command:

hmhc3928@18.191.205.190: ~ $
<?php

namespace MailPoet\Cron\Workers\SendingQueue;

if (!defined('ABSPATH')) exit;


use MailPoet\Cron\Workers\SimpleWorker;
use MailPoet\Mailer\MailerLog;
use MailPoet\Models\ScheduledTask;
use MailPoet\Models\ScheduledTaskSubscriber;
use MailPoet\Models\SendingQueue as SendingQueueModel;
use MailPoet\WP\Functions as WPFunctions;
use MailPoetVendor\Carbon\Carbon;

class Migration extends SimpleWorker {
  const TASK_TYPE = 'migration';
  const BATCH_SIZE = 20;

  public function checkProcessingRequirements() {
    // if migration was completed, don't run it again
    $completedTasks = $this->getCompletedTasks();
    return empty($completedTasks);
  }

  public function prepareTaskStrategy(ScheduledTask $task, $timer) {
    $unmigratedColumns = $this->checkUnmigratedColumnsExist();
    $unmigratedQueuesCount = 0;
    $unmigratedQueueSubscribers = [];

    if ($unmigratedColumns) {
      $unmigratedQueuesCount = $this->getUnmigratedQueues()->count();
      $unmigratedQueueSubscribers = $this->getTaskIdsForUnmigratedSubscribers();
    }

    if (!$unmigratedColumns ||
      ($unmigratedQueuesCount == 0
      && count($unmigratedQueueSubscribers) == 0)
    ) {
      // nothing to migrate, complete task
      $task->processedAt = WPFunctions::get()->currentTime('mysql');
      $task->status = ScheduledTask::STATUS_COMPLETED;
      $task->save();
      $this->resumeSending();
      return false;
    }

    // pause sending while the migration is in process
    $this->pauseSending();
    return true;
  }

  public function pauseSending() {
    $mailerLog = MailerLog::getMailerLog();
    if (MailerLog::isSendingPaused($mailerLog)) {
      // sending is already paused
      return false;
    }
    $mailerLog = MailerLog::setError(
      $mailerLog,
      'migration',
      WPFunctions::get()->__('Your sending queue data is being migrated to allow better performance, sending is paused while the migration is in progress and will resume automatically upon completion. This may take a few minutes.')
    );
    return MailerLog::pauseSending($mailerLog);
  }

  public function resumeSending() {
    $mailerLog = MailerLog::getMailerLog();
    if (!MailerLog::isSendingPaused($mailerLog)) {
      // sending is not paused
      return false;
    }
    $error = MailerLog::getError($mailerLog);
    // only resume sending if it was paused by migration
    if (isset($error['operation']) && $error['operation'] === 'migration') {
      return MailerLog::resumeSending();
    }
  }

  public function processTaskStrategy(ScheduledTask $task, $timer) {
    $this->migrateSendingQueues($timer);
    $this->migrateSubscribers($timer);
    $this->resumeSending();
    return true;
  }

  private function checkUnmigratedColumnsExist() {
    global $wpdb;
    $existingColumns = $wpdb->get_col('DESC ' . SendingQueueModel::$_table);
    return in_array('type', $existingColumns);
  }

  public function getUnmigratedQueues() {
    return SendingQueueModel::where('task_id', 0)
      ->whereNull('type');
  }

  public function getTaskIdsForUnmigratedSubscribers() {
    global $wpdb;
    $query = sprintf(
      'SELECT queues.`task_id` FROM %1$s queues INNER JOIN %2$s tasks ON queues.`task_id` = tasks.`id` ' .
      'WHERE tasks.`type` = "sending" AND (tasks.`status` IS NULL OR tasks.`status` = "paused") ' .
      'AND queues.`subscribers` != "" AND queues.`subscribers` != "N;"' .
      'AND queues.`count_total` > (SELECT COUNT(*) FROM %3$s subs WHERE subs.`task_id` = queues.`task_id`)',
      MP_SENDING_QUEUES_TABLE,
      MP_SCHEDULED_TASKS_TABLE,
      MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE
    );
    return $wpdb->get_col($query);
  }

  /*
   * Migrate all sending queues without converting subscriber data
   */
  public function migrateSendingQueues($timer) {
    global $wpdb;

    $queues = $this->getUnmigratedQueues()
      ->select('id')
      ->findArray();

    $columnList = [
      'status',
      'priority',
      'scheduled_at',
      'processed_at',
      'created_at',
      'updated_at',
      'deleted_at',
    ];

    if (!empty($queues)) {
      foreach (array_chunk($queues, self::BATCH_SIZE) as $queueBatch) {
        // abort if execution limit is reached
        $this->cronHelper->enforceExecutionLimit($timer);

        foreach ($queueBatch as $queue) {
          // create a new scheduled task of type "sending"
          $wpdb->query(sprintf(
            'INSERT IGNORE INTO %1$s (`type`, %2$s) ' .
            'SELECT "sending", %2$s FROM %3$s WHERE `id` = %4$s',
            MP_SCHEDULED_TASKS_TABLE,
            '`' . join('`, `', $columnList) . '`',
            MP_SENDING_QUEUES_TABLE,
            $queue['id']
          ));
          // link the queue with the task via task_id
          $newTaskId = $wpdb->insert_id; // phpcs:ignore Squiz.NamingConventions.ValidVariableName.MemberNotCamelCaps
          $wpdb->query(sprintf(
            'UPDATE %1$s SET `task_id` = %2$s WHERE `id` = %3$s',
            MP_SENDING_QUEUES_TABLE,
            $newTaskId,
            $queue['id']
          ));
        }
      }
    }

    return true;
  }

  /*
   * Migrate subscribers for in-progress sending tasks from the `subscribers` field to a separate table
   */
  public function migrateSubscribers($timer) {
    global $wpdb;

    // find in-progress queues that have serialized subscribers
    $taskIds = $this->getTaskIdsForUnmigratedSubscribers();

    // check if subscribers for each one were already migrated
    if (!empty($taskIds)) {
      $taskIds = $wpdb->get_col(sprintf(
        'SELECT queues.`task_id` FROM %1$s queues WHERE queues.`task_id` IN(' . join(',', array_map('intval', $taskIds)) . ') ' .
        'AND queues.`count_total` > (SELECT COUNT(*) FROM %2$s subs WHERE subs.`task_id` = queues.`task_id`)',
        MP_SENDING_QUEUES_TABLE,
        MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE
      ));
    }

    if (!empty($taskIds)) {
      foreach ($taskIds as $taskId) {
        // abort if execution limit is reached
        $this->cronHelper->enforceExecutionLimit($timer);

        $this->migrateTaskSubscribers($taskId, $timer);
      }
    }

    return true;
  }

  public function migrateTaskSubscribers($taskId, $timer) {
    global $wpdb;

    $migratedUnprocessedCount = ScheduledTaskSubscriber::getUnprocessedCount($taskId);
    $migratedProcessedCount = ScheduledTaskSubscriber::getProcessedCount($taskId);

    $subscribers = $wpdb->get_var(sprintf(
      'SELECT `subscribers` FROM %1$s WHERE `task_id` = %2$d ' .
      'AND (`count_processed` > %3$d OR `count_to_process` > %4$d)',
      MP_SENDING_QUEUES_TABLE,
      $taskId,
      $migratedUnprocessedCount,
      $migratedProcessedCount
    ));

    // sanity check
    if (empty($subscribers)) {
      return false;
    }

    $subscribers = unserialize($subscribers);

    if (!empty($subscribers['to_process'])) {
      $subscribersToMigrate = array_slice($subscribers['to_process'], $migratedUnprocessedCount);
      foreach ($subscribersToMigrate as $subId) {
        // abort if execution limit is reached
        $this->cronHelper->enforceExecutionLimit($timer);

        ScheduledTaskSubscriber::createOrUpdate([
          'task_id' => $taskId,
          'subscriber_id' => $subId,
          'processed' => ScheduledTaskSubscriber::STATUS_UNPROCESSED,
        ]);
      }
    }

    if (!empty($subscribers['processed'])) {
      $subscribersToMigrate = array_slice($subscribers['processed'], $migratedProcessedCount);
      foreach ($subscribersToMigrate as $subId) {
        // abort if execution limit is reached
        $this->cronHelper->enforceExecutionLimit($timer);

        ScheduledTaskSubscriber::createOrUpdate([
          'task_id' => $taskId,
          'subscriber_id' => $subId,
          'processed' => ScheduledTaskSubscriber::STATUS_PROCESSED,
        ]);
      }
    }

    return true;
  }

  public function getNextRunDate($wp = null) {
    if (is_null($wp)) {
      $wp = new WPFunctions();
    }
    // run migration immediately
    return Carbon::createFromTimestamp($wp->currentTime('timestamp'));
  }
}

Filemanager

Name Type Size Permission Actions
Tasks Folder 0755
Migration.php File 7.89 KB 0644
SendingErrorHandler.php File 1.82 KB 0644
SendingQueue.php File 15.7 KB 0644
SendingThrottlingHandler.php File 2.81 KB 0644
index.php File 0 B 0644