本文共 10300 字,大约阅读时间需要 34 分钟。
问题:还是一样,在学分布式计算,老师布置了个作业。
如下:开发一个基于分布式架构的校园卡模拟交易和管理系统 系统架构至少三个角色 服务器-以RPC的形式提供服务调用,有三种服务(1、支付。2、开户。3、充值)。数据信息应可以持久化存储在服务器上。 卡务管理-提供WEB页面,实现开户和充值的交互人机界面,并通过RPC调用服务器的相关业务功能。 刷卡机-命令行应用程序,提供刷卡人机界面,可以接受键盘反复输入卡号、密码、金额,并通过调用RPC服务完成消费支付业务。 要求:整个系统具备基本的容错机制,如RPC服务宕机或卡号密码错误等。
看了一些视频和资料:
总体结构如下:实际开发应当不会放到同一个项目下,应当分模块开发,由于本项目较为简单,只以一个项目的形式完成。
话不多说,直接上代码分析,
首先展示RPC的逻辑: common公共端 RegistryCenter.java 用来封装注册中心的端口号和urlpackage edu.cn.common.config;public class RegistryCenter { private static final int REGISTRY_PORT=2021; private static final String SERVER_ADDRESS="127.0.0.1"; private static final String SERVER_URL="TCP://"+SERVER_ADDRESS+":"+REGISTRY_PORT; public static int getRegistryPort() { return REGISTRY_PORT; } public static String getServerAddress() { return SERVER_ADDRESS; } public static String getServerUrl() { return SERVER_URL; }}
RemoteCall.java 用来封装调用服务过程中的参数(类名、方法名、参数类型、参数)
import java.io.Serializable;import java.util.Arrays;public class RemoteCall implements Serializable { private static final long serialVersionUID = 1L; String serviceClassName;//服务功能的类名 String serviceMethodName;//服务功能的方法名称 Class [] serviceMethodArgsClass;//方法的参数列表 Object[] serviceMethodArgsValues;//方法的实际参数列表 public RemoteCall() { } public RemoteCall(String className, String methodName, Class [] methodArgsClass, Object[] methodArgsValues) { this.serviceClassName=className; this.serviceMethodName=methodName; this.serviceMethodArgsClass=methodArgsClass; this.serviceMethodArgsValues=methodArgsValues; } public String getServiceClassName() { return serviceClassName; } public void setServiceClassName(String serviceClassName) { this.serviceClassName = serviceClassName; } public String getServiceMethodName() { return serviceMethodName; } public void setServiceMethodName(String serviceMethodName) { this.serviceMethodName = serviceMethodName; } public Class [] getServiceMethodArgsClass() { return serviceMethodArgsClass; } public void setServiceMethodArgsClass(Class [] serviceMethodArgsClass) { this.serviceMethodArgsClass = serviceMethodArgsClass; } public Object[] getServiceMethodArgsValues() { return serviceMethodArgsValues; } public void setServiceMethodArgsValues(Object[] serviceMethodArgsValues) { this.serviceMethodArgsValues = serviceMethodArgsValues; } @Override public String toString() { return "RemoteCall{" + "serviceClassName='" + serviceClassName + '\'' + ", serviceMethodName='" + serviceMethodName + '\'' + ", serviceMethodArgsClass=" + Arrays.toString(serviceMethodArgsClass) + ", serviceMethodArgsValues=" + Arrays.toString(serviceMethodArgsValues) + '}'; }}
RPC服务端:
ServerCenter.javapackage edu.cn.server.rpc;public interface ServerCenter { void start(); void close(); void register(Class service, Class serviceImpl);}
ServerCenterImpl.java
package edu.cn.server.rpc;import edu.cn.common.config.RegistryCenter;import edu.cn.common.config.RemoteCall;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ServerCenterImpl implements ServerCenter { //服务端的所有可供客户端访问的接口都注册到map中 private static MapserverRegister = new HashMap (); private static int port; private static int count = 0; public ServerCenterImpl(int port) { this.port = port; } //连接池,一个对象处理一个客户请求(连接池中大小与CPU有关) private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static boolean isRunning = false; //开启服务端服务 @Override public void start() { try { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); isRunning = true; System.out.println("RPC服务端启动,URL:"+ RegistryCenter.getServerUrl()); while (true) { System.out.println("RPC服务端 线程 "+(++count)+" 启动..."); //接收客户端请求,处理请求,并返回结果 //客户端每发送一次请求,则服务端从连接池中获取一个线程 Socket socket = serverSocket.accept();//等待客户端连接 executor.execute(new ServiceTask(socket)); } } catch (Exception e) { System.out.println("start exception." + e); } } //关闭连接 @Override public void close() { isRunning = false; executor.shutdown(); } //服务端通过register方法将接口注册到注册中心,key为关键字,value为接口的具体实现; @Override public void register(Class service, Class serviceImpl) { serverRegister.put(service.getName(), serviceImpl); } private static class ServiceTask implements Runnable { private Socket socket; public ServiceTask() { } public ServiceTask(Socket socket) { this.socket = socket; } @Override public void run() { ObjectInputStream input = null; ObjectOutputStream output = null; try { input = new ObjectInputStream(socket.getInputStream()); RemoteCall remoteCall = (RemoteCall) input.readObject(); //根据客户端请求,到map中找到与之对应的具体接口 Class serviceClass = serverRegister.get(remoteCall.getServiceClassName()); Method method = serviceClass.getMethod(remoteCall.getServiceMethodName(), remoteCall.getServiceMethodArgsClass()); Object result = method.invoke(serviceClass.newInstance(), remoteCall.getServiceMethodArgsValues()); //将方法执行完毕的返回值传给客户端 System.out.println("服务端 线程 "+(count)+"处理后的结果:" + result); output = new ObjectOutputStream(socket.getOutputStream()); output.writeObject(result); } catch (Exception e) { System.out.println("ServerCenterImpl" + e); } finally { if (output != null) { try { output.close(); } catch (Exception e) { } } if (input != null) { try { input.close(); } catch (Exception e) { } } } } }}
import edu.cn.common.config.RegistryCenter;import edu.cn.common.service.CampusCardService;import edu.cn.server.service.impl.CampusCardServiceImpl;public class CardServerRPC extends Thread { final int port;//端口号 public CardServerRPC() { this.port = RegistryCenter.getRegistryPort(); } public void run() { ServerCenter serverCenter = new ServerCenterImpl(this.port); //将接口和实现类注册到服务中心 serverCenter.register(CampusCardService.class, CampusCardServiceImpl.class); serverCenter.start(); }public static void main(String[] args) { //开启RPC服务端 CardServerRPC demo=new CardServerRPC(); demo.start(); }}
客户端
MyAspect.java RPC代理类,用来调用远程服务的package edu.cn.client.rpc;import edu.cn.common.config.RegistryCenter;import edu.cn.common.config.RemoteCall;import edu.cn.common.service.CampusCardService;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.Socket;public class MyAspect { /** * * @return 返回服务对象 */ public static Object getService(){ //本地调用远程服务 MyAspect myAspect = new MyAspect(); CampusCardService campusCardService = (CampusCardService) myAspect.rpc(CampusCardService.class); return campusCardService; } /** * 动态创建代理对象 * Object newProxyInstance(ClassLoader loader, Class [] interfaces, InvocationHandler h) * 参数1:真实对象的类加载器 * 参数2:真实对象实现的所有的接口,接口是特殊的类,使用Class[]装载多个接口 * 参数3: 动态代理内部类对象 * @param clazz * @return */ public Object rpc(final Class clazz) { return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{ clazz}, new MyHandler(clazz)); } class MyHandler implements InvocationHandler{ final Class clazz; public MyHandler(Class clazz){ this.clazz=clazz; } /** * @param proxy 代理对象 * @param method 代理的方法对象 * @param args 方法调用时参数 * @return * @throws Throwable */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket=null; try{ socket=new Socket(RegistryCenter.getServerAddress(), RegistryCenter.getRegistryPort()); }catch(Exception e){ System.out.println("服务器正在休息,请稍后再试一试吧!"); e.printStackTrace(); return null; } RemoteCall remoteCall=new RemoteCall(clazz.getName(),method.getName(),method.getParameterTypes(),args); ObjectOutputStream objectOutputStream=new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(remoteCall); ObjectInputStream objectInputStream=new ObjectInputStream(socket.getInputStream()); Object result=objectInputStream.readObject();// System.out.println("接收服务端处理后的返回值"+result); objectInputStream.close(); objectOutputStream.close(); return result; } }}
部分业务逻辑的具体实现
package edu.cn.client.rpc;import edu.cn.common.entity.LoginCard;import edu.cn.common.entity.UpdateCard;import edu.cn.common.service.CampusCardService;import java.util.Scanner;public class CardClientRPCInit extends Thread{ private CampusCardService campusCardService = (CampusCardService) MyAspect.getService(); public void run() { Scanner sc1 = new Scanner(System.in); Scanner sc2 = new Scanner(System.in); Scanner sc3 = new Scanner(System.in); LoginCard user = new LoginCard(); UpdateCard depend = new UpdateCard(); String id = null; String password = null; double spend = 0.0; while (true) { System.out.println("--------我是刷卡机!--------"); System.out.print("您此次消费(元):"); spend = sc1.nextDouble(); System.out.print("您的卡号是:"); id = sc2.nextLine(); System.out.print("您的密码是:"); password = sc3.nextLine(); user.setLoginCard(id, password); while (!campusCardService.login(user)) { System.out.println("-----卡号或密码错误!请重新输入!-----"); System.out.print("您的卡号是:"); id = sc2.nextLine(); System.out.print("您的密码是:"); password = sc3.nextLine(); user.setLoginCard(id, password); } System.out.println("密码正确!正在扣费----"); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } depend.setUpdateCard(id,password, spend); System.out.println(campusCardService.pay(depend)); } }}
客户端刷卡机的main方法:
package edu.cn.client.main;import edu.cn.client.rpc.CardClientRPCInit;public class PosReader { public static void main(String[] args) { CardClientRPCInit card=new CardClientRPCInit(); card.start(); }}
启动顺序:先启动服务端后 再启动客户端,即可实现该系统
启动成功截图服务端未启动时的结果截图:
本次笔记就到这里了。需要源码的可私信或留言。
转载地址:http://dcsv.baihongyu.com/