欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Rabbitmq 分布式集群环境搭建 博客分类: Rabbitmq  

程序员文章站 2024-03-19 10:34:10
...

 1、MQ

 

       MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

2、安装

http://www.rabbitmq.com/download.html 选择通用二进制办解压安装

4、启动

sbin/rabbitmq-server -detached

 5、启用web管理界面

sbin/rabbitmq-plugins enable rabbitmq_management

 web访问: http://IP:15672/

6、设置账号密码

 

# 删除guest账号
sbin/rabbitmqctl delete_user guest
# 新增管理账号 admin
sbin/rabbitmqctl add_user admin 123456
# 设置admin为超级管理员
sbin/rabbitmqctl set_user_tags admin administrator
# 查看当前用户角色列表
 sbin/rabbitmqctl list_users 

 7、Java 客户端代码

a、加入Jar包

<dependency>
	        <groupId>com.rabbitmq</groupId>
	        <artifactId>amqp-client</artifactId>
	        <version>3.0.4</version>
	</dependency>

 

b、生成着代码

import java.io.IOException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Send {  
    private final static String QUEUE_NAME = "hello";  
  
    public static void main(String[] args) throws IOException {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("10.0.0.104");  
        factory.setUsername("rabbitmq");
        factory.setPassword("123456"); 
        factory.setPort( 5672); 
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 这里第二个参数是指消息是否持久化,持久化传递true,这里和消费者保持一致
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        for(int x=0;x<20;x++){
	        String message = "Hello World!";  
	        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
	        System.out.println(" [x] Sent '" + message + "'");  
        }
        channel.close();  
        connection.close();  
    }  
}  

 c、消费者代码

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;  
   
  
public class Reqv {  
    private final static String QUEUE_NAME = "hello";  
  
    public static void main(String[] argv) throws Exception {  
  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("10.0.0.104");  
        factory.setUsername("rabbitmq");
        factory.setPassword("123456"); 
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 这里第二个参数是指消息是否持久化,持久化传递true,这里和生产者保持一致
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  
         DefaultConsumer consumer = new DefaultConsumer(channel){
        	
        	@Override
        	public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
        			throws IOException {
        		System.out.println(" [m] Received '" + new String(body, "UTF-8") + "'" ); 
        	}
        };
        boolean autoAck = true;
        
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }  
}  

8、spring boot amqp 

a、引入Jar包

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

b、配置文件:

spring:
    rabbitmq:
      host: 10.0.0.104
      port: 5672
      username: admin
      password: 123456

  

c、生产者代码

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageQueueService {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	public void send(String msg) {
		System.out.println("Sender : " + msg);
		rabbitTemplate.convertAndSend( "hello" , msg);
	}
	
}

d、消费者代码:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = { "hello" } ) 
public class ReceiverQueue {
	@RabbitHandler
	public void process(String hello) {
		System.out.println("Receiver : " + hello);
	}
}

 

 

9、如果提示没有权限,进入web管理界面,选择Admin,将来Can access virtual hosts列设置成,当然也可以根据目录结构控制权限


Rabbitmq 分布式集群环境搭建
            
    
    博客分类: Rabbitmq  
 

10、rabbitmq 分布式搭建

a、准备两台机器server1和server2 或者docker环境,两台机器上都安装上面服rabbitmq,然后运行命令

两台机器都要启动rabbitmq

#停止rabbitmq应用,并不是server
sbin/rabbitmqctl stop_app
#链接server1
sbin/rabbitmqctl join_cluster rabbit@server1
# 这里可能会提示下面错误
Error: unable to connect to nodes [rabbit@rabbitmqmaster]: nodedown

DIAGNOSTICS
===========

attempted to contact: [rabbit@rabbitmqmaster]

rabbit@rabbitmqmaster:
  * connected to epmd (port 4369) on rabbitmqmaster
  * epmd reports node 'rabbit' running on port 25672
  * TCP connection succeeded but Erlang distribution failed

  * Authentication failed (rejected by the remote node), please check the Erlang cookie


current node details:
- node name: 'rabbitmq-cli-88@rabbitmqsalver2'
- home dir: /root
- cookie hash: 55WeQTsiv71JcEDq/JcE9Q==

找到home dir 下面的.erlang.cookie,这里的是 /root/.erlang.cookie 讲将两台机器的该文件内容保持一致,再次运行该命令成功。

# 查看集群状态
sbin/rabbitmqct cluster_status

 11、命令设置账号可权限

#设置
sbin/rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
#查看权限
sbin/rabbitmqctl list_permissions

 

 

  • Rabbitmq 分布式集群环境搭建
            
    
    博客分类: Rabbitmq  
  • 大小: 19 KB