Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 218 additions & 0 deletions src/core/etl/src/Flow/ETL/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
use Flow\ETL\Schema\Metadata;

use function array_key_exists;
use function array_keys;
use function array_map;
use function array_merge;
use function array_search;
use function array_splice;
use function array_values;
use function count;
use function Flow\ETL\DSL\definition_from_array;
Expand Down Expand Up @@ -121,6 +124,34 @@ public function add(Definition ...$definitions): self
return $this;
}

/**
* Inserts definitions immediately after an existing column.
*
* @param Definition<mixed> ...$definitions
*
* @throws SchemaDefinitionNotFoundException
*
* @return Schema
*/
public function addAfter(string|Reference $reference, Definition ...$definitions): self
{
return $this->insertAt($this->indexOf($reference) + 1, ...$definitions);
}

/**
* Inserts definitions immediately before an existing column.
*
* @param Definition<mixed> ...$definitions
*
* @throws SchemaDefinitionNotFoundException
*
* @return Schema
*/
public function addBefore(string|Reference $reference, Definition ...$definitions): self
{
return $this->insertAt($this->indexOf($reference), ...$definitions);
}

/**
* Adds metadata to a given definition.
*
Expand Down Expand Up @@ -200,6 +231,35 @@ public function gracefulRemove(string|Reference ...$entries): self
return $this;
}

/**
* Inserts definitions at an explicit position. Index 0 prepends, an index equal to the
* number of definitions appends.
*
* @param Definition<mixed> ...$definitions
*
* @throws InvalidArgumentException
*
* @return Schema
*/
public function insertAt(int $index, Definition ...$definitions): self
{
$definitionsList = array_values($this->definitions);

if ($index < 0 || $index > count($definitionsList)) {
throw new InvalidArgumentException(sprintf(
'Cannot insert definitions at index %d, schema has %d definitions',
$index,
count($definitionsList),
));
}

array_splice($definitionsList, $index, 0, $definitions);

$this->setDefinitions(...$definitionsList);

return $this;
}

public function isSame(self $schema): bool
{
if (count($this->definitions) !== count($schema->definitions)) {
Expand Down Expand Up @@ -300,6 +360,66 @@ public function merge(self $schema): self
return $this;
}

/**
* Moves an existing column to immediately after another column, preserving its definition.
*
* @throws InvalidArgumentException
* @throws SchemaDefinitionNotFoundException
*
* @return Schema
*/
public function moveAfter(string|Reference $name, string|Reference $reference): self
{
return $this->moveRelative($name, $reference, 1);
}

/**
* Moves an existing column to immediately before another column, preserving its definition.
*
* @throws InvalidArgumentException
* @throws SchemaDefinitionNotFoundException
*
* @return Schema
*/
public function moveBefore(string|Reference $name, string|Reference $reference): self
{
return $this->moveRelative($name, $reference, 0);
}

/**
* Moves an existing column to an explicit position, preserving its definition.
* The index is the final position of the column in the resulting schema.
*
* @throws InvalidArgumentException
* @throws SchemaDefinitionNotFoundException
*
* @return Schema
*/
public function moveTo(string|Reference $name, int $index): self
{
$from = $this->indexOf($name);
$definitionsList = array_values($this->definitions);

if ($index < 0 || $index >= count($definitionsList)) {
throw new InvalidArgumentException(sprintf(
'Cannot move entry "%s" to index %d, schema has %d definitions',
(string) $name,
$index,
count($definitionsList),
));
}

// Mago infers array_splice()'s return as array<array-key, mixed>, dropping the element type,
// so we restore it explicitly. See https://github.com/carthage-software/mago/issues/1982
/** @var list<Definition<mixed>> $moved */
$moved = array_splice($definitionsList, $from, 1);
array_splice($definitionsList, $index, 0, $moved);

$this->setDefinitions(...$definitionsList);

return $this;
}

/**
* @return array<array-key, array<mixed>>
*/
Expand All @@ -314,6 +434,20 @@ public function normalize(): array
return $definitions;
}

/**
* Inserts definitions at the beginning of the schema.
*
* @param Definition<mixed> ...$definitions
*
* @return Schema
*/
public function prepend(Definition ...$definitions): self
{
$this->setDefinitions(...$definitions, ...array_values($this->definitions));

return $this;
}

public function references(): References
{
$refs = [];
Expand Down Expand Up @@ -375,6 +509,42 @@ public function rename(string|Reference $entry, string $newName): self
return $this;
}

/**
* Reorders columns by name. Any columns not listed keep their relative order and are appended.
*
* @throws InvalidArgumentException
* @throws SchemaDefinitionNotFoundException
*
* @return Schema
*/
public function reorder(string|Reference ...$names): self
{
$definitions = [];
$reordered = [];

foreach ($names as $name) {
$definition = $this->findDefinition($name) ?: throw new SchemaDefinitionNotFoundException((string) $name);
$key = $definition->entry()->name();

if (array_key_exists($key, $reordered)) {
throw new InvalidArgumentException(sprintf('Cannot reorder entry "%s" more than once', (string) $name));
}

$reordered[$key] = true;
$definitions[] = $definition;
}

foreach ($this->definitions as $key => $definition) {
if (!array_key_exists($key, $reordered)) {
$definitions[] = $definition;
}
}

$this->setDefinitions(...$definitions);

return $this;
}

/**
* @param Definition<mixed> $definition
*
Expand Down Expand Up @@ -415,6 +585,54 @@ public function setMetadata(string $definition, Metadata $metadata): self
return $this;
}

private function indexOf(string|Reference $reference): int
{
$index = array_search(EntryReference::init($reference)->name(), array_keys($this->definitions), true);

if ($index === false) {
throw new SchemaDefinitionNotFoundException((string) $reference);
}

return $index;
}

private function moveRelative(string|Reference $name, string|Reference $reference, int $offset): self
{
$from = $this->indexOf($name);
$referenceName = EntryReference::init($reference)->name();

if (!$this->findDefinition($reference)) {
throw new SchemaDefinitionNotFoundException((string) $reference);
}

if (EntryReference::init($name)->name() === $referenceName) {
throw new InvalidArgumentException(sprintf('Cannot move entry "%s" relative to itself', (string) $name));
}

$definitionsList = array_values($this->definitions);

// Mago infers array_splice()'s return as array<array-key, mixed>, dropping the element type,
// so we restore it explicitly. See https://github.com/carthage-software/mago/issues/1982
/** @var list<Definition<mixed>> $moved */
$moved = array_splice($definitionsList, $from, 1);

$referenceIndex = 0;

foreach ($definitionsList as $position => $definition) {
if ($definition->entry()->name() === $referenceName) {
$referenceIndex = $position;

break;
}
}

array_splice($definitionsList, $referenceIndex + $offset, 0, $moved);

$this->setDefinitions(...$definitionsList);

return $this;
}

/**
* @param Definition<mixed> ...$definitions
*/
Expand Down
Loading
Loading