博客
关于我
手写一个简单的分布式架构(javaweb+RPC)
阅读量:274 次
发布时间:2019-03-01

本文共 10300 字,大约阅读时间需要 34 分钟。

问题:还是一样,在学分布式计算,老师布置了个作业。

如下:

开发一个基于分布式架构的校园卡模拟交易和管理系统

系统架构至少三个角色
服务器-以RPC的形式提供服务调用,有三种服务(1、支付。2、开户。3、充值)。数据信息应可以持久化存储在服务器上。
卡务管理-提供WEB页面,实现开户和充值的交互人机界面,并通过RPC调用服务器的相关业务功能。
刷卡机-命令行应用程序,提供刷卡人机界面,可以接受键盘反复输入卡号、密码、金额,并通过调用RPC服务完成消费支付业务。
要求:整个系统具备基本的容错机制,如RPC服务宕机或卡号密码错误等。

看了一些视频和资料:

总体结构如下:
在这里插入图片描述
common包下是公共类、公共方法和接口
client包下是客户端
server包下是服务端

实际开发应当不会放到同一个项目下,应当分模块开发,由于本项目较为简单,只以一个项目的形式完成。

话不多说,直接上代码分析,

首先展示RPC的逻辑:
common公共端
RegistryCenter.java 用来封装注册中心的端口号和url

package edu.cn.common.config;public class RegistryCenter {   	private static final int REGISTRY_PORT=2021;	private static final String SERVER_ADDRESS="127.0.0.1";	private static final String SERVER_URL="TCP://"+SERVER_ADDRESS+":"+REGISTRY_PORT;	public static int getRegistryPort() {   		return REGISTRY_PORT;	}	public static String getServerAddress() {   		return SERVER_ADDRESS;	}	public static String getServerUrl() {   		return SERVER_URL;	}}

RemoteCall.java 用来封装调用服务过程中的参数(类名、方法名、参数类型、参数)

import java.io.Serializable;import java.util.Arrays;public class RemoteCall implements Serializable {   	private static  final long serialVersionUID = 1L;	String serviceClassName;//服务功能的类名	String serviceMethodName;//服务功能的方法名称	Class
[] serviceMethodArgsClass;//方法的参数列表 Object[] serviceMethodArgsValues;//方法的实际参数列表 public RemoteCall() { } public RemoteCall(String className, String methodName, Class
[] methodArgsClass, Object[] methodArgsValues) { this.serviceClassName=className; this.serviceMethodName=methodName; this.serviceMethodArgsClass=methodArgsClass; this.serviceMethodArgsValues=methodArgsValues; } public String getServiceClassName() { return serviceClassName; } public void setServiceClassName(String serviceClassName) { this.serviceClassName = serviceClassName; } public String getServiceMethodName() { return serviceMethodName; } public void setServiceMethodName(String serviceMethodName) { this.serviceMethodName = serviceMethodName; } public Class
[] getServiceMethodArgsClass() { return serviceMethodArgsClass; } public void setServiceMethodArgsClass(Class
[] serviceMethodArgsClass) { this.serviceMethodArgsClass = serviceMethodArgsClass; } public Object[] getServiceMethodArgsValues() { return serviceMethodArgsValues; } public void setServiceMethodArgsValues(Object[] serviceMethodArgsValues) { this.serviceMethodArgsValues = serviceMethodArgsValues; } @Override public String toString() { return "RemoteCall{" + "serviceClassName='" + serviceClassName + '\'' + ", serviceMethodName='" + serviceMethodName + '\'' + ", serviceMethodArgsClass=" + Arrays.toString(serviceMethodArgsClass) + ", serviceMethodArgsValues=" + Arrays.toString(serviceMethodArgsValues) + '}'; }}

RPC服务端:

ServerCenter.java

package edu.cn.server.rpc;public interface ServerCenter {   	void start();	void close();	void register(Class service, Class serviceImpl);}

ServerCenterImpl.java

package edu.cn.server.rpc;import edu.cn.common.config.RegistryCenter;import edu.cn.common.config.RemoteCall;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ServerCenterImpl implements ServerCenter {   	//服务端的所有可供客户端访问的接口都注册到map中	private static Map
serverRegister = new HashMap
(); private static int port; private static int count = 0; public ServerCenterImpl(int port) { this.port = port; } //连接池,一个对象处理一个客户请求(连接池中大小与CPU有关) private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static boolean isRunning = false; //开启服务端服务 @Override public void start() { try { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); isRunning = true; System.out.println("RPC服务端启动,URL:"+ RegistryCenter.getServerUrl()); while (true) { System.out.println("RPC服务端 线程 "+(++count)+" 启动..."); //接收客户端请求,处理请求,并返回结果 //客户端每发送一次请求,则服务端从连接池中获取一个线程 Socket socket = serverSocket.accept();//等待客户端连接 executor.execute(new ServiceTask(socket)); } } catch (Exception e) { System.out.println("start exception." + e); } } //关闭连接 @Override public void close() { isRunning = false; executor.shutdown(); } //服务端通过register方法将接口注册到注册中心,key为关键字,value为接口的具体实现; @Override public void register(Class service, Class serviceImpl) { serverRegister.put(service.getName(), serviceImpl); } private static class ServiceTask implements Runnable { private Socket socket; public ServiceTask() { } public ServiceTask(Socket socket) { this.socket = socket; } @Override public void run() { ObjectInputStream input = null; ObjectOutputStream output = null; try { input = new ObjectInputStream(socket.getInputStream()); RemoteCall remoteCall = (RemoteCall) input.readObject(); //根据客户端请求,到map中找到与之对应的具体接口 Class serviceClass = serverRegister.get(remoteCall.getServiceClassName()); Method method = serviceClass.getMethod(remoteCall.getServiceMethodName(), remoteCall.getServiceMethodArgsClass()); Object result = method.invoke(serviceClass.newInstance(), remoteCall.getServiceMethodArgsValues()); //将方法执行完毕的返回值传给客户端 System.out.println("服务端 线程 "+(count)+"处理后的结果:" + result); output = new ObjectOutputStream(socket.getOutputStream()); output.writeObject(result); } catch (Exception e) { System.out.println("ServerCenterImpl" + e); } finally { if (output != null) { try { output.close(); } catch (Exception e) { } } if (input != null) { try { input.close(); } catch (Exception e) { } } } } }}
import edu.cn.common.config.RegistryCenter;import edu.cn.common.service.CampusCardService;import edu.cn.server.service.impl.CampusCardServiceImpl;public class CardServerRPC extends Thread {   	final int port;//端口号	public CardServerRPC() {   		this.port = RegistryCenter.getRegistryPort();	}	public void run() {   		ServerCenter serverCenter = new ServerCenterImpl(this.port);		//将接口和实现类注册到服务中心		serverCenter.register(CampusCardService.class, CampusCardServiceImpl.class);		serverCenter.start();	}public static void main(String[] args) {           //开启RPC服务端		CardServerRPC demo=new CardServerRPC();		demo.start();	}}

客户端

MyAspect.java RPC代理类,用来调用远程服务的

package edu.cn.client.rpc;import edu.cn.common.config.RegistryCenter;import edu.cn.common.config.RemoteCall;import edu.cn.common.service.CampusCardService;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.Socket;public class MyAspect {   	/**	 *	 * @return 返回服务对象	 */	public static Object getService(){   		//本地调用远程服务		MyAspect myAspect = new MyAspect();		CampusCardService campusCardService = (CampusCardService) myAspect.rpc(CampusCardService.class);		return  campusCardService;	}	/**	 *  动态创建代理对象	 *  Object newProxyInstance(ClassLoader loader, Class
[] interfaces, InvocationHandler h) * 参数1:真实对象的类加载器 * 参数2:真实对象实现的所有的接口,接口是特殊的类,使用Class[]装载多个接口 * 参数3: 动态代理内部类对象 * @param clazz * @return */ public Object rpc(final Class clazz) { return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{ clazz}, new MyHandler(clazz)); } class MyHandler implements InvocationHandler{ final Class clazz; public MyHandler(Class clazz){ this.clazz=clazz; } /** * @param proxy 代理对象 * @param method 代理的方法对象 * @param args 方法调用时参数 * @return * @throws Throwable */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket=null; try{ socket=new Socket(RegistryCenter.getServerAddress(), RegistryCenter.getRegistryPort()); }catch(Exception e){ System.out.println("服务器正在休息,请稍后再试一试吧!"); e.printStackTrace(); return null; } RemoteCall remoteCall=new RemoteCall(clazz.getName(),method.getName(),method.getParameterTypes(),args); ObjectOutputStream objectOutputStream=new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(remoteCall); ObjectInputStream objectInputStream=new ObjectInputStream(socket.getInputStream()); Object result=objectInputStream.readObject();// System.out.println("接收服务端处理后的返回值"+result); objectInputStream.close(); objectOutputStream.close(); return result; } }}

部分业务逻辑的具体实现

package edu.cn.client.rpc;import edu.cn.common.entity.LoginCard;import edu.cn.common.entity.UpdateCard;import edu.cn.common.service.CampusCardService;import java.util.Scanner;public class CardClientRPCInit extends Thread{   	private CampusCardService campusCardService = (CampusCardService) MyAspect.getService();	public void run() {   		Scanner sc1 = new Scanner(System.in);		Scanner sc2 = new Scanner(System.in);		Scanner sc3 = new Scanner(System.in);		LoginCard user = new LoginCard();		UpdateCard depend = new UpdateCard();		String id = null;		String password = null;		double spend = 0.0;		while (true) {   			System.out.println("--------我是刷卡机!--------");			System.out.print("您此次消费(元):");			spend = sc1.nextDouble();			System.out.print("您的卡号是:");			id = sc2.nextLine();			System.out.print("您的密码是:");			password = sc3.nextLine();			user.setLoginCard(id, password);			while (!campusCardService.login(user)) {   				System.out.println("-----卡号或密码错误!请重新输入!-----");				System.out.print("您的卡号是:");				id = sc2.nextLine();				System.out.print("您的密码是:");				password = sc3.nextLine();				user.setLoginCard(id, password);			}			System.out.println("密码正确!正在扣费----");			try {   				sleep(1000);			} catch (InterruptedException e) {   				e.printStackTrace();			}			depend.setUpdateCard(id,password, spend);			System.out.println(campusCardService.pay(depend));		}	}}

客户端刷卡机的main方法:

package edu.cn.client.main;import edu.cn.client.rpc.CardClientRPCInit;public class PosReader {   	public static void main(String[] args) {   		CardClientRPCInit card=new CardClientRPCInit();		card.start();	}}

启动顺序:先启动服务端后 再启动客户端,即可实现该系统

启动成功截图
在这里插入图片描述

服务端未启动时的结果截图:

在这里插入图片描述
还有web端的业务逻辑。由于细节过多,不再一一展示,在开发时要把握好总体的结构即可。

本次笔记就到这里了。需要源码的可私信或留言。

转载地址:http://dcsv.baihongyu.com/

你可能感兴趣的文章
MySQL 深度分页性能急剧下降,该如何优化?
查看>>
MySQL 深度分页性能急剧下降,该如何优化?
查看>>
MySQL 添加列,修改列,删除列
查看>>
mysql 添加索引
查看>>
MySQL 添加索引,删除索引及其用法
查看>>
mysql 状态检查,备份,修复
查看>>
MySQL 用 limit 为什么会影响性能?
查看>>
MySQL 用 limit 为什么会影响性能?有什么优化方案?
查看>>
MySQL 用户权限管理:授权、撤销、密码更新和用户删除(图文解析)
查看>>
mysql 用户管理和权限设置
查看>>
MySQL 的 varchar 水真的太深了!
查看>>
mysql 的GROUP_CONCAT函数的使用(group_by 如何显示分组之前的数据)
查看>>
MySQL 的instr函数
查看>>
MySQL 的mysql_secure_installation安全脚本执行过程介绍
查看>>
MySQL 的Rename Table语句
查看>>
MySQL 的全局锁、表锁和行锁
查看>>
mysql 的存储引擎介绍
查看>>
MySQL 的存储引擎有哪些?为什么常用InnoDB?
查看>>
Mysql 知识回顾总结-索引
查看>>
Mysql 笔记
查看>>