Communication with the database via RabbitMQ messages in a Laravel Microservice application
Install RabbitMQ:
Verify the installation and running status of RabbitMQ by consulting the earlier article I provided on how to install RabbitMQ.
Install Laravel Project
Install Laravel on both the producer and consumer applications using Composer:
composer create-project --prefer-dist laravel/laravel producer-app
composer create-project --prefer-dist laravel/laravel consumer-app
Install RabbitMQ PHP Library:
In both Laravel applications, install the php-amqplib
library to interact with RabbitMQ:
composer require php-amqplib/php-amqplib
Producer and Consumer Application Configuration:
Configure in both Laravel application to send messages to RabbitMQ. Update the .env
file with RabbitMQ connection details:
RABBITMQ_HOST=your_rabbitmq_host
RABBITMQ_PORT=5672
RABBITMQ_USER=your_rabbitmq_user
RABBITMQ_PASSWORD=your_rabbitmq_password
RABBITMQ_VHOST=your_rabbitmq_vhost
Create a new RabbitMQ service class or use an existing one to handle message sending for Producer Laravel Application:
<?php
namespace App\Services;
use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitMQService
{
private $connection;
private $channel;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);
$this->channel = $this->connection->channel();
}
public function userActivity($userId='1', $activity='order',$productId='123')
{
$this->channel->queue_declare('user_activity_queue', false, true, false, false);
$messageBody = json_encode(['user_id' => $userId, 'activity' => $activity,'productId'=>$productId]);
$message = new AMQPMessage($messageBody);
Log::info($messageBody);
$this->channel->basic_publish($message, '', 'user_activity_queue');
}
public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
}
In your producer laravel application, use the RabbitMQ service to send messages:
<?php
namespace App\Http\Controllers;
use App\Services\RabbitMQService;
class MessageController extends Controller
{
public function userActivity()
{
$rabbitmqService = new RabbitMQService();
$rabbitmqService->userActivity();
return response()->json(['message' => 'Message sent successfully']);
}
}
Now let’s makes our required routes in routes/web.php
use App\Http\Controllers\MessageController;
use Illuminate\Support\Facades\Route;
Route::get('/userActivity',[MessageController::class,'userActivity']);
Consumer Application Configuration:
Configure in consumer laravel application update the .env
file with database connection details:
DB_CONNECTION=mysql
DB_HOST=127.0.0.1
DB_PORT=3306
DB_DATABASE=consumer
DB_USERNAME=xxxxx
DB_PASSWORD=xxxxxx
Create Migration for Consumer Laravel Application
php artisan make:model Inventory -m
Now In the Migration file,
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class extends Migration
{
/**
* Run the migrations.
*/
public function up(): void
{
Schema::create('inventory', function (Blueprint $table) {
$table->id();
$table->integer('productId');
$table->integer('quantity');
$table->timestamps();
});
}
/**
* Reverse the migrations.
*/
public function down(): void
{
Schema::dropIfExists('table_inventory');
}
};
Next,this command use to in your terminal then this setup create to in your database.
php artisan migrate
The complete code in Inventory Model is given below.
<?php
namespace App\Models;
use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model;
class Inventory extends Model
{
use HasFactory;
protected $table="inventory";
}
Create a new RabbitMQ service class or use an existing one to handle message sending for Consumer Laravel Application:
<?php
namespace App\Services;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class RabbitMQService
{
private $connection;
private $channel;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);
$this->channel = $this->connection->channel();
}
public function consumeUserActivity(string $queue)
{
Log::info('Consuming messages from queue: ' . $queue);
$this->channel->queue_declare($queue, false, true, false, false);
$callback = function ($message) {
Log::info('Received message: ' . $message->body);
// Process the message
$data = json_decode($message->body, true);
Log::info($data);
$this->updateInventory($data['user_id'], $data['activity'], $data['productId']);
// Acknowledge the message after processing
if ($this->processingSuccessful()) {
$message->ack();
Log::info('Acknowledged message with delivery tag: ' . $message->delivery_info['delivery_tag']);
} else {
Log::error('Failed to process message. Rejecting message.');
$message->reject(false); // Reject the message without requeueing
}
};
$this->channel->basic_consume($queue, '', false, false, false, false, $callback);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function updateInventory($userId, $activity, $productId)
{
Log::info('qweqw');
// Simulate updating inventory for user ID 1 buying a product
$quantityToReduce = 1; // Assuming the user buys one product
// Update inventory in the inventory database
$inventory = Inventory::where('productId', $productId)->first();
if ($inventory) {
Log::info('inventory');
$inventory->quantity -= $quantityToReduce;
$inventory->save();
// Log or perform any additional actions if needed
} else {
Log::info('issue');
// Handle case where the product is not found in inventory
// Log or perform any error handling as needed
}
}
private function processingSuccessful()
{
// Implement your logic to check if message processing was successful
// Return true if successful, false otherwise
return true; // Change this as per your implementation
}
public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
}
Create an artisan command to consume user activity in consume laravel application:
<?php
namespace App\Console\Commands;
use App\Services\RabbitMQService;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
class ConsumeUserActivity extends Command
{
protected $signature = 'consume:useractivity';
protected $description = 'Consume user activity messages from RabbitMQ';
public function handle()
{
Log::info('handle');
$rabbitmqService = new RabbitMQService();
$rabbitmqService->consumeUserActivity('user_activity_queue', function ($message) {
// Process the message here
Log::info('message');
$this->info("Received message: {$message->getBody()}");
});
}
}
Run the consumer command to start listening for messages in consume laravel application:
php artisan consume:useractivity