Communication with the database via RabbitMQ messages in a Laravel Microservice application

Tamrakar Shreyaa
4 min readJun 9, 2024

--

Install RabbitMQ:

Verify the installation and running status of RabbitMQ by consulting the earlier article I provided on how to install RabbitMQ.

Install Laravel Project

Install Laravel on both the producer and consumer applications using Composer:

composer create-project --prefer-dist laravel/laravel producer-app
composer create-project --prefer-dist laravel/laravel consumer-app

Install RabbitMQ PHP Library:

In both Laravel applications, install the php-amqplib library to interact with RabbitMQ:

composer require php-amqplib/php-amqplib

Producer and Consumer Application Configuration:

Configure in both Laravel application to send messages to RabbitMQ. Update the .env file with RabbitMQ connection details:

RABBITMQ_HOST=your_rabbitmq_host
RABBITMQ_PORT=5672
RABBITMQ_USER=your_rabbitmq_user
RABBITMQ_PASSWORD=your_rabbitmq_password
RABBITMQ_VHOST=your_rabbitmq_vhost

Create a new RabbitMQ service class or use an existing one to handle message sending for Producer Laravel Application:

<?php

namespace App\Services;

use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitMQService
{
private $connection;
private $channel;

public function __construct()
{
$this->connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);

$this->channel = $this->connection->channel();
}


public function userActivity($userId='1', $activity='order',$productId='123')
{
$this->channel->queue_declare('user_activity_queue', false, true, false, false);

$messageBody = json_encode(['user_id' => $userId, 'activity' => $activity,'productId'=>$productId]);
$message = new AMQPMessage($messageBody);
Log::info($messageBody);

$this->channel->basic_publish($message, '', 'user_activity_queue');
}



public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
}

In your producer laravel application, use the RabbitMQ service to send messages:

<?php

namespace App\Http\Controllers;

use App\Services\RabbitMQService;

class MessageController extends Controller
{

public function userActivity()
{
$rabbitmqService = new RabbitMQService();
$rabbitmqService->userActivity();
return response()->json(['message' => 'Message sent successfully']);
}
}

Now let’s makes our required routes in routes/web.php

use App\Http\Controllers\MessageController;
use Illuminate\Support\Facades\Route;

Route::get('/userActivity',[MessageController::class,'userActivity']);

Consumer Application Configuration:

Configure in consumer laravel application update the .env file with database connection details:

DB_CONNECTION=mysql
DB_HOST=127.0.0.1
DB_PORT=3306
DB_DATABASE=consumer
DB_USERNAME=xxxxx
DB_PASSWORD=xxxxxx

Create Migration for Consumer Laravel Application

php artisan make:model Inventory -m

Now In the Migration file,

<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration
{
/**
* Run the migrations.
*/
public function up(): void
{
Schema::create('inventory', function (Blueprint $table) {
$table->id();
$table->integer('productId');
$table->integer('quantity');
$table->timestamps();
});
}

/**
* Reverse the migrations.
*/
public function down(): void
{
Schema::dropIfExists('table_inventory');
}
};

Next,this command use to in your terminal then this setup create to in your database.

php artisan migrate

The complete code in Inventory Model is given below.

<?php

namespace App\Models;

use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model;

class Inventory extends Model
{
use HasFactory;

protected $table="inventory";

}

Create a new RabbitMQ service class or use an existing one to handle message sending for Consumer Laravel Application:

<?php

namespace App\Services;

use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitMQService
{
private $connection;
private $channel;

public function __construct()
{
$this->connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);

$this->channel = $this->connection->channel();
}

public function consumeUserActivity(string $queue)
{
Log::info('Consuming messages from queue: ' . $queue);

$this->channel->queue_declare($queue, false, true, false, false);

$callback = function ($message) {
Log::info('Received message: ' . $message->body);

// Process the message
$data = json_decode($message->body, true);
Log::info($data);

$this->updateInventory($data['user_id'], $data['activity'], $data['productId']);

// Acknowledge the message after processing
if ($this->processingSuccessful()) {
$message->ack();
Log::info('Acknowledged message with delivery tag: ' . $message->delivery_info['delivery_tag']);
} else {
Log::error('Failed to process message. Rejecting message.');
$message->reject(false); // Reject the message without requeueing
}
};

$this->channel->basic_consume($queue, '', false, false, false, false, $callback);

while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}

private function updateInventory($userId, $activity, $productId)
{


Log::info('qweqw');

// Simulate updating inventory for user ID 1 buying a product
$quantityToReduce = 1; // Assuming the user buys one product

// Update inventory in the inventory database
$inventory = Inventory::where('productId', $productId)->first();
if ($inventory) {
Log::info('inventory');
$inventory->quantity -= $quantityToReduce;
$inventory->save();
// Log or perform any additional actions if needed
} else {
Log::info('issue');

// Handle case where the product is not found in inventory
// Log or perform any error handling as needed
}
}

private function processingSuccessful()
{
// Implement your logic to check if message processing was successful
// Return true if successful, false otherwise
return true; // Change this as per your implementation
}

public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
}

Create an artisan command to consume user activity in consume laravel application:

<?php

namespace App\Console\Commands;


use App\Services\RabbitMQService;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;

class ConsumeUserActivity extends Command
{
protected $signature = 'consume:useractivity';
protected $description = 'Consume user activity messages from RabbitMQ';

public function handle()
{
Log::info('handle');

$rabbitmqService = new RabbitMQService();
$rabbitmqService->consumeUserActivity('user_activity_queue', function ($message) {
// Process the message here
Log::info('message');

$this->info("Received message: {$message->getBody()}");
});


}


}

Run the consumer command to start listening for messages in consume laravel application:

php artisan consume:useractivity

Output

--

--

Tamrakar Shreyaa
Tamrakar Shreyaa

Written by Tamrakar Shreyaa

Laravel | PHP | Node| nestjs |API | AJAX | jQuery | Laravel vue | Livewire | LAMP stack | CI CD

No responses yet