自定义source开发:mysql
source是收集日志存入channel。web
Source提供了两种机制:PollableSource(轮训拉取)和EventDrivenSource(事件驱动),sql
若是使用EventDrivenSource,你能够在start方法中启动额外的线程,不断的往channel中发数据。若是使用PollableSource,你能够在process()实现不断重发。apache
public class MySource extends AbstractSource implements Configurable, PollableSource { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; try { // This try clause includes whatever Channel/Event operations you want to do // Receive new data Event e = getSomeData(); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e); status = Status.READY; } catch (Throwable t) { // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; }}
或者ide
package
org.apache.flume;
import
org.apache.flume.conf.Configurable;
import
org.apache.flume.source.AbstractSource;
public
class
TailSource
extends
AbstractSource
implements
EventDrivenSource,
Configurable {
@Override
public
void
configure(Context context) {
}
@Override
public
synchronized
void
start() {
}
@Override
public
synchronized
void
stop() {
}
}
自定义sink:this
sink是从channel中拉取日志处理。spa
process会不断调用,你只需在process中去取channel的数据便可。线程
public class MySink extends AbstractSink implements Configurable { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to .. } @Override public void stop () { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do Event event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } return status; }}
自定义sink与mysql整合http://www.iteblog.com/archives/1109日志