Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ public function test_precision_and_scale_produce_numeric(): void
static::assertTrue($table->column('amount')->type->isEqual(ColumnType::numeric(10, 2)));
}

public function test_prepend_drives_create_table_column_order(): void
{
$schema = schema(str_schema('name'), str_schema('email'))->prepend(int_schema('id'));

$table = (new SchemaConverter())->toPostgreSqlTable($schema, 'users');

static::assertSame(['id', 'name', 'email'], $table->columnNames());
}

public function test_primary_key_forces_not_null(): void
{
$table = (new SchemaConverter())->toPostgreSqlTable(
Expand Down
221 changes: 221 additions & 0 deletions src/core/etl/src/Flow/ETL/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
use function array_key_exists;
use function array_map;
use function array_merge;
use function array_splice;
use function array_values;
use function count;
use function Flow\ETL\DSL\definition_from_array;
use function Flow\ETL\DSL\schema;
use function implode;
use function is_array;
use function is_int;
use function sprintf;
use function str_starts_with;
use function substr;

final class Schema implements Countable
{
Expand Down Expand Up @@ -121,6 +125,68 @@ public function add(Definition ...$definitions): self
return $this;
}

/**
* Inserts new definitions right after an existing one.
*
* @param Definition<mixed> ...$definitions
*
* @throws SchemaDefinitionNotFoundException
*
* @return Schema
*/
public function addAfter(string|Reference $reference, Definition ...$definitions): self
{
$this->get($reference);

$target = EntryReference::init($reference);
$result = [];

foreach (array_values($this->definitions) as $definition) {
$result[] = $definition;

if ($definition->entry()->is($target)) {
foreach ($definitions as $new) {
$result[] = $new;
}
}
}

$this->setDefinitions(...$result);

return $this;
}

/**
* Inserts new definitions right before an existing one.
*
* @param Definition<mixed> ...$definitions
*
* @throws SchemaDefinitionNotFoundException
*
* @return Schema
*/
public function addBefore(string|Reference $reference, Definition ...$definitions): self
{
$this->get($reference);

$target = EntryReference::init($reference);
$result = [];

foreach (array_values($this->definitions) as $definition) {
if ($definition->entry()->is($target)) {
foreach ($definitions as $new) {
$result[] = $new;
}
}

$result[] = $definition;
}

$this->setDefinitions(...$result);

return $this;
}

/**
* Adds metadata to a given definition.
*
Expand Down Expand Up @@ -200,6 +266,34 @@ public function gracefulRemove(string|Reference ...$entries): self
return $this;
}

/**
* Inserts new definitions at an explicit position (0 = beginning, count() = end).
*
* @param Definition<mixed> ...$definitions
*
* @throws InvalidArgumentException
*
* @return Schema
*/
public function insertAt(int $index, Definition ...$definitions): self
{
$current = array_values($this->definitions);

if ($index < 0 || $index > count($current)) {
throw InvalidArgumentException::because(
'Cannot insert definitions at index %d, schema has %d definition(s)',
$index,
count($current),
);
}

array_splice($current, $index, 0, $definitions);

$this->setDefinitions(...$current);

return $this;
}

public function isSame(self $schema): bool
{
if (count($this->definitions) !== count($schema->definitions)) {
Expand Down Expand Up @@ -300,6 +394,89 @@ public function merge(self $schema): self
return $this;
}

/**
* Relocates an existing definition (preserving its metadata) to a new position.
* The position is either a numeric index (the final position after removal; use count() - 1 for
* the end) or a string anchor prefixed with "before:" / "after:" referencing another column.
*
* @throws InvalidArgumentException
* @throws SchemaDefinitionNotFoundException
*
* @return Schema
*/
public function move(string|Reference $name, int|string $position): self
{
$definition = $this->get($name);
$movedName = $definition->entry()->name();

$remaining = [];

foreach (array_values($this->definitions) as $next) {
if ($next->entry()->name() !== $movedName) {
$remaining[] = $next;
}
}

if (is_int($position)) {
if ($position < 0 || $position > count($remaining)) {
throw InvalidArgumentException::because(
'Cannot move "%s" to index %d, schema has %d definition(s)',
$movedName,
$position,
count($this->definitions),
);
}

array_splice($remaining, $position, 0, [$definition]);

$this->setDefinitions(...$remaining);

return $this;
}

if (str_starts_with($position, 'before:')) {
$anchor = EntryReference::init(substr($position, 7));
$before = true;
} elseif (str_starts_with($position, 'after:')) {
$anchor = EntryReference::init(substr($position, 6));
$before = false;
} else {
throw InvalidArgumentException::because(
'Move position must be an integer index or a string prefixed with "before:" or "after:", given: "%s"',
$position,
);
}

if ($anchor->name() === $movedName) {
throw InvalidArgumentException::because('Cannot move "%s" relative to itself', $movedName);
}

$result = [];
$found = false;

foreach ($remaining as $next) {
if ($before && $next->entry()->is($anchor)) {
$result[] = $definition;
$found = true;
}

$result[] = $next;

if (!$before && $next->entry()->is($anchor)) {
$result[] = $definition;
$found = true;
}
}

if (!$found) {
throw new SchemaDefinitionNotFoundException($anchor->name());
}

$this->setDefinitions(...$result);

return $this;
}

/**
* @return array<array-key, array<mixed>>
*/
Expand All @@ -314,6 +491,20 @@ public function normalize(): array
return $definitions;
}

/**
* Inserts new definitions at the beginning of the schema.
*
* @param Definition<mixed> ...$definitions
*
* @return Schema
*/
public function prepend(Definition ...$definitions): self
{
$this->setDefinitions(...array_merge($definitions, array_values($this->definitions)));

return $this;
}

public function references(): References
{
$refs = [];
Expand Down Expand Up @@ -375,6 +566,36 @@ public function rename(string|Reference $entry, string $newName): self
return $this;
}

/**
* Reorders definitions by name. Any unlisted definitions keep their relative order and are
* appended after the listed ones.
*
* @throws SchemaDefinitionNotFoundException
*
* @return Schema
*/
public function reorder(string|Reference ...$names): self
{
$ordered = [];
$reordered = [];

foreach ($names as $name) {
$definition = $this->get($name);
$ordered[] = $definition;
$reordered[$definition->entry()->name()] = true;
}

foreach (array_values($this->definitions) as $definition) {
if (!array_key_exists($definition->entry()->name(), $reordered)) {
$ordered[] = $definition;
}
}

$this->setDefinitions(...$ordered);

return $this;
}

/**
* @param Definition<mixed> $definition
*
Expand Down
Loading
Loading