Nodejs 实时监控文件内容的变化及按行读取文本文件

in 编程
关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9

需求问题

在使用nodejs开发项目的过程中,有一个需求需要实时监控指定文件的变更并按行读取最新的文件内容,在nodejs中有相应地API么?具体怎么使用呢~~~

fs.watchFile-实时监控文件变化

nodejs的fs模块提供了watchFile方法,在文件内容每次发生变化时就触发相应回调函数。回调函数提供2个参数-当前的文件状态对象fs.state和文件发生变化时的文件状态fs.state,比较2者的最后修改时间就能知道文件内容是否发生过变化。unwatchfile取消对指定文件的监控

fs.read/fs.createReadStream-读取文件内容

在nodejs中读取文件内容最基本的API为fs.read,也可以通过创建一个文件流,监听data/end事件读取文件内容

Readline模块-按行读取文件内容

nodejs提供readline模块按行读取文件流数据内容的方法。通过创建一个readline的interface对象,监听readline事件就可以获取input输入流的行数据。当输入流读取完毕后将触发close事件

var readline = require('readline');
var rl = readline.createInterface({
  input: fs.createReadStream(filename,{
      enconding:'utf8'
  }),
  output: null
});

rl.on('readline',function(line){
    console.log('got line content:%s',line);
});

解决方案

文件内容变化的监控用fs.watchFile方法实现,最新内容的读取就只有fs.read方法可以使用了。
因为无论是使用readline模块还是使用fs.createReadStream文件流接口都无法从特定的位置(也就是上次文件内容的最后更新位置)开始读取数据,只能从头开始读取文件内容,这个对于-文件监控并读取最新的内容-这个需求来说是不合适也是完全没有效率的。
使用fs.read方法需要提供一种机制能够按行解析出数据并且能够触发消息通知监听者,有行数据读取完成。并且在读取到的数据长度不够获取没有检测到换行标识符(\r\n)时,保留这些数据。

最终的方案是这样的:

  1. fs.watchFile检查到文件内容发生了变化

  2. 调用fs.read方法读取指定的字节数据到到buffer缓冲对象中

  3. 将获取的字节数据和上次read行解析后遗留的内容合并成一个新的buffer对象

  4. 将buffer对象通过换行符解析出行数据

  5. 每解析出一条行数据,就触发一个line事件,通知监听者已读到新的行数据

  6. 保留行解析后遗留数据

  7. 如果本次read读取到的实际数据长度小于buffer缓冲区长度,说明已经到达文件的末尾,没有更多地数据能够读取到了。回到1 等到内容变化的通知

  8. 否则回到2,并从上次读取的最后位置开始读取

  9. 如果读取行数据内容为===END===或者行解析后的遗留内容为===END===,那么将调用fs.unwatchFile停止文件内容的监控并不再调用fs.read

代码

var fs=require('fs');
var EM = require("events").EventEmitter;
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var newlines = [
    //13, // \r
    10  // \n
];
function createLineReader(fileName) {
    if (!(this instanceof createLineReader)) return new createLineReader(fileName,monitorFlag);
    var self=this;
    var currentFileUpdateFlag=0;
    var fileOPFlag="a+";
    fs.open(fileName,fileOPFlag,function(error,fd){
        var buffer;
        var remainder = null;
           fs.watchFile(fileName,{
           persistent: true,
           interval: 1000
        },function(curr, prev){
           //console.log('the current mtime is: ' + curr.mtime);
           //console.log('the previous mtime was: ' + prev.mtime);
           if(curr.mtime>prev.mtime){
               //文件内容有变化,那么通知相应的进程可以执行相关操作。例如读物文件写入数据库等
               continueReadData();
           }else{
               //console.log('curr.mtime<=prev.mtime');
           }

           });
            
        //先读取原来文件中内容
        continueReadData();

        function continueReadData(){
            //var fileUpdateFlag=fileUpdateFlagIn;
            buffer=new Buffer(2048);
            var start = 0,i=0,tmp;
            fs.read(fd,buffer,0,buffer.length,null,function(err, bytesRead, buffer){

                var data=buffer.slice(0,bytesRead)
                if(remainder != null){//append newly received data chunk
                    //console.log("remainder length:"+remainder.length);
                    tmp = new Buffer(remainder.length+bytesRead);
                    remainder.copy(tmp);
                    //data=buffer.slice(0,bytesRead);
                    data.copy(tmp,remainder.length)
                    data = tmp;
                }
                //console.log("data length:"+data.length);
                for(i=0; i<data.length; i++){
                    if(newlines.indexOf(data[i]) >=0){ //\r \n new line
                        var line = data.slice(start,i);
                        self.emit("line", line);
                        start = i+1;
                    }
                }

                if(start<data.length){
                    remainder = data.slice(start);
                    if(remainder.toString()==='===END==='){
                        self.emit("end");
                        stopWatch();
                        return;
                    }
                }else{
                    remainder = null;
                }

                if(bytesRead<buffer.length){
                    return;
                }else{
                    //console.log('~~continue~~');
                    continueReadData();
                }
            });


        }

        function stopWatch(){
            fs.unwatchFile(fileName);
        }
    });

}

util.inherits(createLineReader, EventEmitter);

module.exports=createLineReader;
关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9
扫一扫关注公众号添加购物返利助手,领红包
Comments are closed.

推荐使用阿里云服务器

超多优惠券

服务器最低一折,一年不到100!

朕已阅去看看