initCache(self::CACHE_HAS_MESSAGES_TIME, self::CACHE_HAS_MESSAGES_ID, self::CACHE_HAS_MESSAGES_DIR)) { $nextExec = $cache->getVars(); } elseif ($cache->startDataCache()) { $result = MessageTable::getList([ 'select' => ['ID', 'NEXT_EXEC'], 'filter' => [ '=SUCCESS_EXEC' => SuccessExec::NO, ], 'limit' => 1, 'order' => ['ID' => 'DESC'], ])->fetch(); if ($result) { if ($result['NEXT_EXEC'] instanceof DateTime) { $nextExec = $result['NEXT_EXEC']; } else { $nextExec = new DateTime(); } } else { $nextExec = (new DateTime())->add('+' . self::CACHE_HAS_MESSAGES_TIME . ' seconds'); } $cache->endDataCache($nextExec); } return $nextExec <= new DateTime(); } /** * @return string */ public static function run() { if ( defined('DisableMessageServiceCheck') && DisableMessageServiceCheck === true || ( !defined('DisableMessageServiceCheck') && defined("DisableEventsCheck") && DisableEventsCheck === true ) ) { return null; } if (static::isStopped() || !static::hasMessages()) { return ""; } Application::getInstance()->addBackgroundJob([static::class, "sendMessages"]); return ""; } /** * @return string|null */ public static function sendMessages(): ?string { if (static::isStopped()) { return ''; } $lockTag = 'b_messageservice_message'; if (!Application::getConnection()->lock($lockTag)) { return ''; } $event = new BeforeProcessQueueEvent(); $event->send(); if (!$event->canProcessQueue()) { Application::getConnection()->unlock($lockTag); return ''; } $limit = (int)Config\Option::get('messageservice', 'queue_limit'); if ($limit < 1) { $limit = 5; } $query = MessageTable::query() ->addSelect('ID') ->addSelect('TYPE') ->addSelect('SENDER_ID') ->addSelect('AUTHOR_ID') ->addSelect('MESSAGE_FROM') ->addSelect('MESSAGE_TO') ->addSelect('MESSAGE_HEADERS') ->addSelect('MESSAGE_BODY') ->addSelect('EXTERNAL_ID') ->where(Query::filter() ->logic('or') ->where(Query::filter() ->logic('and') ->where('SUCCESS_EXEC', SuccessExec::NO) ->where(Query::filter() ->logic('or') ->where('NEXT_EXEC', '<', new DateTime()) ->whereNull('NEXT_EXEC') ) ) ->where(Query::filter() ->logic('and') ->where('SUCCESS_EXEC', SuccessExec::PROCESSED) ->where('NEXT_EXEC', '<', (new DateTime())->add('-2 MINUTE')) ) ) ->addOrder('ID') ->setLimit($limit) ; if (defined('BX_CLUSTER_GROUP')) { $query->where('CLUSTER_GROUP', \BX_CLUSTER_GROUP); } $messageFieldsList = $query->fetchAll(); if (empty($messageFieldsList)) { Application::getConnection()->unlock($lockTag); return null; } $idList = array_column($messageFieldsList, 'ID'); MessageTable::updateMulti( $idList, [ 'SUCCESS_EXEC' => SuccessExec::PROCESSED, 'NEXT_EXEC' => (new DateTime())->add('+2 MINUTE'), ], true ); $hasDailyLimits = Sender\Limitation::hasDailyLimits(); if ($hasDailyLimits) { $counts = Internal\Entity\MessageTable::getAllDailyCount(); } else { $counts = []; } $nextDay = static::getNextExecTime(); foreach ($messageFieldsList as $messageFields) { $message = Message::createFromFields($messageFields); if ($hasDailyLimits) { $sender = $message->getSender(); if ($sender) { $limit = Sender\Limitation::getDailyLimit($sender->getId(), $messageFields['MESSAGE_FROM']); if ($limit > 0) { $serviceId = $sender->getId() . ':' . $messageFields['MESSAGE_FROM']; $counts[$serviceId] ??= 0; if ($counts[$serviceId] >= $limit) { $message->update([ 'STATUS_ID' => MessageStatus::DEFERRED, 'NEXT_EXEC' => $nextDay, ]); continue; } ++$counts[$serviceId]; } } } try { $result = static::sendMessage($messageFields); $message->updateWithSendResult($result, $nextDay); } catch (\Throwable $e) { Application::getInstance()->getExceptionHandler()->writeToLog($e); $message->update([ 'STATUS_ID' => MessageStatus::EXCEPTION, 'SUCCESS_EXEC' => SuccessExec::ERROR, 'DATE_EXEC' => new DateTime(), 'EXEC_ERROR' => $e->getMessage(), ]); break; } } $event = new AfterProcessQueueEvent(); $event->send(); Application::getConnection()->unlock($lockTag); return null; } /** * @param array $messageFields * @return SendMessage */ private static function sendMessage(array $messageFields) { $event = new BeforeSendMessageFromQueueEvent($messageFields); $event->send(); $sendResult = $event->processResults() ?? new SendMessage; if (!$sendResult->isSuccess()) { return $sendResult; } $type = $messageFields['TYPE']; if ($type === MessageType::SMS) { $sender = SmsManager::getSenderById($messageFields['SENDER_ID']); if (!$sender) { $sendResult->addError(new Error(Loc::getMessage("MESSAGESERVICE_QUEUE_SENDER_NOT_FOUND"))); } else { $sender->setSocketTimeout(6); $sender->setStreamTimeout(18); $sendResult = $sender->sendMessage($messageFields); } } else { $sendResult->addError(new Error(Loc::getMessage("MESSAGESERVICE_QUEUE_MESSAGE_TYPE_ERROR"))); } $event = new AfterSendMessageFromQueueEvent($messageFields, $sendResult); $event->send(); $event->sendAlias(static::EVENT_SEND_RESULT); return $sendResult; } /** * Returns next date to exec message, if it will be deferred due to the send limits. * * @return DateTime */ private static function getNextExecTime(): DateTime { $nextDay = DateTime::createFromTimestamp(time() + 86400); $retryTime = Sender\Limitation::getRetryTime(); if (!$retryTime['auto']) { if ($nextDay->getTimeZone()->getName() !== $retryTime['tz']) { try //if TZ is incorrect { $nextDay->setTimeZone(new \DateTimeZone($retryTime['tz'])); } catch (\Exception $e) {} } $nextDay->setTime($retryTime['h'], $retryTime['i'], 0); } return $nextDay; } public static function stop(): void { Config\Option::set(self::OPTION_QUEUE_STOP_MODULE, self::OPTION_QUEUE_STOP_NAME, 'Y'); } public static function resume(): void { Config\Option::set(self::OPTION_QUEUE_STOP_MODULE, self::OPTION_QUEUE_STOP_NAME, 'N'); } public static function isStopped(): bool { return Config\Option::get(self::OPTION_QUEUE_STOP_MODULE, self::OPTION_QUEUE_STOP_NAME) === 'Y'; } /** * @return string */ public static function cleanUpAgent(): string { $period = abs(intval(Config\Option::get("messageservice", "clean_up_period", 14))); $periodInSeconds = $period * 24 * 3600; if ($periodInSeconds > 0) { $connection = \Bitrix\Main\Application::getConnection(); $datetime = $connection->getSqlHelper()->addSecondsToDateTime('-' . $periodInSeconds); $connection->queryExecute("DELETE FROM b_messageservice_message WHERE DATE_EXEC <= {$datetime}"); } return __METHOD__.'();'; } }