diff --git a/Classes/Elasticsearch/AbstractIndex.php b/Classes/Elasticsearch/AbstractIndex.php index 678516a..923e68a 100644 --- a/Classes/Elasticsearch/AbstractIndex.php +++ b/Classes/Elasticsearch/AbstractIndex.php @@ -8,7 +8,7 @@ * All rights reserved. */ -use Elasticsearch\Client; +use Elastic\Elasticsearch\Client; use Neos\Flow\Annotations as Flow; use Neos\Flow\Log\Utility\LogEnvironment; use Psr\Log\LoggerInterface; diff --git a/Classes/Elasticsearch/ElasticsearchService.php b/Classes/Elasticsearch/ElasticsearchService.php index 3e4ed70..1238e2a 100644 --- a/Classes/Elasticsearch/ElasticsearchService.php +++ b/Classes/Elasticsearch/ElasticsearchService.php @@ -8,10 +8,10 @@ * All rights reserved. */ -use Elasticsearch\Common\Exceptions\Missing404Exception; +use Elastic\Elasticsearch\Client; +use Elastic\Elasticsearch\ClientBuilder; +use Elastic\Elasticsearch\Exception\ClientResponseException; use Neos\Flow\Annotations as Flow; -use Elasticsearch\Client; -use Elasticsearch\ClientBuilder; use Neos\Flow\Log\Utility\LogEnvironment; use Neos\Flow\Reflection\ReflectionService; use Psr\Log\LoggerInterface; @@ -28,10 +28,34 @@ class ElasticsearchService private $client; /** - * @var array - * @Flow\InjectConfiguration(path="elasticsearch.server") + * @var string + * @Flow\InjectConfiguration(path="elasticsearch.server.scheme") + */ + protected $clientScheme; + + /** + * @var string + * @Flow\InjectConfiguration(path="elasticsearch.server.host") + */ + protected $clientHost; + + /** + * @var int + * @Flow\InjectConfiguration(path="elasticsearch.server.port") + */ + protected $clientPort; + + /** + * @var string + * @Flow\InjectConfiguration(path="elasticsearch.server.user") */ - protected $clientConfiguration; + protected $clientUser; + + /** + * @var string + * @Flow\InjectConfiguration(path="elasticsearch.server.pass") + */ + protected $clientPassword; /** * @var array @@ -62,9 +86,10 @@ public function recreateElasticIndex(string $indexName): void $templateName = $indexName . '_template'; - $this->getClient()->indices()->putTemplate([ + $this->getClient()->indices()->putIndexTemplate([ 'name' => $templateName, - 'body' => $this->getIndexConfiguration($indexName) + 'index_patterns' => [$indexName . '*'], + 'template' => $this->getIndexConfiguration($indexName) ]); $this->logger->info(sprintf('Successfully transferred template %s', $templateName), LogEnvironment::fromMethodName(__METHOD__)); @@ -73,7 +98,10 @@ public function recreateElasticIndex(string $indexName): void try { $this->getClient()->indices()->delete(['index' => $indexPattern]); $this->logger->info(sprintf('Successfully removed indices with pattern %s', $indexPattern), LogEnvironment::fromMethodName(__METHOD__)); - } catch (Missing404Exception $exception) { + } catch (ClientResponseException $exception) { + if ($exception->getResponse()->getStatusCode() !== 404) { + throw $exception; + } $this->logger->info(sprintf('Index with pattern %s could not be removed as it is not found', $indexPattern), LogEnvironment::fromMethodName(__METHOD__)); } } @@ -84,9 +112,18 @@ public function recreateElasticIndex(string $indexName): void public function getClient(): Client { if ($this->client === null) { - $this->client = ClientBuilder::create() - ->setHosts([$this->clientConfiguration]) - ->build(); + $hostString = sprintf('%s://%s:%s', $this->clientScheme, $this->clientHost, $this->clientPort); + + $builder = ClientBuilder::create() + ->setHosts([$hostString]); + + if (isset($this->clientUser, $this->clientPassword)) { + $builder->setBasicAuthentication($this->clientUser, $this->clientPassword); + } else { + $this->logger->warning('No credentials for elastic client found. Is this correct?', LogEnvironment::fromMethodName(__METHOD__)); + } + + $this->client = $builder->build(); } return $this->client; diff --git a/Classes/Elasticsearch/KibanaService.php b/Classes/Elasticsearch/KibanaService.php index 40cae5b..10d5c8f 100644 --- a/Classes/Elasticsearch/KibanaService.php +++ b/Classes/Elasticsearch/KibanaService.php @@ -8,9 +8,9 @@ * All rights reserved. */ +use Elastic\Elasticsearch\Client; +use Elastic\Elasticsearch\ClientBuilder; use Neos\Flow\Annotations as Flow; -use Elasticsearch\Client; -use Elasticsearch\ClientBuilder; /** * @Flow\Scope("singleton") @@ -23,10 +23,34 @@ class KibanaService private $client; /** - * @var array - * @Flow\InjectConfiguration(path="kibana.server") + * @var string + * @Flow\InjectConfiguration(path="kibana.server.scheme") */ - protected $clientConfiguration; + protected $clientScheme; + + /** + * @var string + * @Flow\InjectConfiguration(path="kibana.server.host") + */ + protected $clientHost; + + /** + * @var int + * @Flow\InjectConfiguration(path="kibana.server.port") + */ + protected $clientPort; + + /** + * @var string + * @Flow\InjectConfiguration(path="kibana.server.user") + */ + protected $clientUser; + + /** + * @var string + * @Flow\InjectConfiguration(path="kibana.server.pass") + */ + protected $clientPassword; /** * @return Client @@ -34,9 +58,16 @@ class KibanaService public function getClient(): Client { if ($this->client === null) { - $this->client = ClientBuilder::create() - ->setHosts([$this->clientConfiguration]) - ->build(); + $hostString = sprintf('%s://%s:%s', $this->clientScheme, $this->clientHost, $this->clientPort); + + $builder = ClientBuilder::create() + ->setHosts([$hostString]); + + if (isset($this->clientUser, $this->clientPassword)) { + $builder->setBasicAuthentication($this->clientUser, $this->clientPassword); + } + + $this->client = $builder->build(); } return $this->client; diff --git a/Classes/Persistence/DataSource.php b/Classes/Persistence/DataSource.php index ca39908..2ff1012 100644 --- a/Classes/Persistence/DataSource.php +++ b/Classes/Persistence/DataSource.php @@ -75,7 +75,7 @@ public function connect(): self $this->entityManager = $this->entityManagerFactory->create(); if ($this->entityManager->getConnection()->getDatabasePlatform()->getName() === 'mysql') { - $this->entityManager->getConnection()->getWrappedConnection()->setAttribute(\PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, false); + $this->entityManager->getConnection()->getNativeConnection()->setAttribute(\PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, false); } $this->connection = $this->entityManager->getConnection(); diff --git a/Classes/Transfer/AbstractTransferJob.php b/Classes/Transfer/AbstractTransferJob.php index 171e0a0..bfd185f 100644 --- a/Classes/Transfer/AbstractTransferJob.php +++ b/Classes/Transfer/AbstractTransferJob.php @@ -135,7 +135,7 @@ protected function logStats(bool $summary = false): void protected function autoBulkIndex(array $document): void { $this->bulkIndexStorage['body'][] = [ - 'index' => [ + 'create' => [ '_index' => $document['index'], '_id' => $document['id'] ?? null ] @@ -163,7 +163,8 @@ public function flushBulkIndex(): void } foreach ($result['items'] as $resultDocument) { - if ($resultDocument['index']['_shards']['failed'] ?? 1 !== 0) { + $item = $resultDocument['create'] ?? $resultDocument['index'] ?? []; + if (($item['_shards']['failed'] ?? 1) !== 0) { $this->logger->error(sprintf('Ingesting a document in Elasticsearch failed. Details: %s', json_encode($resultDocument, JSON_THROW_ON_ERROR))); } } diff --git a/composer.json b/composer.json index 938be7a..485cf26 100644 --- a/composer.json +++ b/composer.json @@ -4,7 +4,7 @@ "name": "punktde/analytics", "require": { "neos/flow": "*", - "elasticsearch/elasticsearch": ">= 7.0", + "elasticsearch/elasticsearch": ">=8.19", "league/flysystem-webdav": "*" }, "autoload": {