标签 HDFS 下的文章

HDFS在百度的使用

HDFS在百度的使用:
建议在火狐下查看
[crocodoc width=”700″ height=”500″]

利用hdfs搭建网盘–webserver开发

利用hdfs搭建网盘–webserver开发,描述下实现思路:
1、网盘系统中的webserver是用来给用户提供操作界面,接收用户指令,完成文件上传、下载、图片上传、下载和图片预览功能的。
2、其中关于存储相关的功能都是调用hdfs API来完成,而关于文件的相关结构化信息都存储在mysql关系型数据库中;
3、webserver起到的是连接客户和hdfs的作用
4、采用的是SSH框架(Struts2、spring、hibernate)、数据库为mysql,数据模型请参考:利用hdfs搭建网盘–数据模型设计
5、web调用hdfs API的思路是:利用java运行时 运行java jar包,可参考《利用HDFS java API增删改查操作》,例如:

process = Runtime.getRuntime().exec("java -jar /root/hdfs-0.0.1-SNAPSHOT.jar read "+fileInfo.getFilePath()+" /root/file-tmp/"+fileInfo.getFileName());

文件列表页面:

@Action(value = "right", results = { @Result(name = "success", location = "/WEB-INF/npage/right.jsp") })
	public String right() {
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		if (user != null) {
			page = fileInfoServie.queryUserFileList(0, 20, user.getUserId());
		}
		return "success";
	}

文件下载:

@Action(value = "downloadFile", results = { @Result(name = "success", location = "/right", type = "redirectAction") })
	public String downloadFile(){
		// 获取用户信息
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		//查出要删除的文件信息
		String fileId = getRequest().getParameter("fileId");
		List<FileInfo> lists = fileInfoServie.findBy("fileId", Long.parseLong(fileId), PropertyFilter.MatchType.EQ);
		FileInfo fileInfo = new FileInfo();
		if(lists != null && lists.size() > 0){
			fileInfo = lists.get(0);
		}
		//路径
		String path = "/root/file-tmp/"+fileInfo.getFileName();
		// 从hdfs取得文件
		Process process;
		try {
			process = Runtime.getRuntime().exec("java -jar /root/hdfs-0.0.1-SNAPSHOT.jar read "+fileInfo.getFilePath()+" /root/file-tmp/"+fileInfo.getFileName());
			InputStreamReader ir = new InputStreamReader(process.getInputStream());
			LineNumberReader input = new LineNumberReader(ir);
			String line;
			while ((line = input.readLine()) != null)
				System.out.println(line);
		} catch (IOException e) {
			e.printStackTrace();
		}
		 try {
            // path是指欲下载的文件的路径。
            File file = new File(path);
            // 取得文件名。
            String filename = file.getName();
            // 取得文件的后缀名。
            String ext = filename.substring(filename.lastIndexOf(".") + 1).toUpperCase();
            // 以流的形式下载文件。
            InputStream fis = new BufferedInputStream(new FileInputStream(path));
            byte[] buffer = new byte[fis.available()];
            fis.read(buffer);
            fis.close();
            // 清空response
            this.getResponse().reset();
            // 设置response的Header
            this.getResponse().addHeader("Content-Disposition", "attachment;filename=" + new String(filename.getBytes()));
            this.getResponse().addHeader("Content-Length", "" + file.length());
            OutputStream toClient = new BufferedOutputStream(this.getResponse().getOutputStream());
            this.getResponse().setContentType("application/octet-stream");
            toClient.write(buffer);
            toClient.flush();
            toClient.close();
	        } catch (IOException ex) {
	            ex.printStackTrace();
	        }
		return null;
	}

删除文件:

@Action(value = "deleteFile", results = { @Result(name = "success", location = "/right", type = "redirectAction") })
	public String deleteFile(){
		// 获取用户信息
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		//查出要删除的文件信息
		String fileId = getRequest().getParameter("fileId");
		List<FileInfo> lists = fileInfoServie.findBy("fileId", Long.parseLong(fileId), PropertyFilter.MatchType.EQ);
		FileInfo fileInfo = new FileInfo();
		if(lists != null && lists.size() > 0){
			fileInfo = lists.get(0);
		}
		// 将文件从hadoop集群删除
		Process process;
		try {
			process = Runtime.getRuntime().exec("java -jar /root/hdfs-0.0.1-SNAPSHOT.jar delete "+fileInfo.getFilePath());
			InputStreamReader ir = new InputStreamReader(process.getInputStream());
			LineNumberReader input = new LineNumberReader(ir);
			String line;
			while ((line = input.readLine()) != null)
				System.out.println(line);
		} catch (IOException e) {
			e.printStackTrace();
		}
		//从数据库删除
		fileInfoServie.deleteById(fileInfo.getId());
		return "success";
	}

照片列表:

@Action(value = "goPicPage", results = { @Result(name = "success", location = "/WEB-INF/npage/pic-right.jsp") })
	public String goPicPage() {
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		if (user != null) {
			page = fileInfoServie.queryPicFileList(0, 20, user.getUserId());
		}
		return "success";
	}

上传文件或者图片:

@Action(value = "uploadFile", results = { @Result(name = "success", location = "/right", type = "redirectAction") })
	public String uploadFile() throws IOException {
		// 获取用户信息
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		boolean isPic = false;
		if (file_name.indexOf(".jpg") > 0 || file_name.indexOf(".gif") > 0) {
			isPic = true;
		}
		// 接收文件
		String path = "";
		if (null != file_name && !file_name.equals("")) {
			path = fileInfoServie.saveFile(methodFile, file_name);
		}
		String fileName = "";
		String[] strArr = path.split("/");
		if (strArr != null && strArr.length > 0) {
			fileName = strArr[strArr.length - 1];
		}
		// 将文件上传到hadoop集群
		Process process;
		try {
			process = Runtime.getRuntime().exec(
					"java -jar /root/hdfs-0.0.1-SNAPSHOT.jar upload " + path + " hdfs://hadoopm:9000/user/root/upload/"
							+ user.getUserName() + "/" + fileName);
			InputStreamReader ir = new InputStreamReader(process.getInputStream());
			LineNumberReader input = new LineNumberReader(ir);
			String line;
			while ((line = input.readLine()) != null)
				System.out.println(line);
		} catch (IOException e) {
			e.printStackTrace();
		}
		FileInfo fileInfo = new FileInfo();
		fileInfo.setCreateTime(new Date());
		fileInfo.setFileName(this.file_name);
		fileInfo.setFilePath("hdfs://hadoopm:9000/user/root/upload/" + user.getUserName() + "/" + fileName);
		File fileTemp = new File(path);
		fileInfo.setFileSize((fileTemp.length() / 1024) > 1 ? (fileTemp.length() / 1024) : 1);
		//判断是否为图片
		if (isPic) {
			fileInfo.setFileType(1);// 0 :普通文件 1:图片
			String tempPath = "/root/" + user.getUserName() + "/"+ System.currentTimeMillis() + "/" + this.file_name;
			ImageScale.resizeFix(new File(path),new File(tempPath), 250, 250);
			fileInfo.setImg(tempPath);
		} else {
			fileInfo.setFileType(0);
		}
		fileInfo.setUserId(user.getUserId());
		fileInfo.setFpos(null);
		fileInfo.setFileId(System.currentTimeMillis());
		fileInfoServie.save(fileInfo);
		return "success";
	}

获取缩略图:

@Action(value = "getsImg")
	public String getsImg() throws IOException {
		String fileId = getRequest().getParameter("fileId");
		List<FileInfo> lists = fileInfoServie.findBy("fileId", Long.parseLong(fileId), PropertyFilter.MatchType.EQ);
		FileInfo fileInfo = new FileInfo();
		if(lists != null && lists.size() > 0){
			fileInfo = lists.get(0);
		}
		FileInputStream is = new FileInputStream(fileInfo.getImg());
		int i = is.available(); // 得到文件大小
		byte data[] = new byte[i];
		is.read(data); // 读数据
		is.close();
		this.getResponse().setContentType("image/*"); // 设置返回的文件类型
		OutputStream toClient = this.getResponse().getOutputStream(); // 得到向客户端输出二进制数据的对象
		toClient.write(data); // 输出数据
		toClient.close();
		return null;
	}

获取大图:

@Action(value = "getbImg")
	public String getbImg() throws IOException{
		// 获取用户信息
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		//查出要删除的文件信息
		String fileId = getRequest().getParameter("fileId");
		List<FileInfo> lists = fileInfoServie.findBy("fileId", Long.parseLong(fileId), PropertyFilter.MatchType.EQ);
		FileInfo fileInfo = new FileInfo();
		if(lists != null && lists.size() > 0){
			fileInfo = lists.get(0);
		}
		//路径
		String path = "/root/file-tmp/bpic/"+fileInfo.getFileName();
		// 从hdfs取得文件
		Process process;
		try {
			process = Runtime.getRuntime().exec("java -jar /root/hdfs-0.0.1-SNAPSHOT.jar read "+fileInfo.getFilePath()+" /root/file-tmp/bpic/"+fileInfo.getFileName());
			InputStreamReader ir = new InputStreamReader(process.getInputStream());
			LineNumberReader input = new LineNumberReader(ir);
			String line;
			while ((line = input.readLine()) != null)
				System.out.println(line);
		} catch (IOException e) {
			e.printStackTrace();
		}
		File f = new File(path);
        if (!f.exists()) {
            this.getResponse().sendError(404, "File not found!");
            return null;
        }
        BufferedInputStream br = new BufferedInputStream(new FileInputStream(f));
        byte[] buf = new byte[1024];
        int len = 0;
        this.getResponse().reset(); // 非常重要
        // 在线打开方式
        URL u = new URL("file:///" + path);
        this.getResponse().setContentType(u.openConnection().getContentType());
        this.getResponse().setHeader("Content-Disposition", "inline; filename=" + f.getName());
        // 文件名应该编码成UTF-8
        OutputStream out = this.getResponse().getOutputStream();
        while ((len = br.read(buf)) > 0)
            out.write(buf, 0, len);
        br.close();
        out.close();
		return null;
	}

完整Action类如下:

package org.nbc.storage.file.action;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.OutputStream;
import java.net.URL;
import java.util.Date;
import java.util.List;
import javax.annotation.Resource;
import javax.servlet.http.HttpSession;
import org.apache.struts2.convention.annotation.Action;
import org.apache.struts2.convention.annotation.Result;
import org.nbc.storage.common.action.BaseAction;
import org.nbc.storage.common.util.ImageScale;
import org.nbc.storage.file.model.FileInfo;
import org.nbc.storage.file.service.FileInfoServie;
import org.nbc.storage.user.model.User;
import com.sitech.core.orm.PropertyFilter;
import com.sitech.core.utils.Page;
public class FileAction extends BaseAction {
	private static final long serialVersionUID = 1L;
	private Page page;
	private String file_name;
	private File methodFile;
	/** 下载需要的 */
	private InputStream inputStream;
	private String downloadFileName;
	@Resource
	public FileInfoServie fileInfoServie;
	@Action(value = "files", results = { @Result(name = "success", location = "/WEB-INF/npage/main.jsp") })
	public String userFileList() {
		return "success";
	}
	@Action(value = "top", results = { @Result(name = "success", location = "/WEB-INF/npage/top.jsp") })
	public String top() {
		return "success";
	}
	@Action(value = "left", results = { @Result(name = "success", location = "/WEB-INF/npage/left.jsp") })
	public String left() {
		return "success";
	}
	@Action(value = "right_top", results = { @Result(name = "success", location = "/WEB-INF/npage/right_top.jsp") })
	public String right_top() {
		return "success";
	}
	@Action(value = "right", results = { @Result(name = "success", location = "/WEB-INF/npage/right.jsp") })
	public String right() {
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		if (user != null) {
			page = fileInfoServie.queryUserFileList(0, 20, user.getUserId());
		}
		return "success";
	}
	/**
	 *
	 * 下载文件.
	 *
	 * @return
	 */
	@Action(value = "downloadFile", results = { @Result(name = "success", location = "/right", type = "redirectAction") })
	public String downloadFile(){
		// 获取用户信息
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		//查出要删除的文件信息
		String fileId = getRequest().getParameter("fileId");
		List lists = fileInfoServie.findBy("fileId", Long.parseLong(fileId), PropertyFilter.MatchType.EQ);
		FileInfo fileInfo = new FileInfo();
		if(lists != null && lists.size() > 0){
			fileInfo = lists.get(0);
		}
		//路径
		String path = "/root/file-tmp/"+fileInfo.getFileName();
		// 从hdfs取得文件
		Process process;
		try {
			process = Runtime.getRuntime().exec("java -jar /root/hdfs-0.0.1-SNAPSHOT.jar read "+fileInfo.getFilePath()+" /root/file-tmp/"+fileInfo.getFileName());
			InputStreamReader ir = new InputStreamReader(process.getInputStream());
			LineNumberReader input = new LineNumberReader(ir);
			String line;
			while ((line = input.readLine()) != null)
				System.out.println(line);
		} catch (IOException e) {
			e.printStackTrace();
		}
		 try {
            // path是指欲下载的文件的路径。
            File file = new File(path);
            // 取得文件名。
            String filename = file.getName();
            // 取得文件的后缀名。
            String ext = filename.substring(filename.lastIndexOf(".") + 1).toUpperCase();
            // 以流的形式下载文件。
            InputStream fis = new BufferedInputStream(new FileInputStream(path));
            byte[] buffer = new byte[fis.available()];
            fis.read(buffer);
            fis.close();
            // 清空response
            this.getResponse().reset();
            // 设置response的Header
            this.getResponse().addHeader("Content-Disposition", "attachment;filename=" + new String(filename.getBytes()));
            this.getResponse().addHeader("Content-Length", "" + file.length());
            OutputStream toClient = new BufferedOutputStream(this.getResponse().getOutputStream());
            this.getResponse().setContentType("application/octet-stream");
            toClient.write(buffer);
            toClient.flush();
            toClient.close();
	        } catch (IOException ex) {
	            ex.printStackTrace();
	        }
		return null;
	}
	/**
	 *
	 * 删除文件.
	 *
	 * @return
	 */
	@Action(value = "deleteFile", results = { @Result(name = "success", location = "/right", type = "redirectAction") })
	public String deleteFile(){
		// 获取用户信息
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		//查出要删除的文件信息
		String fileId = getRequest().getParameter("fileId");
		List lists = fileInfoServie.findBy("fileId", Long.parseLong(fileId), PropertyFilter.MatchType.EQ);
		FileInfo fileInfo = new FileInfo();
		if(lists != null && lists.size() > 0){
			fileInfo = lists.get(0);
		}
		// 将文件从hadoop集群删除
		Process process;
		try {
			process = Runtime.getRuntime().exec("java -jar /root/hdfs-0.0.1-SNAPSHOT.jar delete "+fileInfo.getFilePath());
			InputStreamReader ir = new InputStreamReader(process.getInputStream());
			LineNumberReader input = new LineNumberReader(ir);
			String line;
			while ((line = input.readLine()) != null)
				System.out.println(line);
		} catch (IOException e) {
			e.printStackTrace();
		}
		//从数据库删除
		fileInfoServie.deleteById(fileInfo.getId());
		return "success";
	}
	@Action(value = "goUploadPage", results = { @Result(name = "success", location = "/WEB-INF/npage/upload-right.jsp") })
	public String goUploadPage() {
		return "success";
	}
	@Action(value = "goPicPage", results = { @Result(name = "success", location = "/WEB-INF/npage/pic-right.jsp") })
	public String goPicPage() {
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		if (user != null) {
			page = fileInfoServie.queryPicFileList(0, 20, user.getUserId());
		}
		return "success";
	}
	/**
	 *
	 * 上传文件或图片.
	 *
	 * @return
	 * @throws IOException
	 */
	@Action(value = "uploadFile", results = { @Result(name = "success", location = "/right", type = "redirectAction") })
	public String uploadFile() throws IOException {
		// 获取用户信息
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		boolean isPic = false;
		if (file_name.indexOf(".jpg") > 0 || file_name.indexOf(".gif") > 0) {
			isPic = true;
		}
		// 接收文件
		String path = "";
		if (null != file_name && !file_name.equals("")) {
			path = fileInfoServie.saveFile(methodFile, file_name);
		}
		String fileName = "";
		String[] strArr = path.split("/");
		if (strArr != null && strArr.length > 0) {
			fileName = strArr[strArr.length - 1];
		}
		// 将文件上传到hadoop集群
		Process process;
		try {
			process = Runtime.getRuntime().exec(
					"java -jar /root/hdfs-0.0.1-SNAPSHOT.jar upload " + path + " hdfs://hadoopm:9000/user/root/upload/"
							+ user.getUserName() + "/" + fileName);
			InputStreamReader ir = new InputStreamReader(process.getInputStream());
			LineNumberReader input = new LineNumberReader(ir);
			String line;
			while ((line = input.readLine()) != null)
				System.out.println(line);
		} catch (IOException e) {
			e.printStackTrace();
		}
		FileInfo fileInfo = new FileInfo();
		fileInfo.setCreateTime(new Date());
		fileInfo.setFileName(this.file_name);
		fileInfo.setFilePath("hdfs://hadoopm:9000/user/root/upload/" + user.getUserName() + "/" + fileName);
		File fileTemp = new File(path);
		fileInfo.setFileSize((fileTemp.length() / 1024) > 1 ? (fileTemp.length() / 1024) : 1);
		//判断是否为图片
		if (isPic) {
			fileInfo.setFileType(1);// 0 :普通文件 1:图片
			String tempPath = "/root/" + user.getUserName() + "/"+ System.currentTimeMillis() + "/" + this.file_name;
			ImageScale.resizeFix(new File(path),new File(tempPath), 250, 250);
			fileInfo.setImg(tempPath);
		} else {
			fileInfo.setFileType(0);
		}
		fileInfo.setUserId(user.getUserId());
		fileInfo.setFpos(null);
		fileInfo.setFileId(System.currentTimeMillis());
		fileInfoServie.save(fileInfo);
		return "success";
	}
	/**
	 *
	 * 获取缩略图.
	 *
	 * @return
	 * @throws IOException
	 */
	@Action(value = "getsImg")
	public String getsImg() throws IOException {
		String fileId = getRequest().getParameter("fileId");
		List lists = fileInfoServie.findBy("fileId", Long.parseLong(fileId), PropertyFilter.MatchType.EQ);
		FileInfo fileInfo = new FileInfo();
		if(lists != null && lists.size() > 0){
			fileInfo = lists.get(0);
		}
		FileInputStream is = new FileInputStream(fileInfo.getImg());
		int i = is.available(); // 得到文件大小
		byte data[] = new byte[i];
		is.read(data); // 读数据
		is.close();
		this.getResponse().setContentType("image/*"); // 设置返回的文件类型
		OutputStream toClient = this.getResponse().getOutputStream(); // 得到向客户端输出二进制数据的对象
		toClient.write(data); // 输出数据
		toClient.close();
		return null;
	}
	/**
	 *
	 * 获取大图.
	 *
	 * @return
	 * @throws IOException
	 */
	@Action(value = "getbImg")
	public String getbImg() throws IOException{
		// 获取用户信息
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		//查出要删除的文件信息
		String fileId = getRequest().getParameter("fileId");
		List lists = fileInfoServie.findBy("fileId", Long.parseLong(fileId), PropertyFilter.MatchType.EQ);
		FileInfo fileInfo = new FileInfo();
		if(lists != null && lists.size() > 0){
			fileInfo = lists.get(0);
		}
		//路径
		String path = "/root/file-tmp/bpic/"+fileInfo.getFileName();
		// 从hdfs取得文件
		Process process;
		try {
			process = Runtime.getRuntime().exec("java -jar /root/hdfs-0.0.1-SNAPSHOT.jar read "+fileInfo.getFilePath()+" /root/file-tmp/bpic/"+fileInfo.getFileName());
			InputStreamReader ir = new InputStreamReader(process.getInputStream());
			LineNumberReader input = new LineNumberReader(ir);
			String line;
			while ((line = input.readLine()) != null)
				System.out.println(line);
		} catch (IOException e) {
			e.printStackTrace();
		}
		File f = new File(path);
        if (!f.exists()) {
            this.getResponse().sendError(404, "File not found!");
            return null;
        }
        BufferedInputStream br = new BufferedInputStream(new FileInputStream(f));
        byte[] buf = new byte[1024];
        int len = 0;
        this.getResponse().reset(); // 非常重要
        // 在线打开方式
        URL u = new URL("file:///" + path);
        this.getResponse().setContentType(u.openConnection().getContentType());
        this.getResponse().setHeader("Content-Disposition", "inline; filename=" + f.getName());
        // 文件名应该编码成UTF-8
        OutputStream out = this.getResponse().getOutputStream();
        while ((len = br.read(buf)) > 0)
            out.write(buf, 0, len);
        br.close();
        out.close();
		return null;
	}
	public FileInfoServie getFileInfoServie() {
		return fileInfoServie;
	}
	public void setFileInfoServie(FileInfoServie fileInfoServie) {
		this.fileInfoServie = fileInfoServie;
	}
	public Page getPage() {
		return page;
	}
	public void setPage(Page page) {
		this.page = page;
	}
	public String getFile_name() {
		return file_name;
	}
	public void setFile_name(String file_name) {
		this.file_name = file_name;
	}
	public File getMethodFile() {
		return methodFile;
	}
	public void setMethodFile(File methodFile) {
		this.methodFile = methodFile;
	}
	public String getDownloadFileName() {
		return downloadFileName;
	}
	public void setDownloadFileName(String downloadFileName) {
		this.downloadFileName = downloadFileName;
	}
	public void setInputStream(InputStream inputStream) {
		this.inputStream = inputStream;
	}
	@Action(value = "filelist", results = { @Result(name = "success", location = "/WEB-INF/npage/filelist.jsp") })
	public String filelist() {
//		HttpSession session = this.getRequest().getSession();
//		User user = (User) session.getAttribute("user");
//		if (user != null) {
			page = fileInfoServie.queryUserFileList(0, 20, 1L);
//		}
		return "success";
	}
	@Action(value = "goUploadPageCC", results = { @Result(name = "success", location = "/WEB-INF/npage/goUploadPageCC.jsp") })
	public String goUploadPageCC() {
		return "success";
	}
	@Action(value = "uploadFileCC", results = { @Result(name = "success", location = "/filelist", type = "redirectAction") })
	public String uploadFileCC() throws IOException {
		// 获取用户信息
		HttpSession session = this.getRequest().getSession();
		User user = (User) session.getAttribute("user");
		boolean isPic = false;
		if (file_name.indexOf(".jpg") > 0 || file_name.indexOf(".gif") > 0) {
			isPic = true;
		}
		// 接收文件
		String path = "";
		if (null != file_name && !file_name.equals("")) {
			path = fileInfoServie.saveFile(methodFile, file_name);
		}
		String fileName = "";
		String[] strArr = path.split("/");
		if (strArr != null && strArr.length > 0) {
			fileName = strArr[strArr.length - 1];
		}
		// 将文件上传到hadoop集群
		Process process;
		try {
			process = Runtime.getRuntime().exec(
					"java -jar /root/hdfs-0.0.1-SNAPSHOT.jar upload " + path + " hdfs://hadoopm:9000/user/root/upload/"
							+ "lisn" + "/" + fileName);
			InputStreamReader ir = new InputStreamReader(process.getInputStream());
			LineNumberReader input = new LineNumberReader(ir);
			String line;
			while ((line = input.readLine()) != null)
				System.out.println(line);
		} catch (IOException e) {
			e.printStackTrace();
		}
		FileInfo fileInfo = new FileInfo();
		fileInfo.setCreateTime(new Date());
		fileInfo.setFileName(this.file_name);
		fileInfo.setFilePath("hdfs://hadoopm:9000/user/root/upload/" +"lisn" + "/" + fileName);
		File fileTemp = new File(path);
		fileInfo.setFileSize((fileTemp.length() / 1024) > 1 ? (fileTemp.length() / 1024) : 1);
		//判断是否为图片
		if (isPic) {
			fileInfo.setFileType(1);// 0 :普通文件 1:图片
			String tempPath = "/root/" + "lisn" + "/"+ System.currentTimeMillis() + "/" + this.file_name;
			ImageScale.resizeFix(new File(path),new File(tempPath), 250, 250);
			fileInfo.setImg(tempPath);
		} else {
			fileInfo.setFileType(0);
		}
		fileInfo.setUserId(1L);
		fileInfo.setFpos(null);
		fileInfo.setFileId(System.currentTimeMillis());
		fileInfoServie.save(fileInfo);
		return "success";
	}
}

完整services:

package org.nbc.storage.file.service.impl;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import javax.annotation.Resource;
import org.nbc.storage.file.model.FileInfo;
import org.nbc.storage.file.service.FileInfoServie;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.sitech.core.orm.BaseDAO;
import com.sitech.core.service.impl.BaseServiceImpl;
import com.sitech.core.utils.Page;
@Service("fileInfoServie")
@Transactional
public class FileInfoServieImpl extends BaseServiceImpl<FileInfo, Long> implements FileInfoServie {
	private static int BUFFER_SIZE = 16 * 1024;
	@Resource(name = "fileInfoDAO")
	public void setBaseDAO(BaseDAO<FileInfo, Long> baseDAO) {
		super.setBaseDAO(baseDAO);
	}
	@Override
	public Page<FileInfo> queryUserFileList(int pageNo,
			int pageSize, Long userId) {
		String hql = " from FileInfo where userId = ? order by createTime desc ";
		Page<FileInfo> page = new Page<FileInfo>(pageSize);
		page.setPageNo(pageNo);
		return this.getBaseDAO().findPage(page, hql, userId);
	}
	public long copyFile(File src, File dest) throws Exception {
		BufferedInputStream in = null;
		BufferedOutputStream out = null;
		byte[] buffer = new byte[BUFFER_SIZE];
		long total = 0;
		try {
			in = new BufferedInputStream(new FileInputStream(src), BUFFER_SIZE);
			out = new BufferedOutputStream(new FileOutputStream(dest),
					BUFFER_SIZE);
			int curSize = in.read(buffer);
			while (curSize > 0) {
				total += curSize;
				out.write(buffer, 0, curSize);
				curSize = in.read(buffer);
			}
		} catch (Exception e) {
		} finally {
			try {
				if (in != null) {
					in.close();
					in = null;
				}
				if (out != null) {
					out.close();
					out = null;
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		return total;
	}
	public String saveFile(File methodFile,String file_name){
		String savePath = "/root/lisn/";
		String middlePath = "";
		String separator = "/";
		String [] strArr = file_name.split("\.");
		if(strArr != null && strArr.length > 0){
			file_name = strArr[strArr.length-1];
		}
		//创建filename
		file_name = System.currentTimeMillis() + "."+file_name;
		File f = new File(savePath);
		if (!f.exists()) {
			f.mkdirs();
		}
		File file = new File(savePath+file_name);
		try {
			file.createNewFile();
			long total = copyFile(methodFile, file);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return savePath+file_name;
	}
	@Override
	public Page<FileInfo> queryPicFileList(int pageNo, int pageSize, Long userId) {
		String hql = " from FileInfo where userId = ? and fileType = 1 order by createTime desc ";
		Page<FileInfo> page = new Page<FileInfo>(pageSize);
		page.setPageNo(pageNo);
		return this.getBaseDAO().findPage(page, hql, userId);
	}
}

展示图片预览页面:

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<%
String path = request.getContextPath();
String basePath = request.getScheme()+"://"+request.getServerName()+":"+request.getServerPort()+path+"/";
%>
<%@ taglib prefix="s" uri="/struts-tags" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>无标题文档</title>
<style type="text/css">
<!--
.STYLE1 {font-size: 12px}
.STYLE3 {color: #707070; font-size: 12px; }
.STYLE5 {color: #0a6e0c; font-size: 12px; }
body {
	margin-top: 0px;
	margin-bottom: 0px;
}
.STYLE7 {font-size: 12}
-->
</style>
</head>
<body>
<table width="100%" border="0" cellspacing="0" cellpadding="0">
  <tr>
    <td height="30"><table width="100%" border="0" cellspacing="0" cellpadding="0">
      <tr>
        <td>&nbsp;</td>
        <td style="padding-right:10px;"><div align="right">
          <table border="0" align="right" cellpadding="0" cellspacing="0">
            <tr>
              <td width="60"><table width="87%" border="0" cellpadding="0" cellspacing="0">
                  <tr>
                   <!--  <td class="STYLE1"><div align="center">
                        <input type="checkbox" name="checkbox62" value="checkbox" />
                    </div></td>-->
                    <td class="STYLE1" onclick="window.location.href='<%=request.getContextPath() %>/wangpan-web/goPicPage'"><div align="center" style="cursor:pointer;">图片列表</div></td>
                  </tr>
              </table></td>
              <td width="60"><table width="90%" border="0" cellpadding="0" cellspacing="0">
                  <tr>
                  <!--   <td class="STYLE1"><div align="center"><img src="<%=request.getContextPath() %>/images/001.gif" width="14" height="14" /></div></td> -->
                    <td class="STYLE1" onclick="window.location.href='<%=request.getContextPath() %>/wangpan-web/goUploadPage'"><div align="center" style="cursor:pointer;">上传文件</div></td>
                  </tr>
              </table></td>
              <td width="60"><table width="90%" border="0" cellpadding="0" cellspacing="0">
                  <tr>
                   <!--  <td class="STYLE1"><div align="center"><img src="<%=request.getContextPath() %>/images/114.gif" width="14" height="14" /></div></td> -->
                    <td class="STYLE1" onclick="window.location.href='<%=request.getContextPath() %>/wangpan-web/right'"><div align="center" style="cursor:pointer;">文件列表</div></td>
                  </tr>
              </table></td>
              <!--
              <td width="52"><table width="88%" border="0" cellpadding="0" cellspacing="0">
                  <tr>
                    <td class="STYLE1"><div align="center"><img src="<%=request.getContextPath() %>/images/083.gif" width="14" height="14" /></div></td>
                    <td class="STYLE1"><div align="center">删除</div></td>
                  </tr>
              </table></td>
               -->
            </tr>
          </table>
        </div></td>
      </tr>
    </table></td>
  </tr>
  <tr>
    <td><table width="100%" border="0" cellpadding="0" cellspacing="1" bgcolor="#c9c9c9">
      <tr>
        <td height="22" bgcolor="#FFFFFF" width="50%"><div align="center"><strong><span class="STYLE1">图片预览</span></strong></div></td>
      </tr>
      <s:iterator value="page.result" var="file">
      <tr>
        <td height="22" bgcolor="#FFFFFF" width="50%"><div align="center"><span class="STYLE3" style="cursor:pointer;"><img onclick="window.open('http://192.168.75.142:8080/wangpan-web/getbImg?fileId=<s:property value="fileId" />');" title="点击查看大图" alt="<s:property value="fileName" />" src="http://192.168.75.142:8080/wangpan-web/getsImg?fileId=<s:property value="fileId" />" >  </span></div></td>
      </tr>
      </s:iterator>
    </table></td>
  </tr>
  <tr>
    <td height="35"><table width="100%" border="0" cellspacing="0" cellpadding="0">
      <tr>
        <td width="25%" height="29" nowrap="nowrap"><table width="342" border="0" cellspacing="0" cellpadding="0">
          <tr>
            <td width="44%" class="STYLE1">当前页:1/2页 每页
              <input name="textfield2" type="text" class="STYLE1" style="height:14px; width:25px;" value="15" size="5" />            </td>
            <td width="14%" class="STYLE1"><img src="<%=request.getContextPath() %>/images/sz.gif" width="43" height="20" /></td>
            <td width="42%" class="STYLE1"><span class="STYLE7">数据总量 15 </span></td>
          </tr>
        </table></td>
        <td width="75%" valign="top" class="STYLE1"><div align="right">
            <table width="352" height="20" border="0" cellpadding="0" cellspacing="0">
              <tr>
                <td width="62" height="22" valign="middle"><div align="right"><img src="<%=request.getContextPath() %>/images/page_first_1.gif" width="48" height="20" /></div></td>
                <td width="50" height="22" valign="middle"><div align="right"><img src="<%=request.getContextPath() %>/images/page_back_1.gif" width="55" height="20" /></div></td>
                <td width="54" height="22" valign="middle"><div align="right"><img src="<%=request.getContextPath() %>/images/page_next.gif" width="58" height="20" /></div></td>
                <td width="49" height="22" valign="middle"><div align="right"><img src="<%=request.getContextPath() %>/images/page_last.gif" width="52" height="20" /></div></td>
                <td width="59" height="22" valign="middle"><div align="right">转到第</div></td>
                <td width="25" height="22" valign="middle"><span class="STYLE7">
                  <input name="textfield" type="text" class="STYLE1" style="height:10px; width:25px;" size="5" />
                </span></td>
                <td width="23" height="22" valign="middle">页</td>
                <td width="30" height="22" valign="middle"><img src="<%=request.getContextPath() %>/images/go.gif" width="26" height="20" /></td>
              </tr>
            </table>
        </div></td>
      </tr>
    </table></td>
  </tr>
</table>
</body>
</html>

文件列表页:

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<%
String path = request.getContextPath();
String basePath = request.getScheme()+"://"+request.getServerName()+":"+request.getServerPort()+path+"/";
%>
<%@ taglib prefix="s" uri="/struts-tags" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>无标题文档</title>
<style type="text/css">
<!--
.STYLE1 {font-size: 12px}
.STYLE3 {color: #707070; font-size: 12px; }
.STYLE5 {color: #0a6e0c; font-size: 12px; }
body {
	margin-top: 0px;
	margin-bottom: 0px;
}
.STYLE7 {font-size: 12}
-->
</style>
</head>
<body>
<table width="100%" border="0" cellspacing="0" cellpadding="0">
  <tr>
    <td height="30"><table width="100%" border="0" cellspacing="0" cellpadding="0">
      <tr>
        <td>&nbsp;</td>
        <td style="padding-right:10px;"><div align="right">
          <table border="0" align="right" cellpadding="0" cellspacing="0">
            <tr>
              <td width="60"><table width="87%" border="0" cellpadding="0" cellspacing="0">
                  <tr>
                   <!--  <td class="STYLE1"><div align="center">
                        <input type="checkbox" name="checkbox62" value="checkbox" />
                    </div></td>-->
                    <td class="STYLE1" onclick="window.location.href='<%=request.getContextPath() %>/wangpan-web/goPicPage'"><div align="center" style="cursor:pointer;">图片列表</div></td>
                  </tr>
              </table></td>
              <td width="60"><table width="90%" border="0" cellpadding="0" cellspacing="0">
                  <tr>
                  <!--   <td class="STYLE1"><div align="center"><img src="<%=request.getContextPath() %>/images/001.gif" width="14" height="14" /></div></td> -->
                    <td class="STYLE1" onclick="window.location.href='<%=request.getContextPath() %>/wangpan-web/goUploadPage'"><div align="center" style="cursor:pointer;">上传文件</div></td>
                  </tr>
              </table></td>
              <td width="60"><table width="90%" border="0" cellpadding="0" cellspacing="0">
                  <tr>
                   <!--  <td class="STYLE1"><div align="center"><img src="<%=request.getContextPath() %>/images/114.gif" width="14" height="14" /></div></td> -->
                    <td class="STYLE1" onclick="window.location.href='<%=request.getContextPath() %>/wangpan-web/right'"><div align="center" style="cursor:pointer;">文件列表</div></td>
                  </tr>
              </table></td>
              <!--
              <td width="52"><table width="88%" border="0" cellpadding="0" cellspacing="0">
                  <tr>
                    <td class="STYLE1"><div align="center"><img src="<%=request.getContextPath() %>/images/083.gif" width="14" height="14" /></div></td>
                    <td class="STYLE1"><div align="center">删除</div></td>
                  </tr>
              </table></td>
               -->
            </tr>
          </table>
        </div></td>
      </tr>
    </table></td>
  </tr>
  <tr>
    <td><table width="100%" border="0" cellpadding="0" cellspacing="1" bgcolor="#c9c9c9">
      <tr>
        <td height="22" bgcolor="#FFFFFF" width="50%"><div align="center"><strong><span class="STYLE1">文件名</span></strong></div></td>
        <td height="22" bgcolor="#FFFFFF"><div align="center"><strong><span class="STYLE1">大小</span></strong></div></td>
        <td height="22" bgcolor="#FFFFFF"><div align="center"><strong><span class="STYLE1">上传日期</span></strong></div></td>
        <td height="22" bgcolor="#FFFFFF"><div align="center"><strong><span class="STYLE1">操作</span></strong></div></td>
      </tr>
      <s:iterator value="page.result" var="file">
      <tr>
        <td height="22" bgcolor="#FFFFFF" width="50%"><div align="center"><span class="STYLE3"><s:property value="fileName" /></span></div></td>
        <td height="22" bgcolor="#FFFFFF"><div align="center"><span class="STYLE3"><s:property value="fileSize" />&nbsp;KB</span></div></td>
        <td height="22" bgcolor="#FFFFFF"><div align="center"><span class="STYLE3"><s:date name="createTime" format="yyyy-MM-dd HH:mm" /></span></div></td>
        <td height="22" bgcolor="#FFFFFF">
        	<div align="center">
        		<span style="cursor:pointer;" class="STYLE3" onclick="window.location.href='<%=request.getContextPath() %>/wangpan-web/deleteFile?fileId=<s:property value="fileId" />'">删除</span>
        		<span style="cursor:pointer;" class="STYLE3" onclick="window.open('<%=request.getContextPath() %>/wangpan-web/downloadFile?fileId=<s:property value="fileId" />');">下载</span>
        		</div>
        </td>
      </tr>
      </s:iterator>
    </table></td>
  </tr>
  <tr>
    <td height="35"><table width="100%" border="0" cellspacing="0" cellpadding="0">
      <tr>
        <td width="25%" height="29" nowrap="nowrap"><table width="342" border="0" cellspacing="0" cellpadding="0">
          <tr>
            <td width="44%" class="STYLE1">当前页:1/2页 每页
              <input name="textfield2" type="text" class="STYLE1" style="height:14px; width:25px;" value="15" size="5" />            </td>
            <td width="14%" class="STYLE1"><img src="<%=request.getContextPath() %>/images/sz.gif" width="43" height="20" /></td>
            <td width="42%" class="STYLE1"><span class="STYLE7">数据总量 15 </span></td>
          </tr>
        </table></td>
        <td width="75%" valign="top" class="STYLE1"><div align="right">
            <table width="352" height="20" border="0" cellpadding="0" cellspacing="0">
              <tr>
                <td width="62" height="22" valign="middle"><div align="right"><img src="<%=request.getContextPath() %>/images/page_first_1.gif" width="48" height="20" /></div></td>
                <td width="50" height="22" valign="middle"><div align="right"><img src="<%=request.getContextPath() %>/images/page_back_1.gif" width="55" height="20" /></div></td>
                <td width="54" height="22" valign="middle"><div align="right"><img src="<%=request.getContextPath() %>/images/page_next.gif" width="58" height="20" /></div></td>
                <td width="49" height="22" valign="middle"><div align="right"><img src="<%=request.getContextPath() %>/images/page_last.gif" width="52" height="20" /></div></td>
                <td width="59" height="22" valign="middle"><div align="right">转到第</div></td>
                <td width="25" height="22" valign="middle"><span class="STYLE7">
                  <input name="textfield" type="text" class="STYLE1" style="height:10px; width:25px;" size="5" />
                </span></td>
                <td width="23" height="22" valign="middle">页</td>
                <td width="30" height="22" valign="middle"><img src="<%=request.getContextPath() %>/images/go.gif" width="26" height="20" /></td>
              </tr>
            </table>
        </div></td>
      </tr>
    </table></td>
  </tr>
</table>
</body>
</html>

利用hdfs搭建网盘–数据模型设计

先阐述下利用hdfs搭建网盘的思路:
(1)、首先要搭建hadoop集群,确保该集群正常运行
(2)、通过API访问文件到存储在hdfs中的文件,能对文件进行增删改查
(3)、文件的其他结构化信息,,比如:文件名称,上传时间,所属用户、文件类型等信息,需要存储在数据库里,我们使用mysql
(4)、用户需要通过操作界面来访问网盘系统,而不是直接操作hdfs,这里采用java、struts2框架来实现web端开发
(5)、有用户系统,存储用户相关信息,另外hdfs中文件存放的路径也和用户有直接关系
网盘系统的截图:http://pan.baidu.com/share/link?shareid=3253971941&uk=772112791
第一点已经在《Hadoop集群搭建详细简明教程》里详细写明步骤了,再次就不再提了
第二点已经在《利用HDFS java API增删改查操作》里详细阐述了
这篇文章先阐述下第三、第五点
网盘数据模型的设计,以及mysql在linux下的安装
网盘数据模型的设计:

file_info

file_info


user

user


tables

tables


目前只用到这file_ifno,user这两个表
mysql在linux:
我是在《Hadoop集群搭建详细简明教程》中搭建好的hadoopm主机上,通过yum安装的mysql,并设置mysql密码

yum install mysql
yum install mysql-server
yum install mysql-devel
chgrp -R mysql /var/lib/mysql
chmod -R 770 /var/lib/mysql
service mysqld start
mysql
SET PASSWORD FOR 'root'@'localhost' = PASSWORD('secret_password');

PS:设置密码的时候可能会报错:ERROR 1372 (HY000): Password hash should be a 41-digit hexadecimal number
可通过这个命令:select password(‘root’);得知root的 41-digit是什么
设置mysql开机启动:

chkconfig --levels 345 mysqld on

相关参考资料:
1、linux下安装mysql
2、设置用户
http://www.hackbase.com/tech/2011-09-09/65234.html
yum install mysql
http://www.cnblogs.com/xiaochaohuashengmi/archive/2011/10/16/2214272.html
3、远程连接
http://www.cnblogs.com/smallstone/archive/2010/04/29/1723838.html
4、创建数据库–奇怪的问题:必须得使用特别奇怪的引号,否则不生效
http://tdcq.iteye.com/blog/363955
http://blog.sina.com.cn/s/blog_5dc960cd0100ea2h.html
5、在数据库上创建表
http://www.51cto.com/html/2005/1129/12524.htm
6、mysql中文乱码
http://www.2cto.com/database/201108/101151.html
7、修改表结构
http://database.51cto.com/art/201005/201148.htm
8、2007-07-20 10:59 在mysql中查询一个数据库中的所有表
http://hi.baidu.com/aigyoo/item/d56a8c01dcb50c10cd34eacf
9、mysql查看表结构命令
http://www.blogjava.net/etlan/archive/2007/07/12/129794.html

利用HDFS java API增删改查操作

利用HDFS java API增删改查操作
在做这个实验的时候需要特别注意下面三个问题:
1、hdfs安全模式需要关闭 命令:./hadoop dfsadmin -safemode leave
2、工程中依赖的版本必须和集群的一致,否则也会报 version不一致错误
3、hadoop集群用户权限的问题,以及各个目录的作用
目前为什么会有这三个问题的原因待查!!!
未验证目前使用hadoop的版本(release-0.20.0)是否支持webhdfs,反正我是怎么都连接不上啊!!!
从这上面看,0.20.0 可能是不支持的
https://jira.springsource.org/browse/IMPALA-15?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
Serengeti Distro:
Apache Hadoop:1.0.1
GreenPlum HD:1.1(Apache Hadoop 1.0.0)
CloudEra: CDH3(Apache Hadoop 0.20.2, WebHDFS is not supported in this version)
Hortonworks: 1.0.7 (Apache Hadoop 1.0.2)
步骤如下:
工程结构,如图:

工程结构

工程结构


上代码 O(∩_∩)O哈哈~
pom.xml配置如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.yun.hdfs</groupId>
	<artifactId>hdfs</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>
	<build>
		<plugins>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<configuration>
					<appendAssemblyId>false</appendAssemblyId>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<archive>
						<manifest>
							<mainClass>com.yun.hdfs.WangPan</mainClass>
						</manifest>
					</archive>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>assembly</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
	<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-core</artifactId>
			<version>0.20.2</version>
			<type>jar</type>
			<scope>compile</scope>
		</dependency>
	</dependencies>
</project>

WangPan.java 主方法用于调用:

package com.yun.hdfs;
import java.io.IOException;
public class WangPan {
	/*** 运行结果描述. */
	private static String result = "";
	public static void main(String[] args) {
		try {
			// 判断命令输入是否正确
			if (args[0] != null && !"".equals(args[0]) && args.length > 0) {
				if ("upload".equals(args[0])) {
					result = "upload:" + WangPanUtils.uploadFile(args);
				} else if ("delete".equals(args[0])) {
					result = "delete:" + WangPanUtils.deleteFile(args);
				} else if ("query".equals(args[0])) {
					if (WangPanUtils.listFile(args) == null) {
						result = "query:fail!";
					} else {
						result = "query:success";
					}
				} else if ("read".equals(args[0])) {
					result = "read:" + WangPanUtils.readFile(args);
				} else {
					System.out.println("sorry,wo have no this service!");
				}
				System.out.println(result);
			} else {
				System.out.println("fail!");
				System.exit(1);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

WangPanUtils.java增删改查:

package com.yun.hdfs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class WangPanUtils {
	/**
	 * 上传文件. java -jar /root/hdfs-0.0.1-SNAPSHOT.jar upload /root/test-hdfs.txt
	 * hdfs://hadoopm:9000/user/root/upload/12390po.txt
	 *
	 * @param args
	 * @return
	 * @throws IOException
	 */
	public static String uploadFile(String[] args) throws IOException {
		String loaclSrc = args[1];
		String dst = args[2];
		if (args.length < 3) {
			return "fail";
		}
		InputStream in = new BufferedInputStream(new FileInputStream(loaclSrc));
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dst), conf);
		OutputStream out = fs.create(new Path(dst));
		IOUtils.copyBytes(in, out, 4096, true);
		return "success";
	}
	/**
	 * 查询文件列表. java -jar /root/hdfs-0.0.1-SNAPSHOT.jar query
	 * hdfs://hadoopm:9000/user/root/
	 *
	 * @param args
	 * @return
	 * @throws IOException
	 */
	public static Path[] listFile(String[] args) throws IOException {
		if (args.length < 2) {
			return null;
		}
		String dst = args[1];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dst), conf);
		FileStatus[] statu = fs.listStatus(new Path(dst));
		Path[] listPaths = FileUtil.stat2Paths(statu);
		return listPaths;
	}
	/**
	 * 删除文件.
	 * java -jar /root/hdfs-0.0.1-SNAPSHOT.jar delete hdfs://hadoopm:9000/user/root/upload/12390po.txt
	 *
	 * @param args
	 * @return
	 * @throws IOException
	 */
	public static String deleteFile(String[] args) throws IOException {
		if (args.length < 2) {
			return "fail";
		}
		String fileName = args[1];
		Configuration config = new Configuration();
		FileSystem hdfs = FileSystem.get(URI.create(fileName), config);
		Path path = new Path(fileName);
		if (!hdfs.exists(path)) {
			return "fail";
		}
		boolean isDeleted = hdfs.delete(path, false);
		if (isDeleted) {
			return "success";
		} else {
			return "fail";
		}
	}
	/**
	 * 读取文件.
	 * java -jar /root/hdfs-0.0.1-SNAPSHOT.jar read hdfs://hadoopm:9000/user/root/upload/123.txt /root/test-readfile898.txt
	 *
	 * @param args
	 * @return
	 * @throws IOException
	 */
	public static String readFile(String[] args) throws IOException {
		if(args.length < 3){
			return "fail";
		}
		String dst = args[1];
		String newPath = args[2];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dst), conf);
		FSDataInputStream hdfsInStream = fs.open(new Path(dst));
		OutputStream out = new FileOutputStream(newPath);
		byte[] ioBuffer = new byte[1024];
		int readLen = hdfsInStream.read(ioBuffer);
		while (-1 != readLen) {
			out.write(ioBuffer, 0, readLen);
			readLen = hdfsInStream.read(ioBuffer);
		}
		out.close();
		hdfsInStream.close();
		fs.close();
		return "success";
	}
}
/**
	 * 创建文件夹.
	 * java -jar /root/hdfs-0.0.1-SNAPSHOT.jar mkdir hdfs://hadoopm:9000/user/root/upload/test909
	 *
	 * @return
	 * @throws IOException
	 */
	public static String mkdir(String[] args) throws IOException{
		if(args.length < 2){
			return "fali";
		}
		String dst = args[1];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dst), conf);
		Path path = new Path(dst);
		if (fs.exists(path)) {
			return "fail";
		}
		fs.mkdirs(path);
		return "success";
	}

PS:需要注意的是,我们需要把这个工程利用maven打包成一个可运行的jar包,使用如下命令:

打包命令

打包命令


执行命令在每个方法注释上写明了,执行效果如下:
增删改查效果

增删改查效果


还需要访问 http://hadoopm:50070/ -> Browse the filesystem 查看hdfs文件操作是否真的成功
web hdfs

web hdfs

HDFS:Hadoop分布式文件系统的架构和设计

Hadoop分布式文件系统:架构和设计要点
原文:http://hadoop.apache.org/core/docs/current/hdfs_design.html
一、前提和设计目标
1、硬件错误是常态,而非异常情况,HDFS可能是有成百上千的server组成,任何一个组件都有可能一直失效,因此错误检测和快速、自动的恢复是HDFS的核心架构目标。
2、跑在HDFS上的应用与一般的应用不同,它们主要是以流式读为主,做批量处理;比之关注数据访问的低延迟问题,更关键的在于数据访问的高吞吐量。
3、HDFS以支持大数据集合为目标,一个存储在上面的典型文件大小一般都在千兆至T字节,一个单一HDFS实例应该能支撑数以千万计的文件。
4、 HDFS应用对文件要求的是write-one-read-many访问模型。一个文件经过创建、写,关闭之后就不需要改变。这一假设简化了数据一致性问 题,使高吞吐量的数据访问成为可能。典型的如MapReduce框架,或者一个web crawler应用都很适合这个模型。
5、移动计算的代价比之移动数据的代价低。一个应用请求的计算,离它操作的数据越近就越高效,这在数据达到海量级别的时候更是如此。将计算移动到数据附近,比之将数据移动到应用所在显然更好,HDFS提供给应用这样的接口。
6、在异构的软硬件平台间的可移植性。
二、Namenode和Datanode
HDFS采用master/slave架构。一个HDFS集群是有一个Namenode和一定数目的Datanode组成。Namenode是一个中心服 务器,负责管理文件系统的namespace和客户端对文件的访问。Datanode在集群中一般是一个节点一个,负责管理节点上它们附带的存储。在内 部,一个文件其实分成一个或多个block,这些block存储在Datanode集合里。Namenode执行文件系统的namespace操作,例如 打开、关闭、重命名文件和目录,同时决定block到具体Datanode节点的映射。Datanode在Namenode的指挥下进行block的创 建、删除和复制。Namenode和Datanode都是设计成可以跑在普通的廉价的运行linux的机器上。HDFS采用java语言开发,因此可以部 署在很大范围的机器上。一个典型的部署场景是一台机器跑一个单独的Namenode节点,集群中的其他机器各跑一个Datanode实例。这个架构并不排 除一台机器上跑多个Datanode,不过这比较少见。

hdfs架构

单一节点的Namenode大大简化了系统的架构。Namenode负责保管和管理所有的HDFS元数据,因而用户数据就不需要通过Namenode(也就是说文件数据的读写是直接在Datanode上)。
三、文件系统的namespace
HDFS支持传统的层次型文件组织,与大多数其他文件系统类似,用户可以创建目录,并在其间创建、删除、移动和重命名文件。HDFS不支持user quotas和访问权限,也不支持链接(link),不过当前的架构并不排除实现这些特性。Namenode维护文件系统的namespace,任何对文 件系统namespace和文件属性的修改都将被Namenode记录下来。应用可以设置HDFS保存的文件的副本数目,文件副本的数目称为文件的 replication因子,这个信息也是由Namenode保存。
四、数据复制
HDFS被设计成在一个大集群中可以跨机器地可靠地存储海量的文件。它将每个文件存储成block序列,除了最后一个block,所有的block都是同 样的大小。文件的所有block为了容错都会被复制。每个文件的block大小和replication因子都是可配置的。Replication因子可 以在文件创建的时候配置,以后也可以改变。HDFS中的文件是write-one,并且严格要求在任何时候只有一个writer。Namenode全权管 理block的复制,它周期性地从集群中的每个Datanode接收心跳包和一个Blockreport。心跳包的接收表示该Datanode节点正常工 作,而Blockreport包括了该Datanode上所有的block组成的列表。

hdfs datanodes

1、副本的存放,副本的存放是HDFS可靠性和性能的关键。HDFS采用一种称为rack-aware的策略来改进数据的可靠性、有效性和网络带宽的利 用。这个策略实现的短期目标是验证在生产环境下的表现,观察它的行为,构建测试和研究的基础,以便实现更先进的策略。庞大的HDFS实例一般运行在多个机 架的计算机形成的集群上,不同机架间的两台机器的通讯需要通过交换机,显然通常情况下,同一个机架内的两个节点间的带宽会比不同机架间的两台机器的带宽 大。
通过一个称为Rack Awareness的过程,Namenode决定了每个Datanode所属的rack id。一个简单但没有优化的策略就是将副本存放在单独的机架上。这样可以防止整个机架(非副本存放)失效的情况,并且允许读数据的时候可以从多个机架读 取。这个简单策略设置可以将副本分布在集群中,有利于组件失败情况下的负载均衡。但是,这个简单策略加大了写的代价,因为一个写操作需要传输block到 多个机架。
在大多数情况下,replication因子是3,HDFS的存放策略是将一个副本存放在本地机架上的节点,一个副本放在同一机架上的另一个节点,最后一 个副本放在不同机架上的一个节点。机架的错误远远比节点的错误少,这个策略不会影响到数据的可靠性和有效性。三分之一的副本在一个节点上,三分之二在一个 机架上,其他保存在剩下的机架中,这一策略改进了写的性能。
2、副本的选择,为了降低整体的带宽消耗和读延时,HDFS会尽量让reader读最近的副本。如果在reader的同一个机架上有一个副本,那么就读该副本。如果一个HDFS集群跨越多个数据中心,那么reader也将首先尝试读本地数据中心的副本。
3、SafeMode
Namenode启动后会进入一个称为SafeMode的特殊状态,处在这个状态的Namenode是不会进行数据块的复制的。Namenode从所有的 Datanode接收心跳包和Blockreport。Blockreport包括了某个Datanode所有的数据块列表。每个block都有指定的最 小数目的副本。当Namenode检测确认某个Datanode的数据块副本的最小数目,那么该Datanode就会被认为是安全的;如果一定百分比(这 个参数可配置)的数据块检测确认是安全的,那么Namenode将退出SafeMode状态,接下来它会确定还有哪些数据块的副本没有达到指定数目,并将 这些block复制到其他Datanode。
五、文件系统元数据的持久化
Namenode存储HDFS的元数据。对于任何对文件元数据产生修改的操作,Namenode都使用一个称为Editlog的事务日志记录下来。例如, 在HDFS中创建一个文件,Namenode就会在Editlog中插入一条记录来表示;同样,修改文件的replication因子也将往 Editlog插入一条记录。Namenode在本地OS的文件系统中存储这个Editlog。整个文件系统的namespace,包括block到文件 的映射、文件的属性,都存储在称为FsImage的文件中,这个文件也是放在Namenode所在系统的文件系统上。
Namenode在内存中保存着整个文件系统namespace和文件Blockmap的映像。这个关键的元数据设计得很紧凑,因而一个带有4G内存的 Namenode足够支撑海量的文件和目录。当Namenode启动时,它从硬盘中读取Editlog和FsImage,将所有Editlog中的事务作 用(apply)在内存中的FsImage ,并将这个新版本的FsImage从内存中flush到硬盘上,然后再truncate这个旧的Editlog,因为这个旧的Editlog的事务都已经 作用在FsImage上了。这个过程称为checkpoint。在当前实现中,checkpoint只发生在Namenode启动时,在不久的将来我们将 实现支持周期性的checkpoint。
Datanode并不知道关于文件的任何东西,除了将文件中的数据保存在本地的文件系统上。它把每个HDFS数据块存储在本地文件系统上隔离的文件中。 Datanode并不在同一个目录创建所有的文件,相反,它用启发式地方法来确定每个目录的最佳文件数目,并且在适当的时候创建子目录。在同一个目录创建 所有的文件不是最优的选择,因为本地文件系统可能无法高效地在单一目录中支持大量的文件。当一个Datanode启动时,它扫描本地文件系统,对这些本地 文件产生相应的一个所有HDFS数据块的列表,然后发送报告到Namenode,这个报告就是Blockreport。
六、通讯协议
所有的HDFS通讯协议都是构建在TCP/IP协议上。客户端通过一个可配置的端口连接到Namenode,通过ClientProtocol与 Namenode交互。而Datanode是使用DatanodeProtocol与Namenode交互。从ClientProtocol和 Datanodeprotocol抽象出一个远程调用(RPC),在设计上,Namenode不会主动发起RPC,而是是响应来自客户端和 Datanode 的RPC请求。
七、健壮性
HDFS的主要目标就是实现在失败情况下的数据存储可靠性。常见的三种失败:Namenode failures, Datanode failures和网络分割(network partitions)。
1、硬盘数据错误、心跳检测和重新复制
每个Datanode节点都向Namenode周期性地发送心跳包。网络切割可能导致一部分Datanode跟Namenode失去联系。 Namenode通过心跳包的缺失检测到这一情况,并将这些Datanode标记为dead,不会将新的IO请求发给它们。寄存在dead Datanode上的任何数据将不再有效。Datanode的死亡可能引起一些block的副本数目低于指定值,Namenode不断地跟踪需要复制的 block,在任何需要的情况下启动复制。在下列情况可能需要重新复制:某个Datanode节点失效,某个副本遭到损坏,Datanode上的硬盘错 误,或者文件的replication因子增大。
2、集群均衡
HDFS支持数据的均衡计划,如果某个Datanode节点上的空闲空间低于特定的临界点,那么就会启动一个计划自动地将数据从一个Datanode搬移 到空闲的Datanode。当对某个文件的请求突然增加,那么也可能启动一个计划创建该文件新的副本,并分布到集群中以满足应用的要求。这些均衡计划目前 还没有实现。
3、数据完整性
从某个Datanode获取的数据块有可能是损坏的,这个损坏可能是由于Datanode的存储设备错误、网络错误或者软件bug造成的。HDFS客户端 软件实现了HDFS文件内容的校验和。当某个客户端创建一个新的HDFS文件,会计算这个文件每个block的校验和,并作为一个单独的隐藏文件保存这些 校验和在同一个HDFS namespace下。当客户端检索文件内容,它会确认从Datanode获取的数据跟相应的校验和文件中的校验和是否匹配,如果不匹配,客户端可以选择 从其他Datanode获取该block的副本。
4、元数据磁盘错误
FsImage和Editlog是HDFS的核心数据结构。这些文件如果损坏了,整个HDFS实例都将失效。因而,Namenode可以配置成支持维护多 个FsImage和Editlog的拷贝。任何对FsImage或者Editlog的修改,都将同步到它们的副本上。这个同步操作可能会降低 Namenode每秒能支持处理的namespace事务。这个代价是可以接受的,因为HDFS是数据密集的,而非元数据密集。当Namenode重启的 时候,它总是选取最近的一致的FsImage和Editlog使用。
Namenode在HDFS是单点存在,如果Namenode所在的机器错误,手工的干预是必须的。目前,在另一台机器上重启因故障而停止服务的Namenode这个功能还没实现。
5、快照
快照支持某个时间的数据拷贝,当HDFS数据损坏的时候,可以恢复到过去一个已知正确的时间点。HDFS目前还不支持快照功能。
八、数据组织
1、数据块
兼容HDFS的应用都是处理大数据集合的。这些应用都是写数据一次,读却是一次到多次,并且读的速度要满足流式读。HDFS支持文件的write- once-read-many语义。一个典型的block大小是64MB,因而,文件总是按照64M切分成chunk,每个chunk存储于不同的 Datanode
2、步骤
某个客户端创建文件的请求其实并没有立即发给Namenode,事实上,HDFS客户端会将文件数据缓存到本地的一个临时文件。应用的写被透明地重定向到 这个临时文件。当这个临时文件累积的数据超过一个block的大小(默认64M),客户端才会联系Namenode。Namenode将文件名插入文件系 统的层次结构中,并且分配一个数据块给它,然后返回Datanode的标识符和目标数据块给客户端。客户端将本地临时文件flush到指定的 Datanode上。当文件关闭时,在临时文件中剩余的没有flush的数据也会传输到指定的Datanode,然后客户端告诉Namenode文件已经 关闭。此时Namenode才将文件创建操作提交到持久存储。如果Namenode在文件关闭前挂了,该文件将丢失。
上述方法是对通过对HDFS上运行的目标应用认真考虑的结果。如果不采用客户端缓存,由于网络速度和网络堵塞会对吞估量造成比较大的影响。
3、流水线复制
当某个客户端向HDFS文件写数据的时候,一开始是写入本地临时文件,假设该文件的replication因子设置为3,那么客户端会从Namenode 获取一张Datanode列表来存放副本。然后客户端开始向第一个Datanode传输数据,第一个Datanode一小部分一小部分(4kb)地接收数 据,将每个部分写入本地仓库,并且同时传输该部分到第二个Datanode节点。第二个Datanode也是这样,边收边传,一小部分一小部分地收,存储 在本地仓库,同时传给第三个Datanode,第三个Datanode就仅仅是接收并存储了。这就是流水线式的复制。
九、可访问性
HDFS给应用提供了多种访问方式,可以通过DFSShell通过命令行与HDFS数据进行交互,可以通过java API调用,也可以通过C语言的封装API访问,并且提供了浏览器访问的方式。正在开发通过WebDav协议访问的方式。具体使用参考文档。
十、空间的回收
1、文件的删除和恢复
用户或者应用删除某个文件,这个文件并没有立刻从HDFS中删除。相反,HDFS将这个文件重命名,并转移到/trash目录。当文件还在/trash目 录时,该文件可以被迅速地恢复。文件在/trash中保存的时间是可配置的,当超过这个时间,Namenode就会将该文件从namespace中删除。 文件的删除,也将释放关联该文件的数据块。注意到,在文件被用户删除和HDFS空闲空间的增加之间会有一个等待时间延迟。
当被删除的文件还保留在/trash目录中的时候,如果用户想恢复这个文件,可以检索浏览/trash目录并检索该文件。/trash目录仅仅保存被删除 文件的最近一次拷贝。/trash目录与其他文件目录没有什么不同,除了一点:HDFS在该目录上应用了一个特殊的策略来自动删除文件,目前的默认策略是 删除保留超过6小时的文件,这个策略以后会定义成可配置的接口。
2、Replication因子的减小
当某个文件的replication因子减小,Namenode会选择要删除的过剩的副本。下次心跳检测就将该信息传递给Datanode, Datanode就会移除相应的block并释放空间,同样,在调用setReplication方法和集群中的空闲空间增加之间会有一个时间延迟。
参考资料:
HDFS Java API: http://hadoop.apache.org/core/docs/current/api/
HDFS source code: http://hadoop.apache.org/core/version_control.html

Hadoop

hadoop

hadoop


一个分布式系统基础架构,由Apache基金会开发。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力高速运算和存储。Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有着高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上。而且它提供高传输率(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求(requirements)这样可以流的形式访问(streaming access)文件系统中的数据。
Hadoop 是一个能够对大量数据进行分布式处理的软件框架。但是 Hadoop 是以一种可靠、高效、可伸缩的方式进行处理的。Hadoop 是可靠的,因为它假设计算元素和存储会失败,因此它维护多个工作数据副本,确保能够针对失败的节点重新分布处理。Hadoop 是高效的,因为它以并行的方式工作,通过并行处理加快处理速度。Hadoop 还是可伸缩的,能够处理 PB 级数据。此外,Hadoop 依赖于社区服务器,因此它的成本比较低,任何人都可以使用。
Hadoop带有用 Java 语言编写的框架,因此运行在 Linux 生产平台上是非常理想的。Hadoop 上的应用程序也可以使用其他语言编写,比如 C++。
Hadoop并不仅仅是一个用于存储的分布式文件系统,而是设计用来在由通用计算设备组成的大型集群上执行分布式应用的框架。