[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #1

[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #2

[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #3

[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #4


경축축 이제 진짜 드디어 마지막 글이다!!


이제 Command Type 을 추가하는 게 끝났다.


하지만 거기까지 해서는


Authentication 을 위한 Packet 흐름


1. Connect Packet (Device -> Server)

2. Authenticate Packet (Server -> Device)

3. Authenticate Packet (Device -> Server)

4. Connect ACK Packet (Server -> Device)

에서의 네번째 단계가 아마 안 될 거다...





#Connect ACK Packet 받기


세번째 글에서 stream.pipe 란 애를 요주의라고 했었다.


그니까!! 

3단계까지 무사히 마쳤으니

Server 에서 authenticate packet 을 받아서 처리할 수 있는 상태가 되었다.


그래서 처리를 해주고, authentication 이 완료되면 Connect ACK Packet 을 device 에 보내줘야한다.

그래야 비로소 connection 이 완료되니까!


근데!!!!!! 지금 상태에선 Device 가 그놈의 Connect ACK Packet 을 못 받는다!!!!!


대체 왜인지 알기 위해 진짜 디버깅을 엄청 했다

Wire shark 도 켜서 막 패킷 캡쳐해보고... 별짓을 다했는데

확실해진 건 Server -> Device 로 가는 Connect ACK Packet 이 확실히 보내졌다는 거다.


그럼 문제가 뭘까?

Device 쪽 코드에서 받아서 뭔가 처리를 하는데 생긴 문제일텐데,

stream 에서 데이터를 아예 못받나? 싶어서

client 코드 들어가서 

this.stream.on('data', function(data) {
  console.log(data);
});


해봤더니 넘나 잘 나옴!!

심지어 Wire shark 에서 캡쳐한 Packet 이랑 byte array 도 일치한다 ㅠㅠ


그럼 뭐가 문제일까...

하다보니 문득 위의 this.stream 이 readable stream 이란 것과,

실제로 받은 packet 의 처리는 stream.pipe 에 destination 으로 설정됐던 writable stream 에서 한다는 생각이 스쳐갔다.



그럼 왜인지는 몰라도 저 pipe 가 한번만 동작하고 있는 건 아닐까?

싶어서 코드를 수정해봤다

/*
writable._write = function (buf, enc, done) {
  completeParse = done;
  parser.parse(buf);
  process();
};

this.stream.pipe(writable);
*/

this.stream.on('data', function(buf) {
  completeParse = function(err) {
    if (err) {
      console.log(err);
      return;
    }
  };
  parser.parse(buf);
  process();
});


원래 있던 pipe 관련 코드들을 다 주석처리하고

저렇게 직접 readable stream 에서 읽은 data buffer 를 처리하도록 해줬더니.............

세상에 넘나 잘 된다


대체 왜 pipe 가 한번만 동작하고 막힌 건지는 모르겠다.

어딘가에서 막힌 것 같은데 그것까지 찾아볼 생각도 없고...

기나긴 삽질에 지쳐 그럴 자신도 없다 ㅠㅠ




그래도 이제 authentication 을 위한 모든 준비가 끝났다!

암호화만 하면 된다. ㅠㅠㅠㅠㅠㅠㅠㅠㅠㅠㅠㅠ



나말고 읽을 사람이 있을지 의문이지만

혹시 이 대장정을 다 읽으셨다면


수고하셨습니다!!

[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #1

[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #2

[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #3



흐흐흐 드디어 네번째 글이다

정말 정리하기 넘 힘들군..


그래도 이번엔 드디어 3단계로 넘어간다!!

경축경축!!!



#MQTT Packet Command 추가하기



Authentication 을 위한 Packet 흐름


1. Connect Packet (Device -> Server)

2. Authenticate Packet (Server -> Device)

3. Authenticate Packet (Device -> Server)

4. Connect ACK Packet (Server -> Device)


크으으 이제 무려 3단계다


2단계에서 워낙 삽질을 많이 해서

3단계는 좀 수월하게 할 수 있다.



일단 2단계에서의 접근과 동일하게,

이번엔 Device 에서 Packet 을 generate 하는 부분을 봐야 되니까!

Device 쪽 mqtt 라이브러리를 봐야한다.



#5. mqtt (send)


내가 원하는 건

publish 할 때 처럼!!

그냥 client.authenticate(~) 해서 authenticate 요청을 날려버리는 거다!


그러니 publish function 이 어떻게 정의되어 있는지부터 한번 보자.


(mqtt/lib/client.js)

/**
 * publish - publish <message> to <topic>
 *
 * @param {String} topic - topic to publish to
 * @param {String, Buffer} message - message to publish
 * @param {Object} [opts] - publish options, includes:
 *    {Number} qos - qos level to publish on
 *    {Boolean} retain - whether or not to retain the message
 * @param {Function} [callback] - function(err){}
 *    called when publish succeeds or fails
 * @returns {MqttClient} this - for chaining
 * @api public
 *
 * @example client.publish('topic', 'message');
 * @example
 *     client.publish('topic', 'message', {qos: 1, retain: true});
 * @example client.publish('topic', 'message', console.log);
 */
MqttClient.prototype.publish = function (topic, message, opts, callback) {
  var packet;

  // .publish(topic, payload, cb);
  if ('function' === typeof opts) {
    callback = opts;
    opts = null;
  }

  // Default opts
  if (!opts) {
    opts = {qos: 0, retain: false};
  }

  if (this._checkDisconnecting(callback)) {
    return this;
  }

  packet = {
    cmd: 'publish',
    topic: topic,
    payload: message,
    qos: opts.qos,
    retain: opts.retain,
    messageId: this._nextId()
  };

  switch (opts.qos) {
    case 1:
    case 2:

      // Add to callbacks
      this.outgoing[packet.messageId] = callback || nop;
      this._sendPacket(packet);
      break;
    default:
      this._sendPacket(packet, callback);
      break;
  }

  return this;
};


안녕? publish function 아!

나는 너를 복붙해다가, 겁나 고쳐댈거야!


말 그대로다.

그냥 얘 복붙해서 맘에 들도록 짜깁기하면 된다.

2편에서 generate 할 때 그랬던 것처럼!



짜깁기 하여 완성된 코드▼

MqttClient.prototype.authenticate = function (clientId, username, secretKey, callback) {
  var packet;

  if (this._checkDisconnecting(callback)) {
    return this;
  }

  packet = {
    cmd: 'authenticate',
    keepalive: 1,
    qos: 0,
    retain: false,
    clientId: clientId,
    username: username,
    password: secretKey
  };

  this._sendPacket(packet, callback);

  return this;
};

비교적 매우 심플하다.

왜냐면 별 처리를 안 했거든..


근데 여기서는 packet에 들어갈 정보들만 명시해준 다음,

_sendPacket 이란 function 으로 넘겨준다.


그럼 저 function 이 뭘 하는 앤지 정도는 살펴봐야겠지?

저기서 막 또 cmd case 별로 나눠서 뭔 짓 해주면 어떡해..


보러 가자!

/**
 * _sendPacket - send or queue a packet
 * @param {String} type - packet type (see `protocol`)
 * @param {Object} packet - packet options
 * @param {Function} cb - callback when the packet is sent
 * @api private
 */
MqttClient.prototype._sendPacket = function (packet, cb) {
  if (!this.connected) {
    if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
      this.queue.push({ packet: packet, cb: cb })
    } else if (packet.qos > 0) {
      this.outgoingStore.put(packet, function (err) {
        if (err) {
          return cb && cb(err)
        }
      })
    } else if (cb) {
      cb(new Error('No connection to broker'))
    }

    return
  }

  // When sending a packet, reschedule the ping timer
  this._shiftPingInterval()

  if (packet.cmd !== 'publish') {
    sendPacket(this, packet, cb)
    return
  }

  switch (packet.qos) {
    case 2:
    case 1:
      storeAndSend(this, packet, cb)
      break
    /**
     * no need of case here since it will be caught by default
     * and jshint comply that before default it must be a break
     * anyway it will result in -1 evaluation
     */
    case 0:
      /* falls through */
    default:
      sendPacket(this, packet, cb)
      break
  }
}


거봐 확인 안 했으면 어쩔뻔 했어


나는 authenticate packet 을 client 가 connect 되기 전에 요청한다.

근데 저 function 에 따르면, connected 상태에 따라 처리해주는 부분이 있어버려서,

제대로 sendPacket 까지 도달하지 못하는 것 같다. (실제로 이 function 을 처리 안 해주고 그냥 돌리면 에러가 난다. 테스트 해봄)



이 코드를 수정하는 데에는 여러가지 방법이 있겠지만,

코드 하나하나 뜯어보고 무슨 의미인지 알아내기엔 이미 삽질을 너무 많이 해서

그냥 가장 간단한 방법으로 고쳤다.

/**
 * _sendPacket - send or queue a packet
 * @param {String} type - packet type (see `protocol`)
 * @param {Object} packet - packet options
 * @param {Function} cb - callback when the packet is sent
 * @api private
 */
MqttClient.prototype._sendPacket = function (packet, cb) {
  if (!this.connected && packet.cmd !== 'authenticate') {
    if (0 < packet.qos || 'publish' !== packet.cmd || this.queueQoSZero) {
      this.queue.push({ packet: packet, cb: cb });
    } else if (cb) {
      cb(new Error('No connection to broker'));
    }

    return;
  }

  // When sending a packet, reschedule the ping timer
  this._shiftPingInterval();

  switch (packet.qos) {
    case 2:
    case 1:
      storeAndSend(this, packet, cb);
      break;
    /**
     * no need of case here since it will be caught by default
     * and jshint comply that before default it must be a break
     * anyway it will result in -1 evaluation
     */
    case 0:
      /* falls through */
    default:
      sendPacket(this, packet, cb);
      break;
  }
};

다른 그림찾기 넘 고난이도일까봐 bold 처리 했다.


저렇게 그냥 단순히 cmd가 authenticate일 경우에는 connected 가 false 여도 다음 로직으로 넘어가도록 해버렸다.



이제 여기까지 하면!!

거의 다 됐는데!!!



이전편처럼 project directory 가 구분되어 있는 사람들은

Deivce project 쪽 mqtt-packet library 에서도 authenticate type packet generate 하는 부분을 다시 추가해줘야 한다.


근데 나만 그런 건지는 모르겠는데,

npm install mosca 로 설치된 Server project 쪽 mqtt-packet library

npm install mqtt 로 설치된 Device project 쪽 mqtt-packet library 가 

코드 구성이 조금 상이하다.


주의할 것.. 아마 버전 차이인 것 같다.


npm install mqtt 로 설치된 mqtt-packet library 에서는 

generate 하는 코드를 mqtt-packet/writeToStream.js 파일에서 찾을 수 있다.



방식이 좀 다르긴 한데

stream 이란 두번째 parameter 가 추가되었다.


그래도 이전에 했던 것처럼 connect 복붙해서 적당히 수정하면 된다.

진짜 똑같이 수정하면 됨!! 괜히 이상한 부분만 안 건드리면..


대신 진짜진짜 주의할 부분이 있다!!!!

나도 원래는 같은 project directory 에서 Server, Device process 둘 다 돌리면서 테스트 하고 분리했었는데,

이걸 안 건드려서 한참 삽질을 또 했다.


(Device side mqtt-packet/constants.js)

/* Authenticate */
protocol.AUTHENTICATE_HEADER = Buffer.from([protocol.codes['authenticate'] << protocol.CMD_SHIFT])

저 파일을 열고 밑으로 스크롤 죽죽 내려보면

HEADER 를 generate 해서 명시해두는 부분이 있는데,

이게 Server side 에서 쓰는 mqtt-packet library 버전이랑 달라서 ㅠㅠ

Server 에서는 그냥 connect function 적당히 수정하면 HEADER parsing 도 같이 처리됐었는데

여기선 따로 저렇게 constants.js 파일에서 관리한다.

꼭꼭 명시해줘야됨..


저걸 모르고 그냥 돌렸더니 세상에

authenticate 형식으로 잘 파싱해놓고

보낼 때 connect로 보내버린다.


저 코드를 constants.js 에 추가해줬다면

다시 writeToStream.js 파일로 돌아와서 복붙해서 짜깁기해둔 authenticate parsing function 에다가

  // Generate header
  stream.write(protocol.AUTHENTICATE_HEADER)

라고 수정해주면 된다.




여기까지하면 device에서 authenticate packet 을 보내는 것도 잘 된다!



Server 에서 저 authenticate packet 을 parsing 해서 받는 부분은

Device 에서 받을 때와 동일한 파일들을 동일하게 수정해주면 된다.

다행히 이 부분은 버전별 차이가 없다 ㅠㅠ 다행






길고 긴 여정이 끝났다!


이렇게 하면 

Mqtt Packet 에 Command Type 추가하기 완료!!!



흐하항행복해

이제 추가한 타입을 멋대로 가져다 쓰면 된다

이벤트도 받고 패킷도 쏘고~~~ 앗싸링!!


아 근데 글 하나 더 있다. ㅎ

[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #1

[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #2



하잇!

아 이어서 정리하려니까 넘 힘들다

언젠가는 내가 이걸 다시 보고 공부를 할까?

왜 자꾸 부질 없는 것 같지... 흑흑


그래도 내가 공부한 거 정리하는 거니까 마저 열씨미 해야징 ㅠㅠ





#MQTT Packet Command 추가하기



Authentication 을 위한 Packet 흐름


1. Connect Packet (Device -> Server)

2. Authenticate Packet (Server -> Device)

3. Authenticate Packet (Device -> Server)

4. Connect ACK Packet (Server -> Device)


2단계를 하고 있었다.

저번 글에서는 #4. mqtt-packet (generate) 에서 모듈 열어보고, authenticate type packet 을 generate 하는 코드 추가해주고 테스트까지 해봤다.


그럼 2단계인 Server -> Device 에서

"Server -" 정도 까지 된 거다.

남은 "> Device" 부분을 이 글에서 볼 거고!



이제 Server 측에서 authenticate type packet 을 보내는 걸 했으니!

Device 에서 받아야한다. 

해당 Packet 을 제대로 받으려면, Authenticate type 을 parsing 할 수 있어야겠지?


Device 쪽 코드에서는 mqtt library 를 쓴다.

그러니 이번에는 mqtt library를 열어볼 거야



#5. mqtt (receive)


Device 에서는 mqtt library를 이용해 client 를 생성하고,Server 에 Connect 한다.


그리고 client 에다 관련 event들을 handling 하는 callback function 들을 달아주는데,

우리는 authenticate type 의 packet 에 대한 event 도 받아야 한다!

그러려면 일단 event를 emit 하는 부분을 찾아봐야 하지 않을까?!


(mqtt/lib/client.js)

MqttClient.prototype._handleConnack = function (packet) {
  var rc = packet.returnCode,
    // TODO: move to protocol
    errors = [
      '',
      'Unacceptable protocol version',
      'Identifier rejected',
      'Server unavailable',
      'Bad username or password',
      'Not authorized'
    ];

  clearTimeout(this.connackTimer);

  if (0 === rc) {
    this.reconnecting = false;
    this.emit('connect', packet);
  } else if (0 < rc) {
    this.emit('error',
        new Error('Connection refused: ' + errors[rc]));
  }
};

그래서 evnet를 emit 하는 부분을 찾아보았다!

저거 말고도 많은데, 일단 connect event 를 emit 하는 부분을 찾아봤더니, 저렇게 되어있었다.


그럼 다음 수순은 뭘까?

저 function 을 호출하는 부분을 찾아야겠지!?


(mqtt/lib/client.js)

MqttClient.prototype._handlePacket = function (packet, done) {
  this.emit('packetreceive', packet);

  switch (packet.cmd) {
    case 'publish':
      this._handlePublish(packet, done);
      break;
    case 'authenticate':
      this.emit('authenticate', packet);
      break;
    case 'puback':
    case 'pubrec':
    case 'pubcomp':
    case 'suback':
    case 'unsuback':
      this._handleAck(packet);
      done();
      break;
    case 'pubrel':
      this._handlePubrel(packet, done);
      break;
    case 'connack':
      this._handleConnack(packet);
      done();
      break;
    case 'pingresp':
      this._handlePingresp(packet);
      done();
      break;
    default:
      // do nothing
      // maybe we should do an error handling
      // or just log it
      break;
  }
};


짜잔!


이제 packet type 에 따라 event를 발생시켜주는 부분을 알았으니,

저기다 authenticate event 를 emit 하는 코드를 추가해주면

client 에서 authenticate event 를 handling 할 수 있을 거야!!


그래서 authenticate case 를 추가해줬다.


이럼 끝일까?


젠젠,, ㅠ


Packet Parsing 이 남았다.

기껏 authenticate packet 을 전송하는 거랑, 해당 event 를 처리할 수 있도록 준비까지 다 마쳐놨으니!

제대로 authenticate packet 타입을 처리해줘야 한다.


parsing 을 어디서 하는지 알아보자.


일단 저 _handlePacket function이 packet 이란 parameter 를 받는 것으로 보아,

어딘가에서 packet을 parsing 해서 _handlePacket function 을 호출해주겠지?



(mqtt/lib/client.js) 요 파일 많이많이 본다

function process () {
  var packet = packets.shift(),
    done = completeParse;
  if (packet) {
    that._handlePacket(packet, process);
  } else {
    completeParse = null;
    done();
  }
}


여기다!

근데 packet 을 packets 라는 변수에서 꺼내오는 걸 보니

packet 은 이 function 이 호출되기 전에 이미 처리되어 packets 에 저장되는 것 같다.

그럼 또 이 process function 이 호출되는 곳을 봐야한다.



(mqtt/lib/client.js)

writable._write = function (buf, enc, done) {
  console.log(buf)
  completeParse = done;
  parser.parse(buf);
  process();
};

this.stream.pipe(writable);

요기 있지!!


여길 보면 stream.pipe 란 애가 있는데, 요주의..

아마 나중에 또 등장할 거다. 한 마지막편쯤에..


어쨌든 저 pipe 란 애의 원래 역할은, 

readable stream 에 pipe 로 writable stream 을 연결해주면,

readable stream 이 읽어들인 data buffer 를 writable stream 으로 토스해준다.

그리고 그 buffer 를 처리해주는 부분이 writable._write 로 정의된 저 function 인 것 같다.

자세한 건 document 참조


여튼 그렇게 읽어들인 buffer 를 처리하는 부분에

내가 찾던 키워드가 있다!


parser.parse(buf)...


저 parser 는 또 뭘까 해서 보니

2편에 나왔던 그 아이다

mqtt-packet..




#6. mqtt-packet (parse)


그럼 다시 봐야지 뭐..


(mqtt-packet/parser.js)

Parser.prototype._parsePayload = function () {
  var result = false

  // Do we have a payload? Do we have enough data to complete the payload?
  // PINGs have no payload
  if (this.packet.length === 0 || this._list.length >= this.packet.length) {

    this._pos = 0

    switch (this.packet.cmd) {
      case 'connect':
        this._parseConnect()
        break
      case 'authenticate':
        this._parseAuth()
        break
      case 'connack':
        this._parseConnack()
        break
      case 'publish':
        this._parsePublish()
        break
      case 'puback':
      case 'pubrec':
      case 'pubrel':
      case 'pubcomp':
        this._parseMessageId()
        break
      case 'subscribe':
        this._parseSubscribe()
        break
      case 'suback':
        this._parseSuback()
        break
      case 'unsubscribe':
        this._parseUnsubscribe()
        break
      case 'unsuback':
        this._parseUnsuback()
        break
      case 'pingreq':
      case 'pingresp':
      case 'disconnect':
        // these are empty, nothing to do
        break
      default:
        this._emitError(new Error('not supported'))
    }

    result = true
  }

  return result
}

흐흐 나 코드 따라가는 것 좀 잘 하는 것 같다.


여튼 여기 등장! 빠밤

그럼 여기다 또 언제나 그래왔듯 authenticate case 를 추가해주면 된다.


근데 또 못보던 function 이 은근슬쩍 등장했다.

그렇다. 만들어줘야한다.

진짜 개귀찮다. 내가 왜 이 짓을 시작했지

역시 코딩은 삽질의 연속이다

인정? 어인정~ 동의? 어 보감~ 동휘? 어 보검~~~~~~~~~~~~~~~



(mqtt-packet/parser.js)

Parser.prototype._parseAuth = function() {
  var protocolId // constants id
    , clientId // Client id
    , password // Password
    , username // Username
    , flags = {}
    , packet = this.packet

  // Parse constants id
  protocolId = this._parseString()
  if (protocolId === null)
    return this._emitError(new Error('cannot parse protocol id'))

  if (protocolId != 'MQTT' && protocolId != 'MQIsdp') {

    return this._emitError(new Error('invalid protocol id'))
  }

  packet.protocolId = protocolId

  // Parse constants version number
  if(this._pos >= this._list.length)
    return this._emitError(new Error('packet too short'))

  packet.protocolVersion = this._list.readUInt8(this._pos)

  if(packet.protocolVersion != 3 && packet.protocolVersion != 4) {

    return this._emitError(new Error('invalid protocol version'))
  }

  this._pos++
  if(this._pos >= this._list.length)
    return this._emitError(new Error('packet too short'))

  // Parse connect flags
  flags.username  = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)
  flags.password  = (this._list.readUInt8(this._pos) & constants.PASSWORD_MASK)

  packet.clean = (this._list.readUInt8(this._pos) & constants.CLEAN_SESSION_MASK) !== 0
  this._pos++

  // Parse keepalive
  packet.keepalive = this._parseNum()
  if(packet.keepalive === -1)
    return this._emitError(new Error('packet too short'))

  // Parse client ID
  clientId = this._parseString()
  if(clientId === null)
    return this._emitError(new Error('packet too short'))
  packet.clientId = clientId

  // Parse username
  if (flags.username) {
    username = this._parseString()
    if(username === null)
      return this._emitError(new Error('cannot parse username'))
    packet.username = username
  }

  // Parse password
  if(flags.password) {
    password = this._parseBuffer()
    if(password === null)
      return this._emitError(new Error('cannot parse username'))
    packet.password = password
  }

  return packet
}


이번에도 역시 _parseConnect 를 참고했다.

참고하되 저번에 만들어둔 Authenticate Generate Function 을 꼭 고려하면서 짜깁기해야한다!

그때 넣었던 데이터들 잘 참고해서 고쳐줘야함!


요까지 해주면 device 에서 authenticate packet 을 받는 것도 매우 잘 된다!


아래는 받는 코드

client.on('authenticate', function(packet) {
  console.log(packet);
});


아 근데 주의할 점이 있다.


혹여나 (가 아니고 대개 그렇겠지만)

Server process 를 돌리는 project directory 와

Device process 를 돌리는 project directory 가 다르다면

각각 node_modules directory 에서 적합한 library 를 수정해줘야 하는데,


지금까지 한 Authenticate Packet (Server -> Device) 단계에서

Server / Device 둘 모두에서 고쳐줘야 하는 부분이 있다.


constants.js !!


authenticate 타입을

Server 에선 추가해줬는데 Device 에서 안 해줬거나

혹은 그 반대의 상황이어도

보내거나 받는 게 제대로 안 된다.


꼭 확인!


ㅠㅠ 다음편엔 3단계로 넘어갈 예정.

정리 넘 힘들당 

+ Recent posts