Spring+redis实现session集群

2018-04-14 20:51:28
999次阅读
0个评论
本文主要是在Spring中实现session集群,采用redis对session进行持久化管理,这样当应用部署的时候,不需要在Resin、Tomcat等容器里面进行分布式配置,方便加入新的节点服务器进行集群扩容,session不依赖各节点的服务器,可直接从redis获取。下面是功能的核心代码:

一、首先在web.xml里面配置

加入拦截器:


<filter>
        <filter-name>distributedSessionFilter</filter-name>
        <filter-class>DistributedSessionFilter</filter-class>
        <init-param>
            <!-- 必填,密钥.2种方式,1对应为bean,格式为bean:key。2字符串,格式如:afffrfgv-->
            <param-name>key</param-name>
            <param-value>xxxxxxxx</param-value>
        </init-param>
        <init-param>
            <!-- 必填,redis对应的bean,格式为bean:xx-->
            <param-name>cacheBean</param-name>
            <param-value>bean:redisPersistent</param-value>//DistributedBaseInterFace,对应于此接口,进行session的持久化操作
        </init-param>
        <init-param>
            <!-- 必填, -->
            <param-name>cookieName</param-name>
            <param-value>TESTSESSIONID</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>distributedSessionFilter</filter-name>
        <url-pattern>*.do</url-pattern>
    </filter-mapping>

二、拦截器的实现,核心代码如下

主要有以下的几个类: 
DistributedSessionFilter, 
DistributedSessionManager, 
DistributedHttpSessionWrapper, 
DistributedHttpServletRequestWrapper

1、DistributedSessionFilter实现Filter:


import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

public class DistributedSessionFilter implements Filter {
    private static final Logger log = LoggerFactory.getLogger(DistributedSessionFilter.class);

    private String cookieName;

    //主要是对session进行管理的操作
    private DistributedSessionManager distributedSessionManager;

    private String key;
}

容器启动时候的初始化方法:


@Override
    public void init(FilterConfig config) throws ServletException {
        WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(config
                .getServletContext());
        String key = config.getInitParameter("key");
        String cookieName = config.getInitParameter("cookieName");
        String cacheBean = config.getInitParameter("cacheBean");
        // 获取bean的名称,配置是"bean:"
        String redisBeanStr = cacheBean.substring(5);
        DistributedBaseInterFace distributedCache = (DistributedBaseInterFace) wac.getBean(redisBeanStr);

        // 获取key,有2种配置方式,1对应为bean,格式为bean:key。2字符串
        if (key.startsWith("bean:")) {
            this.key = (String) wac.getBean(key.substring(5));
        } else {
            this.key = key;
        }
        this.cookieName = cookieName;
        this.distributedSessionManager = DistributedSessionManager.getInstance(distributedCache);

        //异常处理省略。。。
    }

进行实际的请求拦截:


@Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
            throws ServletException, IOException {
        DistributedHttpServletRequestWrapper distReq = null;
        try {
            //请求处理
            distReq = createDistributedRequest(servletRequest, servletResponse);
            filterChain.doFilter(distReq, servletResponse);
        } catch (Throwable e) {
            //省略。。。
        } finally {
            if (distReq != null) {
                try {
                    //处理完成request后,处理session(主要是保存session会话)
                    dealSessionAfterRequest(distReq.getSession());
                } catch (Throwable e2) {
                    //省略。。。
                }
            }
        }
    }

    //分布式请求
    private DistributedHttpServletRequestWrapper createDistributedRequest(ServletRequest servletRequest,
            ServletResponse servletResponse) throws IOException, ServletException {
        HttpServletRequest request = (HttpServletRequest) servletRequest;
        HttpServletResponse response = (HttpServletResponse) servletResponse;
        String userSid = CookieUtil.getCookie(cookieName, request);
        String actualSid = distributedSessionManager.getActualSid(userSid, request, key);
        if (StringUtil.isBlank(actualSid)) {
            if (StringUtil.isNotBlank(userSid)) {
                log.info("userSid[{}]验证不通过", userSid);
            }
            // 写cookie
            String[] userSidArr = distributedSessionManager.createUserSid(request, key);
            userSid = userSidArr[0];
            CookieUtil.setCookie(cookieName, userSid, request, response);
            actualSid = userSidArr[1];
        }
        actualSid = "sid:" + actualSid;
        DistributedHttpSessionWrapper distSession = null;
        try {
            Map<String, Object> allAttribute = distributedSessionManager.getSession(actualSid, request.getSession()
                    .getMaxInactiveInterval());
            distSession = new DistributedHttpSessionWrapper(actualSid, request.getSession(), allAttribute);
        } catch (Throwable e) {
            // 出错,删掉缓存数据
            log.error(e.getMessage(), e);
            Map<String, Object> allAttribute = new HashMap<String, Object>();
            distSession = new DistributedHttpSessionWrapper(actualSid, request.getSession(), allAttribute);
            distributedSessionManager.removeSession(distSession);
        }
        DistributedHttpServletRequestWrapper requestWrapper = new DistributedHttpServletRequestWrapper(request,
                distSession);
        return requestWrapper;

    }

    // request处理完时操作session
    private void dealSessionAfterRequest(DistributedHttpSessionWrapper session) {
        if (session == null) {
            return;
        }
        if (session.changed) {
            distributedSessionManager.saveSession(session);
        } else if (session.invalidated) {
            distributedSessionManager.removeSession(session);
        } else {
            distributedSessionManager.expire(session);
        }
    }

2、DistributedSessionManager,主要处理分布式session,核心代码:


class DistributedSessionManager {
    protected static final Logger log = LoggerFactory.getLogger(DistributedSessionManager.class);

    private static DistributedSessionManager instance = null;

    //redis处理session的接口,自己根据情况实现
    private DistributedBaseInterFace distributedBaseInterFace;

    private static byte[] lock = new byte[1];

    private DistributedSessionManager(DistributedBaseInterFace distributedBaseInterFace) {
        this.distributedBaseInterFace = distributedBaseInterFace;
    }

    public static DistributedSessionManager getInstance(DistributedBaseInterFace redis) {
        if (instance == null) {
            synchronized (lock) {
                if (instance == null) {
                    instance = new DistributedSessionManager(redis);
                }
            }
        }
        return instance;
    }

    //获取session
    public Map<String, Object> getSession(String sid,int second) {
        String json = this.distributedBaseInterFace.get(sid,second);
        if (StringUtil.isNotBlank(json)) {
            return JsonUtil.unserializeMap(json);
        }
        return new HashMap<String, Object>(1);
    }

    //保存session
    public void saveSession(DistributedHttpSessionWrapper session) {
        Map<String, Object> map=session.allAttribute;
        if(MapUtil.isEmpty(map)){
            return;
        }
        String json = JsonUtil.serializeMap(map);
        this.distributedBaseInterFace.set(session.getId(), json, session.getMaxInactiveInterval());
    }

    //删除session
    public void removeSession(DistributedHttpSessionWrapper session) {
        distributedBaseInterFace.del(session.getId());
    }

    public void expire(DistributedHttpSessionWrapper session) {
        distributedBaseInterFace.expire(session.getId(), session.getMaxInactiveInterval());
    }

    /**
     * 创建cookie的sid
     */
    public String[] createUserSid(HttpServletRequest request, String key) {
        //...
    }

    public String getActualSid(String userSid, HttpServletRequest request, String key) {
        //...
    }
}

3、DistributedHttpSessionWrapper 实现了 HttpSession,进行分布式session包装,核心代码:


public class DistributedHttpSessionWrapper implements HttpSession {

    private HttpSession orgiSession;

    private String sid;

    boolean changed = false;

    boolean invalidated = false;

    Map<String, Object> allAttribute;

    public DistributedHttpSessionWrapper(String sid, HttpSession session, Map<String, Object> allAttribute) {
        this.orgiSession = session;
        this.sid = sid;
        this.allAttribute = allAttribute;
    }

    @Override
    public String getId() {
        return this.sid;
    }

    @Override
    public void setAttribute(String name, Object value) {
        changed = true;
        allAttribute.put(name, value);
    }

    @Override
    public Object getAttribute(String name) {
        return allAttribute.get(name);
    }

    @Override
    public Enumeration<String> getAttributeNames() {
        Set<String> set = allAttribute.keySet();
        Iterator<String> iterator = set.iterator();
        return new MyEnumeration<String>(iterator);
    }

    private class MyEnumeration<T> implements Enumeration<T> {
        Iterator<T> iterator;

        public MyEnumeration(Iterator<T> iterator) {
            super();
            this.iterator = iterator;
        }

        @Override
        public boolean hasMoreElements() {
            return iterator.hasNext();
        }

        @Override
        public T nextElement() {
            return iterator.next();
        }

    }

    @Override
    public void invalidate() {
        this.invalidated = true;
    }

    @Override
    public void removeAttribute(String name) {
        changed = true;
        allAttribute.remove(name);
    }

    @Override
    public long getCreationTime() {
        return orgiSession.getCreationTime();
    }

    @Override
    public long getLastAccessedTime() {
        return orgiSession.getLastAccessedTime();
    }

    @Override
    public int getMaxInactiveInterval() {
        return orgiSession.getMaxInactiveInterval();
    }

    @Override
    public ServletContext getServletContext() {
        return orgiSession.getServletContext();
    }

    @Override
    public Object getValue(String arg0) {
        return orgiSession.getValue(arg0);
    }

    @Override
    public String[] getValueNames() {
        return orgiSession.getValueNames();
    }

    @Override
    public boolean isNew() {
        return orgiSession.isNew();
    }

    @Override
    public void putValue(String arg0, Object arg1) {
        orgiSession.putValue(arg0, arg1);
    }

    @Override
    public void removeValue(String arg0) {
        orgiSession.removeValue(arg0);
    }

    @Override
    public void setMaxInactiveInterval(int arg0) {
        orgiSession.setMaxInactiveInterval(arg0);
    }

    @Override
    public HttpSessionContext getSessionContext() {
        return orgiSession.getSessionContext();
    }


4、DistributedHttpServletRequestWrapper 实现了 HttpServletRequestWrapper,包装处理过的session和原始request,核心代码:



public class DistributedHttpServletRequestWrapper extends javax.servlet.http.HttpServletRequestWrapper {
    private HttpServletRequest orgiRequest;
    private DistributedHttpSessionWrapper session;

    public DistributedHttpServletRequestWrapper(HttpServletRequest request, DistributedHttpSessionWrapper session) {
        super(request);
        if (session == null){
            //异常处理。。
        }
        if (request == null){
            //异常处理。。
        }
        this.orgiRequest = request;
        this.session = session;
    }

    public DistributedHttpSessionWrapper getSession(boolean create) {
        orgiRequest.getSession(create);
        return session;
    }

    public DistributedHttpSessionWrapper getSession() {
        return session;
    }

}

5、另外,定义DistributedBaseInterFace接口,用来处理session入redis进行持久化操作:


public interface DistributedBaseInterFace {

    /**
     * 根据key获取缓存数据
     * @param key   
     * @param seconds   
     */
    public String get(String key,int seconds);

    /**
     * 更新缓存数据
     * @param key    
     * @param json
     * @param seconds
     */
    public void set(String key, String json,int  seconds);

    /**
     * 删除缓存
     * @param key
     */
    public void del(String key);

    /**
     * 设置过期数据
     * @param key
     * @param seconds
     */
    public void expire(String key,int  seconds);
收藏00

登录 后评论。没有帐号? 注册 一个。