mirror of https://github.com/v2ray/v2ray-core
				
				
				
			update activity
							parent
							
								
									f418b9bc20
								
							
						
					
					
						commit
						6f3362fc4c
					
				| 
						 | 
				
			
			@ -15,7 +15,6 @@ import (
 | 
			
		|||
	"v2ray.com/core/common/buf"
 | 
			
		||||
	"v2ray.com/core/common/errors"
 | 
			
		||||
	"v2ray.com/core/common/net"
 | 
			
		||||
	"v2ray.com/core/common/signal"
 | 
			
		||||
	"v2ray.com/core/proxy"
 | 
			
		||||
	"v2ray.com/core/transport/ray"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -147,7 +146,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
 | 
			
		|||
		log.Trace(newError("failed to write first payload").Base(err))
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if err := buf.Copy(signal.BackgroundTimer(), s.input, writer); err != nil {
 | 
			
		||||
	if err := buf.Copy(s.input, writer); err != nil {
 | 
			
		||||
		log.Trace(newError("failed to fetch all input").Base(err))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -175,7 +174,7 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func drain(reader *Reader) error {
 | 
			
		||||
	buf.Copy(signal.BackgroundTimer(), reader, buf.Discard)
 | 
			
		||||
	buf.Copy(reader, buf.Discard)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -199,7 +198,7 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *Reader) error {
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	if s, found := m.sessionManager.Get(meta.SessionID); found {
 | 
			
		||||
		return buf.Copy(signal.BackgroundTimer(), reader, s.output, buf.IgnoreWriterError())
 | 
			
		||||
		return buf.Copy(reader, s.output, buf.IgnoreWriterError())
 | 
			
		||||
	}
 | 
			
		||||
	return drain(reader)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -291,7 +290,7 @@ type ServerWorker struct {
 | 
			
		|||
 | 
			
		||||
func handle(ctx context.Context, s *Session, output buf.Writer) {
 | 
			
		||||
	writer := NewResponseWriter(s.ID, output)
 | 
			
		||||
	if err := buf.Copy(signal.BackgroundTimer(), s.input, writer); err != nil {
 | 
			
		||||
	if err := buf.Copy(s.input, writer); err != nil {
 | 
			
		||||
		log.Trace(newError("session ", s.ID, " ends: ").Base(err))
 | 
			
		||||
	}
 | 
			
		||||
	writer.Close()
 | 
			
		||||
| 
						 | 
				
			
			@ -323,7 +322,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 | 
			
		|||
	w.sessionManager.Add(s)
 | 
			
		||||
	go handle(ctx, s, w.outboundRay.OutboundOutput())
 | 
			
		||||
	if meta.Option.Has(OptionData) {
 | 
			
		||||
		return buf.Copy(signal.BackgroundTimer(), reader, s.output, buf.IgnoreWriterError())
 | 
			
		||||
		return buf.Copy(reader, s.output, buf.IgnoreWriterError())
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -333,7 +332,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *Reader) err
 | 
			
		|||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	if s, found := w.sessionManager.Get(meta.SessionID); found {
 | 
			
		||||
		return buf.Copy(signal.BackgroundTimer(), reader, s.output, buf.IgnoreWriterError())
 | 
			
		||||
		return buf.Copy(reader, s.output, buf.IgnoreWriterError())
 | 
			
		||||
	}
 | 
			
		||||
	return drain(reader)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -71,7 +71,15 @@ func IgnoreWriterError() CopyOption {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer, handler copyHandler) error {
 | 
			
		||||
func UpdateActivity(timer signal.ActivityTimer) CopyOption {
 | 
			
		||||
	return func(handler *copyHandler) {
 | 
			
		||||
		handler.onData = func() {
 | 
			
		||||
			timer.Update()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func copyInternal(reader Reader, writer Writer, handler copyHandler) error {
 | 
			
		||||
	for {
 | 
			
		||||
		buffer, err := reader.Read()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -81,7 +89,6 @@ func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer, hand
 | 
			
		|||
		}
 | 
			
		||||
 | 
			
		||||
		handler.onData()
 | 
			
		||||
		timer.Update()
 | 
			
		||||
 | 
			
		||||
		if buffer.IsEmpty() {
 | 
			
		||||
			buffer.Release()
 | 
			
		||||
| 
						 | 
				
			
			@ -99,12 +106,12 @@ func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer, hand
 | 
			
		|||
 | 
			
		||||
// Copy dumps all payload from reader to writer or stops when an error occurs.
 | 
			
		||||
// ActivityTimer gets updated as soon as there is a payload.
 | 
			
		||||
func Copy(timer signal.ActivityTimer, reader Reader, writer Writer, options ...CopyOption) error {
 | 
			
		||||
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
 | 
			
		||||
	handler := copyHandler{}
 | 
			
		||||
	for _, option := range options {
 | 
			
		||||
		option(&handler)
 | 
			
		||||
	}
 | 
			
		||||
	err := copyInternal(timer, reader, writer, handler)
 | 
			
		||||
	err := copyInternal(reader, writer, handler)
 | 
			
		||||
	if err != nil && errors.Cause(err) != io.EOF {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -76,7 +76,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 | 
			
		|||
 | 
			
		||||
		chunkReader := buf.NewReader(conn)
 | 
			
		||||
 | 
			
		||||
		if err := buf.Copy(timer, chunkReader, inboundRay.InboundInput()); err != nil {
 | 
			
		||||
		if err := buf.Copy(chunkReader, inboundRay.InboundInput(), buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return newError("failed to transport request").Base(err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -86,7 +86,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 | 
			
		|||
	responseDone := signal.ExecuteAsync(func() error {
 | 
			
		||||
		v2writer := buf.NewWriter(conn)
 | 
			
		||||
 | 
			
		||||
		if err := buf.Copy(timer, inboundRay.InboundOutput(), v2writer); err != nil {
 | 
			
		||||
		if err := buf.Copy(inboundRay.InboundOutput(), v2writer, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return newError("failed to transport response").Base(err)
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -118,7 +118,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 | 
			
		|||
		} else {
 | 
			
		||||
			writer = buf.NewSequentialWriter(conn)
 | 
			
		||||
		}
 | 
			
		||||
		if err := buf.Copy(timer, input, writer); err != nil {
 | 
			
		||||
		if err := buf.Copy(input, writer, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return newError("failed to process request").Base(err)
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
| 
						 | 
				
			
			@ -128,7 +128,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 | 
			
		|||
		defer output.Close()
 | 
			
		||||
 | 
			
		||||
		v2reader := buf.NewReader(conn)
 | 
			
		||||
		if err := buf.Copy(timer, v2reader, output); err != nil {
 | 
			
		||||
		if err := buf.Copy(v2reader, output, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return newError("failed to process response").Base(err)
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -141,7 +141,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 | 
			
		|||
		defer ray.InboundInput().Close()
 | 
			
		||||
 | 
			
		||||
		v2reader := buf.NewReader(reader)
 | 
			
		||||
		if err := buf.Copy(timer, v2reader, ray.InboundInput()); err != nil {
 | 
			
		||||
		if err := buf.Copy(v2reader, ray.InboundInput(), buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
| 
						 | 
				
			
			@ -149,7 +149,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 | 
			
		|||
 | 
			
		||||
	responseDone := signal.ExecuteAsync(func() error {
 | 
			
		||||
		v2writer := buf.NewWriter(writer)
 | 
			
		||||
		if err := buf.Copy(timer, ray.InboundOutput(), v2writer); err != nil {
 | 
			
		||||
		if err := buf.Copy(ray.InboundOutput(), v2writer, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -105,7 +105,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 | 
			
		|||
		}
 | 
			
		||||
 | 
			
		||||
		requestDone := signal.ExecuteAsync(func() error {
 | 
			
		||||
			if err := buf.Copy(timer, outboundRay.OutboundInput(), bodyWriter); err != nil {
 | 
			
		||||
			if err := buf.Copy(outboundRay.OutboundInput(), bodyWriter, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
| 
						 | 
				
			
			@ -119,7 +119,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 | 
			
		|||
				return err
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if err := buf.Copy(timer, responseReader, outboundRay.OutboundOutput()); err != nil {
 | 
			
		||||
			if err := buf.Copy(responseReader, outboundRay.OutboundOutput(), buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -141,7 +141,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 | 
			
		|||
		})
 | 
			
		||||
 | 
			
		||||
		requestDone := signal.ExecuteAsync(func() error {
 | 
			
		||||
			if err := buf.Copy(timer, outboundRay.OutboundInput(), writer); err != nil {
 | 
			
		||||
			if err := buf.Copy(outboundRay.OutboundInput(), writer, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
				return newError("failed to transport all UDP request").Base(err)
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
| 
						 | 
				
			
			@ -155,7 +155,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 | 
			
		|||
				User:   user,
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if err := buf.Copy(timer, reader, outboundRay.OutboundOutput()); err != nil {
 | 
			
		||||
			if err := buf.Copy(reader, outboundRay.OutboundOutput(), buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
				return newError("failed to transport all UDP response").Base(err)
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -173,7 +173,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
 | 
			
		|||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := buf.Copy(timer, ray.InboundOutput(), responseWriter); err != nil {
 | 
			
		||||
		if err := buf.Copy(ray.InboundOutput(), responseWriter, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return newError("failed to transport all TCP response").Base(err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -183,7 +183,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
 | 
			
		|||
	requestDone := signal.ExecuteAsync(func() error {
 | 
			
		||||
		defer ray.InboundInput().Close()
 | 
			
		||||
 | 
			
		||||
		if err := buf.Copy(timer, bodyReader, ray.InboundInput()); err != nil {
 | 
			
		||||
		if err := buf.Copy(bodyReader, ray.InboundInput(), buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return newError("failed to transport all TCP request").Base(err)
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -90,11 +90,11 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.
 | 
			
		|||
	var responseFunc func() error
 | 
			
		||||
	if request.Command == protocol.RequestCommandTCP {
 | 
			
		||||
		requestFunc = func() error {
 | 
			
		||||
			return buf.Copy(timer, ray.OutboundInput(), buf.NewWriter(conn))
 | 
			
		||||
			return buf.Copy(ray.OutboundInput(), buf.NewWriter(conn), buf.UpdateActivity(timer))
 | 
			
		||||
		}
 | 
			
		||||
		responseFunc = func() error {
 | 
			
		||||
			defer ray.OutboundOutput().Close()
 | 
			
		||||
			return buf.Copy(timer, buf.NewReader(conn), ray.OutboundOutput())
 | 
			
		||||
			return buf.Copy(buf.NewReader(conn), ray.OutboundOutput(), buf.UpdateActivity(timer))
 | 
			
		||||
		}
 | 
			
		||||
	} else if request.Command == protocol.RequestCommandUDP {
 | 
			
		||||
		udpConn, err := dialer.Dial(ctx, udpRequest.Destination())
 | 
			
		||||
| 
						 | 
				
			
			@ -103,12 +103,12 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.
 | 
			
		|||
		}
 | 
			
		||||
		defer udpConn.Close()
 | 
			
		||||
		requestFunc = func() error {
 | 
			
		||||
			return buf.Copy(timer, ray.OutboundInput(), buf.NewSequentialWriter(NewUDPWriter(request, udpConn)))
 | 
			
		||||
			return buf.Copy(ray.OutboundInput(), buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer))
 | 
			
		||||
		}
 | 
			
		||||
		responseFunc = func() error {
 | 
			
		||||
			defer ray.OutboundOutput().Close()
 | 
			
		||||
			reader := &UDPReader{reader: udpConn}
 | 
			
		||||
			return buf.Copy(timer, reader, ray.OutboundOutput())
 | 
			
		||||
			return buf.Copy(reader, ray.OutboundOutput(), buf.UpdateActivity(timer))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -124,7 +124,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 | 
			
		|||
		defer input.Close()
 | 
			
		||||
 | 
			
		||||
		v2reader := buf.NewReader(reader)
 | 
			
		||||
		if err := buf.Copy(timer, v2reader, input); err != nil {
 | 
			
		||||
		if err := buf.Copy(v2reader, input, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return newError("failed to transport all TCP request").Base(err)
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
| 
						 | 
				
			
			@ -132,7 +132,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 | 
			
		|||
 | 
			
		||||
	responseDone := signal.ExecuteAsync(func() error {
 | 
			
		||||
		v2writer := buf.NewWriter(writer)
 | 
			
		||||
		if err := buf.Copy(timer, output, v2writer); err != nil {
 | 
			
		||||
		if err := buf.Copy(output, v2writer, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return newError("failed to transport all TCP response").Base(err)
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -129,7 +129,7 @@ func transferRequest(timer signal.ActivityTimer, session *encoding.ServerSession
 | 
			
		|||
	defer output.Close()
 | 
			
		||||
 | 
			
		||||
	bodyReader := session.DecodeRequestBody(request, input)
 | 
			
		||||
	if err := buf.Copy(timer, bodyReader, output); err != nil {
 | 
			
		||||
	if err := buf.Copy(bodyReader, output, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
| 
						 | 
				
			
			@ -157,7 +157,7 @@ func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSessio
 | 
			
		|||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := buf.Copy(timer, input, bodyWriter); err != nil {
 | 
			
		||||
	if err := buf.Copy(input, bodyWriter, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -123,7 +123,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 | 
			
		|||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := buf.Copy(timer, input, bodyWriter); err != nil {
 | 
			
		||||
		if err := buf.Copy(input, bodyWriter, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -147,7 +147,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 | 
			
		|||
 | 
			
		||||
		reader.SetBuffered(false)
 | 
			
		||||
		bodyReader := session.DecodeResponseBody(request, reader)
 | 
			
		||||
		if err := buf.Copy(timer, bodyReader, output); err != nil {
 | 
			
		||||
		if err := buf.Copy(bodyReader, output, buf.UpdateActivity(timer)); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue