Ingest events published from a contract
Soroban RPC provides a getEvents
method which allows you to query events from a smart contract. However, the data retention window for these events is 7 days at most. If you need access to a longer-lived record of these events you'll want to "ingest" the events as they are published, maintaining your own record or database as events are ingested.
There are many strategies you can use to ingest and keep the events published by a smart contract. Among the simplest might be using a community-developed tool such as Mercury which will take all the infrastructure work off your plate for a low subscription fee.
Another approach we'll explore here is using a cron job to query Soroban RPC periodically and store the relevant events in a locally stored SQLite database. We are going to use an Object Relational Mapper (ORM), allowing us to write database query directly in Python or JavaScript.
Setup
- Python
In a virtual environment, install the Python dependencies:
pip install sqlalchemy stellar-sdk
Setup the Database Client
- Python
- JavaScript
To access the database, we will use SQLAlchemy, which is a frequently used Python library to query database.
We are going to ingest events in a table named SorobanEvent
. In SQLAlchemy, this translates into a class, also called a database model:
from typing import Any
from sqlalchemy import orm, JSON
class Base(orm.DeclarativeBase):
# needed to tell SQLAlchemy to translate a dictionary into a JSON entry
type_annotation_map = {
dict[str, Any]: JSON,
}
class Event(Base):
__tablename__ = "SorobanEvent"
id: orm.Mapped[int] = orm.mapped_column(primary_key=True)
contract_id: orm.Mapped[str]
ledger: orm.Mapped[int]
topics: orm.Mapped[dict[str, Any]]
value: orm.Mapped[str]
We will use an in-memory-only SQLite database for this guide, but thanks to the use of an ORM, we could be using any other supported database. We would simply need to change the connection string.
from sqlalchemy import create_engine
engine = create_engine("sqlite://", echo=True)
# the following creates the table in the DB
Base.metadata.create_all(engine)
By setting echo=True
we can understand what is happening on the database. Creating the database table leads to the following logs:
BEGIN (implicit)
PRAGMA main.table_info("SorobanEvent")
...
PRAGMA temp.table_info("SorobanEvent")
...
CREATE TABLE SorobanEvent (
id INTEGER NOT NULL,
contract_id VARCHAR NOT NULL,
ledger INTEGER NOT NULL,
topics JSON NOT NULL,
value VARCHAR NOT NULL,
PRIMARY KEY (id)
)
...
COMMIT
The finer details of choosing a Prisma configuration are beyond the scope of this document. You can get a lot more information in the Prisma quickstart. Here is our Prisma schema's model:
model SorobanEvent {
id String @id
ledger Int
contract_id String
topic_1 String?
topic_2 String?
topic_3 String?
topic_4 String?
value String
}
Using a database model is very convenient as it allows us to control the database schema programmatically. If we need to change the schema, by adding a new columns for instance, then using an ORM allows us to use very powerful migration tools.
We'll use this model to create and query for the events stored in our database.
Query Events from Soroban RPC
First, we'll need to query the events from Soroban RPC. This simple example makes an RPC request using the getEvents
method, filtering for all transfer
events that are emitted by the native XLM contract.
We are making some assumptions here. We'll assume that your contract sees enough activity, and that you are querying for events frequently enough that you aren't in danger of needing to figure out the oldest ledger Soroban RPC is aware of. The approach we're taking is to find the largest (most recent) ledger sequence number in the database and query for events starting there. Your use-case may require some logic to determine what the latest ledger is, and what the oldest ledger available is, etc.
- Python
- JavaScript
If we start from scratch, there is no known ledger so we can try to ingest roughly the last 7 days assuming a ledger closes every 6s.
import stellar_sdk
soroban_server = stellar_sdk.SorobanServer()
ledger = soroban_server.get_latest_ledger().sequence-int(3600 / 6 * 24 * 7)
Later on, we will be able to start from the latest ingested ledger by making a query to our DB.
with orm.Session(engine) as session:
stmt = sqlalchemy.select(Event.ledger).where(
Event.contract_id == contract_id
).order_by(Event.ledger.desc())
ledger = session.scalars(stmt).first()
Let's get events from Soroban RPC!
from stellar_sdk.soroban_rpc import EventFilter, EventFilterType
contract_id = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"
res = soroban_server.get_events(
ledger,
filters=[
EventFilter(
event_type=EventFilterType.CONTRACT,
contract_ids=[contract_id],
topics=[["AAAADwAAAAh0cmFuc2Zlcg==", "*", "*", "*"]],
)
],
)
events = res.events
We use the @stellar/stellar-sdk
library:
import { SorobanRpc } from "@stellar/stellar-sdk";
import { PrismaClient } from "@prisma/client";
const server = new SorobanRpc.Server("https://soroban-testnet.stellar.org");
const prisma = new PrismaClient();
let latestEventIngested = await prisma.sorobanEvent.findFirst({
orderBy: [
{
ledger: "desc",
},
],
});
let events = await server.getEvents({
startLedger: latestEventIngested.ledger,
filters: [
{
type: "contract",
contractIds: ["CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"],
topics: [["AAAADwAAAAh0cmFuc2Zlcg==", "*", "*", "*"]],
},
],
});
Store Events in the Database
Now, we'll check if the events
object contains any new events we should store, and we do exactly that. We're storing the event's topics and values as base64-encoded strings here, but you could decode the necessary topics and values into the appropriate data types for your use-case.
- Python
- JavaScript
Make your life easier with a SQLAlchemy sessionmaker when making transactions (e.g. add records.)
import sqlalchemy
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
with Session.begin() as session:
events_ = []
for event in events:
topic_ = event.topic
value = event.value
events_.append(Event(contract_id=contract_id, ledger=event.ledger, topics=topic_, value=value))
session.add_all(events_)
BEGIN (implicit)
INFO sqlalchemy.engine.Engine COMMIT
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 3311, '["AAAADwAAAAh0cmFuc2Zlcg==", "AAAAEgAAAAAAAAAAJY16rJOcKxQayCR7ayNA80hW5q1U4ypIGOY7NktBfKU=", "AAAAEgAAAAHXkotywnA8z+r365/0701QSlWouXn8m0UOoshCtNHOYQ==", "AAAADgAAAAZuYXRpdmUAAA=="]', 'AAAACgAAAAAAAAAAAAAAAAAAAGQ=')
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 3325, '["AAAADwAAAAh0cmFuc2Zlcg==", "AAAAEgAAAAAAAAAAJY16rJOcKxQayCR7ayNA80hW5q1U4ypIGOY7NktBfKU=", "AAAAEgAAAAHXkotywnA8z+r365/0701QSlWouXn8m0UOoshCtNHOYQ==", "AAAADgAAAAZuYXRpdmUAAA=="]', 'AAAACgAAAAAAAAAAAAAAAAAAAGQ=')
...
COMMIT
if (events.events?.length) {
events.events.forEach(async (event) => {
await prisma.sorobanEvent.create({
data: {
id: event.id,
type: event.type,
ledger: event.ledger,
contract_id: event.contractId.toString(),
topic_1: event.topic[0].toXDR("base64") || null,
topic_2: event.topic[1].toXDR("base64") || null,
topic_3: event.topic[2].toXDR("base64") || null,
topic_4: event.topic[3].toXDR("base64") || null,
value: event.value.toXDR("base64"),
},
});
});
}
Run the Script with Cron
A cron entry is an excellent way to automate this script to gather and ingest events every so often. You could configure this script to run as (in)frequently as you want or need. This example would run the script every 24 hours at 1:14 pm:
- Python
- JavaScript
14 13 * * * python /absolute/path/to/script.py
Here's another example that will run the script every 30 minutes:
30 * * * * python /absolute/path/to/script.py
14 13 * * * node /absolute/path/to/script.js
Here's another example that will run the script every 30 minutes:
30 * * * * node /absolute/path/to/script.js
Guides in this category:
📄️ Consume previously ingested events
Consume ingested events without querying the RPC again
📄️ Ingest events published from a contract
Use Soroban RPC's getEvents method for querying events, with a 7 day retention window
📄️ Publish events from a Rust contract
Publish events from a Rust contract