Grokbase Groups Camel users June 2015
FAQ
camel-netty is based on Netty3.x which page name starts with org.jboss.netty

It looks like you are using Netty 4.x (page name starts with io.netty), I suggest you use camel-netty4 instead of camel-netty component.

--
Willem Jiang

Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang
Weibo: 姜宁willem


On June 29, 2015 at 2:58:07 PM, SteveR (srichardson@vonage.com) wrote:
I have a linux Java7 stand-alone application using Camel 2.14 and the
camel-netty component. I have a route that receives via netty:udp and then
attempts to mirror the received packets also via netty:udp. I'm also using
the *LengthFieldBasedFrameDecoder *to decode the UDP packets into message
frames based on a 2-byte length field within each UDP message.

The UDP messages I'm receiving contain certain characters that don't decode
in UTF-8 (e.g. 0xa1 0xb2, 0xc3 ), so I've been trying to use the
*iso-8859-1* charset. I'm thinking that what I want in my
ServerPipelineFactory subclass is the
*io.netty.handler.codec.bytes.ByteArrayDecoder*, but I'm unable to get it to
compile with the *addlast()* method.

I've been using the approach outlined here

thus far, but then I run into this compile issue with *io.netty *versus
*org.jboss.netty*.

Which leads me to believe that I'm confused wrt imports for *io.netty*
versus *org.jboss.netty*? Below is my extended *ServerPipelineFactory
*class which I'd like to switch the StringDecoder/StringEncoder for
ByteArrayDecoder/ByteArrayEncoder.

Any help is greatly appreciated!

Thanks, SteveR


package multiprotocollistenerrouter;

//import io.netty.handler.codec.bytes.ByteArrayDecoder;
//import io.netty.handler.codec.bytes.ByteArrayEncoder;
//import io.netty.channel.ChannelPipeline;
//import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.util.CharsetUtil;
import org.apache.camel.component.netty.NettyConsumer;
import org.apache.camel.component.netty.ServerPipelineFactory;
import org.apache.camel.component.netty.handlers.ServerChannelHandler;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

//import org.jboss.netty.handler.logging.LoggingHandler;
//import org.jboss.netty.logging.InternalLogLevel;

import org.slf4j.Logger; // The org.slf4j.Logger interface is the
main user entry point of SLF4J API.
import org.slf4j.LoggerFactory; // Utility class producing Loggers for
various logging APIs, most notably for log4j.

/**
* Consumer linked channel pipeline factory for the MCQ source provider.
* Provides custom support for reception/parsing/processing of MCQ.
*
* @author steve
*
* @see http://camel.apache.org/netty.html
* @see
http://opensourceknowledge.blogspot.com/2010/08/customizing-netty-endpoints-using.html#
* @see
http://seeallhearall.blogspot.com/2012/06/netty-tutorial-part-15-on-channel.html
* @see
https://github.com/apache/camel/blob/master/components/camel-netty/src/test/
*
java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
*/
public class McqServerPipelineFactory extends ServerPipelineFactory {
private final static Logger logger =
LoggerFactory.getLogger(McqServerPipelineFactory.class);
private static final String NEW_LINE =
System.getProperty("line.separator");

// -------------------------------------------------------------------
// Stateless, singleton handler instances...re-used across connections
// -------------------------------------------------------------------
private static final ChannelHandler STR_ENCODER = new
StringEncoder(CharsetUtil.ISO_8859_1);
private static final ChannelHandler STR_DECODER = new
StringDecoder(CharsetUtil.ISO_8859_1);

// private static final ChannelHandler LOG_HANDLER = new
LoggingHandler(InternalLogLevel.INFO);

// private static final ByteArrayDecoder BYTES_DECODER = new
ByteArrayDecoder();
// private static final ByteArrayEncoder BYTES_ENCODER = new
ByteArrayEncoder();

private final NettyConsumer consumer;

private final int maxFrameLength;
private final int lengthFieldOffset;
private final int lengthFieldLength;
private final int lengthAdjustment;
private final int initialBytesToStrip;
private boolean invoked;
private String routeId;

@Override
public ChannelPipeline getPipeline() throws Exception {
logger.trace("getPipeline(): ENTER");

invoked = true;
ChannelPipeline channelPipeline = Channels.pipeline();

String theRouteId = consumer.getRoute().getId();
logger.info("getPipeline(): {}, routeId = {}", consumer.toString(),
theRouteId);

// -----------------------------------------------
// Add logger to print incoming and outgoing data.
// This is both an upstream/downstream handler.
// -----------------------------------------------
// String loggerName = "MCQ_LOGGING_HANDLER_" + theRouteId;
// channelPipeline.addLast(loggerName, LOG_HANDLER);

//
-----------------------------------------------------------------------------------
// Here we add the custom message decoder. Frame decoders are
typically
// stateful, thus we create a new instance with every pipeline.
// See
https://docs.jboss.org/netty/3.1/api/org/jboss/netty/buffer/ChannelBuffer.html
// See
https://docs.jboss.org/netty/3.1/api/org/jboss/netty/buffer/ChannelBuffers.html
//
-----------------------------------------------------------------------------------
String lengthFieldBasedFrameDecoderName =
"MCQ_LENGTH_FIELD_FRAME_BASED_DECODER_" + theRouteId;
channelPipeline.addLast(lengthFieldBasedFrameDecoderName,
new LengthFieldBasedFrameDecoder(maxFrameLength,
lengthFieldOffset,
lengthFieldLength,
lengthAdjustment,
initialBytesToStrip));

if(logger.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("getPipeline(): Added
").append(lengthFieldBasedFrameDecoderName).append(":").
append(NEW_LINE).
append("maxFrameLength = ").append(maxFrameLength)
.append(NEW_LINE).
append("lengthFieldOffset =
").append(lengthFieldOffset).append(NEW_LINE).
append("lengthFieldLength =
").append(lengthFieldLength).append(NEW_LINE).
append("lengthAdjustment = ").append(lengthAdjustment)
.append(NEW_LINE).
append("initialBytesToStrip = ").append(initialBytesToStrip);
logger.info(sb.toString());
}

// -----------------------------------------------------------
// Add string encoder (downstream) / string decoder (upstream)
// -----------------------------------------------------------
String stringDecoderName = "MCQ_STRING_DECODER_" + theRouteId;
channelPipeline.addLast(stringDecoderName, STR_DECODER);
String stringEncoderName = "MCQ_STRING_ENCODER_" + theRouteId;
channelPipeline.addLast(stringEncoderName, STR_ENCODER);

// -----------------------------------------------------------
// Add byte[] encoder (downstream) / byte[] decoder (upstream)
// -----------------------------------------------------------
// String bytesDecoderName = "MCQ_BYTES_DECODER_" + theRouteId;
// channelPipeline.addLast(bytesDecoderName, BYTES_DECODER);
// String bytesEncoderName = "MCQ_BYTES_ENCODER_" + theRouteId;
// channelPipeline.addLast(bytesEncoderName, BYTES_ENCODER);

// ------------------------------------------------------
// Here we add the default Camel ServerChannelHandler for
// the consumer, to allow Camel to route the message, etc.
// ------------------------------------------------------
String serverChannelHandlerName = "MCQ_SERVER_CHANNEL_HANDLER_" +
theRouteId;
channelPipeline.addLast(serverChannelHandlerName, new
ServerChannelHandler(consumer));
logger.info("getPipeline(): Added default Camel {}",
serverChannelHandlerName);

logger.trace("getPipeline(): EXIT");
return channelPipeline;
}

/**
* Create the server pipeline factory.
*
* @param consumer
* @return ServerPipelineFactory
*/
@Override
public ServerPipelineFactory createPipelineFactory(NettyConsumer
consumer) {
logger.trace("createPipelineFactory(NettyConsumer): ENTER with {}",
(consumer == null) ? "null" : consumer);

if(consumer != null) {
// -------------------------------------------------------------
// - Get routeId from NettyConsumer input.
// - Get serverPipelineFactoryName from routeId.
// - Get LengthFieldBasedFrameDecoderBean from RouteManager.
// - Construct McqServerPipelineFactory from
// NettyConsumer and LengthFieldBasedFrameDecoderBean members.
// -------------------------------------------------------------
String theRouteId = consumer.getRoute().getId();

String serverPipelineFactoryName =
RouteManager.getServerPipelineFactoryName(theRouteId);

LengthFieldBasedFrameDecoderBean
lengthFieldBasedFrameDecoderBean =

RouteManager.getLengthFieldBasedFrameDecoderInfo(serverPipelineFactoryName);

Integer theMaxFrameLength =
lengthFieldBasedFrameDecoderBean.getMaxFrameLengthInBytes();
Integer theLengthFieldOffset =
lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthFieldOffset();
Integer theLengthFieldLength =
lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthFieldLength();
Integer theLengthAdjustment =
lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthAdjustment();
Integer theInitialBytesToStrip =
lengthFieldBasedFrameDecoderBean.getFrameDecoderInitialBytesToStrip();

if(logger.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Constructing McqServerPipelineFactory:")
.append(NEW_LINE).
append("consumer = ").append(consumer)
.append(NEW_LINE).
append("maxFrameLength =
").append(theMaxFrameLength) .append(NEW_LINE).
append("lengthFieldOffset =
").append(theLengthFieldOffset) .append(NEW_LINE).
append("lengthFieldLength =
").append(theLengthFieldLength) .append(NEW_LINE).
append("lengthAdjustment =
").append(theLengthAdjustment) .append(NEW_LINE).
append("initialBytesToStrip =
").append(theInitialBytesToStrip).append(NEW_LINE).
append("routeId = ").append(theRouteId);
logger.info(sb.toString());
}

logger.trace("createPipelineFactory(): EXIT");
return new McqServerPipelineFactory(consumer,
theMaxFrameLength,
theLengthFieldOffset,
theLengthFieldLength,
theLengthAdjustment,
theInitialBytesToStrip,
theRouteId);
}

logger.trace("createPipelineFactory(): EXIT");
return null;
}

public boolean isfactoryInvoked() {
return invoked;
}

public McqServerPipelineFactory(NettyConsumer consumer) {
logger.trace("McqServerPipelineFactory(NettyConsumer): ENTER with
{}", (consumer == null) ? "null" : consumer);

this.maxFrameLength = 0;
this.lengthFieldOffset = 0;
this.lengthFieldLength = 0;
this.lengthAdjustment = 0;
this.initialBytesToStrip = 0;
this.consumer = consumer;

logger.trace("McqServerPipelineFactory(NettyConsumer): EXIT");
}

public McqServerPipelineFactory(
NettyConsumer consumer,
Integer maxFrameLength,
Integer lengthFieldOffset,
Integer lengthFieldLength,
Integer lengthAdjustment,
Integer initialBytesToStrip,
String routeId) {
if(logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("McqServerPipelineFactory(): ENTER with:").
append(NEW_LINE).
append("consumer = ").append(consumer)
.append(NEW_LINE).
append("maxFrameLength = ").append(maxFrameLength)
.append(NEW_LINE).
append("lengthFieldOffset = ").append(lengthFieldOffset)
.append(NEW_LINE).
append("lengthFieldLength = ").append(lengthFieldLength)
.append(NEW_LINE).
append("lengthAdjustment = ").append(lengthAdjustment)
.append(NEW_LINE).
append("initialBytesToStrip =
").append(initialBytesToStrip).append(NEW_LINE).
append("routeId = ").append(routeId);
logger.trace(sb.toString());
}

this.consumer = consumer;
this.maxFrameLength = maxFrameLength;
this.lengthFieldOffset = lengthFieldOffset;
this.lengthFieldLength = lengthFieldLength;
this.lengthAdjustment = lengthAdjustment;
this.initialBytesToStrip = initialBytesToStrip;
this.routeId = routeId;

logger.trace("McqServerPipelineFactory(): EXIT");
}
}
// ------------- end --------------





--
View this message in context: http://camel.465427.n5.nabble.com/Camel-2-14-Netty-How-to-add-ByteArrayDecoder-to-ServerChannelPipeline-tp5768638.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 2 of 5 | next ›
Discussion Overview
groupusers @
categoriescamel
postedJun 29, '15 at 6:58a
activeJul 10, '15 at 1:46a
posts5
users2
websitecamel.apache.org

2 users in discussion

Willem Jiang: 3 posts SteveR: 2 posts

People

Translate

site design / logo © 2021 Grokbase