Building a event streaming architecture using a simple and breakthrough solution

luca bindi
10 min readApr 3, 2024

— Sometimes a simpler and cheaper solution is just around the corner or already available in your technology products—

We are now accustomed to talking about microservices, and we don’t always fully understand the complications involved in managing distributed systems and messages that must adhere to certain criteria and persist information, ensuring its integrity among the microservices in the face of potential issues.

Furthermore, in microservices architectures or in whatever distributed systems, we often find ourselves facing different scenarios that putting together multiple technology components and service that often communicate with each other only via asynchronous stateless distributed messaging systems via REST.

In a distributed systems , arguments like reliability, consistency, latency , connection, fault-tolerance, durability, transaction states, state of the communications among the services are extremely important

To accomplish this, various guides in the literature can be followed, implementing compensating patterns to address these considerations, but each of them has some problem with the latency in the communication between persisting the data and conveying the information to the rest of the crew about the change of the status, that can cause the lost of the information or, in some case, the duplication of the information.

For this reason, maintaining the integrity and alignment of all services on the same information represents a crucial and critical point.
This is especially significant when dealing with customers who place orders, and where the entire chain must adhere to specific rules to ensure the order is duly acknowledged and confirmed, and the user who submitted it feels assured of their transaction.

It is common to see Kafka clusters in distributed systems or microservices architectures or wherever you need to handle large quantities of messages or logs in real time, as it fits very well for stateless asynchronous distributed messaging. Kafka is particularly suited for real-time data pipelines and streaming/event-driven applications. However, Kafka is not without its drawbacks. The most noteworthy aspects encompass the complexity it introduces to the system architecture, the scalability of the distributed broker, replication, specific configurations, and various other factors that must be taken into account to ensure data consistency, and I’m overlooking the associated costs.

Hence for instance in an e-commerce processes, we could have a microservice for management order that send an async request via kafka on inventory topic to know the quantity of the specific product are presence to confirm to end-customer the availability of that product that he wants to buy.

The next duty of Order managment could be to persist the information on a relational database also to start the accounting process, after it has wrote the information on a relational database like for example Oracle database , the order managment microservices needs to inform others microservice who are part of the saga about the number of that products are sold.

The Inventory Service will update the quantity of products in its JSON database, for example mongodb , reading from the order managment service’s message in the kafka topic , and so on Delivery Service can initiate the process to pack and dispatch the order to the end-user.

All of this steps seems work well but what if the connection between order managment and kafka has any connection problem or latency problem ? or the inventory service is able to read the quantity of the products on his json database but not able to comunicate to kafka to update new amount of that products for others order?

Obviously there are some configuration that you can put on the table to avoid those problem like event acknowledgment from the broker, idempoten producer or consumer message and so on, but it’s not without cost.

But how could be more and more simple to write down the persisting message on the database and put the message in a topic at the same operation, or better we could have to say in the same “transaction” in completely consistent way? I think it would be extremely easier if the system thinks for us at all these stuffs in a completely simpler architectures and technology, that is already at you, and you have only to use it.

How to do it ? the answer is simple, with Oracle Database that is a converged database.

What is converged database ?

It’s not another database, but the Oracle Database that you already know but we like to call it converged because Oracle Database since many years is multi-workload database or translytical and multimodel database that seamlessly handles relational data, JSON data, vector embedding , spatial and property graph data and various other data models with the same engine. It provides versatility through both SQL and NoSQL approaches, without the need to choose among different technologies (like mongodb or neo4j), you can have the best of them worlds on the same engine with the same expertise.

So for that reason, we call Oracle Database, Converged Database.

And on Oracle Cloud Infrastructure (OCI) all these features are even more simplier to use it.

But do you knows Oracle Converged Database has an incredible feature called Transaction Event Queue ( TxEventQ — previous TEQ) directly inside the database that you can manage it by REST?

What is TxEventQ ?

TxEventQ is Oracle Transactional Event Queues a integrated message queuing system inside Oracle database that provides a high-performance, high-throughput, streaming events, pub/sub, security, etc etc .

Applications can use TxEventQ to communicate with each other as part of a workflow or saga pattern , producing or consuming messages addressing the requirements from data-driven and event-driven architectures in modern enterprise applications.

In a nutshell, TxEventQ is similar to Kafka, but with the difference that TxEventQ is completely integrated in the Oracle Database and has all the features and scalability that Oracle Database can offer and you don’t need to think to the complexity of the distributed architecture, rather the architecture will be easier.

If you want, you can continue using Kafka , but it’s important to be aware that there are some alternatives that could make easier your work

Furthermore, in a Converged Database, there are other features that make it easy to access important information about the transaction state, especially when this information arrives in streaming. If you want more information about this, you can read further by checking this link to my blog about this argument).

Now probably you are thinking “are you asking me to move the communication /kafka layer inside the database layer ?? I have to move the layer that are on top of the application layer to at the backend layer ?” : “No absolutely not, Oracle Database can expose the topics by ORDS, so the the services can keep interact with the topic via REST on the middleware layer, keep decoupling among layer and decoupling among services. But also let you to interact with the topic from the database layer when the service is persisting the information on his database in a single commit operation”.

What is ORDS ?

ORDS in a is a middle-tier, separate layer to interact with the database features by REST verbs and provide you a lot of tools to make your work easier and fast… for example we can use REST for efficiently managing the database lifecycle, querying your data, create collection and document JSON, performing any desired operations always via REST .. and also to create a topic TxEventQ, publish and read a message.

Let me now show you how you can use TxEventQ through ORDS do reach the goal, remember all these features (NoSQL / Json / Relational / columnar / Spatial /Graph vector embedding / TxEventQ ) are inside your Oracle Database , you don’t need to enable somethings, you just have to use them.

I’m using Oracle Database free for developers and my ORDS is configured on the server (158.111.175.1) application layer on 8080 port and my pluggable database is called freepdb1 and the schema for who I enable REST is appl1

http://158.111.175.18:8080/ords/freepdb1/appl1

Create a Topic via REST

First of all we have to create an authorization key for our schema and password

echo -n "appl1:DataBase##1234" | base64
--output--
YXBwbDE6RGF0YUJhc2UjIzEx

Create a Topic via REST

To create a topic via REST on the database from application layer you can use a POST verb

in my case {cluster_id} is FREEPDB1 that is the db_unique_name

http://158.111.175.18:8080/ords/freepdb1/appl1/\_/db-api/stable/database/txeventq/clusters/FREEPDB1/topics

in the body of the POST we will put the name of the topic, and partitions

and this is the header with the base authorization key

We can check on the database the type of topic

SELECT name || ' -> Sharded ? = ' || sharded AS "QueueName -> Sharded" FROM user_queues where
queue_type = 'NORMAL_QUEUE'
and name ='REST_TEQ';

--you will see the topic is shared

Create a consumer group

Now we have to create a consumer group using POST

http://158.111.175.18:8080/ords/freepdb1/appl1/_/db-api/stable/database/txeventq/clusters/FREEPDB1/consumer-groups/REST_CONS_GROUP

Now we can choose to publish on the topic via REST after persisting the data in the database (as we would have done with Kafka), or perform these two operations together within the same commit statement in the database to ensure consistency.

Publish a message on the Topic by REST

after you persist the data inside the database you can publish a message on a topic via REST and you can do it , just using POST verb with the body message

http://158.111.175.18:8080/ords/freepdb1/appl1/_/db-api/stable/database/txeventq/topics/REST_TOPIC

and you can read it directly from the table on the database or use DBMS_AQ package to dequeue the message

select MSG_STATE,EXPIRATION,utl_raw.cast_to_varchar2(dbms_lob.substr(USER_DATA)) Message ,CONSUMER_NAME from AQ$REST_TOPIC;

or read it via REST using GET

http://158.111.175.18:8080/ords/freepdb1/appl1/_/db-api/stable/database/txeventq/consumers/REST_CONS_GROUP/instances/MY-Consumer-Instance_REST_CONS_GROUP/records

after dequeue the message change his status in processed

Write the information in the database and Publish a message on a topic in one sigle operation / commit

I have a table Order_Product in the database, but it could be also a collection JSON, I use this very simple plsql block to persist in the database table and in a topic just with the purpose to show you the commands, but you could write a procedure and just call it

set serveroutput on size 1000000
DECLARE
msgprop dbms_aq.message_properties_t;
enqopt dbms_aq.enqueue_options_t;
enq_msgid RAW(16);
okafkaMsg SYS.AQ$_JMS_BYTES_MESSAGE;
key varchar2(100):='KEY 1';
qName varchar2(100):='REST_TOPIC';
Mess_value varchar2(100):='Order 1 Pixel 8 confirmed';
partition number(9):=0;
finalPayload raw(32767);
keyLen number;
valueLen number;
keyLenRaw raw(4);
valueLenRaw raw(4);
keyRaw raw(32767);
valueRaw raw(32767);
begin

-- Convert varchar2 to RAW
keyRaw := UTL_RAW.cast_to_raw(key);
valueRaw := UTL_RAW.cast_to_raw(Mess_value);

--Get payload in okafka message format

keyLen := UTL_RAW.LENGTH(keyRaw);
valueLen := UTL_RAW.LENGTH(valueRaw);
keyLenRaw := UTL_RAW.CAST_FROM_BINARY_INTEGER(keyLen);
valueLenRaw := UTL_RAW.CAST_FROM_BINARY_INTEGER(valueLen);
finalPayload := utl_raw.concat(keyLenRaw,keyRaw,valueLenRaw,valueRaw);

okafkaMsg := SYS.AQ$_JMS_BYTES_MESSAGE.CONSTRUCT();
okafkaMsg.set_bytes(finalPayload);

-- Set relevant message properties
okafkaMsg.set_Long_Property('AQINTERNAL_PARTITION', (partition*2));
okafkaMsg.set_Int_Property('AQINTERNAL_MESSAGEVERSION', 2);
okafkaMsg.set_Long_Property('AQINTERNAL_HEADERCOUNT', 0);
okafkaMsg.set_String_Property('AQINTERNAL_KEY_TYPE','STRING');
okafkaMsg.set_String_Property('AQINTERNAL_VALUE_TYPE','STRING');

insert into ORDER_PRODUCT (ID , Confirmed, Message, product_type, amount, Time_S)
VALUES (1,'Y',Mess_value,'Google Pixel',1,sysdate);

dbms_aq.enqueue(qName, enqopt, msgprop, okafkaMsg, enq_msgid);
dbms_output.put_line('Enqueued message with msg id ' || enq_msgid);
-- commit;
end;
/

As you can see, I intentionally omitted the commit command at the end of the block just to demonstrate, from another session (the yellow terminal one in the video) that the message is not in the ORDER_PRODUCT table, nor in the TOPIC table, and cannot be accessed via REST until the commit command.

Hence, as you saw, you can create the interchange comunication between the microservices to persist data in its his database and publish messages to his topic directly with a single confirmed commit command. Additionally, you can publish or read messages from other microservices’ topics via REST without relying on other technology components, thus building a distributed system that avoids the complexity of amalgamating different and difficult-to-manage components. Moreover, in an easier way, you can also enforce consistency, fault tolerance, resilience, and reliability, scalability, security, network latency, or communication problems, while still using your expertise and the technology that you already use in your data center or in the cloud (Exadata/ OCI PaaS Service/ RAC /Dataguard / Rman / Compression / Partition etc etc).

You can find all endpoints to manage TxEventQ through REST in the Oracle documentation that I suggest reading it.

As always the code is made for educational purposes and all things surely could be written better, all checks for a stable application haven’t been run and consequently there is no any support to it.

I hope it’s useful and … have fun

PS: Many thanks to my colleague Eugenio for the support!

Ciao.
Luca

Disclaimer

The views expressed in this paper are my own and do not necessarily reflect the views of Oracle.

useful Links
https://www.oracle.com/database/advanced-queuing/
https://docs.oracle.com/en/database/oracle/oracle-database/19/adque/aq-introduction.html
https://docs.oracle.com/en/database/oracle/oracle-rest-data-services/23.4/orrst/api-oracle-transactional-event-queues.html
https://luca-bindi.medium.com/autonomous-flashback-time-travel-525f26847ffd

--

--