pragmatiq
Tutorials

Run pragmatiq on Amazon SageMaker

Train the pragmatiq banking foundation model with an Amazon SageMaker training job and serve user embeddings from a NVIDIA Triton real-time endpoint — end to end, with pip install pragmatiq, on synthetic or your own data.

pragmatiq is an independent implementation inspired by the PRAGMA paper (arXiv 2604.08649) and is not affiliated with or endorsed by Revolut.

This guide runs pragmatiq entirely on Amazon SageMaker — no servers to manage. You prep data in SageMaker Studio, train with a managed SageMaker training job, and serve embeddings from a NVIDIA Triton real-time endpoint. pragmatiq is on PyPI, so every step is just pip install pragmatiq.

StepSageMaker serviceOutput
1. Prep dataStudio notebookevents.parquet … in S3
2. TrainTraining job (PyTorch estimator)a trained run (model.tar.gz) in S3
3. ServeTriton real-time endpointembeddings over HTTPS

It mirrors the local Pretrain and Serve with Triton tutorials — same commands, same Triton request shape — on SageMaker-managed infrastructure. Works on the synthetic generator or on your own bank data (same four-file contract).

How pragmatiq works — a foundation model for events, not text

A large language model reads a stream of subword tokens and predicts the next one. pragmatiq reads a stream of timestamped key–value events — a card payment, an app screen, a transfer — and learns to reconstruct masked fields. It's the same transformer backbone, but because the input isn't text, three things change: the tokenizer, the notion of position, and the training objective.

Text LLMpragmatiq
Unita subword pieceone (key, value, time) triple per field
TokenizerBPE over a text corpusper-field: numeric → percentile bins, categorical → id, text → BPE or a frozen-embedding sentinel
Vocabulary~50–250k subwords~28k: key tokens + value buckets/ids + specials
Positioninteger index (1st, 2nd, 3rd token)continuous elapsed time via TimeRoPE
Sequenceone flat streama hierarchy: fields → events → history, plus a profile state
Objectivenext-/masked-token over the vocabmasked-value reconstruction from a 3-view head; MSE for text in the Nemotron variant

Click through the stack — every stage is implemented in pragmatiq/models/:

Every field becomes a (key, value, time) token. One shared table embeds both keys and values; a sinusoidal within-field position is added for multi-piece text.

Input
key_ids, value_ids, positions
Output
x — per-token vectors [T, d]
  • x = E(key) + E(value) + sinusoidal(position)
  • The table is tied to the MLM output projection
  • Numeric → percentile bucket, categorical → id, text → BPE pieces (or one frozen-embedding sentinel in the Nemotron variant)

The tokenizer: key–value–time, not subwords

A text tokenizer has one job: split a string into subword ids. pragmatiq's tokenizer fits one vocabulary over the whole dataset and turns each field into a key token plus a value representation chosen by the field's kind — keys and values share one embedding space:

  • Numeric values (amounts, balances) are percentile-binned with a dedicated zero bucket, so "no balance" is its own symbol and an unseen magnitude clips into the end buckets instead of failing.
  • Low-cardinality strings (country, channel) become one categorical token per value.
  • High-cardinality text (merchant names, device ids) is byte-level BPE by default — or, in the Nemotron variant, a single sentinel token whose raw string a frozen text model maps to a vector.

Unseen keys or values at inference map to [UNK] with a warning — the model never raises on vocabulary drift, which matters when you serve a live book.

sourcetransactionamountbucket 41 / 64mccid #812currencyid #3merchantTESCO·▁STORES·▁45·21BPE piecestime8·ln(1+Δt/8) ≈ 5.1
numeric → percentile bucketcategorical → idtext → BPE sub-word pieces

Conceptual ids/buckets — the real tokenizer fits the vocabulary and bucket edges from your data. Toggle Nemotron text mode to see high-cardinality text collapse from BPE pieces to a single sentinel a frozen encoder maps to a vector.

Time is the spine, not an afterthought

Banking events are wildly irregular — seconds apart, then months apart — so an integer position ("the previous event") throws away the strongest signal. pragmatiq compresses elapsed seconds with a log curve:

τ(Δt)=8ln ⁣(1+Δt8)\tau(\Delta t) = 8 \cdot \ln\!\left(1 + \frac{\Delta t}{8}\right)

and feeds that as a continuous position for rotary embeddings (TimeRoPE) — a token at log-seconds pp rotates frequency pair ii by pinv_freqip \cdot \text{inv\_freq}_i, so attention encodes the elapsed time between two events rather than their ordinal distance. Calendar features (hour, day-of-week, day-of-month) take a separate sin/cos → MLP path, so day/night and weekday/weekend structure is available independently of elapsed time.

Decision

RoPE over a continuous time position, not token index

Why: 'One second ago' and 'one month ago' must be genuinely different relative rotations — integer positions can't express that, and elapsed time is the dominant signal in a banking history.

Alternative considered: Standard integer-position RoPE or learned absolute positions, which treat 'the previous event' identically regardless of elapsed time.

Why the encoder stack differs from a vanilla transformer

Rather than one flat sequence, pragmatiq encodes a hierarchy so an event's fields don't bleed across event boundaries:

  1. The event encoder encodes each event independently (block-diagonal attention within an event, a prepended [EVT] marker); its output plus calendar features is the per-event vector.
  2. The profile-state encoder encodes static attributes + lifelong milestones under a [USR] marker.
  3. The history encoder runs over [profile, event…] with TimeRoPE on log-seconds-to-the-latest-event; the [USR] slot output is the user embedding.

All three are bidirectional, pre-norm, GELU, ffn = 4d — and the whole batch is packed without padding via cu_seqlens (flash-attn on GPU, an SDPA fallback on CPU, checked against a padded reference). Pretraining masks 15% of tokens, 10% of whole events, and 10% of (user, key) groups, then reconstructs each masked value from a 3-view head — concat[ẑ_e, z_h(event), z_h(USR)] ∈ ℝ^{3d} → Linear(3d→d) — scored against the tied embedding table.

The Nemotron variant

By default, high-cardinality text is BPE. The optional PRAGMA+Nemotron variant instead emits one sentinel token per text field and lets a frozen text model embed its raw string; masked text tokens are reconstructed with MSE against that vector, so the loss becomes L=CE+λMSE\mathcal{L} = \text{CE} + \lambda \cdot \text{MSE}. It's switchable from the data step alone and off by default (the BPE path stays byte-identical) — tokenize with configs/data/tokenizer_nemotron.yaml and pip install "pragmatiq[extras]" for the frozen embedder. See the variant page.

Set up SageMaker

Open SageMaker Studio

In the AWS console, open Amazon SageMaker Studio and launch a JupyterLab space — this is your driver environment. You'll also need a SageMaker execution role (Studio creates one) with access to your S3 bucket and ECR, and an S3 bucket for data and artifacts:

aws s3 mb s3://YOUR-pragmatiq-bucket

Install pragmatiq

In a Studio notebook cell, install pragmatiq (PyPI) and the SageMaker SDK:

pip install pragmatiq sagemaker
import sagemaker
sess = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = "YOUR-pragmatiq-bucket"

Get your data into S3

pragmatiq trains on a four-file parquet contract — the same files whether they're synthetic or yours. Produce them, then upload to S3.

The generator is deterministic (same seed → byte-identical output) and writes the full contract — events.parquet, profiles.parquet, transfers.parquet, and labels/*.parquet:

from pragmatiq import api
api.synthesize({"n_users": 50000, "seed": 0}, out="data", n_workers=8)
sess.upload_data("data", bucket=bucket, key_prefix="data")   # -> s3://.../data/

To resemble your real book without moving raw records, pragmatiq synth calibrate --stats configs/data/aggregates.example.yaml fits the generator's priors to shareable aggregates.

Write the four files with the exact dtypes in the data contract. The key idea: event fields and profile attributes are map<string,string> — pass raw string values ("42.10", "5411"), and the tokenizer infers numeric / categorical / text per field. Don't pre-bin or pre-encode.

import pyarrow as pa, pyarrow.parquet as pq

events = pa.table({
    "user_id": ["u1", "u1"],
    "ts": pa.array([1_700_000_000_000_000, 1_700_003_600_000_000], type=pa.timestamp("us")),
    "source": ["transaction", "app"],            # groups schemas: transaction / app / trading …
    "fields": [                                   # map<string,string> per event
        {"amount": "42.10", "mcc": "5411", "merchant": "TESCO STORES 4521"},
        {"screen": "home", "action": "view"},
    ],
})
pq.write_table(events, "data/events.parquet")
# + profiles.parquet (user_id, as_of, attributes, lifelong),
#   optional transfers.parquet and labels/<task>.parquet — see the data contract.

Forecast label tables carry an eval_ts per user, so histories are truncated before embedding (a forecast, not a hindcast). Validate and upload:

import subprocess
subprocess.run(["pragmatiq", "validate", "data"], check=True)   # fails loudly on dtype/integrity issues
sess.upload_data("data", bucket=bucket, key_prefix="data")

Your data stays in your account

Everything runs in your AWS account and VPC. The generator and synth calibrate exist so you can develop against a realistic book without moving raw records.

Train with a SageMaker training job

A SageMaker training job runs your script on a GPU instance it provisions and tears down for you, reading the data from S3 and writing the trained run back to S3. Two small files define it.

The entry point and its requirements

src/train.py — tokenizes the mounted data channel and pretrains, writing the run to the SageMaker model directory (which SageMaker uploads to S3):

# src/train.py
import os
from pragmatiq import api

data = os.environ["SM_CHANNEL_DATA"]      # S3 "data" channel, mounted by SageMaker
out  = os.environ["SM_MODEL_DIR"]         # /opt/ml/model -> tarred to S3 as model.tar.gz

api.tokenize(data, "/tmp/tok", n_workers=8)
api.pretrain("/tmp/tok", "run", model_size="small",
             config={"max_steps": 4000, "token_budget": 16384}, runs_root=out)

src/requirements.txt — SageMaker installs this into the training container before your script runs. This is the pip install that pulls pragmatiq from PyPI into the managed job:

pragmatiq[serve]

Launch it

From the Studio notebook, point the PyTorch estimator at your script and the S3 data channel. The trainer auto-detects CUDA (bf16 on GPU), so the same code runs on any instance:

from sagemaker.pytorch import PyTorch

est = PyTorch(
    entry_point="train.py", source_dir="src",
    role=role, framework_version="2.4", py_version="py311",
    instance_type="ml.g5.2xlarge", instance_count=1,
)
est.fit({"data": f"s3://{bucket}/data"})
print(est.model_data)        # s3://.../model.tar.gz — the trained run

Turn on managed spot training (use_spot_instances=True + a checkpoint path) for up to ~90% savings — pragmatiq checkpoints capture the full state and resume bit-exactly, so an interruption is safe.

Serve with NVIDIA Triton on a SageMaker endpoint

pragmatiq's production serving path is a Triton python backend running the native varlen PyTorch model — the exact no-padding forward used in training (see Serve with Triton). SageMaker hosts it on a managed real-time endpoint: you supply a Triton model repository and a container with pragmatiq installed.

Build the serving image

The stock Triton image can't import pragmatiq, so base on the SageMaker Triton container (it already speaks SageMaker's /ping + /invocations contract) and pip install pragmatiq. Bake every dependency in — SageMaker runs the container read-only:

# Dockerfile — base on the current SageMaker Triton image for your region (see the
# SageMaker Triton docs for the URI), then add pragmatiq from PyPI.
FROM <ACCOUNT>.dkr.ecr.<REGION>.amazonaws.com/sagemaker-tritonserver:<TAG>
RUN pip install --no-cache-dir "pragmatiq[serve]"

Build it and push to Amazon ECR:

aws ecr create-repository --repository-name pragmatiq-triton
# docker build -t pragmatiq-triton . && tag + push to ECR (see the ECR docs)

Package the model repository

A Triton model repository is a folder per model. pragmatiq ships one — config.pbtxt plus a version directory 1/ holding model.py. Drop the trained run in alongside it and tar the repo to S3:

base=https://raw.githubusercontent.com/dynamiq-ai/pragmatiq/main/deploy/triton/model_repository/pragmatiq_embedder
mkdir -p model_repository/pragmatiq_embedder/1/run
curl -sL $base/config.pbtxt -o model_repository/pragmatiq_embedder/config.pbtxt
curl -sL $base/1/model.py   -o model_repository/pragmatiq_embedder/1/model.py

aws s3 cp s3://YOUR-pragmatiq-bucket/.../model.tar.gz run.tgz   # est.model_data
tar -xzf run.tgz -C model_repository/pragmatiq_embedder/1/run --strip-components=1

tar -C model_repository -czf triton.tar.gz pragmatiq_embedder
aws s3 cp triton.tar.gz s3://YOUR-pragmatiq-bucket/triton/triton.tar.gz

Why the “1/” directory?

Triton requires every model's files to live in a numbered version directorypragmatiq_embedder/1/. The number is the model version; Triton serves the highest one, so 2/, 3/ … let you roll versions without touching the endpoint. The config.pbtxt sits one level up, at the model root.

Create the model, endpoint config, and endpoint

Point a SageMaker Model at your ECR image and the S3 artifact. The environment variables tell the SageMaker Triton container which model to load and where the run lives inside the unpacked archive, and size the python backend's shared memory:

aws sagemaker create-model --model-name pragmatiq-embedder \
  --execution-role-arn arn:aws:iam::<ACCOUNT>:role/<SageMakerExecutionRole> \
  --primary-container 'Image=<ACCOUNT>.dkr.ecr.<REGION>.amazonaws.com/pragmatiq-triton:latest,ModelDataUrl=s3://YOUR-pragmatiq-bucket/triton/triton.tar.gz,Environment={SAGEMAKER_TRITON_DEFAULT_MODEL_NAME=pragmatiq_embedder,PRAGMATIQ_RUN=/opt/ml/model/pragmatiq_embedder/1/run,SAGEMAKER_TRITON_SHM_DEFAULT_BYTE_SIZE=1073741824}'

aws sagemaker create-endpoint-config --endpoint-config-name pragmatiq-embedder \
  --production-variants VariantName=main,ModelName=pragmatiq-embedder,InstanceType=ml.g5.xlarge,InitialInstanceCount=1

aws sagemaker create-endpoint --endpoint-name pragmatiq-embedder \
  --endpoint-config-name pragmatiq-embedder
aws sagemaker wait endpoint-in-service --endpoint-name pragmatiq-embedder

Invoke it

The request body is the same records_json payload as local Triton — a JSON array of plain user records; the response is the [n_users, dim] fp32 embedding matrix. Batching happens inside the model (the varlen forward packs all users with no padding), and unseen keys/values map to [UNK] rather than raising. The SageMaker Triton container adapts Triton's KServe v2 protocol to /invocations, so you POST it through the standard invoke-endpoint:

aws sagemaker-runtime invoke-endpoint --endpoint-name pragmatiq-embedder \
  --content-type application/json \
  --body '{"inputs":[{"name":"records_json","shape":[1],"datatype":"BYTES","data":["[{\"user_id\":\"u1\",\"events\":[{\"ts\":1718200000000000,\"source\":\"transaction\",\"fields\":{\"amount\":\"42.50\",\"merchant\":\"TESCO\"}}],\"attributes\":{\"country\":\"GB\"},\"lifelong\":[]}]"]}]}' \
  out.json && cat out.json

Add auto-scaling once it's live.

Clean up

A real-time endpoint bills per instance-hour while it's in service — delete it when you're done:

aws sagemaker delete-endpoint --endpoint-name pragmatiq-embedder
aws sagemaker delete-endpoint-config --endpoint-config-name pragmatiq-embedder
aws sagemaker delete-model --model-name pragmatiq-embedder

Training jobs bill only while running and stop on their own. Artifacts in S3 are cheap; delete them when finished.

Where to go next

On this page